Uh oh!
There was an error while loading. Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork 33.9k
gh-107219: Fix concurrent.futures terminate_broken()#109244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Uh oh!
There was an error while loading. Please reload this page.
Conversation
vstinner commented Sep 10, 2023 • edited by bedevere-bot
Loading Uh oh!
There was an error while loading. Please reload this page.
edited by bedevere-bot
Uh oh!
There was an error while loading. Please reload this page.
vstinner commented Sep 10, 2023
@serhiy-storchaka@methane@ambv@gpshead@pitrou: Would you mind to have a look? I would like to merge this fix as soon as possible since the bug #107219 is affecting very badly the Python workflow. The CI failure rate is very high because of this For now, I prefer to use |
vstinner commented Sep 10, 2023
With this change, I can no longer reproduce bug. On my Windows VM which has 2 CPUs, I can easily reproduce the hang in around 30 seconds on the Python main branch:
I stressed the test with:
In 8 minutes, I failed to reproduce the bug anymore with this change. Bonus: Moreover, I can no longer hang the test when I interrupt it with CTRL+C. |
vstinner commented Sep 10, 2023
Oh! For the first time in like 2 weeks, Note: There are only these two unrelated failures: These 2 tests passed when re-run in verbose mode (Result: FAILURE then SUCCESS). |
| ov=self._send_ov | ||
| ifovisnotNone: | ||
| # Interrupt WaitForMultipleObjects() in _send_bytes() | ||
| ov.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asyncio uses a similar code in ProactorEventLoop:
cpython/Lib/asyncio/windows_events.py
Lines 67 to 81 in 1ec4537
| def_cancel_overlapped(self): | |
| ifself._ovisNone: | |
| return | |
| try: | |
| self._ov.cancel() | |
| exceptOSErrorasexc: | |
| context={ | |
| 'message': 'Cancelling an overlapped future failed', | |
| 'exception': exc, | |
| 'future': self, | |
| } | |
| ifself._source_traceback: | |
| context['source_traceback'] =self._source_traceback | |
| self._loop.call_exception_handler(context) | |
| self._ov=None |
asyncio uses more advanced code around to handle more cases. For example, in asyncio, the cancel() API is part of the public API.
Here the cancellation is a standard action in the Windows Overlapped API. The cancellation is synchronous, it's easy!
Hopefully, we are not in the very complicated RegisterWaitWithQueue() case! This case requires an asynchronous cancellation which is really complicated to handle: the completion of the cancellation should be awaited!? See this horror story: https://vstinner.github.io/asyncio-proactor-cancellation-from-hell.html
| # close() was called by another thread while | ||
| # WaitForMultipleObjects() was waiting for the overlapped | ||
| # operation. | ||
| raiseOSError(errno.EPIPE, "handle is closed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose to raise a BrokenPipeError exception here, since Queue._feed() has a special code path for that to ignore EPIPE errors silently:
cpython/Lib/multiprocessing/queues.py
Lines 255 to 257 in 1ec4537
| exceptExceptionase: | |
| ifignore_epipeandgetattr(e, 'errno', 0) ==errno.EPIPE: | |
| return |
And concurrent.futures uses this code path for its "call queue" which is causing troubles here:
cpython/Lib/concurrent/futures/process.py
Lines 724 to 732 in 1ec4537
| self._call_queue=_SafeQueue( | |
| max_size=queue_size, ctx=self._mp_context, | |
| pending_work_items=self._pending_work_items, | |
| shutdown_lock=self._shutdown_lock, | |
| thread_wakeup=self._executor_manager_thread_wakeup) | |
| # Killed worker processes can produce spurious "broken pipe" | |
| # tracebacks in the queue's own worker thread. But we detect killed | |
| # processes anyway, so silence the tracebacks. | |
| self._call_queue._ignore_epipe=True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds like we got lucky that callers were handling one thing we could raise! :)
vstinnerSep 12, 2023 • edited
Loading Uh oh!
There was an error while loading. Please reload this page.
edited
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the beginning, I started by adding a new exception. But I chose to reuse the existing code instead. IMO BrokenPipeError perfectly makes sense for a PipeConnection.
serhiy-storchaka left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
But I have one suggestion and one question/suggestion.
| finally: | ||
| self._send_ov=None | ||
| nwritten, err=ov.GetOverlappedResult(True) | ||
| iferr==WSA_OPERATION_ABORTED: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What other value can it be? There is assert err == 0 below, so I guess that any error was unexpected.
Could we simply check that err is not zero here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose to write a minimalist change: change at least code as possible. I introduce one new error, I added a check for this error, and that's all. I don't know the code enough to answer to your question. I'm not a multiprocessing or Windows API expert at all :-(
Uh oh!
There was an error while loading. Please reload this page.
Fix a race condition in concurrent.futures. When a process in the process pool was terminated abruptly (while the future was running or pending), close the connection write end. If the call queue is blocked on sending bytes to a worker process, closing the connection write end interrupts the send, so the queue can be closed. Changes: * _ExecutorManagerThread.terminate_broken() now closes call_queue._writer. * multiprocessing PipeConnection.close() now interrupts WaitForMultipleObjects() in _send_bytes() by cancelling the overlapped operation.
Address Serhiy's review.
9987dc7 to 069fbfaCompareUh oh!
There was an error while loading. Please reload this page.
| BUFSIZE=8192 | ||
| # A very generous timeout when it comes to local connections... | ||
| CONNECTION_TIMEOUT=20. | ||
| WSA_OPERATION_ABORTED=995 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is the same as _winapi.ERROR_OPERATION_ABORTED.
vstinnerSep 12, 2023 • edited
Loading Uh oh!
There was an error while loading. Please reload this page.
edited
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I'm confused. I don't recall which doc I was looking to. WriteFile() is documented to return ERROR_OPERATION_ABORTED when it's canceled: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefile
miss-islington commented Sep 11, 2023
Thanks @vstinner for the PR 🌮🎉.. I'm working now to backport this PR to: 3.11, 3.12. |
bedevere-bot commented Sep 11, 2023
There's a new commit after the PR has been approved. @serhiy-storchaka: please review the changes made to this pull request. |
…109244) Fix a race condition in concurrent.futures. When a process in the process pool was terminated abruptly (while the future was running or pending), close the connection write end. If the call queue is blocked on sending bytes to a worker process, closing the connection write end interrupts the send, so the queue can be closed. Changes: * _ExecutorManagerThread.terminate_broken() now closes call_queue._writer. * multiprocessing PipeConnection.close() now interrupts WaitForMultipleObjects() in _send_bytes() by cancelling the overlapped operation. (cherry picked from commit a9b1f84) Co-authored-by: Victor Stinner <vstinner@python.org>
bedevere-bot commented Sep 11, 2023
GH-109254 is a backport of this pull request to the 3.12 branch. |
bedevere-bot commented Sep 11, 2023
GH-109255 is a backport of this pull request to the 3.11 branch. |
vstinner commented Sep 11, 2023
PR merged, thanks for the review @serhiy-storchaka. I wanted to merge this fix ASAP since it prevented to merge others PRs. |
…109244) Fix a race condition in concurrent.futures. When a process in the process pool was terminated abruptly (while the future was running or pending), close the connection write end. If the call queue is blocked on sending bytes to a worker process, closing the connection write end interrupts the send, so the queue can be closed. Changes: * _ExecutorManagerThread.terminate_broken() now closes call_queue._writer. * multiprocessing PipeConnection.close() now interrupts WaitForMultipleObjects() in _send_bytes() by cancelling the overlapped operation. (cherry picked from commit a9b1f84) Co-authored-by: Victor Stinner <vstinner@python.org>
serhiy-storchaka left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the sources of GetOverlappedResult() in _winapi.c, the only value of err can be ERROR_SUCCESS (0), ERROR_MORE_DATA, ERROR_OPERATION_ABORTED, ERROR_IO_INCOMPLETE.
serhiy-storchaka commented Sep 11, 2023
Great work, @vstinner! |
… (#109255) gh-107219: Fix concurrent.futures terminate_broken() (GH-109244) Fix a race condition in concurrent.futures. When a process in the process pool was terminated abruptly (while the future was running or pending), close the connection write end. If the call queue is blocked on sending bytes to a worker process, closing the connection write end interrupts the send, so the queue can be closed. Changes: * _ExecutorManagerThread.terminate_broken() now closes call_queue._writer. * multiprocessing PipeConnection.close() now interrupts WaitForMultipleObjects() in _send_bytes() by cancelling the overlapped operation. (cherry picked from commit a9b1f84) Co-authored-by: Victor Stinner <vstinner@python.org>
vstinner commented Sep 11, 2023
Well, if you're confident, you can modify the By the way, having
Thanks. |
… (#109254) gh-107219: Fix concurrent.futures terminate_broken() (GH-109244) Fix a race condition in concurrent.futures. When a process in the process pool was terminated abruptly (while the future was running or pending), close the connection write end. If the call queue is blocked on sending bytes to a worker process, closing the connection write end interrupts the send, so the queue can be closed. Changes: * _ExecutorManagerThread.terminate_broken() now closes call_queue._writer. * multiprocessing PipeConnection.close() now interrupts WaitForMultipleObjects() in _send_bytes() by cancelling the overlapped operation. (cherry picked from commit a9b1f84) Co-authored-by: Victor Stinner <vstinner@python.org>
Fix a race condition in concurrent.futures. When a process in the process pool was terminated abruptly (while the future was running or pending), close the connection write end. If the call queue is blocked on sending bytes to a worker process, closing the connection write end interrupts the send, so the queue can be closed.
Changes: