From fa1c222aee6c77481c10f047886772d6a7649cdf Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Sat, 8 Mar 2025 08:26:23 -0500 Subject: [PATCH 1/4] Add socket broker Protocol the brokers Run demo dispatcherctl over socket Add socket broker unit tests Add socket broker usage integration tests Work out issues with test scope and server not opening client connections Finish type hinting Fix contract problem Incremental JSON parse Run linters --- dispatcher.yml | 3 + dispatcherd/brokers/pg_notify.py | 9 +- dispatcherd/brokers/socket.py | 223 +++++++++++++++++++++++ dispatcherd/control.py | 8 +- dispatcherd/factories.py | 10 +- dispatcherd/producers/brokered.py | 13 +- dispatcherd/protocols.py | 8 +- dispatcherd/service/main.py | 9 +- schema.json | 11 +- tests/conftest.py | 57 +++--- tests/integration/brokers/test_socket.py | 75 ++++++++ tests/integration/test_socket_use.py | 86 +++++++++ 12 files changed, 464 insertions(+), 48 deletions(-) create mode 100644 dispatcherd/brokers/socket.py create mode 100644 tests/integration/brokers/test_socket.py create mode 100644 tests/integration/test_socket_use.py diff --git a/dispatcher.yml b/dispatcher.yml index ab22d20..d44ec63 100644 --- a/dispatcher.yml +++ b/dispatcher.yml @@ -21,6 +21,8 @@ brokers: default_publish_channel: test_channel max_connection_idle_seconds: 5 max_self_check_message_age_seconds: 2 + socket: + socket_path: demo_dispatcher.sock producers: ScheduledProducer: task_schedule: @@ -32,4 +34,5 @@ producers: task_list: 'lambda: print("This task runs on startup")': {} publish: + default_control_broker: socket default_broker: pg_notify diff --git a/dispatcherd/brokers/pg_notify.py b/dispatcherd/brokers/pg_notify.py index cbaa28e..a403c6a 100644 --- a/dispatcherd/brokers/pg_notify.py +++ b/dispatcherd/brokers/pg_notify.py @@ -10,6 +10,8 @@ from ..protocols import BrokerSelfCheckStatus from ..utils import resolve_callable +from ..protocols import Broker as BrokerProtocol + logger = logging.getLogger(__name__) @@ -36,7 +38,7 @@ def create_connection(**config) -> psycopg.Connection: # type: ignore[no-untype return connection -class Broker: +class Broker(BrokerProtocol): NOTIFY_QUERY_TEMPLATE = 'SELECT pg_notify(%s, %s);' def __init__( @@ -130,6 +132,9 @@ def get_publish_channel(self, channel: Optional[str] = None) -> str: raise ValueError('Could not determine a channel to use publish to from settings or PGNotify config') + def __str__(self) -> str: + return 'pg_notify-broker' + # --- asyncio connection methods --- async def aget_connection(self) -> psycopg.AsyncConnection: @@ -227,7 +232,7 @@ async def apublish_message_from_cursor(self, cursor: psycopg.AsyncCursor, channe """The inner logic of async message publishing where we already have a cursor""" await cursor.execute(self.NOTIFY_QUERY_TEMPLATE, (channel, message)) - async def apublish_message(self, channel: Optional[str] = None, message: str = '') -> None: # public + async def apublish_message(self, channel: Optional[str] = None, origin: Union[str, int, None] = '', message: str = '') -> None: # public """asyncio way to publish a message, used to send control in control-and-reply Not strictly necessary for the service itself if it sends replies in the workers, diff --git a/dispatcherd/brokers/socket.py b/dispatcherd/brokers/socket.py new file mode 100644 index 0000000..a30958a --- /dev/null +++ b/dispatcherd/brokers/socket.py @@ -0,0 +1,223 @@ +import asyncio +import json +import logging +import os +import socket +from typing import Any, AsyncGenerator, Callable, Coroutine, Iterator, Optional, Union + +from ..protocols import Broker as BrokerProtocol + +logger = logging.getLogger(__name__) + + +class Client: + def __init__(self, client_id: int, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + self.client_id = client_id + self.reader = reader + self.writer = writer + self.listen_loop_active = False + # This is needed for task management betewen the client tasks and the main aprocess_notify + # if the client task starts listening, then we can not send replies + # so this waits for the caller method to add replies to stack before continuing + self.yield_clear = asyncio.Event() + self.replies_to_send: list = [] + + def write(self, message: str, /) -> None: + self.writer.write((message + '\n').encode()) + + def queue_reply(self, reply: str, /) -> None: + self.replies_to_send.append(reply) + + async def send_replies(self) -> None: + for reply in self.replies_to_send.copy(): + logger.info(f'Sending reply to client_id={self.client_id} len={len(reply)}') + self.write(reply) + else: + logger.info(f'No replies to send to client_id={self.client_id}') + await self.writer.drain() + self.replies_to_send = [] + + +def extract_json(message: str) -> Iterator[str]: + """With message that may be an incomplete JSON string, yield JSON-complete strings and leftover""" + decoder = json.JSONDecoder() + pos = 0 + length = len(message) + while pos < length: + try: + _, index = decoder.raw_decode(message, pos) + json_msg = message[pos:index] + yield json_msg + pos = index + except json.JSONDecodeError: + break + + +class Broker(BrokerProtocol): + """A Unix socket client for dispatcher as simple as possible + + Because we want to be as simple as possible we do not maintain persistent connections. + So every control-and-reply command will connect and disconnect. + + Intended use is for dispatcherctl, so that we may bypass any flake related to pg_notify + for debugging information. + """ + + def __init__(self, socket_path: str) -> None: + self.socket_path = socket_path + self.client_ct = 0 + self.clients: dict[int, Client] = {} + self.sock: Optional[socket.socket] = None # for synchronous clients + self.incoming_queue: asyncio.Queue = asyncio.Queue() + + def __str__(self) -> str: + return f'socket-broker-{self.socket_path}' + + async def _add_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + client = Client(self.client_ct, reader, writer) + self.clients[self.client_ct] = client + self.client_ct += 1 + logger.info(f'Socket client_id={client.client_id} is connected') + + try: + client.listen_loop_active = True + while True: + line = await client.reader.readline() + if not line: + break # disconnect + message = line.decode().strip() + await self.incoming_queue.put((client.client_id, message)) + # Wait for caller to potentially fill a reply queue + # this should realistically never take more than a trivial amount of time + await asyncio.wait_for(client.yield_clear.wait(), timeout=2) + client.yield_clear.clear() + await client.send_replies() + except asyncio.TimeoutError: + logger.error(f'Unexpected asyncio task management bug for client_id={client.client_id}, exiting') + except asyncio.CancelledError: + logger.debug(f'Ack that reader task for client_id={client.client_id} has been canceled') + except Exception: + logger.exception(f'Exception from reader task for client_id={client.client_id}') + finally: + del self.clients[client.client_id] + client.writer.close() + await client.writer.wait_closed() + logger.info(f'Socket client_id={client.client_id} is disconnected') + + async def aprocess_notify( + self, connected_callback: Optional[Callable[[], Coroutine[Any, Any, None]]] = None + ) -> AsyncGenerator[tuple[Union[int, str], str], None]: + if os.path.exists(self.socket_path): + logger.debug(f'Deleted pre-existing {self.socket_path}') + os.remove(self.socket_path) + + aserver = None + try: + aserver = await asyncio.start_unix_server(self._add_client, self.socket_path) + logger.info(f'Set up socket server on {self.socket_path}') + + if connected_callback: + await connected_callback() + + while True: + client_id, message = await self.incoming_queue.get() + if (client_id == -1) and (message == 'stop'): + return # internal exit signaling from aclose + + yield client_id, message + # trigger reply messages if applicable + client = self.clients.get(client_id) + if client: + logger.info(f'Yield complete for client_id={client_id}') + client.yield_clear.set() + + except asyncio.CancelledError: + logger.debug('Ack that general socket server task has been canceled') + finally: + if aserver: + aserver.close() + await aserver.wait_closed() + + for client in self.clients.values(): + client.writer.close() + await client.writer.wait_closed() + self.clients = {} + + if os.path.exists(self.socket_path): + os.remove(self.socket_path) + + async def aclose(self) -> None: + """Send an internal message to the async generator, which will cause it to close the server""" + await self.incoming_queue.put((-1, 'stop')) + + async def apublish_message(self, channel: Optional[str] = '', origin: Union[int, str, None] = None, message: str = "") -> None: + if isinstance(origin, int) and origin >= 0: + client = self.clients.get(int(origin)) + if client: + if client.listen_loop_active: + logger.info(f'Queued message len={len(message)} for client_id={origin}') + client.queue_reply(message) + else: + logger.warning(f'Not currently listening to client_id={origin}, attempting reply len={len(message)}, but might be dropped') + client.write(message) + await client.writer.drain() + else: + logger.error(f'Client_id={origin} is not currently connected') + else: + # Acting as a client in this case, mostly for tests + logger.info(f'Publishing async socket message len={len(message)} with new connection') + writer = None + try: + _, writer = await asyncio.open_unix_connection(self.socket_path) + writer.write((message + '\n').encode()) + await writer.drain() + finally: + if writer: + writer.close() + await writer.wait_closed() + + def process_notify( + self, connected_callback: Optional[Callable] = None, timeout: float = 5.0, max_messages: int = 1 + ) -> Iterator[tuple[Union[int, str], str]]: + try: + with socket.socket(socket.AF_UNIX) as sock: + self.sock = sock + sock.settimeout(timeout) + sock.connect(self.socket_path) + + if connected_callback: + connected_callback() + + received_ct = 0 + buffer = '' + while True: + response = sock.recv(1024).decode().strip() + + current_message = buffer + response + yielded_chars = 0 + for complete_msg in extract_json(current_message): + received_ct += 1 + yield (0, complete_msg) + if received_ct >= max_messages: + return + yielded_chars += len(complete_msg) + else: + buffer = current_message[yielded_chars:] + logger.info(f'Received incomplete message len={len(buffer)}, adding to buffer') + + finally: + self.sock = None + + def _publish_from_sock(self, sock: socket.socket, message: str) -> None: + sock.sendall((message + "\n").encode()) + + def publish_message(self, channel: Optional[str] = None, message: Optional[str] = None) -> None: + assert isinstance(message, str) + if self.sock: + logger.info(f'Publishing socket message len={len(message)} via existing connection') + self._publish_from_sock(self.sock, message) + else: + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: + sock.connect(self.socket_path) + logger.info(f'Publishing socket message len={len(message)} over new connection') + self._publish_from_sock(sock, message) diff --git a/dispatcherd/control.py b/dispatcherd/control.py index 7091503..56d0232 100644 --- a/dispatcherd/control.py +++ b/dispatcherd/control.py @@ -21,7 +21,7 @@ def __init__(self, queuename: Optional[str], broker: Broker, send_message: str, self.expected_replies = expected_replies async def connected_callback(self) -> None: - await self.broker.apublish_message(self.queuename, self.send_message) + await self.broker.apublish_message(channel=self.queuename, message=self.send_message) async def listen_for_replies(self) -> None: """Listen to the reply channel until we get the expected number of messages. @@ -94,12 +94,14 @@ async def acontrol(self, command: str, data: Optional[dict] = None) -> None: await broker.aclose() def control_with_reply(self, command: str, expected_replies: int = 1, timeout: float = 1.0, data: Optional[dict] = None) -> list[dict]: - logger.info(f'control-and-reply {command} to {self.queuename}') start = time.time() reply_queue = Control.generate_reply_queue_name() send_message = self.create_message(command=command, reply_to=reply_queue, send_data=data) - broker = get_broker(self.broker_name, self.broker_config, channels=[reply_queue]) + try: + broker = get_broker(self.broker_name, self.broker_config, channels=[reply_queue]) + except TypeError: + broker = get_broker(self.broker_name, self.broker_config) def connected_callback() -> None: broker.publish_message(channel=self.queuename, message=send_message) diff --git a/dispatcherd/factories.py b/dispatcherd/factories.py index a0a56bc..30f5b8e 100644 --- a/dispatcherd/factories.py +++ b/dispatcherd/factories.py @@ -91,10 +91,14 @@ def get_publisher_from_settings(publish_broker: Optional[str] = None, settings: def get_control_from_settings(publish_broker: Optional[str] = None, settings: LazySettings = global_settings, **overrides): - publish_broker = _get_publisher_broker_name(publish_broker=publish_broker, settings=settings) - broker_options = settings.brokers[publish_broker].copy() + """Returns a Control instance based on the values in settings""" + if 'default_control_broker' in settings.publish: + result_publish_broker = settings.publish['default_control_broker'] + else: + result_publish_broker = _get_publisher_broker_name(publish_broker=publish_broker, settings=settings) + broker_options = settings.brokers[result_publish_broker].copy() broker_options.update(overrides) - return Control(publish_broker, broker_options) + return Control(result_publish_broker, broker_options) # ---- Schema generation ---- diff --git a/dispatcherd/producers/brokered.py b/dispatcherd/producers/brokered.py index 37e478d..5674378 100644 --- a/dispatcherd/producers/brokered.py +++ b/dispatcherd/producers/brokered.py @@ -1,6 +1,6 @@ import asyncio import logging -from typing import Iterable, Optional +from typing import Iterable, Optional, Union from ..protocols import Broker, DispatcherMain from .base import BaseProducer @@ -28,6 +28,9 @@ async def recycle(self) -> None: assert self.dispatcher await self.start_producing(self.dispatcher) + def __str__(self) -> str: + return f'brokered-producer-{self.broker}' + async def start_producing(self, dispatcher: DispatcherMain) -> None: self.production_task = asyncio.create_task(self.produce_forever(dispatcher), name=f'{self.broker.__module__}_production') @@ -46,12 +49,12 @@ async def produce_forever(self, dispatcher: DispatcherMain) -> None: self.dispatcher = dispatcher async for channel, payload in self.broker.aprocess_notify(connected_callback=self.connected_callback): self.produced_count += 1 - reply_to, reply_payload = await dispatcher.process_message(payload, producer=self, channel=channel) + reply_to, reply_payload = await dispatcher.process_message(payload, producer=self, channel=str(channel)) if reply_to and reply_payload: - await self.notify(channel=reply_to, message=reply_payload) + await self.notify(channel=reply_to, origin=channel, message=reply_payload) - async def notify(self, channel: Optional[str] = None, message: str = '') -> None: - await self.broker.apublish_message(channel=channel, message=message) + async def notify(self, channel: Optional[str] = None, origin: Optional[Union[int, str]] = None, message: str = '') -> None: + await self.broker.apublish_message(channel=channel, origin=origin, message=message) async def shutdown(self) -> None: if self.production_task: diff --git a/dispatcherd/protocols.py b/dispatcherd/protocols.py index fcd973e..953cbd4 100644 --- a/dispatcherd/protocols.py +++ b/dispatcherd/protocols.py @@ -20,7 +20,7 @@ class Broker(Protocol): async def aprocess_notify( self, connected_callback: Optional[Optional[Callable[[], Coroutine[Any, Any, None]]]] = None - ) -> AsyncGenerator[tuple[str, str], None]: + ) -> AsyncGenerator[tuple[Union[int, str], str], None]: """The generator of messages from the broker for the dispatcherd service The producer iterates this to produce tasks. @@ -28,7 +28,7 @@ async def aprocess_notify( """ yield ('', '') # yield affects CPython type https://github.com/python/mypy/pull/18422 - async def apublish_message(self, channel: Optional[str] = None, message: str = '') -> None: + async def apublish_message(self, channel: Optional[str] = None, origin: Union[int, str, None] = None, message: str = '') -> None: """Asynchronously send a message to the broker, used by dispatcherd service for reply messages""" ... @@ -36,7 +36,9 @@ async def aclose(self) -> None: """Close the asynchronous connection, used by service, and optionally by publishers""" ... - def process_notify(self, connected_callback: Optional[Callable] = None, timeout: float = 5.0, max_messages: int = 1) -> Iterator[tuple[str, str]]: + def process_notify( + self, connected_callback: Optional[Callable] = None, timeout: float = 5.0, max_messages: int = 1 + ) -> Iterator[tuple[Union[int, str], str]]: """Synchronous method to generate messages from broker, used for synchronous control-and-reply""" ... diff --git a/dispatcherd/service/main.py b/dispatcherd/service/main.py index 8797587..854a41a 100644 --- a/dispatcherd/service/main.py +++ b/dispatcherd/service/main.py @@ -184,10 +184,11 @@ async def run_control_action(self, action: str, control_data: Optional[dict] = N # Give Nones for no reply, or the reply if reply_to: - logger.info(f"Control action {action} returned {return_data}, sending back reply") - return (reply_to, json.dumps(return_data)) + reply_msg = json.dumps(return_data) + logger.info(f"Control action {action} returned message len={len(reply_msg)}, sending back reply") + return (reply_to, reply_msg) else: - logger.info(f"Control action {action} returned {return_data}, done") + logger.info(f"Control action {action} returned {type(return_data)}, done") return (None, None) async def process_message_internal(self, message: dict, producer: Optional[Producer] = None) -> tuple[Optional[str], Optional[str]]: @@ -206,9 +207,9 @@ async def start_working(self) -> None: logger.exception(f'Pool {self.pool} failed to start working') self.events.exit_event.set() - logger.debug('Starting task production') async with self.fd_lock: # lots of connecting going on here for producer in self.producers: + logger.debug(f'Starting task production from {producer}') try: await producer.start_producing(self) except Exception: diff --git a/schema.json b/schema.json index 5fdfc6b..bba07e4 100644 --- a/schema.json +++ b/schema.json @@ -9,6 +9,9 @@ "default_publish_channel": "typing.Optional[str]", "max_connection_idle_seconds": "typing.Optional[int]", "max_self_check_message_age_seconds": "typing.Optional[int]" + }, + "socket": { + "socket_path": "" } }, "producers": { @@ -28,13 +31,13 @@ "worker_stop_wait": "", "worker_removal_wait": "" }, + "main_kwargs": { + "node_id": "typing.Optional[str]" + }, "process_manager_kwargs": { "preload_modules": "typing.Optional[list[str]]" }, - "process_manager_cls": "typing.Literal['ProcessManager', 'ForkServerManager']", - "main_kwargs": { - "node_id": "typing.Optional[str]" - } + "process_manager_cls": "typing.Literal['ProcessManager', 'ForkServerManager']" }, "publish": { "default_broker": "str" diff --git a/tests/conftest.py b/tests/conftest.py index f76c0e5..07df3b6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -88,37 +88,46 @@ def test_settings(): return DispatcherSettings(BASIC_CONFIG) +@pytest.fixture +def adispatcher_factory(): + @contextlib.asynccontextmanager + async def _rf(config): + dispatcher = None + try: + settings = DispatcherSettings(config) + dispatcher = from_settings(settings=settings) + + await dispatcher.connect_signals() + await dispatcher.start_working() + await dispatcher.wait_for_producers_ready() + await dispatcher.pool.events.workers_ready.wait() + + assert dispatcher.pool.finished_count == 0 # sanity + assert dispatcher.control_count == 0 + + yield dispatcher + finally: + if dispatcher: + try: + await dispatcher.shutdown() + await dispatcher.cancel_tasks() + except Exception: + logger.exception('shutdown had error') + return _rf + + @pytest_asyncio.fixture( loop_scope="function", scope="function", params=['ProcessManager', 'ForkServerManager'], ids=["fork", "forkserver"], ) -async def apg_dispatcher(request) -> AsyncIterator[DispatcherMain]: - dispatcher = None - try: - this_test_config = BASIC_CONFIG.copy() - this_test_config.setdefault('service', {}) - this_test_config['service']['process_manager_cls'] = request.param - this_settings = DispatcherSettings(this_test_config) - dispatcher = from_settings(settings=this_settings) - - await dispatcher.connect_signals() - await dispatcher.start_working() - await dispatcher.wait_for_producers_ready() - await dispatcher.pool.events.workers_ready.wait() - - assert dispatcher.pool.finished_count == 0 # sanity - assert dispatcher.control_count == 0 - +async def apg_dispatcher(request, adispatcher_factory) -> AsyncIterator[DispatcherMain]: + this_test_config = BASIC_CONFIG.copy() + this_test_config.setdefault('service', {}) + this_test_config['service']['process_manager_cls'] = request.param + async with adispatcher_factory(this_test_config) as dispatcher: yield dispatcher - finally: - if dispatcher: - try: - await dispatcher.shutdown() - await dispatcher.cancel_tasks() - except Exception: - logger.exception('shutdown had error') @pytest_asyncio.fixture(loop_scope="function", scope="function") diff --git a/tests/integration/brokers/test_socket.py b/tests/integration/brokers/test_socket.py new file mode 100644 index 0000000..e999509 --- /dev/null +++ b/tests/integration/brokers/test_socket.py @@ -0,0 +1,75 @@ +import asyncio +import socket +import time +import threading + +import pytest + +from dispatcher.brokers.socket import Broker + + +@pytest.fixture +def socket_path(tmp_path): + return str(tmp_path / 'test_sock.sock') + + +@pytest.mark.asyncio +async def test_basic_receive(socket_path): + server_broker = Broker(socket_path=socket_path) + client_broker = Broker(socket_path=socket_path) + + server_is_ready = asyncio.Event() + + async def on_connect(): + server_is_ready.set() + + received = [] + async def save_local(): + async for client_id, msg in server_broker.aprocess_notify(connected_callback=on_connect): + received.append((client_id, msg)) + + asyncio.create_task(save_local()) + + await asyncio.wait_for(server_is_ready.wait(), timeout=2) + + for msg in ('test1', 'test2'): + await client_broker.apublish_message(message=msg) + + for i in range(20): + if len(received) >= 2: + break + await asyncio.sleep(0.01) + else: + assert 'Failed to receive expected 2 messages' + + await server_broker.aclose() + + assert received == [(0, 'test1'), (1, 'test2')] + + +@pytest.mark.asyncio +async def test_synchronous_listen_timeout(socket_path): + client_broker = Broker(socket_path=socket_path) + + server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + server.bind(socket_path) + + def run_server(): + server.listen(1) + conn, addr = server.accept() + conn.recv(1024) + print('got message, exiting thread') + + server_thread = threading.Thread(target=run_server) + server_thread.start() + + start = time.monotonic() + received = None + with pytest.raises(TimeoutError): + received = [msg for _, msg in client_broker.process_notify(timeout=0.01)] + delta = time.monotonic() - start + assert delta >= 0.01 + + assert received is None + server_thread.join() + assert not server_thread.is_alive() # should have exited after getting message diff --git a/tests/integration/test_socket_use.py b/tests/integration/test_socket_use.py new file mode 100644 index 0000000..bdbc5ca --- /dev/null +++ b/tests/integration/test_socket_use.py @@ -0,0 +1,86 @@ +import asyncio +import logging +import os +from typing import AsyncIterator, Callable + +import pytest +import pytest_asyncio + +from dispatcher.config import DispatcherSettings +from dispatcher.control import Control +from dispatcher.factories import from_settings, get_control_from_settings, get_publisher_from_settings +from dispatcher.protocols import DispatcherMain + +logger = logging.getLogger(__name__) + + +@pytest.fixture(scope='session') +def sock_path(tmp_path_factory): + return str(tmp_path_factory.mktemp("socket") / 'test.sock') + + +@pytest.fixture(scope='session') +def socket_config(sock_path): + return {"version": 2, "brokers": {"socket": {"socket_path": sock_path}}, "service": {"main_kwargs": {"node_id": "socket-test-server"}}} + + +@pytest.fixture(scope='session') +def socket_settings(socket_config): + return DispatcherSettings(socket_config) + + +@pytest_asyncio.fixture +async def asock_dispatcher(socket_config, adispatcher_factory) -> AsyncIterator[DispatcherMain]: + async with adispatcher_factory(socket_config) as dispatcher: + yield dispatcher + + +@pytest_asyncio.fixture +async def sock_control(socket_settings) -> AsyncIterator[Control]: + return get_control_from_settings(settings=socket_settings) + + +@pytest_asyncio.fixture +async def sock_broker(socket_settings) -> Callable: + broker = get_publisher_from_settings(settings=socket_settings) + assert not broker.clients # make sure this is new for client, not the server + return broker + + +@pytest.mark.asyncio +async def test_run_lambda_function_socket(asock_dispatcher, sock_broker): + starting_ct = asock_dispatcher.pool.finished_count + clearing_task = asyncio.create_task(asock_dispatcher.pool.events.work_cleared.wait(), name='test_lambda_clear_wait') + + assert sock_broker.sock is None # again, confirm this is a distinct client broker + await sock_broker.apublish_message(message='lambda: "This worked!"') + + await asyncio.wait_for(clearing_task, timeout=1) + + assert asock_dispatcher.pool.finished_count == starting_ct + 1 + + +@pytest.mark.asyncio +async def test_run_lambda_function_socket_sync_client(asock_dispatcher, sock_broker): + starting_ct = asock_dispatcher.pool.finished_count + clearing_task = asyncio.create_task(asock_dispatcher.pool.events.work_cleared.wait(), name='test_lambda_clear_wait') + + sock_broker.publish_message(message='lambda: "This worked!"') + + await asyncio.wait_for(clearing_task, timeout=1) + + assert asock_dispatcher.pool.finished_count == starting_ct + 1 + + +@pytest.mark.asyncio +async def test_simple_control_and_reply(asock_dispatcher, sock_control): + loop = asyncio.get_event_loop() + + def alive_cmd(): + return sock_control.control_with_reply('alive') + + alive = await loop.run_in_executor(None, alive_cmd) + assert len(alive) == 1 + data = alive[0] + + assert data['node_id'] == 'socket-test-server' From 99991e4c47439aa1ec4525bb4ed88ad01c11b108 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 24 Mar 2025 09:41:27 -0400 Subject: [PATCH 2/4] Clean empty directories with the clean target as well --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index 8af7880..c7ec2c3 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,7 @@ clean: find . -type d -name "__pycache__" -delete rm -rf dispatcherd.egg-info/ rm -rf dist/ + find . -mindepth 1 -type d -empty -delete linters: black dispatcherd/ From 89623b7c082726eff03ca3d5d63528cfecf7a9a8 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 24 Mar 2025 09:42:16 -0400 Subject: [PATCH 3/4] Run isort --- dispatcherd/brokers/pg_notify.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dispatcherd/brokers/pg_notify.py b/dispatcherd/brokers/pg_notify.py index a403c6a..66bacfc 100644 --- a/dispatcherd/brokers/pg_notify.py +++ b/dispatcherd/brokers/pg_notify.py @@ -7,11 +7,10 @@ import psycopg +from ..protocols import Broker as BrokerProtocol from ..protocols import BrokerSelfCheckStatus from ..utils import resolve_callable -from ..protocols import Broker as BrokerProtocol - logger = logging.getLogger(__name__) From 7710c117e6d633566b78e36ba7a5ad5eaf03b401 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 24 Mar 2025 10:57:37 -0400 Subject: [PATCH 4/4] Update import path --- tests/integration/brokers/test_socket.py | 2 +- tests/integration/test_socket_use.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/integration/brokers/test_socket.py b/tests/integration/brokers/test_socket.py index e999509..5d49a4c 100644 --- a/tests/integration/brokers/test_socket.py +++ b/tests/integration/brokers/test_socket.py @@ -5,7 +5,7 @@ import pytest -from dispatcher.brokers.socket import Broker +from dispatcherd.brokers.socket import Broker @pytest.fixture diff --git a/tests/integration/test_socket_use.py b/tests/integration/test_socket_use.py index bdbc5ca..eacf094 100644 --- a/tests/integration/test_socket_use.py +++ b/tests/integration/test_socket_use.py @@ -6,10 +6,10 @@ import pytest import pytest_asyncio -from dispatcher.config import DispatcherSettings -from dispatcher.control import Control -from dispatcher.factories import from_settings, get_control_from_settings, get_publisher_from_settings -from dispatcher.protocols import DispatcherMain +from dispatcherd.config import DispatcherSettings +from dispatcherd.control import Control +from dispatcherd.factories import from_settings, get_control_from_settings, get_publisher_from_settings +from dispatcherd.protocols import DispatcherMain logger = logging.getLogger(__name__)