Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat: detect broken connection and reconnect broker #126

Closed
wants to merge 3 commits into from
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions dispatcher/brokers/pg_notify.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import json
import logging
from typing import Any, AsyncGenerator, Callable, Coroutine, Iterator, Optional, Union

import psycopg
from psycopg.pq import PGconn, ConnStatus

from dispatcher.protocols import BrokerSelfCheckResult
from dispatcher.utils import resolve_callable

logger = logging.getLogger(__name__)
@@ -33,6 +36,7 @@ def create_connection(**config) -> psycopg.Connection:

class Broker:
NOTIFY_QUERY_TEMPLATE = 'SELECT pg_notify(%s, %s);'
SELF_CHECK_CHANNEL = 'pgnotify_self_check'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would result in all nodes sending their health check messages to all connected nodes. This isn't a good idea for scaling, but also wouldn't work unless there was some other identifying information in the message it used to verify that it came from itself. So this could be addressed by adding a uuid to it, and adding some expected uuid to the message might be a good idea anyway to rule out any possibility of getting wires crossed.


def __init__(
self,
@@ -73,6 +77,10 @@ def __init__(
else:
self._config = {}

# Make sure that the self check channel is always present
if not self.SELF_CHECK_CHANNEL in channels:
channels.append(self.SELF_CHECK_CHANNEL)

self.channels = channels
self.default_publish_channel = default_publish_channel

@@ -82,6 +90,10 @@ def __init__(
self.notify_loop_active: bool = False
self.notify_queue: list = []

# Set the initial value of the self check result to UNDECIDED to indicate
# that a self check needs to be initiated
self.self_check_result = BrokerSelfCheckResult.UNDECIDED

def get_publish_channel(self, channel: Optional[str] = None) -> str:
"Handle default for the publishing channel for calls to publish_message, shared sync and async"
if channel is not None:
@@ -94,6 +106,25 @@ def get_publish_channel(self, channel: Optional[str] = None) -> str:

raise ValueError('Could not determine a channel to use publish to from settings or PGNotify config')

async def initiate_self_check(self, node_id: str) -> None:
try:
self.publish_message(self.SELF_CHECK_CHANNEL, json.dumps({'self_check': True, 'task': f'lambda: "{node_id}"'}))
self.self_check_result = BrokerSelfCheckResult.IN_PROGRESS
except Exception as connection_error:
logger.error(f'publish message failed with: {connection_error} ')
self.self_check_result = BrokerSelfCheckResult.FAILURE

async def get_self_check_result(self, node_id: str) -> BrokerSelfCheckResult:
try:
for channel, payload in self.process_notify():
if channel == self.SELF_CHECK_CHANNEL and node_id in payload:
self.self_check_result = BrokerSelfCheckResult.SUCCESS
except Exception as connection_error:
logger.error(f'get self check result failed with: {connection_error} ')
self.self_check_result = BrokerSelfCheckResult.FAILURE

return self.self_check_result

# --- asyncio connection methods ---

async def aget_connection(self) -> psycopg.AsyncConnection:
@@ -229,6 +260,11 @@ def close(self) -> None:
self._sync_connection.close()
self._sync_connection = None

def reconnect(self) -> None:
self.close()
self.get_connection()
self.self_check_result = BrokerSelfCheckResult.UNDECIDED


class ConnectionSaver:
def __init__(self) -> None:
@@ -248,6 +284,9 @@ def connection_saver(**config) -> psycopg.Connection:
"""
if connection_save._connection is None:
connection_save._connection = create_connection(**config)
if connection_save._connection.closed or connection_save._connection.broken:
connection_save._connection.close()
connection_save._connection = create_connection(**config)
return connection_save._connection


21 changes: 21 additions & 0 deletions dispatcher/protocols.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import asyncio
from enum import Enum
from typing import Any, AsyncGenerator, Callable, Coroutine, Iterable, Iterator, Optional, Protocol, Union


class BrokerSelfCheckResult(Enum):
"""This enum represents the result of a broker self-check"""

UNDECIDED = (1,) # self-check hasn't run yet
SUCCESS = (2,) # the last self-check was successful
FAILURE = 3 # the last self-check failed
IN_PROGRESS = 4 # self check in progress


class Broker(Protocol):
async def aprocess_notify(
self, connected_callback: Optional[Optional[Callable[[], Coroutine[Any, Any, None]]]] = None
@@ -21,6 +31,14 @@ async def aclose(self) -> None:
"""Close the asynchronous connection, used by service, and optionally by publishers"""
...

async def initiate_self_check(self, node_id: str) -> None:
"""Start a self check of the broker connection, used by service"""
...

async def get_self_check_result(self, node_id: str) -> BrokerSelfCheckResult:
"""Get the last self check result"""
...

def process_notify(self, connected_callback: Optional[Callable] = None, timeout: float = 5.0, max_messages: int = 1) -> Iterator[tuple[str, str]]:
"""Synchronous method to generate messages from broker, used for synchronous control-and-reply"""
...
@@ -33,6 +51,9 @@ def close(self):
"""Close the sychronous connection"""
...

def reconnect(self):
"""Close and reconnect the synchronous connection"""


class ProducerEvents(Protocol):
ready_event: asyncio.Event
30 changes: 28 additions & 2 deletions dispatcher/service/main.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,9 @@
from typing import Iterable, Optional, Union
from uuid import uuid4

from ..protocols import Producer
from ..brokers import get_broker
from ..producers import BrokeredProducer
from ..protocols import Producer, BrokerSelfCheckResult
from . import control_tasks
from .asyncio_tasks import ensure_fatal
from .next_wakeup_runner import HasWakeup, NextWakeupRunner
@@ -234,7 +236,31 @@ async def main(self) -> None:
await self.start_working()

logger.info(f'Dispatcher node_id={self.node_id} running forever, or until shutdown command')
await self.events.exit_event.wait()

while True:
try:
await asyncio.wait_for(self.events.exit_event.wait(), 5.0)

# exit_event has fired, process should exit
break
except asyncio.TimeoutError:
logger.info(f'initiating broker self check for node-id {self.node_id}')
for producer in self.producers:
if isinstance(producer, BrokeredProducer):
result = await producer.broker.get_self_check_result(self.node_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's actually super non-obvious is that it might not be okay to run 2 listening tasks at the same time. There are actually a lot of hazards with connection management in async code, and every I've fought this I've given up and went back to always forever only having a single task interact with psycopg.

Separate from that, this wouldn't have a timeout anyway, which is the failure scenario.


if result == BrokerSelfCheckResult.IN_PROGRESS:
# the last self check still hasn't finished - we treat that as a connection failure
result = BrokerSelfCheckResult.FAILURE

match result:
case BrokerSelfCheckResult.UNDECIDED | BrokerSelfCheckResult.SUCCESS:
asyncio.create_task(producer.broker.initiate_self_check(self.node_id))
continue
case BrokerSelfCheckResult.FAILURE:
logger.error(f'broker self check failed for node-id {self.node_id}')
producer.broker.reconnect()

finally:
await self.shutdown()

27 changes: 27 additions & 0 deletions run_dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env python3

import json
import logging
import sys

from dispatcher import run_service
from dispatcher.factories import get_publisher_from_settings, get_control_from_settings
from dispatcher.utils import MODULE_METHOD_DELIMITER
from dispatcher.config import setup

from time import sleep

from tests.data.methods import sleep_function, sleep_discard, task_has_timeout, hello_world_binder


# Setup the global config from the settings file shared with the service
setup(file_path='dispatcher.yml')

broker = get_publisher_from_settings()

def main():
run_service()

if __name__ == "__main__":
logging.basicConfig(level='DEBUG', stream=sys.stdout)
main()