Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A global config and setup method #64

Merged
merged 19 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 35 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
<!-- License Badge -->
[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://github.com/ansible/dispatcher/blob/main/LICENSE)

Working space for dispatcher prototyping

This is firstly intended to be a code split of:
This is intended to be a working space for prototyping a code split of:

<https://github.com/ansible/awx/tree/devel/awx/main/dispatch>

As a part of doing the split, we also want to resolve a number of
long-standing design and sustainability issues, thus, asyncio.

The philosophy of the dispatcher is to have a limited scope
as a "local" runner of background tasks, but to be composable
so that it can be "wrapped" easily to enable clustering and
distributed task management by apps using it.

Licensed under [Apache Software License 2.0](LICENSE)

### Usage
Expand Down Expand Up @@ -37,20 +40,11 @@ def print_hello():
print('hello world!!')
```

#### Dispatcher service

The dispatcher service needs to be running before you submit tasks.
This does not make any attempts at message durability or confirmation.
If you submit a task in an outage of the service, it will be dropped.

There are 2 ways to run the dispatcher service:

- Importing and running (code snippet below)
- A CLI entrypoint `dispatcher-standalone` for demo purposes
Additionally, you need to configure dispatcher somewhere in your import path.
This tells dispatcher how to submit tasks to be ran.

```python
from dispatcher.main import DispatcherMain
import asyncio
from dispatcher.config import setup

config = {
"producers": {
Expand All @@ -63,13 +57,29 @@ config = {
},
"pool": {"max_workers": 4},
}
loop = asyncio.get_event_loop()
dispatcher = DispatcherMain(config)
setup(config)
```

For more on how to set up and the allowed options in the config,
see the section [config](docs/config.md) docs.

#### Dispatcher service

The dispatcher service needs to be running before you submit tasks.
This does not make any attempts at message durability or confirmation.
If you submit a task in an outage of the service, it will be dropped.

There are 2 ways to run the dispatcher service:

try:
loop.run_until_complete(dispatcher.main())
finally:
loop.close()
- Importing and running (code snippet below)
- A CLI entrypoint `dispatcher-standalone` for demo purposes

```python
from dispatcher import run_service

# After the setup() method has been called

run_service()
```

Configuration tells how to connect to postgres, and what channel(s) to listen to.
Expand All @@ -88,6 +98,8 @@ The following code will submit `print_hello` to run in the background dispatcher
```python
from test_methods import print_hello

# After the setup() method has been called

print_hello.delay()
```

Expand All @@ -96,6 +108,8 @@ Also valid:
```python
from test_methods import print_hello

# After the setup() method has been called

print_hello.apply_async(args=[], kwargs={})
```

Expand Down
33 changes: 19 additions & 14 deletions dispatcher.yml
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
# Demo config
---
pool:
max_workers: 3
producers:
brokers:
# List of channels to listen on
version: 2
service:
pool_kwargs:
max_workers: 4
brokers:
pg_notify:
config:
conninfo: dbname=dispatch_db user=dispatch password=dispatching host=localhost port=55777
sync_connection_factory: dispatcher.brokers.pg_notify.connection_saver
channels:
- test_channel
- test_channel2
- test_channel3
pg_notify:
# Database connection details
conninfo: dbname=dispatch_db user=dispatch password=dispatching host=localhost
port=55777
scheduled:
'lambda: __import__("time").sleep(1)':
schedule: 3
'lambda: __import__("time").sleep(2)':
schedule: 3
default_publish_channel: test_channel
producers:
ScheduledProducer:
task_schedule:
'lambda: __import__("time").sleep(1)':
schedule: 3
'lambda: __import__("time").sleep(2)':
schedule: 3
publish:
default_broker: pg_notify
21 changes: 21 additions & 0 deletions dispatcher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import asyncio
import logging

from dispatcher.factories import from_settings

logger = logging.getLogger(__name__)


def run_service() -> None:
"""
Runs dispatcher task service (runs tasks due to messages from brokers and other local producers)
Before calling this you need to configure by calling dispatcher.config.setup
"""
loop = asyncio.get_event_loop()
dispatcher = from_settings()
try:
loop.run_until_complete(dispatcher.main())
except KeyboardInterrupt:
logger.info('Dispatcher stopped by KeyboardInterrupt')
finally:
loop.close()
20 changes: 20 additions & 0 deletions dispatcher/brokers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import importlib
from types import ModuleType

from .base import BaseBroker


def get_broker_module(broker_name) -> ModuleType:
"Static method to alias import_module so we use a consistent import path"
return importlib.import_module(f'dispatcher.brokers.{broker_name}')


def get_broker(broker_name: str, broker_config: dict, **overrides) -> BaseBroker:
"""
Given the name of the broker in the settings, and the data under that entry in settings,
return the broker object.
"""
broker_module = get_broker_module(broker_name)
kwargs = broker_config.copy()
kwargs.update(overrides)
return broker_module.Broker(**kwargs)
14 changes: 14 additions & 0 deletions dispatcher/brokers/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from typing import AsyncGenerator, Optional, Protocol


class BaseBroker(Protocol):
async def aprocess_notify(self, connected_callback=None) -> AsyncGenerator[tuple[str, str], None]:
yield ('', '') # yield affects CPython type https://github.com/python/mypy/pull/18422

async def apublish_message(self, channel: Optional[str] = None, message: str = '') -> None: ...

async def aclose(self) -> None: ...

def publish_message(self, channel=None, message=None): ...

def close(self): ...
Loading