Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat: detect broken connection and reconnect broker #126

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

pb82
Copy link
Collaborator

@pb82 pb82 commented Mar 14, 2025

Fixes #2

This PR implements periodic broker self checks. If a broken pg_notify connection is detected, it will attempt to reconnect.

TODO: tests, sanity checking

@pb82 pb82 requested a review from AlanCoding as a code owner March 14, 2025 15:46
@pb82 pb82 requested a review from art-tapin March 14, 2025 16:05
logger.info(f'initiating broker self check for node-id {self.node_id}')
for producer in self.producers:
if isinstance(producer, BrokeredProducer):
result = await producer.broker.get_self_check_result(self.node_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's actually super non-obvious is that it might not be okay to run 2 listening tasks at the same time. There are actually a lot of hazards with connection management in async code, and every I've fought this I've given up and went back to always forever only having a single task interact with psycopg.

Separate from that, this wouldn't have a timeout anyway, which is the failure scenario.

@@ -33,6 +36,7 @@ def create_connection(**config) -> psycopg.Connection:

class Broker:
NOTIFY_QUERY_TEMPLATE = 'SELECT pg_notify(%s, %s);'
SELF_CHECK_CHANNEL = 'pgnotify_self_check'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would result in all nodes sending their health check messages to all connected nodes. This isn't a good idea for scaling, but also wouldn't work unless there was some other identifying information in the message it used to verify that it came from itself. So this could be addressed by adding a uuid to it, and adding some expected uuid to the message might be a good idea anyway to rule out any possibility of getting wires crossed.

@AlanCoding
Copy link
Member

I may have failed to fully digest the consequences of psycopg/psycopg#340 myself, and this requires some changes to the methodology to solve this problem.

From some light research, we must have the wakeup for sending the message handled by the broker. Only viable option may be the timeout kwarg on the async version of .notifies from psycopg.

https://www.psycopg.org/psycopg3/docs/api/connections.html#psycopg.AsyncConnection.notifies

To set the stage, our next obvious alternative would involve manually calling .__anext__. That would give more options to weave together timing of the main loop with the broker listening loop. I don't like it, but the choice is still unsettled.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Broker self-checks
2 participants