Skip to content

Commit 4393a15

Browse files
committed
Properly cleanup connections closed by remote
When a connection is terminated by the remote peer, asyncpg must not forget to perform all the necessary client-side cleanup procedures. Fixes: #385.
1 parent 787317f commit 4393a15

File tree

2 files changed

+34
-1
lines changed

2 files changed

+34
-1
lines changed

asyncpg/protocol/protocol.pyx

+9-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,10 @@ cdef class BaseProtocol(CoreProtocol):
111111
self.conref = weakref.ref(connection)
112112

113113
cdef get_connection(self):
114-
return self.conref()
114+
if self.conref is not None:
115+
return self.conref()
116+
else:
117+
return None
115118

116119
def get_server_pid(self):
117120
return self.backend_pid
@@ -867,6 +870,11 @@ cdef class BaseProtocol(CoreProtocol):
867870
# terminated or due to another error;
868871
# Throw an error in any awaiting waiter.
869872
self.closing = True
873+
# Cleanup the connection resources, including, possibly,
874+
# releasing the pool holder.
875+
con = self.get_connection()
876+
if con is not None:
877+
con._cleanup()
870878
self._handle_waiter_on_connection_lost(exc)
871879

872880
cdef _write(self, buf):

tests/test_pool.py

+25
Original file line numberDiff line numberDiff line change
@@ -881,6 +881,31 @@ async def test_pool_init_and_use_race(self):
881881
await pool_task
882882
await pool.close()
883883

884+
async def test_pool_remote_close(self):
885+
pool = await self.create_pool(min_size=1, max_size=1)
886+
backend_pid_fut = self.loop.create_future()
887+
888+
async def worker():
889+
async with pool.acquire() as conn:
890+
pool_backend_pid = await conn.fetchval(
891+
'SELECT pg_backend_pid()')
892+
backend_pid_fut.set_result(pool_backend_pid)
893+
await asyncio.sleep(0.2, loop=self.loop)
894+
895+
task = self.loop.create_task(worker())
896+
try:
897+
conn = await self.connect()
898+
backend_pid = await backend_pid_fut
899+
await conn.execute('SELECT pg_terminate_backend($1)', backend_pid)
900+
finally:
901+
await conn.close()
902+
903+
await task
904+
905+
# Check that connection_lost has released the pool holder.
906+
conn = await pool.acquire(timeout=0.1)
907+
await pool.release(conn)
908+
884909

885910
@unittest.skipIf(os.environ.get('PGHOST'), 'using remote cluster for testing')
886911
class TestHotStandby(tb.ClusterTestCase):

0 commit comments

Comments
 (0)