Skip to content

Commit a77cbb2

Browse files
committed
Adopt new patterns up to some tests passing
1 parent c643578 commit a77cbb2

File tree

9 files changed

+104
-71
lines changed

9 files changed

+104
-71
lines changed

dispatcher/brokers/base.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from abc import abstractmethod
2+
from typing import Optional
23

34

45
class BaseBroker:
@@ -9,7 +10,7 @@ async def connect(self): ...
910
async def aprocess_notify(self, connected_callback=None): ...
1011

1112
@abstractmethod
12-
async def apublish_message(self, channel, payload=None) -> None: ...
13+
async def apublish_message(self, channel: Optional[str] = None, message: str = '') -> None: ...
1314

1415
@abstractmethod
1516
async def aclose(self) -> None: ...

dispatcher/brokers/pg_notify.py

+45-26
Original file line numberDiff line numberDiff line change
@@ -107,43 +107,24 @@ async def aprocess_notify(self, connected_callback=None):
107107
async for notify in connection.notifies():
108108
yield notify.channel, notify.payload
109109

110-
async def apublish_message(self, channel: Optional[str] = None, payload=None) -> None:
110+
async def apublish_message(self, channel: Optional[str] = None, message: str = '') -> None:
111111
connection = await self.get_connection()
112112
channel = self.get_publish_channel(channel)
113113

114114
async with connection.cursor() as cur:
115-
if not payload:
115+
if not message:
116116
await cur.execute(f'NOTIFY {channel};')
117117
else:
118-
await cur.execute(f"NOTIFY {channel}, '{payload}';")
118+
await cur.execute(f"NOTIFY {channel}, '{message}';")
119+
120+
logger.debug(f'Sent pg_notify message of {len(message)} chars to {channel}')
119121

120122
async def aclose(self) -> None:
121123
if self._connection:
122124
await self._connection.close()
123125
self._connection = None
124126

125127

