5
5
6
6
import pytest
7
7
8
- from dispatcher .brokers .pg_notify import get_connection
9
- from dispatcher .main import DispatcherMain
8
+ from dispatcher .brokers .pg_notify import create_connection
9
+ from dispatcher .config import DispatcherSettings
10
+ from dispatcher .factories import from_settings
10
11
11
12
12
13
class PoolServer :
@@ -17,6 +18,9 @@ class PoolServer:
17
18
which will run (and stop) the relevant dispatcher code in a background process.
18
19
"""
19
20
21
+ def __init__ (self , config ):
22
+ self .config = config
23
+
20
24
def run_benchmark_test (self , queue_in , queue_out , times ):
21
25
print (f'submitting message to pool server { times } ' )
22
26
queue_in .put (str (times ))
@@ -27,8 +31,10 @@ def run_benchmark_test(self, queue_in, queue_out, times):
27
31
raise Exception ('Test subprocess runner exception, look back in logs' )
28
32
29
33
@classmethod
30
- async def run_pool (cls , queue_in , queue_out , workers , function = 'lambda: __import__("time").sleep(0.01)' ):
31
- dispatcher = DispatcherMain ({"producers" : {"brokers" : {}}, "pool" : {"max_workers" : workers }})
34
+ async def run_pool (cls , config , queue_in , queue_out , workers , function = 'lambda: __import__("time").sleep(0.01)' ):
35
+ this_config = config .copy ()
36
+ this_config ['service' ]['pool_kwargs' ]['max_workers' ] = workers
37
+ dispatcher = from_settings (DispatcherSettings (this_config ))
32
38
pool = dispatcher .pool
33
39
await pool .start_working (dispatcher )
34
40
queue_out .put ('ready' )
@@ -58,10 +64,10 @@ async def run_pool(cls, queue_in, queue_out, workers, function='lambda: __import
58
64
print ('exited forever loop of pool server' )
59
65
60
66
@classmethod
61
- def run_pool_loop (cls , queue_in , queue_out , workers , ** kwargs ):
67
+ def run_pool_loop (cls , config , queue_in , queue_out , workers , ** kwargs ):
62
68
loop = asyncio .get_event_loop ()
63
69
try :
64
- loop .run_until_complete (cls .run_pool (queue_in , queue_out , workers , ** kwargs ))
70
+ loop .run_until_complete (cls .run_pool (config , queue_in , queue_out , workers , ** kwargs ))
65
71
except Exception :
66
72
import traceback
67
73
@@ -79,7 +85,7 @@ def run_pool_loop(cls, queue_in, queue_out, workers, **kwargs):
79
85
def start_server (self , workers , ** kwargs ):
80
86
self .queue_in = multiprocessing .Queue ()
81
87
self .queue_out = multiprocessing .Queue ()
82
- process = multiprocessing .Process (target = self .run_pool_loop , args = (self .queue_in , self .queue_out , workers ), kwargs = kwargs )
88
+ process = multiprocessing .Process (target = self .run_pool_loop , args = (self .config , self . queue_in , self .queue_out , workers ), kwargs = kwargs )
83
89
process .start ()
84
90
return process
85
91
@@ -118,7 +124,7 @@ def run_benchmark_test(self, queue_in, queue_out, times):
118
124
queue_in .put ('wake' )
119
125
print ('sending pg_notify messages' )
120
126
function = 'lambda: __import__("time").sleep(0.01)'
121
- conn = get_connection ( {"conninfo" : CONNECTION_STRING })
127
+ conn = create_connection ( ** {"conninfo" : CONNECTION_STRING })
122
128
with conn .cursor () as cur :
123
129
for i in range (times ):
124
130
cur .execute (f"SELECT pg_notify('test_channel', '{ function } ');" )
@@ -127,10 +133,10 @@ def run_benchmark_test(self, queue_in, queue_out, times):
127
133
print (f'finished running round with { times } messages, got: { message_in } ' )
128
134
129
135
@classmethod
130
- async def run_pool (cls , queue_in , queue_out , workers ):
131
- dispatcher = DispatcherMain (
132
- { "producers" : { "brokers" : { "pg_notify" : { "conninfo" : CONNECTION_STRING }, "channels" : CHANNELS }}, "pool" : { " max_workers" : workers }}
133
- )
136
+ async def run_pool (cls , config , queue_in , queue_out , workers ):
137
+ this_config = config . copy ()
138
+ this_config [ 'service' ][ 'pool_kwargs' ][ ' max_workers' ] = workers
139
+ dispatcher = from_settings ( DispatcherSettings ( this_config ) )
134
140
await dispatcher .start_working ()
135
141
# Make sure the dispatcher is listening before starting the tests which will submit messages
136
142
for producer in dispatcher .producers :
@@ -157,12 +163,12 @@ async def run_pool(cls, queue_in, queue_out, workers):
157
163
158
164
159
165
@pytest .fixture
160
- def with_pool_server ():
161
- server_thing = PoolServer ()
166
+ def with_pool_server (test_settings ):
167
+ server_thing = PoolServer (test_settings . serialize () )
162
168
return server_thing .with_server
163
169
164
170
165
171
@pytest .fixture
166
- def with_full_server ():
167
- server_thing = FullServer ()
172
+ def with_full_server (test_settings ):
173
+ server_thing = FullServer (test_settings . serialize () )
168
174
return server_thing .with_server
0 commit comments