Uh oh!
There was an error while loading. Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork 34k
bpo-29842: Make Executor.map less eager so it handles large/unbounded…#707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Uh oh!
There was an error while loading. Please reload this page.
Changes from all commits
fbdb56c62813798394e34634a3de01528313c38fabFile filter
Filter by extension
Conversations
Uh oh!
There was an error while loading. Please reload this page.
Jump to
Uh oh!
There was an error while loading. Please reload this page.
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -38,7 +38,7 @@ Executor Objects | ||
| future = executor.submit(pow, 323, 1235) | ||
| print(future.result()) | ||
| .. method:: map(func, *iterables, timeout=None, chunksize=1) | ||
| .. method:: map(func, *iterables, timeout=None, chunksize=1, prefetch=None) | ||
| Similar to :func:`map(func, *iterables) <map>` except: | ||
| @@ -64,9 +64,16 @@ Executor Objects | ||
| performance compared to the default size of 1. With | ||
| :class:`ThreadPoolExecutor`, *chunksize* has no effect. | ||
| By default, a reasonable number of tasks are | ||
| queued beyond the number of workers, an explicit *prefetch* count may be | ||
| provided to specify how many extra tasks should be queued. | ||
MojoVampire marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading. Please reload this page. | ||
| .. versionchanged:: 3.5 | ||
| Added the *chunksize* argument. | ||
| .. versionchanged:: 3.8 | ||
| Added the *prefetch* argument. | ||
| .. method:: shutdown(wait=True) | ||
| Signal the executor that it should free any resources that it is using | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -4,6 +4,7 @@ | ||
| __author__ = 'Brian Quinlan ([email protected])' | ||
| import collections | ||
| import itertools | ||
| import logging | ||
| import threading | ||
| import time | ||
| @@ -568,7 +569,7 @@ def submit(*args, **kwargs): | ||
| raise NotImplementedError() | ||
| def map(self, fn, *iterables, timeout=None, chunksize=1): | ||
| def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): | ||
| """Returns an iterator equivalent to map(fn, iter). | ||
MojoVampire marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading. Please reload this page. | ||
| Args: | ||
MojoVampire marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading. Please reload this page. | ||
| @@ -580,6 +581,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): | ||
| before being passed to a child process. This argument is only | ||
| used by ProcessPoolExecutor; it is ignored by | ||
| ThreadPoolExecutor. | ||
| prefetch: The number of chunks to queue beyond the number of | ||
| workers on the executor. If None, a reasonable default is used. | ||
| Returns: | ||
| An iterator equivalent to: map(func, *iterables) but the calls may | ||
| @@ -592,21 +595,41 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): | ||
| """ | ||
MojoVampire marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading. Please reload this page. | ||
| if timeout is not None: | ||
| end_time = timeout + time.monotonic() | ||
| if prefetch is None: | ||
| prefetch = self._max_workers | ||
| if prefetch < 0: | ||
| raise ValueError("prefetch count may not be negative") | ||
| fs = [self.submit(fn, *args) for args in zip(*iterables)] | ||
| argsiter = zip(*iterables) | ||
| initialargs = itertools.islice(argsiter, self._max_workers + prefetch) | ||
| fs = collections.deque(self.submit(fn, *args) for args in initialargs) | ||
MojoVampire marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading. Please reload this page. | ||
| # Yield must be hidden in closure so that the futures are submitted | ||
| # before the first iterator value is required. | ||
| def result_iterator(): | ||
MojoVampire marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading. Please reload this page. | ||
| nonlocal argsiter | ||
| try: | ||
| # reverse to keep finishing order | ||
| fs.reverse() | ||
| while fs: | ||
| # Careful not to keep a reference to the popped future | ||
| if timeout is None: | ||
| yield fs.pop().result() | ||
| res = [fs[0].result()] | ||
| else: | ||
| yield fs.pop().result(end_time - time.monotonic()) | ||
| res = [fs[0].result(end_time - time.monotonic())] | ||
| # Got a result, future needn't be cancelled | ||
| del fs[0] | ||
MojoVampire marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading. Please reload this page. | ||
| # Dispatch next task before yielding to keep | ||
| # pipeline full | ||
| if argsiter: | ||
| try: | ||
| args = next(argsiter) | ||
| except StopIteration: | ||
| argsiter = None | ||
| else: | ||
| fs.append(self.submit(fn, *args)) | ||
| yield res.pop() | ||
| finally: | ||
| for future in fs: | ||
| future.cancel() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -632,7 +632,7 @@ def submit(*args, **kwargs): | ||
| return f | ||
| submit.__doc__ = _base.Executor.submit.__doc__ | ||
| def map(self, fn, *iterables, timeout=None, chunksize=1): | ||
| def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): | ||
| """Returns an iterator equivalent to map(fn, iter). | ||
| Args: | ||
MojoVampire marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading. Please reload this page. | ||
| @@ -643,6 +643,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): | ||
| chunksize: If greater than one, the iterables will be chopped into | ||
| chunks of size chunksize and submitted to the process pool. | ||
| If set to one, the items in the list will be sent one at a time. | ||
| prefetch: The number of chunks to queue beyond the number of | ||
| workers on the executor. If None, a reasonable default is used. | ||
| Returns: | ||
| An iterator equivalent to: map(func, *iterables) but the calls may | ||
| @@ -658,7 +660,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): | ||
| results = super().map(partial(_process_chunk, fn), | ||
| _get_chunks(*iterables, chunksize=chunksize), | ||
| timeout=timeout) | ||
| timeout=timeout, prefetch=prefetch) | ||
| return _chain_from_iterable_of_lists(results) | ||
| def shutdown(self, wait=True): | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -746,7 +746,22 @@ def record_finished(n): | ||
| self.executor.map(record_finished, range(10)) | ||
| self.executor.shutdown(wait=True) | ||
| self.assertCountEqual(finished, range(10)) | ||
| # No guarantees on how many tasks dispatched, | ||
| # but at least one should have been dispatched | ||
| self.assertGreater(len(finished), 0) | ||
Member There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this change breaks compatibility. The doc for
So all futures should have executed, instead of being cancelled. ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At the time I wrote it, it didn't conflict with the documentation precisely; the original documentation said that map was "Equivalent to map(func, *iterables) except func is executed asynchronously and several calls to func may be made concurrently.", but doesn't guarantee that any actual futures exist (it's implemented in terms of submit and futures, but doesn't actually require such a design). That said, it looks like you updated the documentation to add "the iterables are collected immediately rather than lazily;", which, if considered a guarantee, rather than a warning, would make this a breaking change even ignoring the "cancel vs. wait" issue. Do you have any suggestions? If strict adherence to your newly (as of late 2017) documented behavior is needed, I suppose I could change the default behavior from "reasonable prefetch" to "exhaustive prefetch", so when prefetch isn't passed, every task is submitted, but it would be kind of annoying to lose the "good by default" behavior of limited prefetching. The reason I cancelled rather than waiting on the result is that I was trying to follow the normal use pattern for map; since the results are yielded lazily, if the iterator goes away or is closed explicitly (or you explicitly shut down the executor), you're done; having the outstanding futures complete when you're not able to see the results means you're either:
Member There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I think there are two problems to discuss:
I think 2) might easily be worked around by introducing a separate method ( It seems it would be good to discuss those questions on the mailing-list. ContributorAuthor
| ||
| def test_infinite_map_input_completes_work(self): | ||
| import itertools | ||
| def identity(x): | ||
| return x | ||
| mapobj = self.executor.map(identity, itertools.count(0)) | ||
| # Get one result, which shows we handle infinite inputs | ||
| # without waiting for all work to be dispatched | ||
| res = next(mapobj) | ||
| mapobj.close() # Make sure futures cancelled | ||
| self.assertEqual(res, 0) | ||
| def test_default_workers(self): | ||
| executor = self.executor_type() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| Executor.map no longer creates all futures eagerly prior to yielding any | ||
| results. This allows it to work with huge or infinite iterables without | ||
| consuming excessive resources or crashing, making it more suitable as a drop | ||
| in replacement for the built-in map. Patch by Josh Rosenberg. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using "chunks" here would be more precise than "tasks".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation for chunksize uses the phrasing "this method chops iterables into a number of chunks which it submits to the pool as separate tasks", and since not all executors even use chunks (ThreadPoolExecutor ignores the argument), I figured I'd stick with "tasks". It does kind of leave out a term to describe a single work item; the docs uses chunks and tasks as synonyms, with no term for a single work item.