-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathbrokered.py
56 lines (43 loc) · 2.14 KB
/
brokered.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import asyncio
import logging
from typing import Iterable, Optional, Union
from ..protocols import Broker, DispatcherMain
from .base import BaseProducer
logger = logging.getLogger(__name__)
class BrokeredProducer(BaseProducer):
def __init__(self, broker: Broker) -> None:
self.production_task: Optional[asyncio.Task] = None
self.broker = broker
self.dispatcher: Optional[DispatcherMain] = None
super().__init__()
def __str__(self) -> str:
return f'brokered-producer-{self.broker}'
async def start_producing(self, dispatcher: DispatcherMain) -> None:
self.production_task = asyncio.create_task(self.produce_forever(dispatcher), name=f'{self.broker.__module__}_production')
def all_tasks(self) -> Iterable[asyncio.Task]:
if self.production_task:
return [self.production_task]
return []
async def connected_callback(self) -> None:
if self.events:
self.events.ready_event.set()
if self.dispatcher:
await self.dispatcher.connected_callback(self)
async def produce_forever(self, dispatcher: DispatcherMain) -> None:
self.dispatcher = dispatcher
async for channel, payload in self.broker.aprocess_notify(connected_callback=self.connected_callback):
self.produced_count += 1
reply_to, reply_payload = await dispatcher.process_message(payload, producer=self, channel=str(channel))
if reply_to and reply_payload:
await self.notify(channel=reply_to, origin=channel, message=reply_payload)
async def notify(self, channel: Optional[str] = None, origin: Optional[Union[int, str]] = None, message: str = '') -> None:
await self.broker.apublish_message(channel=channel, origin=origin, message=message)
async def shutdown(self) -> None:
if self.production_task:
self.production_task.cancel()
try:
await self.production_task
except asyncio.CancelledError:
logger.info(f'Successfully canceled production from {self.broker}')
self.production_task = None
await self.broker.aclose()