Skip to content

Commit 216407e

Browse files
committed
PYTHON-5212 - Do not hold Topology lock while resetting pool
1 parent 412d000 commit 216407e

File tree

4 files changed

+54
-24
lines changed

4 files changed

+54
-24
lines changed

pymongo/asynchronous/pool.py

+22-6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from __future__ import annotations
1616

17+
import asyncio
1718
import collections
1819
import contextlib
1920
import logging
@@ -860,8 +861,13 @@ async def _reset(
860861
# PoolClosedEvent but that reset() SHOULD close sockets *after*
861862
# publishing the PoolClearedEvent.
862863
if close:
863-
for conn in sockets:
864-
await conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
864+
if not _IS_SYNC:
865+
await asyncio.gather(
866+
*[conn._close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets]
867+
)
868+
else:
869+
for conn in sockets:
870+
await conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
865871
if self.enabled_for_cmap:
866872
assert listeners is not None
867873
listeners.publish_pool_closed(self.address)
@@ -891,8 +897,13 @@ async def _reset(
891897
serverPort=self.address[1],
892898
serviceId=service_id,
893899
)
894-
for conn in sockets:
895-
await conn.close_conn(ConnectionClosedReason.STALE)
900+
if not _IS_SYNC:
901+
await asyncio.gather(
902+
*[conn._close_conn(ConnectionClosedReason.STALE) for conn in sockets]
903+
)
904+
else:
905+
for conn in sockets:
906+
await conn.close_conn(ConnectionClosedReason.STALE)
896907

897908
async def update_is_writable(self, is_writable: Optional[bool]) -> None:
898909
"""Updates the is_writable attribute on all sockets currently in the
@@ -938,8 +949,13 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
938949
and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds
939950
):
940951
close_conns.append(self.conns.pop())
941-
for conn in close_conns:
942-
await conn.close_conn(ConnectionClosedReason.IDLE)
952+
if not _IS_SYNC:
953+
await asyncio.gather(
954+
*[conn._close_conn(ConnectionClosedReason.IDLE) for conn in close_conns]
955+
)
956+
else:
957+
for conn in close_conns:
958+
await conn.close_conn(ConnectionClosedReason.IDLE)
943959

944960
while True:
945961
async with self.size_cond:

pymongo/asynchronous/topology.py

+5-6
Original file line numberDiff line numberDiff line change
@@ -529,12 +529,6 @@ async def _process_change(
529529
if not _IS_SYNC:
530530
self._monitor_tasks.append(self._srv_monitor)
531531

532-
# Clear the pool from a failed heartbeat.
533-
if reset_pool:
534-
server = self._servers.get(server_description.address)
535-
if server:
536-
await server.pool.reset(interrupt_connections=interrupt_connections)
537-
538532
# Wake anything waiting in select_servers().
539533
self._condition.notify_all()
540534

@@ -557,6 +551,11 @@ async def on_change(
557551
# that didn't include this server.
558552
if self._opened and self._description.has_server(server_description.address):
559553
await self._process_change(server_description, reset_pool, interrupt_connections)
554+
# Clear the pool from a failed heartbeat, done outside the lock to avoid blocking on connection close.
555+
if self._opened and self._description.has_server(server_description.address) and reset_pool:
556+
server = self._servers.get(server_description.address)
557+
if server:
558+
await server.pool.reset(interrupt_connections=interrupt_connections)
560559

561560
async def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None:
562561
"""Process a new seedlist on an opened topology.

pymongo/synchronous/pool.py

+22-6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from __future__ import annotations
1616

17+
import asyncio
1718
import collections
1819
import contextlib
1920
import logging
@@ -858,8 +859,13 @@ def _reset(
858859
# PoolClosedEvent but that reset() SHOULD close sockets *after*
859860
# publishing the PoolClearedEvent.
860861
if close:
861-
for conn in sockets:
862-
conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
862+
if not _IS_SYNC:
863+
asyncio.gather(
864+
*[conn._close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets]
865+
)
866+
else:
867+
for conn in sockets:
868+
conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
863869
if self.enabled_for_cmap:
864870
assert listeners is not None
865871
listeners.publish_pool_closed(self.address)
@@ -889,8 +895,13 @@ def _reset(
889895
serverPort=self.address[1],
890896
serviceId=service_id,
891897
)
892-
for conn in sockets:
893-
conn.close_conn(ConnectionClosedReason.STALE)
898+
if not _IS_SYNC:
899+
asyncio.gather(
900+
*[conn._close_conn(ConnectionClosedReason.STALE) for conn in sockets]
901+
)
902+
else:
903+
for conn in sockets:
904+
conn.close_conn(ConnectionClosedReason.STALE)
894905

895906
def update_is_writable(self, is_writable: Optional[bool]) -> None:
896907
"""Updates the is_writable attribute on all sockets currently in the
@@ -934,8 +945,13 @@ def remove_stale_sockets(self, reference_generation: int) -> None:
934945
and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds
935946
):
936947
close_conns.append(self.conns.pop())
937-
for conn in close_conns:
938-
conn.close_conn(ConnectionClosedReason.IDLE)
948+
if not _IS_SYNC:
949+
asyncio.gather(
950+
*[conn._close_conn(ConnectionClosedReason.IDLE) for conn in close_conns]
951+
)
952+
else:
953+
for conn in close_conns:
954+
conn.close_conn(ConnectionClosedReason.IDLE)
939955

940956
while True:
941957
with self.size_cond:

pymongo/synchronous/topology.py

+5-6
Original file line numberDiff line numberDiff line change
@@ -529,12 +529,6 @@ def _process_change(
529529
if not _IS_SYNC:
530530
self._monitor_tasks.append(self._srv_monitor)
531531

532-
# Clear the pool from a failed heartbeat.
533-
if reset_pool:
534-
server = self._servers.get(server_description.address)
535-
if server:
536-
server.pool.reset(interrupt_connections=interrupt_connections)
537-
538532
# Wake anything waiting in select_servers().
539533
self._condition.notify_all()
540534

@@ -557,6 +551,11 @@ def on_change(
557551
# that didn't include this server.
558552
if self._opened and self._description.has_server(server_description.address):
559553
self._process_change(server_description, reset_pool, interrupt_connections)
554+
# Clear the pool from a failed heartbeat, done outside the lock to avoid blocking on connection close.
555+
if self._opened and self._description.has_server(server_description.address) and reset_pool:
556+
server = self._servers.get(server_description.address)
557+
if server:
558+
server.pool.reset(interrupt_connections=interrupt_connections)
560559

561560
def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None:
562561
"""Process a new seedlist on an opened topology.

0 commit comments

Comments
 (0)