Skip to content

Commit 7c33517

Browse files
committed
broker: add topic exchange
1 parent 86eafcd commit 7c33517

26 files changed

+134
-64
lines changed

examples/push_agent.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# mypy: ignore-errors
2+
import asyncio
3+
import logging
4+
import os
5+
import sys
6+
7+
from microagent import MicroAgent, consumer, load_stuff, on, periodic
8+
from microagent.tools import amqp, redis
9+
10+
logging.basicConfig(format=(
11+
'%(levelname)-8s [pid#%(process)d] %(asctime)s %(name)s '
12+
'%(filename)s:%(lineno)d %(message)s'
13+
), stream=sys.stdout, level=logging.INFO)
14+
15+
cur_dir = os.path.dirname(os.path.realpath(__file__))
16+
signals, queues = load_stuff('file://' + os.path.join(cur_dir, 'signals.json'))
17+
18+
19+
class UserAgent(MicroAgent):
20+
@on('pre_start')
21+
def setup(self):
22+
self.log.info('Run ...\n %s', self.info())
23+
24+
@periodic(period=15, timeout=10, start_after=3)
25+
async def example_periodic_task_send_push(self):
26+
self.log.info('Run periodic task')
27+
await self.broker.push3.send({'text': 'informer text'}, topic='msg.ios')
28+
await self.broker.push.send({'text': 'informer 1'})
29+
30+
@consumer(queues.android)
31+
async def example_read_queue_android(self, **kwargs):
32+
self.log.info('Catch android %s', kwargs)
33+
34+
@consumer(queues.ios)
35+
async def example_read_queue_ios(self, **kwargs):
36+
self.log.info('Catch ios %s', kwargs)
37+
38+
@consumer(queues.android_a)
39+
async def example_read_queue_android_a(self, **kwargs):
40+
self.log.info('Catch android_a %s', kwargs)
41+
42+
@consumer(queues.ios_a)
43+
async def example_read_queue_ios_a(self, **kwargs):
44+
self.log.info('Catch ios_a %s', kwargs)
45+
46+
47+
async def main():
48+
bus = redis.RedisSignalBus('redis://localhost/7')
49+
broker = amqp.AMQPBroker('amqp://guest:guest@localhost:5672/')
50+
await broker.push3.send({'q':1}, topic='msg.android')
51+
52+
agent = UserAgent(bus=bus, broker=broker)
53+
await agent.start()
54+
55+
while True:
56+
await asyncio.sleep(1)
57+
58+
59+
asyncio.run(main())

examples/signals.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111
}}
1212
],
1313
"queues": [
14-
{"name": "mailer"}
14+
{"name": "mailer"},
15+
{"name": "android", "exchange": "push3", "topics": ["*.android"]},
16+
{"name": "ios", "exchange": "push3", "topics": ["*.ios"]},
17+
{"name": "android_a", "exchange": "push"},
18+
{"name": "ios_a", "exchange": "push"}
1519
]
1620
}

microagent/__init__.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import importlib
44
import json
55
import urllib.request
6-
76
from collections import abc
87
from typing import Any, NamedTuple
98

@@ -16,7 +15,6 @@
1615
from .timer import CRONArgs, CRONTask, PeriodicArgs, PeriodicTask, cron_parser
1716
from .utils import make_bound_key
1817

19-
2018
__all__ = ['Signal', 'Queue', 'MicroAgent', 'ServerInterrupt', 'receiver', 'consumer',
2119
'periodic', 'cron', 'on', 'load_stuff', 'load_signals', 'load_queues']
2220

@@ -58,10 +56,18 @@ def load_stuff(source: str) -> tuple[Any, Any]:
5856
type_map = {name: get_types(_type) for name, _type in providing_args.items()}
5957
providing_args = list(providing_args)
6058

61-
Signal(name=_data['name'], providing_args=providing_args, type_map=type_map)
59+
Signal(
60+
name=_data['name'],
61+
providing_args=providing_args,
62+
type_map=type_map
63+
)
6264

