diff --git a/Makefile b/Makefile index 8af7880..c7ec2c3 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,7 @@ clean: find . -type d -name "__pycache__" -delete rm -rf dispatcherd.egg-info/ rm -rf dist/ + find . -mindepth 1 -type d -empty -delete linters: black dispatcherd/ diff --git a/dispatcher.yml b/dispatcher.yml index ab22d20..d44ec63 100644 --- a/dispatcher.yml +++ b/dispatcher.yml @@ -21,6 +21,8 @@ brokers: default_publish_channel: test_channel max_connection_idle_seconds: 5 max_self_check_message_age_seconds: 2 + socket: + socket_path: demo_dispatcher.sock producers: ScheduledProducer: task_schedule: @@ -32,4 +34,5 @@ producers: task_list: 'lambda: print("This task runs on startup")': {} publish: + default_control_broker: socket default_broker: pg_notify diff --git a/dispatcherd/brokers/pg_notify.py b/dispatcherd/brokers/pg_notify.py index cbaa28e..66bacfc 100644 --- a/dispatcherd/brokers/pg_notify.py +++ b/dispatcherd/brokers/pg_notify.py @@ -7,6 +7,7 @@ import psycopg +from ..protocols import Broker as BrokerProtocol from ..protocols import BrokerSelfCheckStatus from ..utils import resolve_callable @@ -36,7 +37,7 @@ def create_connection(**config) -> psycopg.Connection: # type: ignore[no-untype return connection -class Broker: +class Broker(BrokerProtocol): NOTIFY_QUERY_TEMPLATE = 'SELECT pg_notify(%s, %s);' def __init__( @@ -130,6 +131,9 @@ def get_publish_channel(self, channel: Optional[str] = None) -> str: raise ValueError('Could not determine a channel to use publish to from settings or PGNotify config') + def __str__(self) -> str: + return 'pg_notify-broker' + # --- asyncio connection methods --- async def aget_connection(self) -> psycopg.AsyncConnection: @@ -227,7 +231,7 @@ async def apublish_message_from_cursor(self, cursor: psycopg.AsyncCursor, channe """The inner logic of async message publishing where we already have a cursor""" await cursor.execute(self.NOTIFY_QUERY_TEMPLATE, (channel, message)) - async def apublish_message(self, channel: Optional[str] = None, message: str = '') -> None: # public + async def apublish_message(self, channel: Optional[str] = None, origin: Union[str, int, None] = '', message: str = '') -> None: # public """asyncio way to publish a message, used to send control in control-and-reply Not strictly necessary for the service itself if it sends replies in the workers, diff --git a/dispatcherd/brokers/socket.py b/dispatcherd/brokers/socket.py new file mode 100644 index 0000000..a30958a --- /dev/null +++ b/dispatcherd/brokers/socket.py @@ -0,0 +1,223 @@ +import asyncio +import json +import logging +import os +import socket +from typing import Any, AsyncGenerator, Callable, Coroutine, Iterator, Optional, Union + +from ..protocols import Broker as BrokerProtocol + +logger = logging.getLogger(__name__) + + +class Client: + def __init__(self, client_id: int, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + self.client_id = client_id + self.reader = reader + self.writer = writer + self.listen_loop_active = False + # This is needed for task management betewen the client tasks and the main aprocess_notify + # if the client task starts listening, then we can not send replies + # so this waits for the caller method to add replies to stack before continuing + self.yield_clear = asyncio.Event() + self.replies_to_send: list = [] + + def write(self, message: str, /) -> None: + self.writer.write((message + '\n').encode()) + + def queue_reply(self, reply: str, /) -> None: + self.replies_to_send.append(reply) + + async def send_replies(self) -> None: + for reply in self.replies_to_send.copy(): + logger.info(f'Sending reply to client_id={self.client_id} len={len(reply)}') + self.write(reply) + else: + logger.info(f'No replies to send to client_id={self.client_id}') + await self.writer.drain() + self.replies_to_send = [] + + +def extract_json(message: str) -> Iterator[str]: + """With message that may be an incomplete JSON string, yield JSON-complete strings and leftover""" + decoder = json.JSONDecoder() + pos = 0 + length = len(message) + while pos < length: + try: + _, index = decoder.raw_decode(message, pos) + json_msg = message[pos:index] + yield json_msg + pos = index + except json.JSONDecodeError: + break + + +class Broker(BrokerProtocol): + """A Unix socket client for dispatcher as simple as possible + + Because we want to be as simple as possible we do not maintain persistent connections. + So every control-and-reply command will connect and disconnect. + + Intended use is for dispatcherctl, so that we may bypass any flake related to pg_notify + for debugging information. + """ + + def __init__(self, socket_path: str) -> None: + self.socket_path = socket_path + self.client_ct = 0 + self.clients: dict[int, Client] = {} + self.sock: Optional[socket.socket] = None # for synchronous clients + self.incoming_queue: asyncio.Queue = asyncio.Queue() + + def __str__(self) -> str: + return f'socket-broker-{self.socket_path}' + + async def _add_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + client = Client(self.client_ct, reader, writer) + self.clients[self.client_ct] = client + self.client_ct += 1 + logger.info(f'Socket client_id={client.client_id} is connected') + + try: + client.listen_loop_active = True + while True: + line = await client.reader.readline() + if not line: + break # disconnect + message = line.decode().strip() + await self.incoming_queue.put((client.client_id, message)) + # Wait for caller to potentially fill a reply queue + # this should realistically never take more than a trivial amount of time + await asyncio.wait_for(client.yield_clear.wait(), timeout=2) + client.yield_clear.clear() + await client.send_replies() + except asyncio.TimeoutError: + logger.error(f'Unexpected asyncio task management bug for client_id={client.client_id}, exiting') + except asyncio.CancelledError: + logger.debug(f'Ack that reader task for client_id={client.client_id} has been canceled') + except Exception: + logger.exception(f'Exception from reader task for client_id={client.client_id}') + finally: + del self.clients[client.client_id] + client.writer.close() + await client.writer.wait_closed() + logger.info(f'Socket client_id={client.client_id} is disconnected') + + async def aprocess_notify( + self, connected_callback: Optional[Callable[[], Coroutine[Any, Any, None]]] = None + ) -> AsyncGenerator[tuple[Union[int, str], str], None]: + if os.path.exists(self.socket_path): + logger.debug(f'Deleted pre-existing {self.socket_path}') + os.remove(self.socket_path) + + aserver = None + try: + aserver = await asyncio.start_unix_server(self._add_client, self.socket_path) + logger.info(f'Set up socket server on {self.socket_path}') + + if connected_callback: + await connected_callback() + + while True: + client_id, message = await self.incoming_queue.get() + if (client_id == -1) and (message == 'stop'): + return # internal exit signaling from aclose + + yield client_id, message + # trigger reply messages if applicable + client = self.clients.get(client_id) + if client: + logger.info(f'Yield complete for client_id={client_id}') + client.yield_clear.set() + + except asyncio.CancelledError: + logger.debug('Ack that general socket server task has been canceled') + finally: + if aserver: + aserver.close() + await aserver.wait_closed() + + for client in self.clients.values(): + client.writer.close() + await client.writer.wait_closed() + self.clients = {} + + if os.path.exists(self.socket_path): + os.remove(self.socket_path) + + async def aclose(self) -> None: + """Send an internal message to the async generator, which will cause it to close the server""" + await self.incoming_queue.put((-1, 'stop')) + + async def apublish_message(self, channel: Optional[str] = '', origin: Union[int, str, None] = None, message: str = "") -> None: + if isinstance(origin, int) and origin >= 0: + client = self.clients.get(int(origin)) + if client: + if client.listen_loop_active: + logger.info(f'Queued message len={len(message)} for client_id={origin}') + client.queue_reply(message) + else: + logger.warning(f'Not currently listening to client_id={origin}, attempting reply len={len(message)}, but might be dropped') + client.write(message) + await client.writer.drain() + else: + logger.error(f'Client_id={origin} is not currently connected') + else: + # Acting as a client in this case, mostly for tests + logger.info(f'Publishing async socket message len={len(message)} with new connection') + writer = None + try: + _, writer = await asyncio.open_unix_connection(self.socket_path) + writer.write((message + '\n').encode()) + await writer.drain() + finally: + if writer: + writer.close() + await writer.wait_closed() + + def process_notify( + self, connected_callback: Optional[Callable] = None, timeout: float = 5.0, max_messages: int = 1 + ) -> Iterator[tuple[Union[int, str], str]]: + try: + with socket.socket(socket.AF_UNIX) as sock: + self.sock = sock + sock.settimeout(timeout) + sock.connect(self.socket_path) + + if connected_callback: + connected_callback() + + received_ct = 0 + buffer = '' + while True: + response = sock.recv(1024).decode().strip() + + current_message = buffer + response + yielded_chars = 0 + for complete_msg in extract_json(current_message): + received_ct += 1 + yield (0, complete_msg) + if received_ct >= max_messages: + return + yielded_chars += len(complete_msg) + else: + buffer = current_message[yielded_chars:] + logger.info(f'Received incomplete message len={len(buffer)}, adding to buffer') + + finally: + self.sock = None + + def _publish_from_sock(self, sock: socket.socket, message: str) -> None: + sock.sendall((message + "\n").encode()) + + def publish_message(self, channel: Optional[str] = None, message: Optional[str] = None) -> None: + assert isinstance(message, str) + if self.sock: + logger.info(f'Publishing socket message len={len(message)} via existing connection') + self._publish_from_sock(self.sock, message) + else: + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: + sock.connect(self.socket_path) + logger.info(f'Publishing socket message len={len(message)} over new connection') + self._publish_from_sock(sock, message) diff --git a/dispatcherd/control.py b/dispatcherd/control.py index 7091503..56d0232 100644 --- a/dispatcherd/control.py +++ b/dispatcherd/control.py @@ -21,7 +21,7 @@ def __init__(self, queuename: Optional[str], broker: Broker, send_message: str, self.expected_replies = expected_replies async def connected_callback(self) -> None: - await self.broker.apublish_message(self.queuename, self.send_message) + await self.broker.apublish_message(channel=self.queuename, message=self.send_message) async def listen_for_replies(self) -> None: """Listen to the reply channel until we get the expected number of messages. @@ -94,12 +94,14 @@ async def acontrol(self, command: str, data: Optional[dict] = None) -> None: await broker.aclose() def control_with_reply(self, command: str, expected_replies: int = 1, timeout: float = 1.0, data: Optional[dict] = None) -> list[dict]: - logger.info(f'control-and-reply {command} to {self.queuename}') start = time.time() reply_queue = Control.generate_reply_queue_name() send_message = self.create_message(command=command, reply_to=reply_queue, send_data=data) - broker = get_broker(self.broker_name, self.broker_config, channels=[reply_queue]) + try: + broker = get_broker(self.broker_name, self.broker_config, channels=[reply_queue]) + except TypeError: + broker = get_broker(self.broker_name, self.broker_config) def connected_callback() -> None: broker.publish_message(channel=self.queuename, message=send_message) diff --git a/dispatcherd/factories.py b/dispatcherd/factories.py index a0a56bc..30f5b8e 100644 --- a/dispatcherd/factories.py +++ b/dispatcherd/factories.py @@ -91,10 +91,14 @@ def get_publisher_from_settings(publish_broker: Optional[str] = None, settings: def get_control_from_settings(publish_broker: Optional[str] = None, settings: LazySettings = global_settings, **overrides): - publish_broker = _get_publisher_broker_name(publish_broker=publish_broker, settings=settings) - broker_options = settings.brokers[publish_broker].copy() + """Returns a Control instance based on the values in settings""" + if 'default_control_broker' in settings.publish: + result_publish_broker = settings.publish['default_control_broker'] + else: + result_publish_broker = _get_publisher_broker_name(publish_broker=publish_broker, settings=settings) + broker_options = settings.brokers[result_publish_broker].copy() broker_options.update(overrides) - return Control(publish_broker, broker_options) + return Control(result_publish_broker, broker_options) # ---- Schema generation ---- diff --git a/dispatcherd/producers/brokered.py b/dispatcherd/producers/brokered.py index 37e478d..5674378 100644 --- a/dispatcherd/producers/brokered.py +++ b/dispatcherd/producers/brokered.py @@ -1,6 +1,6 @@ import asyncio import logging -from typing import Iterable, Optional +from typing import Iterable, Optional, Union from ..protocols import Broker, DispatcherMain from .base import BaseProducer @@ -28,6 +28,9 @@ async def recycle(self) -> None: assert self.dispatcher await self.start_producing(self.dispatcher) + def __str__(self) -> str: + return f'brokered-producer-{self.broker}' + async def start_producing(self, dispatcher: DispatcherMain) -> None: self.production_task = asyncio.create_task(self.produce_forever(dispatcher), name=f'{self.broker.__module__}_production') @@ -46,12 +49,12 @@ async def produce_forever(self, dispatcher: DispatcherMain) -> None: self.dispatcher = dispatcher async for channel, payload in self.broker.aprocess_notify(connected_callback=self.connected_callback): self.produced_count += 1 - reply_to, reply_payload = await dispatcher.process_message(payload, producer=self, channel=channel) + reply_to, reply_payload = await dispatcher.process_message(payload, producer=self, channel=str(channel)) if reply_to and reply_payload: - await self.notify(channel=reply_to, message=reply_payload) + await self.notify(channel=reply_to, origin=channel, message=reply_payload) - async def notify(self, channel: Optional[str] = None, message: str = '') -> None: - await self.broker.apublish_message(channel=channel, message=message) + async def notify(self, channel: Optional[str] = None, origin: Optional[Union[int, str]] = None, message: str = '') -> None: + await self.broker.apublish_message(channel=channel, origin=origin, message=message) async def shutdown(self) -> None: if self.production_task: diff --git a/dispatcherd/protocols.py b/dispatcherd/protocols.py index fcd973e..953cbd4 100644 --- a/dispatcherd/protocols.py +++ b/dispatcherd/protocols.py @@ -20,7 +20,7 @@ class Broker(Protocol): async def aprocess_notify( self, connected_callback: Optional[Optional[Callable[[], Coroutine[Any, Any, None]]]] = None - ) -> AsyncGenerator[tuple[str, str], None]: + ) -> AsyncGenerator[tuple[Union[int, str], str], None]: """The generator of messages from the broker for the dispatcherd service The producer iterates this to produce tasks. @@ -28,7 +28,7 @@ async def aprocess_notify( """ yield ('', '') # yield affects CPython type https://github.com/python/mypy/pull/18422 - async def apublish_message(self, channel: Optional[str] = None, message: str = '') -> None: + async def apublish_message(self, channel: Optional[str] = None, origin: Union[int, str, None] = None, message: str = '') -> None: """Asynchronously send a message to the broker, used by dispatcherd service for reply messages""" ... @@ -36,7 +36,9 @@ async def aclose(self) -> None: """Close the asynchronous connection, used by service, and optionally by publishers""" ... - def process_notify(self, connected_callback: Optional[Callable] = None, timeout: float = 5.0, max_messages: int = 1) -> Iterator[tuple[str, str]]: + def process_notify( + self, connected_callback: Optional[Callable] = None, timeout: float = 5.0, max_messages: int = 1 + ) -> Iterator[tuple[Union[int, str], str]]: """Synchronous method to generate messages from broker, used for synchronous control-and-reply""" ... diff --git a/dispatcherd/service/main.py b/dispatcherd/service/main.py index 8797587..854a41a 100644 --- a/dispatcherd/service/main.py +++ b/dispatcherd/service/main.py @@ -184,10 +184,11 @@ async def run_control_action(self, action: str, control_data: Optional[dict] = N # Give Nones for no reply, or the reply if reply_to: - logger.info(f"Control action {action} returned {return_data}, sending back reply") - return (reply_to, json.dumps(return_data)) + reply_msg = json.dumps(return_data) + logger.info(f"Control action {action} returned message len={len(reply_msg)}, sending back reply") + return (reply_to, reply_msg) else: - logger.info(f"Control action {action} returned {return_data}, done") + logger.info(f"Control action {action} returned {type(return_data)}, done") return (None, None) async def process_message_internal(self, message: dict, producer: Optional[Producer] = None) -> tuple[Optional[str], Optional[str]]: @@ -206,9 +207,9 @@ async def start_working(self) -> None: logger.exception(f'Pool {self.pool} failed to start working') self.events.exit_event.set() - logger.debug('Starting task production') async with self.fd_lock: # lots of connecting going on here for producer in self.producers: + logger.debug(f'Starting task production from {producer}') try: await producer.start_producing(self) except Exception: diff --git a/schema.json b/schema.json index 5fdfc6b..bba07e4 100644 --- a/schema.json +++ b/schema.json @@ -9,6 +9,9 @@ "default_publish_channel": "typing.Optional[str]", "max_connection_idle_seconds": "typing.Optional[int]", "max_self_check_message_age_seconds": "typing.Optional[int]" + }, + "socket": { + "socket_path": "" } }, "producers": { @@ -28,13 +31,13 @@ "worker_stop_wait": "", "worker_removal_wait": "" }, + "main_kwargs": { + "node_id": "typing.Optional[str]" + }, "process_manager_kwargs": { "preload_modules": "typing.Optional[list[str]]" }, - "process_manager_cls": "typing.Literal['ProcessManager', 'ForkServerManager']", - "main_kwargs": { - "node_id": "typing.Optional[str]" - } + "process_manager_cls": "typing.Literal['ProcessManager', 'ForkServerManager']" }, "publish": { "default_broker": "str" diff --git a/tests/conftest.py b/tests/conftest.py index f76c0e5..07df3b6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -88,37 +88,46 @@ def test_settings(): return DispatcherSettings(BASIC_CONFIG) +@pytest.fixture +def adispatcher_factory(): + @contextlib.asynccontextmanager + async def _rf(config): + dispatcher = None + try: + settings = DispatcherSettings(config) + dispatcher = from_settings(settings=settings) + + await dispatcher.connect_signals() + await dispatcher.start_working() + await dispatcher.wait_for_producers_ready() + await dispatcher.pool.events.workers_ready.wait() + + assert dispatcher.pool.finished_count == 0 # sanity + assert dispatcher.control_count == 0 + + yield dispatcher + finally: + if dispatcher: + try: + await dispatcher.shutdown() + await dispatcher.cancel_tasks() + except Exception: + logger.exception('shutdown had error') + return _rf + + @pytest_asyncio.fixture( loop_scope="function", scope="function", params=['ProcessManager', 'ForkServerManager'], ids=["fork", "forkserver"], ) -async def apg_dispatcher(request) -> AsyncIterator[DispatcherMain]: - dispatcher = None - try: - this_test_config = BASIC_CONFIG.copy() - this_test_config.setdefault('service', {}) - this_test_config['service']['process_manager_cls'] = request.param - this_settings = DispatcherSettings(this_test_config) - dispatcher = from_settings(settings=this_settings) - - await dispatcher.connect_signals() - await dispatcher.start_working() - await dispatcher.wait_for_producers_ready() - await dispatcher.pool.events.workers_ready.wait() - - assert dispatcher.pool.finished_count == 0 # sanity - assert dispatcher.control_count == 0 - +async def apg_dispatcher(request, adispatcher_factory) -> AsyncIterator[DispatcherMain]: + this_test_config = BASIC_CONFIG.copy() + this_test_config.setdefault('service', {}) + this_test_config['service']['process_manager_cls'] = request.param + async with adispatcher_factory(this_test_config) as dispatcher: yield dispatcher - finally: - if dispatcher: - try: - await dispatcher.shutdown() - await dispatcher.cancel_tasks() - except Exception: - logger.exception('shutdown had error') @pytest_asyncio.fixture(loop_scope="function", scope="function") diff --git a/tests/integration/brokers/test_socket.py b/tests/integration/brokers/test_socket.py new file mode 100644 index 0000000..5d49a4c --- /dev/null +++ b/tests/integration/brokers/test_socket.py @@ -0,0 +1,75 @@ +import asyncio +import socket +import time +import threading + +import pytest + +from dispatcherd.brokers.socket import Broker + + +@pytest.fixture +def socket_path(tmp_path): + return str(tmp_path / 'test_sock.sock') + + +@pytest.mark.asyncio +async def test_basic_receive(socket_path): + server_broker = Broker(socket_path=socket_path) + client_broker = Broker(socket_path=socket_path) + + server_is_ready = asyncio.Event() + + async def on_connect(): + server_is_ready.set() + + received = [] + async def save_local(): + async for client_id, msg in server_broker.aprocess_notify(connected_callback=on_connect): + received.append((client_id, msg)) + + asyncio.create_task(save_local()) + + await asyncio.wait_for(server_is_ready.wait(), timeout=2) + + for msg in ('test1', 'test2'): + await client_broker.apublish_message(message=msg) + + for i in range(20): + if len(received) >= 2: + break + await asyncio.sleep(0.01) + else: + assert 'Failed to receive expected 2 messages' + + await server_broker.aclose() + + assert received == [(0, 'test1'), (1, 'test2')] + + +@pytest.mark.asyncio +async def test_synchronous_listen_timeout(socket_path): + client_broker = Broker(socket_path=socket_path) + + server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + server.bind(socket_path) + + def run_server(): + server.listen(1) + conn, addr = server.accept() + conn.recv(1024) + print('got message, exiting thread') + + server_thread = threading.Thread(target=run_server) + server_thread.start() + + start = time.monotonic() + received = None + with pytest.raises(TimeoutError): + received = [msg for _, msg in client_broker.process_notify(timeout=0.01)] + delta = time.monotonic() - start + assert delta >= 0.01 + + assert received is None + server_thread.join() + assert not server_thread.is_alive() # should have exited after getting message diff --git a/tests/integration/test_socket_use.py b/tests/integration/test_socket_use.py new file mode 100644 index 0000000..eacf094 --- /dev/null +++ b/tests/integration/test_socket_use.py @@ -0,0 +1,86 @@ +import asyncio +import logging +import os +from typing import AsyncIterator, Callable + +import pytest +import pytest_asyncio + +from dispatcherd.config import DispatcherSettings +from dispatcherd.control import Control +from dispatcherd.factories import from_settings, get_control_from_settings, get_publisher_from_settings +from dispatcherd.protocols import DispatcherMain + +logger = logging.getLogger(__name__) + + +@pytest.fixture(scope='session') +def sock_path(tmp_path_factory): + return str(tmp_path_factory.mktemp("socket") / 'test.sock') + + +@pytest.fixture(scope='session') +def socket_config(sock_path): + return {"version": 2, "brokers": {"socket": {"socket_path": sock_path}}, "service": {"main_kwargs": {"node_id": "socket-test-server"}}} + + +@pytest.fixture(scope='session') +def socket_settings(socket_config): + return DispatcherSettings(socket_config) + + +@pytest_asyncio.fixture +async def asock_dispatcher(socket_config, adispatcher_factory) -> AsyncIterator[DispatcherMain]: + async with adispatcher_factory(socket_config) as dispatcher: + yield dispatcher + + +@pytest_asyncio.fixture +async def sock_control(socket_settings) -> AsyncIterator[Control]: + return get_control_from_settings(settings=socket_settings) + + +@pytest_asyncio.fixture +async def sock_broker(socket_settings) -> Callable: + broker = get_publisher_from_settings(settings=socket_settings) + assert not broker.clients # make sure this is new for client, not the server + return broker + + +@pytest.mark.asyncio +async def test_run_lambda_function_socket(asock_dispatcher, sock_broker): + starting_ct = asock_dispatcher.pool.finished_count + clearing_task = asyncio.create_task(asock_dispatcher.pool.events.work_cleared.wait(), name='test_lambda_clear_wait') + + assert sock_broker.sock is None # again, confirm this is a distinct client broker + await sock_broker.apublish_message(message='lambda: "This worked!"') + + await asyncio.wait_for(clearing_task, timeout=1) + + assert asock_dispatcher.pool.finished_count == starting_ct + 1 + + +@pytest.mark.asyncio +async def test_run_lambda_function_socket_sync_client(asock_dispatcher, sock_broker): + starting_ct = asock_dispatcher.pool.finished_count + clearing_task = asyncio.create_task(asock_dispatcher.pool.events.work_cleared.wait(), name='test_lambda_clear_wait') + + sock_broker.publish_message(message='lambda: "This worked!"') + + await asyncio.wait_for(clearing_task, timeout=1) + + assert asock_dispatcher.pool.finished_count == starting_ct + 1 + + +@pytest.mark.asyncio +async def test_simple_control_and_reply(asock_dispatcher, sock_control): + loop = asyncio.get_event_loop() + + def alive_cmd(): + return sock_control.control_with_reply('alive') + + alive = await loop.run_in_executor(None, alive_cmd) + assert len(alive) == 1 + data = alive[0] + + assert data['node_id'] == 'socket-test-server'