Skip to content

Commit 33d4256

Browse files
authored
A global config and setup method (#64)
Use full class names and kwargs in config file for simplicity Update the demo script to use the setup method Move object initialization into a factory module Implement review comment to consolidate factory handling Add settings serialization and pass into workers Add docs on the config Implement change described in issue comment, cls and kwargs patterns * Convert broker base to protocol * Refactor into single broker class * Produce a reference json-ish schema * Add Schema re-generation docs * Add yield to keep protocol method async
1 parent 752c0c7 commit 33d4256

33 files changed

+1034
-335
lines changed

README.md

+35-21
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
<!-- License Badge -->
22
[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://github.com/ansible/dispatcher/blob/main/LICENSE)
33

4-
Working space for dispatcher prototyping
5-
6-
This is firstly intended to be a code split of:
4+
This is intended to be a working space for prototyping a code split of:
75

86
<https://github.com/ansible/awx/tree/devel/awx/main/dispatch>
97

108
As a part of doing the split, we also want to resolve a number of
119
long-standing design and sustainability issues, thus, asyncio.
1210

11+
The philosophy of the dispatcher is to have a limited scope
12+
as a "local" runner of background tasks, but to be composable
13+
so that it can be "wrapped" easily to enable clustering and
14+
distributed task management by apps using it.
15+
1316
Licensed under [Apache Software License 2.0](LICENSE)
1417

1518
### Usage
@@ -37,20 +40,11 @@ def print_hello():
3740
print('hello world!!')
3841
```
3942

40-
#### Dispatcher service
41-
42-
The dispatcher service needs to be running before you submit tasks.
43-
This does not make any attempts at message durability or confirmation.
44-
If you submit a task in an outage of the service, it will be dropped.
45-
46-
There are 2 ways to run the dispatcher service:
47-
48-
- Importing and running (code snippet below)
49-
- A CLI entrypoint `dispatcher-standalone` for demo purposes
43+
Additionally, you need to configure dispatcher somewhere in your import path.
44+
This tells dispatcher how to submit tasks to be ran.
5045

5146
```python
52-
from dispatcher.main import DispatcherMain
53-
import asyncio
47+
from dispatcher.config import setup
5448

5549
config = {
5650
"producers": {
@@ -63,13 +57,29 @@ config = {
6357
},
6458
"pool": {"max_workers": 4},
6559
}
66-
loop = asyncio.get_event_loop()
67-
dispatcher = DispatcherMain(config)
60+
setup(config)
61+
```
62+
63+
For more on how to set up and the allowed options in the config,
64+
see the section [config](docs/config.md) docs.
65+
66+
#### Dispatcher service
67+
68+
The dispatcher service needs to be running before you submit tasks.
69+
This does not make any attempts at message durability or confirmation.
70+
If you submit a task in an outage of the service, it will be dropped.
71+
72+
There are 2 ways to run the dispatcher service:
6873

69-
try:
70-
loop.run_until_complete(dispatcher.main())
71-
finally:
72-
loop.close()
74+
- Importing and running (code snippet below)
75+
- A CLI entrypoint `dispatcher-standalone` for demo purposes
76+
77+
```python
78+
from dispatcher import run_service
79+
80+
# After the setup() method has been called
81+
82+
run_service()
7383
```
7484

7585
Configuration tells how to connect to postgres, and what channel(s) to listen to.
@@ -88,6 +98,8 @@ The following code will submit `print_hello` to run in the background dispatcher
8898
```python
8999
from test_methods import print_hello
90100

101+
# After the setup() method has been called
102+
91103
print_hello.delay()
92104
```
93105

@@ -96,6 +108,8 @@ Also valid:
96108
```python
97109
from test_methods import print_hello
98110

111+
# After the setup() method has been called
112+
99113
print_hello.apply_async(args=[], kwargs={})
100114
```
101115

dispatcher.yml

+19-14
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,25 @@
11
# Demo config
22
---
3-
pool:
4-
max_workers: 3
5-
producers:
6-
brokers:
7-
# List of channels to listen on
3+
version: 2
4+
service:
5+
pool_kwargs:
6+
max_workers: 4
7+
brokers:
8+
pg_notify:
9+
config:
10+
conninfo: dbname=dispatch_db user=dispatch password=dispatching host=localhost port=55777
11+
sync_connection_factory: dispatcher.brokers.pg_notify.connection_saver
812
channels:
913
- test_channel
1014
- test_channel2
1115
- test_channel3
12-
pg_notify:
13-
# Database connection details
14-
conninfo: dbname=dispatch_db user=dispatch password=dispatching host=localhost
15-
port=55777
16-
scheduled:
17-
'lambda: __import__("time").sleep(1)':
18-
schedule: 3
19-
'lambda: __import__("time").sleep(2)':
20-
schedule: 3
16+
default_publish_channel: test_channel
17+
producers:
18+
ScheduledProducer:
19+
task_schedule:
20+
'lambda: __import__("time").sleep(1)':
21+
schedule: 3
22+
'lambda: __import__("time").sleep(2)':
23+
schedule: 3
24+
publish:
25+
default_broker: pg_notify

dispatcher/__init__.py

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import asyncio
2+
import logging
3+
4+
from dispatcher.factories import from_settings
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
def run_service() -> None:
10+
"""
11+
Runs dispatcher task service (runs tasks due to messages from brokers and other local producers)
12+
Before calling this you need to configure by calling dispatcher.config.setup
13+
"""
14+
loop = asyncio.get_event_loop()
15+
dispatcher = from_settings()
16+
try:
17+
loop.run_until_complete(dispatcher.main())
18+
except KeyboardInterrupt:
19+
logger.info('Dispatcher stopped by KeyboardInterrupt')
20+
finally:
21+
loop.close()

dispatcher/brokers/__init__.py

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import importlib
2+
from types import ModuleType
3+
4+
from .base import BaseBroker
5+
6+
7+
def get_broker_module(broker_name) -> ModuleType:
8+
"Static method to alias import_module so we use a consistent import path"
9+
return importlib.import_module(f'dispatcher.brokers.{broker_name}')
10+
11+
12+
def get_broker(broker_name: str, broker_config: dict, **overrides) -> BaseBroker:
13+
"""
14+
Given the name of the broker in the settings, and the data under that entry in settings,
15+
return the broker object.
16+
"""
17+
broker_module = get_broker_module(broker_name)
18+
kwargs = broker_config.copy()
19+
kwargs.update(overrides)
20+
return broker_module.Broker(**kwargs)

dispatcher/brokers/base.py

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from typing import AsyncGenerator, Optional, Protocol
2+
3+
4+
class BaseBroker(Protocol):
5+
async def aprocess_notify(self, connected_callback=None) -> AsyncGenerator[tuple[str, str], None]:
6+
yield ('', '') # yield affects CPython type https://github.com/python/mypy/pull/18422
7+
8+
async def apublish_message(self, channel: Optional[str] = None, message: str = '') -> None: ...
9+
10+
async def aclose(self) -> None: ...
11+
12+
def publish_message(self, channel=None, message=None): ...
13+
14+
def close(self): ...

0 commit comments

Comments
 (0)