Uh oh!
There was an error while loading. Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork 34k
bpo-29842: Make Executor.map less eager so it handles large/unbounded…#18566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Uh oh!
There was an error while loading. Please reload this page.
Changes from all commits
fbdb56c62813798394e34634a3de01528313c38fabd32de88f3a3791eb8d26a9234a4874ec669b618b7f13df4cdbf36a8d5cd250f3cc0b4ed33dca0f058908File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading. Please reload this page.
Jump to
Uh oh!
There was an error while loading. Please reload this page.
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -4,6 +4,7 @@ | ||
| __author__ = 'Brian Quinlan ([email protected])' | ||
| import collections | ||
| import itertools | ||
| import logging | ||
| import threading | ||
| import time | ||
| @@ -568,7 +569,7 @@ def submit(self, fn, /, *args, **kwargs): | ||
| """ | ||
| raise NotImplementedError() | ||
| def map(self, fn, *iterables, timeout=None, chunksize=1): | ||
| def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): | ||
| """Returns an iterator equivalent to map(fn, iter). | ||
| Args: | ||
| @@ -580,6 +581,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): | ||
| before being passed to a child process. This argument is only | ||
| used by ProcessPoolExecutor; it is ignored by | ||
| ThreadPoolExecutor. | ||
| prefetch: The number of chunks to queue beyond the number of | ||
| workers on the executor. If None, a reasonable default is used. | ||
| Returns: | ||
| An iterator equivalent to: map(func, *iterables) but the calls may | ||
| @@ -592,21 +595,38 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): | ||
| """ | ||
| if timeout is not None: | ||
| end_time = timeout + time.monotonic() | ||
| if prefetch is None: | ||
| prefetch = self._max_workers | ||
| if prefetch < 0: | ||
| raise ValueError("prefetch count may not be negative") | ||
| fs = [self.submit(fn, *args) for args in zip(*iterables)] | ||
| argsiter = zip(*iterables) | ||
| initialargs = itertools.islice(argsiter, self._max_workers + prefetch) | ||
| fs = collections.deque(self.submit(fn, *args) for args in initialargs) | ||
| # Yield must be hidden in closure so that the futures are submitted | ||
| # before the first iterator value is required. | ||
| def result_iterator(): | ||
| nonlocal argsiter | ||
| try: | ||
| # reverse to keep finishing order | ||
| fs.reverse() | ||
| while fs: | ||
| # Careful not to keep a reference to the popped future | ||
| if timeout is None: | ||
| yield fs.pop().result() | ||
| res = fs.popleft().result() | ||
| else: | ||
| yield fs.pop().result(end_time - time.monotonic()) | ||
ContributorAuthor
| ||
| res = fs.popleft().result(end_time - time.monotonic()) | ||
| # Dispatch next task before yielding to keep | ||
| # pipeline full | ||
| if argsiter: | ||
| try: | ||
| args = next(argsiter) | ||
| except StopIteration: | ||
| argsiter = None | ||
| else: | ||
| fs.append(self.submit(fn, *args)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the executor has been shut down, this will raise: ContributorAuthor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rdarder | ||
| yield res | ||
| finally: | ||
| for future in fs: | ||
| future.cancel() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| :meth:`concurrent.futures.Executor.map` no longer eagerly creates all futures prior to yielding any | ||
| results. This allows it to work with huge or infinite :term:`iterable` without | ||
| consuming excessive resources or crashing, making it more suitable as a drop | ||
| in replacement for the built-in :func:`map`. Patch by Josh Rosenberg. |
Uh oh!
There was an error while loading. Please reload this page.