Skip to content

Commit da9f5e8

Browse files
committed
use finer-grained locks
1 parent 0d84260 commit da9f5e8

File tree

2 files changed

+72
-68
lines changed

2 files changed

+72
-68
lines changed

pymongo/asynchronous/pool.py

Lines changed: 36 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,9 @@ def __init__(
732732
# and returned to pool from the left side. Stale sockets removed
733733
# from the right side.
734734
self.conns: collections.deque[AsyncConnection] = collections.deque()
735+
self._conns_lock = _async_create_lock()
735736
self.active_contexts: set[_CancellationContext] = set()
737+
self._active_contexts_lock = _async_create_lock()
736738
# The main lock for the pool. The lock should only be used to protect
737739
# updating attributes.
738740
# If possible, avoid any additional work while holding the lock.
@@ -795,6 +797,7 @@ def __init__(
795797
)
796798
# Similar to active_sockets but includes threads in the wait queue.
797799
self.operation_count: int = 0
800+
self._operation_count_lock = _async_create_lock()
798801
# Retain references to pinned connections to prevent the CPython GC
799802
# from thinking that a cursor's pinned connection can be GC'd when the
800803
# cursor is GC'd (see PYTHON-2751).
@@ -837,23 +840,23 @@ async def _reset(
837840
old_state = self.state
838841
if self.closed:
839842
return
840-
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
841-
async with self.lock:
842-
old_state, self.state = self.state, PoolState.PAUSED
843843

844844
with self.lock:
845+
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
846+
old_state, self.state = self.state, PoolState.PAUSED
845847
self.gen.inc(service_id)
846-
newpid = os.getpid()
847-
if self.pid != newpid:
848-
self.pid = newpid
849-
with self.lock:
848+
newpid = os.getpid()
849+
850+
if self.pid != newpid:
851+
self.pid = newpid
852+
850853
self.active_sockets = 0
854+
with self._conns_lock:
855+
if service_id is None:
856+
sockets, self.conns = self.conns, collections.deque()
857+
with self._operation_count_lock:
851858
self.operation_count = 0
852-
if service_id is None:
853-
new_conns = collections.deque()
854-
with self.lock:
855-
sockets, self.conns = self.conns, new_conns
856-
else:
859+
if service_id is not None:
857860
discard: collections.deque = collections.deque() # type: ignore[type-arg]
858861
keep: collections.deque = collections.deque() # type: ignore[type-arg]
859862
for conn in self.conns.copy():
@@ -862,7 +865,7 @@ async def _reset(
862865
else:
863866
keep.append(conn)
864867
sockets = discard
865-
with self.lock:
868+
with self._conns_lock:
866869
self.conns = keep
867870

868871
if close:
@@ -1000,8 +1003,9 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
10001003
if self.gen.get_overall() != reference_generation:
10011004
close_conn = True
10021005
if not close_conn:
1003-
async with self.lock:
1006+
async with self._conns_lock:
10041007
self.conns.appendleft(conn)
1008+
async with self._active_contexts_lock:
10051009
self.active_contexts.discard(conn.cancel_context)
10061010
if close_conn:
10071011
await conn.close_conn(ConnectionClosedReason.STALE)
@@ -1028,7 +1032,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
10281032
# Use a temporary context so that interrupt_connections can cancel creating the socket.
10291033
tmp_context = _CancellationContext()
10301034
conn_id = self.next_connection_id
1031-
async with self.lock:
1035+
async with self._active_contexts_lock:
10321036
self.next_connection_id += 1
10331037
self.active_contexts.add(tmp_context)
10341038

@@ -1050,7 +1054,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
10501054
networking_interface = await _configured_protocol_interface(self.address, self.opts)
10511055
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
10521056
except BaseException as error:
1053-
async with self.lock:
1057+
async with self._active_contexts_lock:
10541058
self.active_contexts.discard(tmp_context)
10551059
if self.enabled_for_cmap:
10561060
assert listeners is not None
@@ -1075,7 +1079,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
10751079
raise
10761080

10771081
conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
1078-
async with self.lock:
1082+
async with self._active_contexts_lock:
10791083
self.active_contexts.add(conn.cancel_context)
10801084
self.active_contexts.discard(tmp_context)
10811085
if tmp_context.cancelled:
@@ -1090,7 +1094,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
10901094
await conn.authenticate()
10911095
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
10921096
except BaseException:
1093-
async with self.lock:
1097+
async with self._active_contexts_lock:
10941098
self.active_contexts.discard(conn.cancel_context)
10951099
await conn.close_conn(ConnectionClosedReason.ERROR)
10961100
raise
@@ -1150,7 +1154,7 @@ async def checkout(
11501154
durationMS=duration,
11511155
)
11521156
try:
1153-
async with self.lock:
1157+
async with self._active_contexts_lock:
11541158
self.active_contexts.add(conn.cancel_context)
11551159
yield conn
11561160
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
@@ -1169,11 +1173,11 @@ async def checkout(
11691173
await self.checkin(conn)
11701174
raise
11711175
if conn.pinned_txn:
1172-
async with self.lock:
1176+
async with self._active_contexts_lock:
11731177
self.__pinned_sockets.add(conn)
11741178
self.ntxns += 1
11751179
elif conn.pinned_cursor:
1176-
async with self.lock:
1180+
async with self._active_contexts_lock:
11771181
self.__pinned_sockets.add(conn)
11781182
self.ncursors += 1
11791183
elif conn.active:
@@ -1237,7 +1241,7 @@ async def _get_conn(
12371241
"Attempted to check out a connection from closed connection pool"
12381242
)
12391243

1240-
async with self.lock:
1244+
async with self._operation_count_lock:
12411245
self.operation_count += 1
12421246

12431247
# Get a free socket or create one.
@@ -1286,7 +1290,7 @@ async def _get_conn(
12861290
self._raise_if_not_ready(checkout_started_time, emit_event=False)
12871291

12881292
try:
1289-
async with self.lock:
1293+
async with self._conns_lock:
12901294
conn = self.conns.popleft()
12911295
except IndexError:
12921296
self._pending += 1
@@ -1346,10 +1350,9 @@ async def checkin(self, conn: AsyncConnection) -> None:
13461350
conn.active = False
13471351
conn.pinned_txn = False
13481352
conn.pinned_cursor = False
1349-
async with self.lock:
1350-
self.__pinned_sockets.discard(conn)
13511353
listeners = self.opts._event_listeners
1352-
async with self.lock:
1354+
async with self._active_contexts_lock:
1355+
self.__pinned_sockets.discard(conn)
13531356
self.active_contexts.discard(conn.cancel_context)
13541357
if self.enabled_for_cmap:
13551358
assert listeners is not None
@@ -1393,25 +1396,24 @@ async def checkin(self, conn: AsyncConnection) -> None:
13931396
if self.stale_generation(conn.generation, conn.service_id):
13941397
close_conn = True
13951398
else:
1396-
with self.lock:
1399+
with self._conns_lock:
13971400
self.conns.appendleft(conn)
13981401
with self._max_connecting_cond:
13991402
# Notify any threads waiting to create a connection.
14001403
self._max_connecting_cond.notify()
14011404
if close_conn:
14021405
await conn.close_conn(ConnectionClosedReason.STALE)
14031406

1404-
async with self.lock:
1407+
async with self._active_contexts_lock:
14051408
self.active_sockets -= 1
1406-
self.operation_count -= 1
1407-
1408-
if txn:
1409-
async with self.lock:
1409+
if txn:
14101410
self.ntxns -= 1
1411-
elif cursor:
1412-
async with self.lock:
1411+
elif cursor:
14131412
self.ncursors -= 1
14141413

1414+
async with self._operation_count_lock:
1415+
self.operation_count -= 1
1416+
14151417
async with self.size_cond:
14161418
self.requests -= 1
14171419
self.size_cond.notify()

pymongo/synchronous/pool.py

Lines changed: 36 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -730,7 +730,9 @@ def __init__(
730730
# and returned to pool from the left side. Stale sockets removed
731731
# from the right side.
732732
self.conns: collections.deque[Connection] = collections.deque()
733+
self._conns_lock = _create_lock()
733734
self.active_contexts: set[_CancellationContext] = set()
735+
self._active_contexts_lock = _create_lock()
734736
# The main lock for the pool. The lock should only be used to protect
735737
# updating attributes.
736738
# If possible, avoid any additional work while holding the lock.
@@ -793,6 +795,7 @@ def __init__(
793795
)
794796
# Similar to active_sockets but includes threads in the wait queue.
795797
self.operation_count: int = 0
798+
self._operation_count_lock = _create_lock()
796799
# Retain references to pinned connections to prevent the CPython GC
797800
# from thinking that a cursor's pinned connection can be GC'd when the
798801
# cursor is GC'd (see PYTHON-2751).
@@ -835,23 +838,23 @@ def _reset(
835838
old_state = self.state
836839
if self.closed:
837840
return
838-
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
839-
with self.lock:
840-
old_state, self.state = self.state, PoolState.PAUSED
841841

842842
with self.lock:
843+
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
844+
old_state, self.state = self.state, PoolState.PAUSED
843845
self.gen.inc(service_id)
844-
newpid = os.getpid()
845-
if self.pid != newpid:
846-
self.pid = newpid
847-
with self.lock:
846+
newpid = os.getpid()
847+
848+
if self.pid != newpid:
849+
self.pid = newpid
850+
848851
self.active_sockets = 0
852+
with self._conns_lock:
853+
if service_id is None:
854+
sockets, self.conns = self.conns, collections.deque()
855+
with self._operation_count_lock:
849856
self.operation_count = 0
850-
if service_id is None:
851-
new_conns = collections.deque()
852-
with self.lock:
853-
sockets, self.conns = self.conns, new_conns
854-
else:
857+
if service_id is not None:
855858
discard: collections.deque = collections.deque() # type: ignore[type-arg]
856859
keep: collections.deque = collections.deque() # type: ignore[type-arg]
857860
for conn in self.conns.copy():
@@ -860,7 +863,7 @@ def _reset(
860863
else:
861864
keep.append(conn)
862865
sockets = discard
863-
with self.lock:
866+
with self._conns_lock:
864867
self.conns = keep
865868

866869
if close:
@@ -996,8 +999,9 @@ def remove_stale_sockets(self, reference_generation: int) -> None:
996999
if self.gen.get_overall() != reference_generation:
9971000
close_conn = True
9981001
if not close_conn:
999-
with self.lock:
1002+
with self._conns_lock:
10001003
self.conns.appendleft(conn)
1004+
with self._active_contexts_lock:
10011005
self.active_contexts.discard(conn.cancel_context)
10021006
if close_conn:
10031007
conn.close_conn(ConnectionClosedReason.STALE)
@@ -1024,7 +1028,7 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
10241028
# Use a temporary context so that interrupt_connections can cancel creating the socket.
10251029
tmp_context = _CancellationContext()
10261030
conn_id = self.next_connection_id
1027-
with self.lock:
1031+
with self._active_contexts_lock:
10281032
self.next_connection_id += 1
10291033
self.active_contexts.add(tmp_context)
10301034

@@ -1046,7 +1050,7 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
10461050
networking_interface = _configured_socket_interface(self.address, self.opts)
10471051
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
10481052
except BaseException as error:
1049-
with self.lock:
1053+
with self._active_contexts_lock:
10501054
self.active_contexts.discard(tmp_context)
10511055
if self.enabled_for_cmap:
10521056
assert listeners is not None
@@ -1071,7 +1075,7 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
10711075
raise
10721076

10731077
conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
1074-
with self.lock:
1078+
with self._active_contexts_lock:
10751079
self.active_contexts.add(conn.cancel_context)
10761080
self.active_contexts.discard(tmp_context)
10771081
if tmp_context.cancelled:
@@ -1086,7 +1090,7 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
10861090
conn.authenticate()
10871091
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
10881092
except BaseException:
1089-
with self.lock:
1093+
with self._active_contexts_lock:
10901094
self.active_contexts.discard(conn.cancel_context)
10911095
conn.close_conn(ConnectionClosedReason.ERROR)
10921096
raise
@@ -1146,7 +1150,7 @@ def checkout(
11461150
durationMS=duration,
11471151
)
11481152
try:
1149-
with self.lock:
1153+
with self._active_contexts_lock:
11501154
self.active_contexts.add(conn.cancel_context)
11511155
yield conn
11521156
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
@@ -1165,11 +1169,11 @@ def checkout(
11651169
self.checkin(conn)
11661170
raise
11671171
if conn.pinned_txn:
1168-
with self.lock:
1172+
with self._active_contexts_lock:
11691173
self.__pinned_sockets.add(conn)
11701174
self.ntxns += 1
11711175
elif conn.pinned_cursor:
1172-
with self.lock:
1176+
with self._active_contexts_lock:
11731177
self.__pinned_sockets.add(conn)
11741178
self.ncursors += 1
11751179
elif conn.active:
@@ -1233,7 +1237,7 @@ def _get_conn(
12331237
"Attempted to check out a connection from closed connection pool"
12341238
)
12351239

1236-
with self.lock:
1240+
with self._operation_count_lock:
12371241
self.operation_count += 1
12381242

12391243
# Get a free socket or create one.
@@ -1282,7 +1286,7 @@ def _get_conn(
12821286
self._raise_if_not_ready(checkout_started_time, emit_event=False)
12831287

12841288
try:
1285-
with self.lock:
1289+
with self._conns_lock:
12861290
conn = self.conns.popleft()
12871291
except IndexError:
12881292
self._pending += 1
@@ -1342,10 +1346,9 @@ def checkin(self, conn: Connection) -> None:
13421346
conn.active = False
13431347
conn.pinned_txn = False
13441348
conn.pinned_cursor = False
1345-
with self.lock:
1346-
self.__pinned_sockets.discard(conn)
13471349
listeners = self.opts._event_listeners
1348-
with self.lock:
1350+
with self._active_contexts_lock:
1351+
self.__pinned_sockets.discard(conn)
13491352
self.active_contexts.discard(conn.cancel_context)
13501353
if self.enabled_for_cmap:
13511354
assert listeners is not None
@@ -1389,25 +1392,24 @@ def checkin(self, conn: Connection) -> None:
13891392
if self.stale_generation(conn.generation, conn.service_id):
13901393
close_conn = True
13911394
else:
1392-
with self.lock:
1395+
with self._conns_lock:
13931396
self.conns.appendleft(conn)
13941397
with self._max_connecting_cond:
13951398
# Notify any threads waiting to create a connection.
13961399
self._max_connecting_cond.notify()
13971400
if close_conn:
13981401
conn.close_conn(ConnectionClosedReason.STALE)
13991402

1400-
with self.lock:
1403+
with self._active_contexts_lock:
14011404
self.active_sockets -= 1
1402-
self.operation_count -= 1
1403-
1404-
if txn:
1405-
with self.lock:
1405+
if txn:
14061406
self.ntxns -= 1
1407-
elif cursor:
1408-
with self.lock:
1407+
elif cursor:
14091408
self.ncursors -= 1
14101409

1410+
with self._operation_count_lock:
1411+
self.operation_count -= 1
1412+
14111413
with self.size_cond:
14121414
self.requests -= 1
14131415
self.size_cond.notify()

0 commit comments

Comments
 (0)