126-
class ConnectionSaver:
127-
def __init__(self):
128-
self._connection = None
129-
130-
131-
connection_save = ConnectionSaver()
132-
133-
134-
def connection_saver(**config):
135-
"""
136-
This mimics the behavior of Django for tests and demos
137-
Philosophically, this is used by an application that uses an ORM,
138-
or otherwise has its own connection management logic.
139-
Dispatcher does not manage connections, so this a simulation of that.
140-
"""
141-
if connection_save._connection is None:
142-
config['autocommit'] = True
143-
connection_save._connection = SyncBroker.create_connection(**config)
144-
return connection_save._connection
145-
146-
147128
class SyncBroker(PGNotifyBase):
148129
def __init__(
149130
self,
@@ -177,11 +158,49 @@ def publish_message(self, channel: Optional[str] = None, message: str = '') -> N
177158
channel = self.get_publish_channel(channel)
178159

179160
with connection.cursor() as cur:
180-
cur.execute('SELECT pg_notify(%s, %s);', (channel, message))
161+
if message:
162+
cur.execute('SELECT pg_notify(%s, %s);', (channel, message))
163+
else:
164+
cur.execute(f'NOTIFY {channel};')
181165

182-
logger.debug(f'Sent pg_notify message to {channel}')
166+
logger.debug(f'Sent pg_notify message of {len(message)} chars to {channel}')
183167

184168
def close(self) -> None:
185169
if self._connection:
186170
self._connection.close()
187171
self._connection = None
172+
173+
174+
class ConnectionSaver:
175+
def __init__(self) -> None:
176+
self._connection: Optional[psycopg.Connection] = None
177+
self._async_connection: Optional[psycopg.AsyncConnection] = None
178+
179+
180+
connection_save = ConnectionSaver()
181+
182+
183+
def connection_saver(**config) -> psycopg.Connection:
184+
"""
185+
This mimics the behavior of Django for tests and demos
186+
Philosophically, this is used by an application that uses an ORM,
187+
or otherwise has its own connection management logic.
188+
Dispatcher does not manage connections, so this a simulation of that.
189+
"""
190+
if connection_save._connection is None:
191+
config['autocommit'] = True
192+
connection_save._connection = SyncBroker.create_connection(**config)
193+
return connection_save._connection
194+
195+
196+
async def async_connection_saver(**config) -> psycopg.AsyncConnection:
197+
"""
198+
This mimics the behavior of Django for tests and demos
199+
Philosophically, this is used by an application that uses an ORM,
200+
or otherwise has its own connection management logic.
201+
Dispatcher does not manage connections, so this a simulation of that.
202+
"""
203+
if connection_save._async_connection is None:
204+
config['autocommit'] = True
205+
connection_save._async_connection = await AsyncBroker.create_connection(**config)
206+
return connection_save._async_connection

dispatcher/config.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77

88
class DispatcherSettings:
99
def __init__(self, config: dict) -> None:
10-
self.brokers: dict = config.get('brokers', [])
11-
self.producers: dict = config.get('producers', [])
10+
self.brokers: dict = config.get('brokers', {})
11+
self.producers: dict = config.get('producers', {})
1212
self.service: dict = config.get('service', {'max_workers': 3})
1313
self.publish: dict = config.get('publish', {})
1414
# TODO: firmly planned sections of config for later

dispatcher/control.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def __init__(self, queuename, send_data, expected_replies):
2929
def _create_events(self):
3030
return SimpleNamespace(exit_event=asyncio.Event())
3131

32-
async def process_message(self, payload, broker=None, channel=None):
32+
async def process_message(self, payload, producer=None, channel=None):
3333
self.received_replies.append(payload)
3434
if self.expected_replies and (len(self.received_replies) >= self.expected_replies):
3535
self.events.exit_event.set()

dispatcher/factories.py

+11-7
Original file line numberDiff line numberDiff line change
@@ -68,20 +68,24 @@ def get_sync_broker(broker_name, broker_config) -> BaseBroker:
6868
return broker_module.SyncBroker(**broker_config)
6969

7070

71-
def get_sync_publisher_from_settings(publish_broker: Optional[str] = None, settings: LazySettings = global_settings) -> BaseBroker:
71+
def _get_publisher_broker_name(publish_broker: Optional[str] = None, settings: LazySettings = global_settings) -> str:
7272
if publish_broker:
73-
pass
73+
return publish_broker
7474
elif len(settings.brokers) == 1:
75-
publish_broker = list(settings.brokers.keys())[0]
75+
return list(settings.brokers.keys())[0]
7676
elif 'default_broker' in settings.publish:
77-
publish_broker = settings.publish['default_broker']
77+
return settings.publish['default_broker']
7878
else:
7979
raise RuntimeError(f'Could not determine which broker to publish with between options {list(settings.brokers.keys())}')
8080

81-
return get_sync_broker(publish_broker, settings.brokers[publish_broker])
8281

82+
def get_sync_publisher_from_settings(publish_broker: Optional[str] = None, settings: LazySettings = global_settings, **overrides) -> BaseBroker:
83+
publish_broker = _get_publisher_broker_name(publish_broker=publish_broker, settings=settings)
8384

84-
def get_async_publisher_from_settings(settings: LazySettings = global_settings, **overrides) -> BaseBroker:
85+
return get_sync_broker(publish_broker, settings.brokers[publish_broker], **overrides)
86+
87+
88+
def get_async_publisher_from_settings(publish_broker: Optional[str] = None, settings: LazySettings = global_settings, **overrides) -> BaseBroker:
8589
"""
8690
An asynchronous publisher is the ideal choice for submitting control-and-reply actions.
8791
This returns an asyncio broker of the default publisher type.
@@ -90,5 +94,5 @@ def get_async_publisher_from_settings(settings: LazySettings = global_settings,
9094
For control-and-reply, this will contain only the reply_to channel, to not receive
9195
unrelated traffic.
9296
"""
93-
publish_broker = settings.publish['default_broker']
97+
publish_broker = _get_publisher_broker_name(publish_broker=publish_broker, settings=settings)
9498
return get_async_broker(publish_broker, settings.brokers[publish_broker], **overrides)

dispatcher/producers/brokered.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ async def produce_forever(self, dispatcher) -> None:
3636
self.dispatcher = dispatcher
3737
async for channel, payload in self.broker.aprocess_notify(connected_callback=self.connected_callback):
3838
self.produced_count += 1
39-
await dispatcher.process_message(payload, broker=self, channel=channel)
39+
await dispatcher.process_message(payload, producer=self, channel=channel)
4040

41-
async def notify(self, channel: str, payload: Optional[str] = None) -> None:
42-
await self.broker.apublish_message(channel, payload=payload)
41+
async def notify(self, channel: str, payload: str = '') -> None:
42+
await self.broker.apublish_message(channel=channel, message=payload)
4343

4444
async def shutdown(self) -> None:
4545
if self.production_task:

tests/conftest.py

+32-23
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
from dispatcher.brokers.pg_notify import SyncBroker, AsyncBroker
1515
from dispatcher.registry import DispatcherMethodRegistry
16+
from dispatcher.config import temporary_settings
17+
from dispatcher.factories import from_settings
1618

1719

1820
# List of channels to listen on
@@ -24,7 +26,10 @@
2426
BASIC_CONFIG = {
2527
"brokers": {
2628
"pg_notify": {
27-
"channels": CHANNELS
29+
"channels": CHANNELS,
30+
"config": {'conninfo': CONNECTION_STRING},
31+
"sync_connection_factory": "dispatcher.brokers.pg_notify.connection_saver",
32+
# "async_connection_factory": "dispatcher.brokers.pg_notify.async_connection_saver",
2833
}
2934
},
3035
"pool": {
@@ -37,7 +42,7 @@
3742
async def aconnection_for_test():
3843
conn = None
3944
try:
40-
conn = await AsyncBroker.create_connection({'conninfo': CONNECTION_STRING})
45+
conn = await AsyncBroker.create_connection(conninfo=CONNECTION_STRING, autocommit=True)
4146

4247
# Make sure database is running to avoid deadlocks which can come
4348
# from using the loop provided by pytest asyncio
@@ -58,27 +63,31 @@ def conn_config():
5863

5964
@pytest.fixture
6065
def pg_dispatcher() -> DispatcherMain:
61-
return DispatcherMain(BASIC_CONFIG)
66+
# We can not reuse the connection between tests
67+
config = BASIC_CONFIG.copy()
68+
config['brokers']['pg_notify'].pop('async_connection_factory')
69+
return DispatcherMain(config)
70+
71+
72+
@pytest.fixture
73+
def test_setup():
74+
with temporary_settings(BASIC_CONFIG):
75+
yield
6276

6377

6478
@pytest_asyncio.fixture(loop_scope="function", scope="function")
65-
async def apg_dispatcher(conn_config) -> AsyncIterator[DispatcherMain]:
66-
# need to make a new connection because it can not be same as publisher
67-
async with aconnection_for_test() as conn:
68-
config = BASIC_CONFIG.copy()
69-
config['producers']['BrokeredProducer']['connection'] = conn
70-
# We have to fill in the config so that replies can still be sent in workers
71-
# the workers may establish a new psycopg connection
72-
config['producers']['BrokeredProducer']['config'] = conn_config
73-
try:
74-
dispatcher = DispatcherMain(config)
75-
76-
await dispatcher.connect_signals()
77-
await dispatcher.start_working()
78-
await dispatcher.wait_for_producers_ready()
79-
80-
yield dispatcher
81-
finally:
79+
async def apg_dispatcher(test_setup) -> AsyncIterator[DispatcherMain]:
80+
dispatcher = None
81+
try:
82+
dispatcher = from_settings()
83+
84+
await dispatcher.connect_signals()
85+
await dispatcher.start_working()
86+
await dispatcher.wait_for_producers_ready()
87+
88+
yield dispatcher
89+
finally:
90+
if dispatcher:
8291
await dispatcher.shutdown()
8392
await dispatcher.cancel_tasks()
8493

@@ -87,18 +96,18 @@ async def apg_dispatcher(conn_config) -> AsyncIterator[DispatcherMain]:
8796
async def pg_message(psycopg_conn) -> Callable:
8897
async def _rf(message, channel='test_channel'):
8998
broker = AsyncBroker(connection=psycopg_conn)
90-
await broker.apublish_message(channel, message)
99+
await broker.apublish_message(channel=channel, message=message)
91100
return _rf
92101

93102

94103
@pytest_asyncio.fixture(loop_scope="function", scope="function")
95-
async def pg_control() -> AsyncIterator[Control]:
104+
async def pg_control(test_setup) -> AsyncIterator[Control]:
96105
"""This has to use a different connection from dispatcher itself
97106
98107
because psycopg will pool async connections, meaning that submission
99108
for the control task would be blocked by the listening query of the dispatcher itself"""
100109
async with aconnection_for_test() as conn:
101-
yield Control('test_channel', async_connection=conn)
110+
yield Control(queue='test_channel')
102111

103112

104113
@pytest_asyncio.fixture(loop_scope="function", scope="function")

tests/integration/publish/__init__.py

Whitespace-only changes.

tests/integration/test_main.py

+8-8
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ async def wait_to_receive(dispatcher, ct, timeout=5.0, interval=0.05):
2424
async def test_run_lambda_function(apg_dispatcher, pg_message):
2525
assert apg_dispatcher.pool.finished_count == 0
2626

27-
clearing_task = asyncio.create_task(apg_dispatcher.pool.events.work_cleared.wait())
27+
clearing_task = asyncio.create_task(apg_dispatcher.pool.events.work_cleared.wait(), name='test_lambda_clear_wait')
2828
await pg_message('lambda: "This worked!"')
2929
await asyncio.wait_for(clearing_task, timeout=3)
3030

@@ -36,7 +36,7 @@ async def test_run_decorated_function(apg_dispatcher, conn_config):
3636
assert apg_dispatcher.pool.finished_count == 0
3737

3838
clearing_task = asyncio.create_task(apg_dispatcher.pool.events.work_cleared.wait())
39-
test_methods.print_hello.apply_async(config=conn_config)
39+
test_methods.print_hello.apply_async()
4040
await asyncio.wait_for(clearing_task, timeout=3)
4141

4242
assert apg_dispatcher.pool.finished_count == 1
@@ -186,11 +186,11 @@ async def test_task_discard(apg_dispatcher, pg_message):
186186

187187

188188
@pytest.mark.asyncio
189-
async def test_task_discard_in_task_definition(apg_dispatcher, conn_config):
189+
async def test_task_discard_in_task_definition(apg_dispatcher):
190190
assert apg_dispatcher.pool.finished_count == 0
191191

192192
for i in range(10):
193-
test_methods.sleep_discard.apply_async(args=[2], config=conn_config)
193+
test_methods.sleep_discard.apply_async(args=[2])
194194

195195
await wait_to_receive(apg_dispatcher, 10)
196196

@@ -199,11 +199,11 @@ async def test_task_discard_in_task_definition(apg_dispatcher, conn_config):
199199

200200

201201
@pytest.mark.asyncio
202-
async def test_tasks_in_serial(apg_dispatcher, conn_config):
202+
async def test_tasks_in_serial(apg_dispatcher):
203203
assert apg_dispatcher.pool.finished_count == 0
204204

205205
for i in range(10):
206-
test_methods.sleep_serial.apply_async(args=[2], config=conn_config)
206+
test_methods.sleep_serial.apply_async(args=[2])
207207

208208
await wait_to_receive(apg_dispatcher, 10)
209209

@@ -212,11 +212,11 @@ async def test_tasks_in_serial(apg_dispatcher, conn_config):
212212

213213

214214
@pytest.mark.asyncio
215-
async def test_tasks_queue_one(apg_dispatcher, conn_config):
215+
async def test_tasks_queue_one(apg_dispatcher):
216216
assert apg_dispatcher.pool.finished_count == 0
217217

218218
for i in range(10):
219-
test_methods.sleep_queue_one.apply_async(args=[2], config=conn_config)
219+
test_methods.sleep_queue_one.apply_async(args=[2])
220220

221221
await wait_to_receive(apg_dispatcher, 10)
222222

0 commit comments

Comments
 (0)