Uh oh!
There was an error while loading. Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork 34k
gh-74028: concurrent.futures.Executor.map: introduce buffersize param for lazier behavior#125663
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Uh oh!
There was an error while loading. Please reload this page.
Changes from all commits
45c3ec5bfb2c5c8539663022b8c67ced787cb5f26ef46ebe6eb26e860821f95d95c55bc80f46690e6d7cab9169401b8adf2f8a63f1fb53a5365c85dbf5f838a0057f11aa1275e0a9a9e8d6ea976124868c11276febb5337602968cb14e368d37ce09cdf239c0a49784178d6fe516a94bba4ac8195880590427bf1af88fdf1fcf3fe0892b2b579ba31332826aef814e526c8d8d7b1d5f68dac531bb756f4File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading. Please reload this page.
Jump to
Uh oh!
There was an error while loading. Please reload this page.
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -8,6 +8,8 @@ | ||
| import threading | ||
| import time | ||
| import types | ||
| import weakref | ||
| from itertools import islice | ||
| FIRST_COMPLETED = 'FIRST_COMPLETED' | ||
| FIRST_EXCEPTION = 'FIRST_EXCEPTION' | ||
| @@ -572,7 +574,7 @@ def submit(self, fn, /, *args, **kwargs): | ||
| """ | ||
| raise NotImplementedError() | ||
| def map(self, fn, *iterables, timeout=None, chunksize=1): | ||
| def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): | ||
| """Returns an iterator equivalent to map(fn, iter). | ||
| Args: | ||
| @@ -584,6 +586,11 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): | ||
| before being passed to a child process. This argument is only | ||
| used by ProcessPoolExecutor; it is ignored by | ||
| ThreadPoolExecutor. | ||
| buffersize: The number of submitted tasks whose results have not | ||
| yet been yielded. If the buffer is full, iteration over the | ||
| iterables pauses until a result is yielded from the buffer. | ||
| If None, all input elements are eagerly collected, and a task is | ||
| submitted for each. | ||
| Returns: | ||
| An iterator equivalent to: map(func, *iterables) but the calls may | ||
| @@ -594,10 +601,25 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): | ||
| before the given timeout. | ||
| Exception: If fn(*args) raises for any values. | ||
| """ | ||
| if buffersize is not None and not isinstance(buffersize, int): | ||
| raise TypeError("buffersize must be an integer or None") | ||
| if buffersize is not None and buffersize < 1: | ||
| raise ValueError("buffersize must be None or > 0") | ||
| if timeout is not None: | ||
| end_time = timeout + time.monotonic() | ||
| fs = [self.submit(fn, *args) for args in zip(*iterables)] | ||
| zipped_iterables = zip(*iterables) | ||
| if buffersize: | ||
| fs = collections.deque( | ||
| self.submit(fn, *args) for args in islice(zipped_iterables, buffersize) | ||
| ) | ||
| else: | ||
| fs = [self.submit(fn, *args) for args in zipped_iterables] | ||
| # Use a weak reference to ensure that the executor can be garbage | ||
| # collected independently of the result_iterator closure. | ||
| executor_weakref = weakref.ref(self) | ||
ebonnal marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading. Please reload this page. | ||
| # Yield must be hidden in closure so that the futures are submitted | ||
| # before the first iterator value is required. | ||
| @@ -606,6 +628,12 @@ def result_iterator(): | ||
| # reverse to keep finishing order | ||
| fs.reverse() | ||
| while fs: | ||
| if ( | ||
Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ebonnal I believe you got this part slightly wrong, "off-by-one". IIUC, the number of pending futures cannot be larger than Fortunately, looks like the fix is trivial: you simply have to yield first, next append to the queue: diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index d98b1ebdd58..3b9ccf4d651 100644 --- a/Lib/concurrent/futures/_base.py+++ b/Lib/concurrent/futures/_base.py@@ -628,17 +628,17 @@ def result_iterator(): # reverse to keep finishing order fs.reverse() while fs: + # Careful not to keep a reference to the popped future+ if timeout is None:+ yield _result_or_cancel(fs.pop())+ else:+ yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) if ( buffersize and (executor := executor_weakref()) and (args := next(zipped_iterables, None)) ): fs.appendleft(executor.submit(fn, *args)) - # Careful not to keep a reference to the popped future- if timeout is None:- yield _result_or_cancel(fs.pop())- else:- yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) finally: for future in fs: future.cancel()ContributorAuthor
| ||
| buffersize | ||
| and (executor := executor_weakref()) | ||
| and (args := next(zipped_iterables, None)) | ||
| ): | ||
| fs.appendleft(executor.submit(fn, *args)) | ||
gpshead marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading. Please reload this page. | ||
| # Careful not to keep a reference to the popped future | ||
| if timeout is None: | ||
| yield _result_or_cancel(fs.pop()) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,9 @@ | ||
| import itertools | ||
| import threading | ||
| import time | ||
| import weakref | ||
| from concurrent import futures | ||
| from operator import add | ||
| from test import support | ||
| from test.support import Py_GIL_DISABLED | ||
| @@ -71,6 +73,74 @@ def test_map_timeout(self): | ||
| self.assertEqual([None, None], results) | ||
| def test_map_buffersize_type_validation(self): | ||
| for buffersize in ("foo", 2.0): | ||
| with self.subTest(buffersize=buffersize): | ||
| with self.assertRaisesRegex( | ||
| TypeError, | ||
| "buffersize must be an integer or None", | ||
| ): | ||
| self.executor.map(str, range(4), buffersize=buffersize) | ||
| def test_map_buffersize_value_validation(self): | ||
| for buffersize in (0, -1): | ||
| with self.subTest(buffersize=buffersize): | ||
| with self.assertRaisesRegex( | ||
| ValueError, | ||
| "buffersize must be None or > 0", | ||
| ): | ||
| self.executor.map(str, range(4), buffersize=buffersize) | ||
| def test_map_buffersize(self): | ||
| ints = range(4) | ||
| for buffersize in (1, 2, len(ints), len(ints) * 2): | ||
| with self.subTest(buffersize=buffersize): | ||
| res = self.executor.map(str, ints, buffersize=buffersize) | ||
| self.assertListEqual(list(res), ["0", "1", "2", "3"]) | ||
| def test_map_buffersize_on_multiple_iterables(self): | ||
| ints = range(4) | ||
| for buffersize in (1, 2, len(ints), len(ints) * 2): | ||
| with self.subTest(buffersize=buffersize): | ||
| res = self.executor.map(add, ints, ints, buffersize=buffersize) | ||
| self.assertListEqual(list(res), [0, 2, 4, 6]) | ||
| def test_map_buffersize_on_infinite_iterable(self): | ||
| res = self.executor.map(str, itertools.count(), buffersize=2) | ||
| self.assertEqual(next(res, None), "0") | ||
ebonnal marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading. Please reload this page. | ||
| self.assertEqual(next(res, None), "1") | ||
| self.assertEqual(next(res, None), "2") | ||
| def test_map_buffersize_on_multiple_infinite_iterables(self): | ||
| res = self.executor.map( | ||
| add, | ||
| itertools.count(), | ||
| itertools.count(), | ||
| buffersize=2 | ||
| ) | ||
| self.assertEqual(next(res, None), 0) | ||
| self.assertEqual(next(res, None), 2) | ||
| self.assertEqual(next(res, None), 4) | ||
| def test_map_buffersize_on_empty_iterable(self): | ||
| res = self.executor.map(str, [], buffersize=2) | ||
| self.assertIsNone(next(res, None)) | ||
| def test_map_buffersize_without_iterable(self): | ||
| res = self.executor.map(str, buffersize=2) | ||
| self.assertIsNone(next(res, None)) | ||
| def test_map_buffersize_when_buffer_is_full(self): | ||
| ints = iter(range(4)) | ||
| buffersize = 2 | ||
| self.executor.map(str, ints, buffersize=buffersize) | ||
| self.executor.shutdown(wait=True) # wait for tasks to complete | ||
| self.assertEqual( | ||
| next(ints), | ||
| buffersize, | ||
| msg="should have fetched only `buffersize` elements from `ints`.", | ||
| ) | ||
| def test_shutdown_race_issue12456(self): | ||
| # Issue #12456: race condition at shutdown where trying to post a | ||
| # sentinel in the call queue blocks (the queue is full while processes | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| Add the optional ``buffersize`` parameter to | ||
| :meth:`concurrent.futures.Executor.map` to limit the number of submitted tasks | ||
| whose results have not yet been yielded. If the buffer is full, iteration over | ||
| the *iterables* pauses until a result is yielded from the buffer. |
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.