Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send replies to Broker.aprocess_notify #128

Open
3 tasks done
AlanCoding opened this issue Mar 18, 2025 · 1 comment
Open
3 tasks done

Send replies to Broker.aprocess_notify #128

AlanCoding opened this issue Mar 18, 2025 · 1 comment
Labels
refactoring code build-out and clean-up

Comments

@AlanCoding
Copy link
Member

Please confirm the following

  • I agree to follow this project's code of conduct.
  • I have checked the current issues for duplicates.
  • I understand that dispatcher is open source software provided for free and that I might not receive a timely response.

Feature type

New Feature

Feature Summary

Currently, we send reply messages (for control-and-reply commands) up the call stack, but this ends at the producer level. The stack goes main->producer->broker. The broker has to break out of its loop due to technical reasons, and because of this, the broker stashes reply messages in an internal data structure and then checks that before continuing the loop.

async def produce_forever(self, dispatcher: DispatcherMain) -> None:
self.dispatcher = dispatcher
async for channel, payload in self.broker.aprocess_notify(connected_callback=self.connected_callback):
self.produced_count += 1
reply_to, reply_payload = await dispatcher.process_message(payload, producer=self, channel=channel)
if reply_to and reply_payload:
await self.notify(channel=reply_to, message=reply_payload)

That's the producer code, the broker code is so complicated I don't want to paste it.

This issue proposes a new call pattern.

notifier = self.broker.aprocess_notify(connected_callback=self.connected_callback)
async for channel, payload in notifier:
    self.produced_count += 1
    reply_to, reply_payload = await dispatcher.process_message(payload, producer=self, channel=channel)
    if reply_to and reply_payload:
        await notifier.asend((reply_to, reply_payload))

I didn't actually know we could do this before, so that's what changed here.

Steps to reproduce

refactoring

Current results

No response

Sugested feature result

Why?

If you look at #125, it has to add origin which indicates where the message "came from". Like the pg_notify channel it was received on. This is different from the specified channel where we will reply to.

This would not be necessary if we did not have this extra step where we lose the current context.

This will allow us to remove a few methods from the common contracts.

  • BrokeredProducer.notify will no longer be needed
  • apublish_message can be deleted, if other testing updates are also done

Additional information

No response

@AlanCoding AlanCoding added the refactoring code build-out and clean-up label Mar 18, 2025
@AlanCoding
Copy link
Member Author

I was talking with @pb82 about this, as it may be relevant for broker recycling, only in as much as it sets expectations that the service must create "its own" connection for the listener and consequently must be free to close it (manage the connection).

This refactor, I'm fairly sure, will bleed into these topics, and I would like to do them all at once:

  • Somehow rename pg_notify Broker _async_connection to indicate client async connection (again, assuming we keep this at all)
  • Save a 3rd connection on the Broker to explicitly indicate it is the connection for the service
  • Tests (and control module) must change somehow to not conflate client & server methods. I see multiple options

I'm struggling to make a final decision on the final point. If we added new methods, these would need to be used by the control module. That means they would need to be part of the broker protocol, and I don't want to do this.

If public async client methods are not introduced, then that may result in deleting acontrol_with_reply and acontrol methods on the control objects. That might be alright in my book, since the synchronous versions have now matured. One sub-option here is that I could basically move those methods into test fixtures to have nice clean tests. This might be accomplished by having some Broker utility methods take the async connection as an argument. This seems sensible.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
refactoring code build-out and clean-up
Projects
None yet
Development

No branches or pull requests

1 participant