|
| 1 | +import asyncio |
| 2 | +import contextlib |
| 3 | +import multiprocessing |
| 4 | +import time |
| 5 | +from copy import deepcopy |
| 6 | + |
| 7 | +import pytest |
| 8 | + |
| 9 | +from dispatcher.brokers.pg_notify import create_connection |
| 10 | +from dispatcher.config import DispatcherSettings |
| 11 | +from dispatcher.factories import from_settings |
| 12 | + |
| 13 | + |
| 14 | +class PoolServer: |
| 15 | + """Before you read more, know there are 3 contexts involved. |
| 16 | +
|
| 17 | + This produces a method to be passed to pytest-benchmark. |
| 18 | + That method has to be ran inside a context manager, |
| 19 | + which will run (and stop) the relevant dispatcher code in a background process. |
| 20 | + """ |
| 21 | + |
| 22 | + def __init__(self, config): |
| 23 | + self.config = config |
| 24 | + |
| 25 | + def run_benchmark_test(self, queue_in, queue_out, times): |
| 26 | + print(f'submitting message to pool server {times}') |
| 27 | + queue_in.put(str(times)) |
| 28 | + print('waiting for reply message from pool server') |
| 29 | + message_in = queue_out.get() |
| 30 | + print(f'finished running round with {times} messages, got: {message_in}') |
| 31 | + if message_in == 'error': |
| 32 | + raise Exception('Test subprocess runner exception, look back in logs') |
| 33 | + |
| 34 | + @classmethod |
| 35 | + async def run_pool(cls, config, queue_in, queue_out, workers, function='lambda: __import__("time").sleep(0.01)'): |
| 36 | + this_config = config.copy() |
| 37 | + this_config['service']['pool_kwargs']['max_workers'] = workers |
| 38 | + dispatcher = from_settings(DispatcherSettings(this_config)) |
| 39 | + pool = dispatcher.pool |
| 40 | + await pool.start_working(dispatcher) |
| 41 | + queue_out.put('ready') |
| 42 | + |
| 43 | + print('waiting for message to start test') |
| 44 | + loop = asyncio.get_event_loop() |
| 45 | + while True: |
| 46 | + print('pool server listening on queue_in') |
| 47 | + message = await loop.run_in_executor(None, queue_in.get) |
| 48 | + print(f'pool server got message {message}') |
| 49 | + if message == 'stop': |
| 50 | + print('shutting down pool server') |
| 51 | + pool.shutdown() |
| 52 | + break |
| 53 | + else: |
| 54 | + times = int(message.strip()) |
| 55 | + print('creating cleared event task') |
| 56 | + cleared_event = asyncio.create_task(pool.events.work_cleared.wait()) |
| 57 | + print('creating tasks for submissions') |
| 58 | + submissions = [pool.dispatch_task({'task': function, 'uuid': str(i)}) for i in range(times)] |
| 59 | + print('awaiting submission task') |
| 60 | + await asyncio.gather(*submissions) |
| 61 | + print('waiting for cleared event') |
| 62 | + await cleared_event |
| 63 | + pool.events.work_cleared.clear() |
| 64 | + await loop.run_in_executor(None, queue_out.put, 'done') |
| 65 | + print('exited forever loop of pool server') |
| 66 | + |
| 67 | + @classmethod |
| 68 | + def run_pool_loop(cls, config, queue_in, queue_out, workers, **kwargs): |
| 69 | + loop = asyncio.get_event_loop() |
| 70 | + try: |
| 71 | + loop.run_until_complete(cls.run_pool(config, queue_in, queue_out, workers, **kwargs)) |
| 72 | + except Exception: |
| 73 | + import traceback |
| 74 | + |
| 75 | + traceback.print_exc() |
| 76 | + # We are in a subprocess here, so even if we handle the exception |
| 77 | + # the main process will not know and still wait forever |
| 78 | + # so give them a kick on our way out |
| 79 | + print('sending error message after error') |
| 80 | + queue_out.put('error') |
| 81 | + finally: |
| 82 | + print('closing asyncio loop') |
| 83 | + loop.close() |
| 84 | + print('finished closing async loop') |
| 85 | + |
| 86 | + def start_server(self, workers, **kwargs): |
| 87 | + self.queue_in = multiprocessing.Queue() |
| 88 | + self.queue_out = multiprocessing.Queue() |
| 89 | + process = multiprocessing.Process(target=self.run_pool_loop, args=(self.config, self.queue_in, self.queue_out, workers), kwargs=kwargs) |
| 90 | + process.start() |
| 91 | + return process |
| 92 | + |
| 93 | + @contextlib.contextmanager |
| 94 | + def with_server(self, *args, **kwargs): |
| 95 | + process = self.start_server(*args, **kwargs) |
| 96 | + msg = self.queue_out.get() |
| 97 | + if msg != 'ready': |
| 98 | + raise RuntimeError('never got ready message from subprocess') |
| 99 | + try: |
| 100 | + yield self |
| 101 | + finally: |
| 102 | + self.queue_in.put('stop') |
| 103 | + process.terminate() # SIGTERM |
| 104 | + # Poll to close process resources, due to race condition where it is not still running |
| 105 | + for i in range(3): |
| 106 | + time.sleep(0.1) |
| 107 | + try: |
| 108 | + process.close() |
| 109 | + break |
| 110 | + except Exception: |
| 111 | + if i == 2: |
| 112 | + raise |
| 113 | + |
| 114 | + |
| 115 | +class FullServer(PoolServer): |
| 116 | + def run_benchmark_test(self, queue_in, queue_out, times): |
| 117 | + print('sending wakeup message to set new clear event') |
| 118 | + queue_in.put('wake') |
| 119 | + print('sending pg_notify messages') |
| 120 | + function = 'lambda: __import__("time").sleep(0.01)' |
| 121 | + conn = create_connection(**self.config['brokers']['pg_notify']['config']) |
| 122 | + with conn.cursor() as cur: |
| 123 | + for i in range(times): |
| 124 | + cur.execute(f"SELECT pg_notify('test_channel', '{function}');") |
| 125 | + print('waiting for reply message from pool server') |
| 126 | + message_in = queue_out.get() |
| 127 | + print(f'finished running round with {times} messages, got: {message_in}') |
| 128 | + |
| 129 | + @classmethod |
| 130 | + async def run_pool(cls, config, queue_in, queue_out, workers): |
| 131 | + this_config = config.copy() |
| 132 | + this_config['service']['pool_kwargs']['max_workers'] = workers |
| 133 | + this_config['service']['pool_kwargs']['min_workers'] = workers |
| 134 | + dispatcher = from_settings(DispatcherSettings(this_config)) |
| 135 | + await dispatcher.start_working() |
| 136 | + # Make sure the dispatcher is listening before starting the tests which will submit messages |
| 137 | + for producer in dispatcher.producers: |
| 138 | + await producer.events.ready_event.wait() |
| 139 | + queue_out.put('ready') |
| 140 | + |
| 141 | + print('waiting for message to start test') |
| 142 | + loop = asyncio.get_event_loop() |
| 143 | + while True: |
| 144 | + print('pool server listening on queue_in') |
| 145 | + message = await loop.run_in_executor(None, queue_in.get) |
| 146 | + print(f'pool server got message {message}') |
| 147 | + if message == 'stop': |
| 148 | + print('shutting down server') |
| 149 | + dispatcher.shutdown() |
| 150 | + break |
| 151 | + print('creating cleared event task') |
| 152 | + cleared_event = asyncio.create_task(dispatcher.pool.events.queue_cleared.wait()) |
| 153 | + print('waiting for cleared event') |
| 154 | + await cleared_event |
| 155 | + dispatcher.pool.events.queue_cleared.clear() |
| 156 | + await loop.run_in_executor(None, queue_out.put, 'done') |
| 157 | + print('exited forever loop of pool server') |
| 158 | + |
| 159 | + |
| 160 | +@pytest.fixture |
| 161 | +def benchmark_config(test_config): |
| 162 | + config = deepcopy(test_config) |
| 163 | + config['service']['main_kwargs']['node_id'] = 'benchmark-server' |
| 164 | + return config |
| 165 | + |
| 166 | + |
| 167 | +@pytest.fixture |
| 168 | +def benchmark_settings(benchmark_config): |
| 169 | + return DispatcherSettings(benchmark_config) |
| 170 | + |
| 171 | + |
| 172 | +@pytest.fixture |
| 173 | +def with_pool_server(benchmark_config): |
| 174 | + server_thing = PoolServer(benchmark_config) |
| 175 | + return server_thing.with_server |
| 176 | + |
| 177 | + |
| 178 | +@pytest.fixture |
| 179 | +def with_full_server(benchmark_config): |
| 180 | + server_thing = FullServer(benchmark_config) |
| 181 | + return server_thing.with_server |
0 commit comments