Skip to content

Commit f4aa9bf

Browse files
un-defelprans
authored andcommitted
Fix _StatementCache.clear() PS memory leak
See MagicStack#416
1 parent e91e491 commit f4aa9bf

File tree

2 files changed

+85
-3
lines changed

2 files changed

+85
-3
lines changed

‎asyncpg/connection.py‎

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1792,13 +1792,18 @@ def iter_statements(self):
17921792
return (e._statementforeinself._entries.values())
17931793

17941794
defclear(self):
1795-
# First, make sure that we cancel all scheduled callbacks.
1796-
forentryinself._entries.values():
1797-
self._clear_entry_callback(entry)
1795+
# Store entries for later.
1796+
entries=tuple(self._entries.values())
17981797

17991798
# Clear the entries dict.
18001799
self._entries.clear()
18011800

1801+
# Make sure that we cancel all scheduled callbacks
1802+
# and call on_remove callback for each entry.
1803+
forentryinentries:
1804+
self._clear_entry_callback(entry)
1805+
self._on_remove(entry._statement)
1806+
18021807
def_set_entry_timeout(self, entry):
18031808
# Clear the existing timeout.
18041809
self._clear_entry_callback(entry)

‎tests/test_cache_invalidation.py‎

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,20 @@
1313

1414

1515
classTestCacheInvalidation(tb.ConnectedTestCase):
16+
17+
def_get_cached_statements(self, connection=None):
18+
ifconnectionisNone:
19+
connection=self.con
20+
returnlist(connection._stmt_cache.iter_statements())
21+
22+
def_check_statements_are_not_closed(self, statements):
23+
self.assertGreater(len(statements), 0)
24+
self.assertTrue(all(nots.closedforsinstatements))
25+
26+
def_check_statements_are_closed(self, statements):
27+
self.assertGreater(len(statements), 0)
28+
self.assertTrue(all(s.closedforsinstatements))
29+
1630
asyncdeftest_prepare_cache_invalidation_silent(self):
1731
awaitself.con.execute('CREATE TABLE tab1(a int, b int)')
1832

@@ -21,11 +35,16 @@ async def test_prepare_cache_invalidation_silent(self):
2135
result=awaitself.con.fetchrow('SELECT * FROM tab1')
2236
self.assertEqual(result, (1, 2))
2337

38+
statements=self._get_cached_statements()
39+
self._check_statements_are_not_closed(statements)
40+
2441
awaitself.con.execute(
2542
'ALTER TABLE tab1 ALTER COLUMN b SET DATA TYPE text')
2643

2744
result=awaitself.con.fetchrow('SELECT * FROM tab1')
2845
self.assertEqual(result, (1, '2'))
46+
47+
self._check_statements_are_closed(statements)
2948
finally:
3049
awaitself.con.execute('DROP TABLE tab1')
3150

@@ -37,6 +56,9 @@ async def test_prepare_cache_invalidation_in_transaction(self):
3756
result=awaitself.con.fetchrow('SELECT * FROM tab1')
3857
self.assertEqual(result, (1, 2))
3958

59+
statements=self._get_cached_statements()
60+
self._check_statements_are_not_closed(statements)
61+
4062
awaitself.con.execute(
4163
'ALTER TABLE tab1 ALTER COLUMN b SET DATA TYPE text')
4264

@@ -45,6 +67,8 @@ async def test_prepare_cache_invalidation_in_transaction(self):
4567
asyncwithself.con.transaction():
4668
result=awaitself.con.fetchrow('SELECT * FROM tab1')
4769

70+
self._check_statements_are_closed(statements)
71+
4872
# This is now OK,
4973
result=awaitself.con.fetchrow('SELECT * FROM tab1')
5074
self.assertEqual(result, (1, '2'))
@@ -69,6 +93,12 @@ async def test_prepare_cache_invalidation_in_pool(self):
6993
result=awaitcon2.fetchrow('SELECT * FROM tab1')
7094
self.assertEqual(result, (1, 2))
7195

