Skip to content

Commit 2bac166

Browse files
elpransaaliddell
andcommitted
Add a workaround for bpo-37658
`asyncio.wait_for()` currently has a bug where it raises a `CancelledError` even when the wrapped awaitable has completed. The upstream fix is in python/cpython#21894. This adds a workaround until the aforementioned PR is merged, backported and released. Co-authored-by: Adam Liddell <[email protected]> Fixes: #467Fixes: #547 Related: #468 Supersedes: #548
1 parent c05d726 commit 2bac166

File tree

4 files changed

+41
-18
lines changed

4 files changed

+41
-18
lines changed

‎asyncpg/compat.py‎

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,19 @@ async def wait_closed(stream):
9090
# On Windows wait_closed() sometimes propagates
9191
# ConnectionResetError which is totally unnecessary.
9292
pass
93+
94+
95+
# Workaround for https://bugs.python.org/issue37658
96+
asyncdefwait_for(fut, timeout):
97+
iftimeoutisNone:
98+
returnawaitfut
99+
100+
fut=asyncio.ensure_future(fut)
101+
102+
try:
103+
returnawaitasyncio.wait_for(fut, timeout)
104+
exceptasyncio.CancelledError:
105+
iffut.done():
106+
returnfut.result()
107+
else:
108+
raise

‎asyncpg/connect_utils.py‎

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -636,18 +636,13 @@ async def _connect_addr(
636636

637637
connector=asyncio.ensure_future(connector)
638638
before=time.monotonic()
639-
try:
640-
tr, pr=awaitasyncio.wait_for(
641-
connector, timeout=timeout)
642-
exceptasyncio.CancelledError:
643-
connector.add_done_callback(_close_leaked_connection)
644-
raise
639+
tr, pr=awaitcompat.wait_for(connector, timeout=timeout)
645640
timeout-=time.monotonic() -before
646641

647642
try:
648643
iftimeout<=0:
649644
raiseasyncio.TimeoutError
650-
awaitasyncio.wait_for(connected, timeout=timeout)
645+
awaitcompat.wait_for(connected, timeout=timeout)
651646
except (Exception, asyncio.CancelledError):
652647
tr.close()
653648
raise
@@ -745,12 +740,3 @@ def _create_future(loop):
745740
returnasyncio.Future(loop=loop)
746741
else:
747742
returncreate_future()
748-
749-
750-
def_close_leaked_connection(fut):
751-
try:
752-
tr, pr=fut.result()
753-
iftr:
754-
tr.close()
755-
exceptasyncio.CancelledError:
756-
pass# hide the exception

‎asyncpg/pool.py‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
importtime
1313
importwarnings
1414

15+
from . importcompat
1516
from . importconnection
1617
from . importconnect_utils
1718
from . importexceptions
@@ -198,7 +199,7 @@ async def release(self, timeout):
198199
# If the connection is in cancellation state,
199200
# wait for the cancellation
200201
started=time.monotonic()
201-
awaitasyncio.wait_for(
202+
awaitcompat.wait_for(
202203
self._con._protocol._wait_for_cancellation(),
203204
budget)
204205
ifbudgetisnotNone:
@@ -623,7 +624,7 @@ async def _acquire_impl():
623624
iftimeoutisNone:
624625
returnawait_acquire_impl()
625626
else:
626-
returnawaitasyncio.wait_for(
627+
returnawaitcompat.wait_for(
627628
_acquire_impl(), timeout=timeout)
628629

629630
asyncdefrelease(self, connection, *, timeout=None):

‎tests/test_pool.py‎

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,26 @@ async def worker():
379379
self.cluster.trust_local_connections()
380380
self.cluster.reload()
381381

382+
asyncdeftest_pool_handles_task_cancel_in_acquire_with_timeout(self):
383+
# See https://github.com/MagicStack/asyncpg/issues/547
384+
pool=awaitself.create_pool(database='postgres',
385+
min_size=1, max_size=1)
386+
387+
asyncdefworker():
388+
asyncwithpool.acquire(timeout=100):
389+
pass
390+
391+
# Schedule task
392+
task=self.loop.create_task(worker())
393+
# Yield to task, but cancel almost immediately
394+
awaitasyncio.sleep(0.00000000001)
395+
# Cancel the worker.
396+
task.cancel()
397+
# Wait to make sure the cleanup has completed.
398+
awaitasyncio.sleep(0.4)
399+
# Check that the connection has been returned to the pool.
400+
self.assertEqual(pool._queue.qsize(), 1)
401+
382402
asyncdeftest_pool_handles_task_cancel_in_release(self):
383403
# Use SlowResetConnectionPool to simulate
384404
# the Task.cancel() and __aexit__ race.

0 commit comments

Comments
(0)