6365
for _data in data.get('queues', []):
64-
Queue(name=_data['name'], exchange=_data.get('exchange', ''))
66+
Queue(
67+
name=_data['name'],
68+
exchange=_data.get('exchange', ''),
69+
topics=_data.get('topics', [])
70+
)
6571

6672
if data.get('jsonlib'):
6773
jsonlib = importlib.import_module(data['jsonlib']) # type: ignore

microagent/abc.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
import logging
2-
32
from collections.abc import Awaitable, Callable
43
from typing import Any, Literal, Protocol
54

6-
75
HookLabel = Literal['server', 'pre_start', 'post_start', 'pre_stop']
86
HookFunc = Callable[[Any], Awaitable[None]]
97
PeriodicFunc = Callable[[Any], Awaitable[None]]

microagent/agent.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ async def setup(self):
7777

7878
import asyncio
7979
import logging
80-
8180
from collections.abc import Callable, Iterable
8281
from dataclasses import dataclass, field
8382
from datetime import datetime, timedelta, timezone
@@ -90,7 +89,6 @@ async def setup(self):
9089
from .signal import Receiver
9190
from .timer import CRONTask, PeriodicTask
9291

93-
9492
HandlerTypes = TypeVar('HandlerTypes', Receiver, Consumer, PeriodicTask, CRONTask, Hook)
9593

9694

microagent/broker.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ async def example_read_queue(self, **kwargs):
4747
'''
4848
import logging
4949
import uuid
50-
5150
from abc import abstractmethod
5251
from dataclasses import dataclass, field
5352
from typing import Any

microagent/bus.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ async def example(self, user_id, **kwargs):
4949
import logging
5050
import time
5151
import uuid
52-
5352
from abc import abstractmethod
5453
from collections import abc, defaultdict
5554
from dataclasses import dataclass, field

microagent/hooks.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,12 @@
2424
'''
2525

2626
import inspect
27-
2827
from collections import abc, defaultdict
2928
from dataclasses import dataclass
3029
from typing import TYPE_CHECKING, ClassVar, TypedDict
3130

3231
from .abc import BoundKey, HookFunc
3332

34-
3533
if TYPE_CHECKING:
3634
from .agent import MicroAgent
3735

microagent/launcher.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,11 @@
2323
import os
2424
import signal
2525
import time
26-
2726
from collections.abc import Iterator
2827
from functools import partial
2928
from itertools import chain
3029
from typing import TYPE_CHECKING, Any
3130

32-
3331
if TYPE_CHECKING:
3432
from .agent import MicroAgent
3533
from .broker import AbstractQueueBroker

microagent/queue.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
import json
2-
3-
from dataclasses import dataclass
2+
from dataclasses import dataclass, field
43
from types import ModuleType
54
from typing import TYPE_CHECKING, ClassVar, TypedDict
65

76
from .abc import BoundKey, ConsumerFunc
87

9-
108
if TYPE_CHECKING:
119
from .agent import MicroAgent
1210

@@ -41,9 +39,10 @@ class Queue:
4139
4240
{
4341
"queues": [
44-
{"name": "mailer"},
45-
{"name": "pusher", "exchange": "events"},
42+
{"name": "mailer"}, # simple
43+
{"name": "pusher", "exchange": "events"}, # fanout
4644
{"name": "logger", "exchange": "events"},
45+
{"name": "save", "exchange": "msg", "topics": "m.*"}, # topics
4746
]
4847
}
4948
@@ -57,6 +56,7 @@ class Queue:
5756
'''
5857
name: str
5958
exchange: str = ''
59+
topics: list[str] = field(default_factory=list)
6060

6161
_queues: ClassVar[dict[str, 'Queue']] = {}
6262
_exchanges: ClassVar[dict[str, 'Queue']] = {}

0 commit comments

Comments
 (0)