Skip to content

Commit 3565ef8

Browse files
committed
Raise a clear error if there's a race in pool intialization.
Raise an InterfaceError if: * Two or more tasks try to initialize the same Pool object concurrently; * A pool method, such as fetchval(), is called while the Pool is being initialized. See also issue #320.
1 parent d9a236e commit 3565ef8

File tree

2 files changed

+48
-5
lines changed

2 files changed

+48
-5
lines changed

asyncpg/pool.py

+19-5
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,8 @@ class Pool:
305305
__slots__ = ('_queue', '_loop', '_minsize', '_maxsize',
306306
'_init', '_connect_args', '_connect_kwargs',
307307
'_working_addr', '_working_config', '_working_params',
308-
'_holders', '_initialized', '_closing', '_closed',
309-
'_connection_class', '_generation')
308+
'_holders', '_initialized', '_initializing', '_closing',
309+
'_closed', '_connection_class', '_generation')
310310

311311
def __init__(self, *connect_args,
312312
min_size,
@@ -359,6 +359,7 @@ def __init__(self, *connect_args,
359359

360360
self._holders = []
361361
self._initialized = False
362+
self._initializing = False
362363
self._queue = asyncio.LifoQueue(maxsize=self._maxsize, loop=self._loop)
363364

364365
self._working_addr = None
@@ -387,9 +388,20 @@ def __init__(self, *connect_args,
387388
async def _async__init__(self):
388389
if self._initialized:
389390
return
391+
if self._initializing:
392+
raise exceptions.InterfaceError(
393+
'pool is being initialized in another task')
390394
if self._closed:
391395
raise exceptions.InterfaceError('pool is closed')
396+
self._initializing = True
397+
try:
398+
await self._initialize()
399+
return self
400+
finally:
401+
self._initializing = False
402+
self._initialized = True
392403

404+
async def _initialize(self):
393405
if self._minsize:
394406
# Since we use a LIFO queue, the first items in the queue will be
395407
# the last ones in `self._holders`. We want to pre-connect the
@@ -412,9 +424,6 @@ async def _async__init__(self):
412424

413425
await asyncio.gather(*connect_tasks, loop=self._loop)
414426

415-
self._initialized = True
416-
return self
417-
418427
def set_connect_args(self, dsn=None, **connect_kwargs):
419428
r"""Set the new connection arguments for this pool.
420429
@@ -703,6 +712,11 @@ async def expire_connections(self):
703712

704713
def _check_init(self):
705714
if not self._initialized:
715+
if self._initializing:
716+
raise exceptions.InterfaceError(
717+
'pool is being initialized, but not yet ready: '
718+
'likely there is a race between creating a pool and '
719+
'using it')
706720
raise exceptions.InterfaceError('pool is not initialized')
707721
if self._closed:
708722
raise exceptions.InterfaceError('pool is closed')

tests/test_pool.py

+29
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,35 @@ async def test_pool_set_connection_args(self):
852852
self.assertEqual(con.get_settings().application_name,
853853
'set_conn_args_test_2')
854854

855+
async def test_pool_init_race(self):
856+
pool = self.create_pool(database='postgres', min_size=1, max_size=1)
857+
858+
t1 = asyncio.ensure_future(pool, loop=self.loop)
859+
t2 = asyncio.ensure_future(pool, loop=self.loop)
860+
861+
await t1
862+
with self.assertRaisesRegex(
863+
asyncpg.InterfaceError,
864+
r'pool is being initialized in another task'):
865+
await t2
866+
867+
await pool.close()
868+
869+
async def test_pool_init_and_use_race(self):
870+
pool = self.create_pool(database='postgres', min_size=1, max_size=1)
871+
872+
pool_task = asyncio.ensure_future(pool, loop=self.loop)
873+
await asyncio.sleep(0, loop=self.loop)
874+
875+
with self.assertRaisesRegex(
876+
asyncpg.InterfaceError,
877+
r'being initialized, but not yet ready'):
878+
879+
await pool.fetchval('SELECT 1')
880+
881+
await pool_task
882+
await pool.close()
883+
855884

856885
@unittest.skipIf(os.environ.get('PGHOST'), 'using remote cluster for testing')
857886
class TestHotStandby(tb.ClusterTestCase):

0 commit comments

Comments
 (0)