96+
statements1=self._get_cached_statements(con1)
97+
self._check_statements_are_not_closed(statements1)
98+
99+
statements2=self._get_cached_statements(con2)
100+
self._check_statements_are_not_closed(statements2)
101+
72102
awaitself.con.execute(
73103
'ALTER TABLE tab1 ALTER COLUMN b SET DATA TYPE text')
74104

@@ -77,6 +107,9 @@ async def test_prepare_cache_invalidation_in_pool(self):
77107
result=awaitcon1.fetchrow('SELECT * FROM tab1')
78108
self.assertEqual(result, (1, '2'))
79109

110+
self._check_statements_are_closed(statements1)
111+
self._check_statements_are_closed(statements2)
112+
80113
asyncwithcon2.transaction():
81114
# This should work, as con1 should have invalidated
82115
# the plan cache.
@@ -98,11 +131,17 @@ async def test_type_cache_invalidation_in_transaction(self):
98131
result=awaitself.con.fetchrow('SELECT * FROM tab1')
99132
self.assertEqual(result, (1, (2, 3)))
100133

134+
statements=self._get_cached_statements()
135+
self._check_statements_are_not_closed(statements)
136+
101137
asyncwithself.con.transaction():
102138
awaitself.con.execute('ALTER TYPE typ1 ADD ATTRIBUTE c text')
103139
withself.assertRaisesRegex(
104140
asyncpg.OutdatedSchemaCacheError, ERRNUM):
105141
awaitself.con.fetchrow('SELECT * FROM tab1')
142+
143+
self._check_statements_are_closed(statements)
144+
106145
# The second request must be correct (cache was dropped):
107146
result=awaitself.con.fetchrow('SELECT * FROM tab1')
108147
self.assertEqual(result, (1, (2, 3, None)))
@@ -123,13 +162,19 @@ async def test_type_cache_invalidation_in_cancelled_transaction(self):
123162
result=awaitself.con.fetchrow('SELECT * FROM tab1')
124163
self.assertEqual(result, (1, (2, 3)))
125164

165+
statements=self._get_cached_statements()
166+
self._check_statements_are_not_closed(statements)
167+
126168
try:
127169
asyncwithself.con.transaction():
128170
awaitself.con.execute(
129171
'ALTER TYPE typ1 ADD ATTRIBUTE c text')
130172
withself.assertRaisesRegex(
131173
asyncpg.OutdatedSchemaCacheError, ERRNUM):
132174
awaitself.con.fetchrow('SELECT * FROM tab1')
175+
176+
self._check_statements_are_closed(statements)
177+
133178
# The second request must be correct (cache was dropped):
134179
result=awaitself.con.fetchrow('SELECT * FROM tab1')
135180
self.assertEqual(result, (1, (2, 3, None)))
@@ -158,13 +203,19 @@ async def test_prepared_type_cache_invalidation(self):
158203
result=awaitprep.fetchrow()
159204
self.assertEqual(result, (1, (2, 3)))
160205

206+
statements=self._get_cached_statements()
207+
self._check_statements_are_not_closed(statements)
208+
161209
try:
162210
asyncwithself.con.transaction():
163211
awaitself.con.execute(
164212
'ALTER TYPE typ1 ADD ATTRIBUTE c text')
165213
withself.assertRaisesRegex(
166214
asyncpg.OutdatedSchemaCacheError, ERRNUM):
167215
awaitprep.fetchrow()
216+
217+
self._check_statements_are_closed(statements)
218+
168219
# PS has its local cache for types codecs, even after the
169220
# cache cleanup it is not possible to use it.
170221
# That's why it is marked as closed.
@@ -206,11 +257,16 @@ async def test_type_cache_invalidation_on_drop_type_attr(self):
206257
result=awaitself.con.fetchrow('SELECT * FROM tab1')
207258
self.assertEqual(result, (1, (2, 3, 'x')))
208259

260+
statements=self._get_cached_statements()
261+
self._check_statements_are_not_closed(statements)
262+
209263
awaitself.con.execute('ALTER TYPE typ1 DROP ATTRIBUTE x')
210264
withself.assertRaisesRegex(
211265
asyncpg.OutdatedSchemaCacheError, ERRNUM):
212266
awaitself.con.fetchrow('SELECT * FROM tab1')
213267

