@@ -31,6 +31,7 @@ async def run_pool(cls, queue_in, queue_out, workers, function='lambda: __import
31
31
dispatcher = DispatcherMain ({"producers" : {"brokers" : {}}, "pool" : {"max_workers" : workers }})
32
32
pool = dispatcher .pool
33
33
await pool .start_working (dispatcher )
34
+ queue_out .put ('ready' )
34
35
35
36
print ('waiting for message to start test' )
36
37
loop = asyncio .get_event_loop ()
@@ -85,6 +86,9 @@ def start_server(self, workers, **kwargs):
85
86
@contextlib .contextmanager
86
87
def with_server (self , * args , ** kwargs ):
87
88
process = self .start_server (* args , ** kwargs )
89
+ msg = self .queue_out .get ()
90
+ if msg != 'ready' :
91
+ raise RuntimeError ('never got ready message from subprocess' )
88
92
try :
89
93
yield self
90
94
finally :
@@ -128,6 +132,10 @@ async def run_pool(cls, queue_in, queue_out, workers):
128
132
{"producers" : {"brokers" : {"pg_notify" : {"conninfo" : CONNECTION_STRING }, "channels" : CHANNELS }}, "pool" : {"max_workers" : workers }}
129
133
)
130
134
await dispatcher .start_working ()
135
+ # Make sure the dispatcher is listening before starting the tests which will submit messages
136
+ for producer in dispatcher .producers :
137
+ await producer .events .ready_event .wait ()
138
+ queue_out .put ('ready' )
131
139
132
140
print ('waiting for message to start test' )
133
141
loop = asyncio .get_event_loop ()
0 commit comments