268+
self._check_statements_are_closed(statements)
269+
214270
# This is now OK, the cache is filled after being dropped.
215271
result=awaitself.con.fetchrow('SELECT * FROM tab1')
216272
self.assertEqual(result, (1, (3, 'x')))
@@ -228,6 +284,9 @@ async def test_type_cache_invalidation_on_change_attr(self):
228284
result=awaitself.con.fetchrow('SELECT * FROM tab1')
229285
self.assertEqual(result, (1, (2, 3)))
230286

287+
statements=self._get_cached_statements()
288+
self._check_statements_are_not_closed(statements)
289+
231290
# It is slightly artificial, but can take place in transactional
232291
# schema changing. Nevertheless, if the code checks and raises it
233292
# the most probable reason is a difference with the cache type.
@@ -237,6 +296,8 @@ async def test_type_cache_invalidation_on_change_attr(self):
237296
asyncpg.OutdatedSchemaCacheError, ERRTYP):
238297
awaitself.con.fetchrow('SELECT * FROM tab1')
239298

299+
self._check_statements_are_closed(statements)
300+
240301
# This is now OK, the cache is filled after being dropped.
241302
result=awaitself.con.fetchrow('SELECT * FROM tab1')
242303
self.assertEqual(result, (1, (2, None)))
@@ -265,9 +326,15 @@ async def test_type_cache_invalidation_in_pool(self):
265326
result=awaitcon1.fetchrow('SELECT * FROM tab1')
266327
self.assertEqual(result, (1, (2, 3)))
267328

329+
statements1=self._get_cached_statements(con1)
330+
self._check_statements_are_not_closed(statements1)
331+
268332
result=awaitcon2.fetchrow('SELECT * FROM tab1')
269333
self.assertEqual(result, (1, (2, 3)))
270334

335+
statements2=self._get_cached_statements(con2)
336+
self._check_statements_are_not_closed(statements2)
337+
271338
# Create the same schema in the "testdb", fetch data which caches
272339
# type info.
273340
con_chk=awaitpool_chk.acquire()
@@ -277,6 +344,9 @@ async def test_type_cache_invalidation_in_pool(self):
277344
result=awaitcon_chk.fetchrow('SELECT * FROM tab1')
278345
self.assertEqual(result, (1, (2, 3)))
279346

347+
statements_chk=self._get_cached_statements(con_chk)
348+
self._check_statements_are_not_closed(statements_chk)
349+
280350
# Change schema in the databases.
281351
awaitself.con.execute('ALTER TYPE typ1 ADD ATTRIBUTE c text')
282352
awaitcon_chk.execute('ALTER TYPE typ1 ADD ATTRIBUTE c text')
@@ -287,6 +357,9 @@ async def test_type_cache_invalidation_in_pool(self):
287357
asyncpg.OutdatedSchemaCacheError, ERRNUM):
288358
awaitcon1.fetchrow('SELECT * FROM tab1')
289359

360+
self._check_statements_are_closed(statements1)
361+
self._check_statements_are_closed(statements2)
362+
290363
asyncwithcon2.transaction():
291364
# This should work, as con1 should have invalidated all caches.
292365
result=awaitcon2.fetchrow('SELECT * FROM tab1')
@@ -298,10 +371,14 @@ async def test_type_cache_invalidation_in_pool(self):
298371

299372
# Check the invalidation is database-specific, i.e. cache entries
300373
# for pool_chk/con_chk was not dropped via pool/con1.
374+
375+
self._check_statements_are_not_closed(statements_chk)
376+
301377
withself.assertRaisesRegex(
302378
asyncpg.OutdatedSchemaCacheError, ERRNUM):
303379
awaitcon_chk.fetchrow('SELECT * FROM tab1')
304380

381+
self._check_statements_are_closed(statements_chk)
305382
finally:
306383
awaitself.con.execute('DROP TABLE tab1')
307384
awaitself.con.execute('DROP TYPE typ1')

0 commit comments

Comments
(0)