Skip to content

Commit 01d6e8f

Browse files
committedOct 21, 2024
Initial commit, demo functionality
0 parents  commit 01d6e8f

25 files changed

+1436
-0
lines changed
 

‎.flake8

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[flake8]
2+
max-line-length = 160
3+
extend-ignore = E203
4+
exclude = .git,tools
5+
max-complexity = 10

‎.gitignore

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Coppied from django-ansible-base
2+
3+
# User level pre-commit hooks
4+
pre-commit-user
5+
6+
# Make target touch files
7+
.docker-compose-built
8+
9+
# Python & setuptools
10+
__pycache__
11+
/build
12+
/deb-build
13+
/reprepro
14+
/rpm-build
15+
/tar-build
16+
/setup-bundle-build
17+
/dist
18+
/*.egg-info
19+
*.py[c,o]
20+
/.eggs
21+
.coverage*
22+
coverage.xml
23+
coverage.json
24+
django-ansible-base-test-results.xml
25+
htmlcov
26+
*.tox
27+
venv/
28+
.venv/
29+
30+
# Mac OS X
31+
*.DS_Store
32+
33+
# VSCode
34+
.vscode/
35+
36+
# Editors
37+
*.sw[poj]
38+
*~
39+
40+
# SQLite
41+
*.sqlite3
42+
*.sqlite3_gw*
43+
*.sqlite3-journal
44+
45+
# Container customizations
46+
container-startup.yml
47+
tools/generated/*
48+
49+
# Gets created when testing sonar-scanner locally
50+
.scannerwork

‎Makefile

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
DOCKER_COMPOSE ?= docker compose
2+
3+
4+
# Mostly copied from DAB
5+
postgres:
6+
docker start dispatch_postgres || $(DOCKER_COMPOSE) up -d msg_postgres --quiet-pull
7+
8+
## Stops the postgres container started with 'make postgres'
9+
stop-postgres:
10+
echo "Killing dispatch_postgres container"
11+
$(DOCKER_COMPOSE) rm -fsv msg_postgres
12+
13+
clean:
14+
find . -type f -regex ".*\.py[co]$$" -delete
15+
find . -type d -name "__pycache__" -delete
16+
rm -rf dispatcher.egg-info/

‎README.md

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# dispatcher
2+
Working space for dispatcher prototyping
3+
4+
This is firstly intended to be a code split of:
5+
6+
https://github.com/ansible/awx/tree/devel/awx/main/dispatch
7+
8+
As a part of doing the split, we also want to resolve a number of
9+
long-standing design and sustainability issues, thus, asyncio.
10+
11+
### Manual Demo
12+
13+
You need to have 2 terminal tabs open to run this.
14+
15+
```
16+
# tab 1
17+
make postgres
18+
dispatcher-standalone
19+
# tab 2
20+
python tools/write_messages.py
21+
```
22+
23+
This will run the dispatcher with schedules, and process a burst of messages
24+
that give instructions to run tasks.
25+
26+
### Running Tests
27+
28+
A structure has been set up for integration tests.
29+
The word "integration" only means that postgres must be running.
30+
31+
```
32+
pip install -r requirements_dev.txt
33+
make postgres
34+
py.test tests/
35+
```
36+
37+
This accomplishes the most basic of starting and shutting down.
38+
With no tasks submitted, it should record running 0 tasks,
39+
and with a task submitted, it records running 1 task.

‎dispatcher/__init__.py

Whitespace-only changes.

‎dispatcher/brokers/pg_notify.py

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import logging
2+
3+
import psycopg
4+
5+
logger = logging.getLogger(__name__)
6+
7+
8+
"""This module exists under the theory that dispatcher messaging should be swappable
9+
10+
to different message busses eventually.
11+
That means that the main code should never import psycopg.
12+
Thus, all psycopg-lib-specific actions must happen here.
13+
"""
14+
15+
16+
# TODO: get database data from settings
17+
# # As Django settings, may not use
18+
# DATABASES = {
19+
# "default": {
20+
# "ENGINE": "django.db.backends.postgresql",
21+
# "HOST": os.getenv("DB_HOST", "127.0.0.1"),
22+
# "PORT": os.getenv("DB_PORT", 55777),
23+
# "USER": os.getenv("DB_USER", "dispatch"),
24+
# "PASSWORD": os.getenv("DB_PASSWORD", "dispatching"),
25+
# "NAME": os.getenv("DB_NAME", "dispatch_db"),
26+
# }
27+
# }
28+
29+
30+
async def aget_connection(config):
31+
return await psycopg.AsyncConnection.connect(**config, autocommit=True)
32+
33+
34+
def get_connection(config):
35+
return psycopg.Connection.connect(**config, autocommit=True)
36+
37+
38+
async def aprocess_notify(connection, channels):
39+
async with connection.cursor() as cur:
40+
for channel in channels:
41+
await cur.execute(f"LISTEN {channel};")
42+
logger.info(f"Set up pg_notify listening on channel '{channel}'")
43+
44+
while True:
45+
logger.debug('Starting listening for pg_notify notifications')
46+
async for notify in connection.notifies():
47+
logger.debug(f"Received notification: {notify.channel} - {notify.payload}")
48+
yield notify.channel, notify.payload
49+
50+
51+
def get_django_connection():
52+
try:
53+
from django.conf import ImproperlyConfigured
54+
from django.db import connection as pg_connection
55+
except ImportError:
56+
return None
57+
else:
58+
try:
59+
if pg_connection.connection is None:
60+
pg_connection.connect()
61+
if pg_connection.connection is None:
62+
raise RuntimeError('Unexpectedly could not connect to postgres for pg_notify actions')
63+
return pg_connection.connection
64+
except ImproperlyConfigured:
65+
return None
66+
67+
68+
def publish_message(queue, message, config=None, new_connection=False):
69+
conn = None
70+
if not new_connection:
71+
conn = get_django_connection()
72+
73+
if not conn:
74+
if config is None:
75+
raise RuntimeError('Could not use Django connection, and no postgres config supplied')
76+
conn = get_connection(config)
77+
78+
with conn.cursor() as cur:
79+
cur.execute('SELECT pg_notify(%s, %s);', (queue, message))
80+
81+
logger.debug(f'Sent pg_notify message to {queue}')
82+
83+
if new_connection:
84+
conn.close()

‎dispatcher/cli.py

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import argparse
2+
import asyncio
3+
import logging
4+
import sys
5+
from datetime import timedelta
6+
7+
from dispatcher.main import DispatcherMain
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
# TODO: obviously stop hard-coding this
13+
CELERYBEAT_SCHEDULE = {
14+
'lambda: __import__("time").sleep(1)': {'schedule': timedelta(seconds=3)},
15+
'lambda: __import__("time").sleep(2)': {'schedule': timedelta(seconds=3)},
16+
}
17+
18+
19+
# List of channels to listen on
20+
CHANNELS = ['test_channel', 'test_channel2', 'test_channel2']
21+
22+
# Database connection details
23+
CONNECTION_STRING = "dbname=dispatch_db user=dispatch password=dispatching host=localhost port=55777"
24+
25+
26+
def standalone():
27+
parser = argparse.ArgumentParser(description="CLI entrypoint for dispatcher, mainly intended for testing.")
28+
parser.add_argument(
29+
'--log-level',
30+
type=str,
31+
default='DEBUG',
32+
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
33+
help='Python log level to standard out. If you want to log to file you are in the wrong place.',
34+
)
35+
36+
args = parser.parse_args()
37+
logging.basicConfig(level=getattr(logging, args.log_level), stream=sys.stdout)
38+
39+
logging.debug(f"Configured standard out logging at {args.log_level} level")
40+
41+
config = {
42+
"producers": {"brokers": {"pg_notify": {"conninfo": CONNECTION_STRING}, "channels": CHANNELS}, "scheduled": CELERYBEAT_SCHEDULE},
43+
"pool": {"max_workers": 3},
44+
}
45+
46+
loop = asyncio.get_event_loop()
47+
dispatcher = DispatcherMain(config)
48+
try:
49+
loop.run_until_complete(dispatcher.main())
50+
# asyncio.run(main())
51+
except KeyboardInterrupt:
52+
logger.info('CLI entry point leaving')
53+
finally:
54+
loop.close()

‎dispatcher/main.py

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import asyncio
2+
import logging
3+
import signal
4+
5+
from dispatcher.pool import WorkerPool
6+
from dispatcher.producers.brokered import BrokeredProducer
7+
from dispatcher.producers.scheduled import ScheduledProducer
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
class DispatcherMain:
13+
def __init__(self, config):
14+
self.exit_event = asyncio.Event()
15+
num_workers = 3
16+
self.pool = WorkerPool(num_workers)
17+
18+
# Initialize all the producers, this should not start anything, just establishes objects
19+
self.producers = []
20+
if 'producers' in config:
21+
producer_config = config['producers']
22+
if 'brokers' in producer_config:
23+
for broker_name, broker_config in producer_config['brokers'].items():
24+
# TODO: import from the broker module here, some importlib stuff
25+
# TODO: make channels specific to broker, probably
26+
if broker_name != 'pg_notify':
27+
continue
28+
self.producers.append(BrokeredProducer(broker=broker_name, config=broker_config, channels=producer_config['brokers']['channels']))
29+
if 'scheduled' in producer_config:
30+
self.producers.append(ScheduledProducer(producer_config['scheduled']))
31+
32+
async def connect_signals(self):
33+
loop = asyncio.get_event_loop()
34+
for sig in (signal.SIGINT, signal.SIGTERM):
35+
loop.add_signal_handler(sig, lambda: asyncio.create_task(self.shutdown(sig)))
36+
37+
async def shutdown(self, sig=None):
38+
if sig:
39+
logging.info(f"Received exit signal {sig.name}...")
40+
41+
logging.debug(f"Shutting down, starting with producers.")
42+
for producer in self.producers:
43+
await producer.shutdown()
44+
45+
logger.debug('Gracefully shutting down worker pool')
46+
await self.pool.shutdown()
47+
48+
logger.debug('Setting event to exit main loop')
49+
self.exit_event.set()
50+
51+
async def start_working(self):
52+
logger.debug('Filling the worker pool')
53+
await self.pool.start_working()
54+
55+
logger.debug('Starting task production')
56+
for producer in self.producers:
57+
await producer.start_producing(self.pool)
58+
59+
async def main(self):
60+
logger.info('Connecting dispatcher signal handling')
61+
await self.connect_signals()
62+
63+
await self.start_working()
64+
65+
logger.info('Dispatcher running forever, or until shutdown command')
66+
await self.exit_event.wait()
67+
68+
logger.debug('Dispatcher loop fully completed')

‎dispatcher/pool.py

+160
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import asyncio
2+
import json
3+
import logging
4+
import multiprocessing
5+
import os
6+
7+
from dispatcher.worker.task import work_loop
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
class PoolWorker:
13+
def __init__(self, worker_id, finished_queue):
14+
self.worker_id = worker_id
15+
# TODO: rename message_queue to call_queue, because this is what cpython ProcessPoolExecutor calls them
16+
self.message_queue = multiprocessing.Queue()
17+
self.process = multiprocessing.Process(target=work_loop, args=(self.worker_id, self.message_queue, finished_queue))
18+
self.current_task = None
19+
self.finished_count = 0
20+
self.status = 'initialized'
21+
22+
def start(self):
23+
self.process.start()
24+
self.status = 'starting'
25+
26+
async def stop(self):
27+
self.status = 'stopping'
28+
self.message_queue.put("stop")
29+
self.process.join()
30+
31+
def mark_finished_task(self):
32+
self.current_task = None
33+
self.finished_count += 1
34+
35+
36+
class WorkerPool:
37+
def __init__(self, num_workers):
38+
self.num_workers = num_workers
39+
self.workers = {}
40+
self.next_worker_id = 0
41+
self.finished_queue = multiprocessing.Queue()
42+
self.queued_messages = [] # TODO: use deque, invent new kinds of message anxiety and panic
43+
self.read_results_task = None
44+
self.shutting_down = False
45+
self.finished_count = 0
46+
self.shutdown_timeout = 3
47+
# TODO: worker management lock
48+
49+
async def start_working(self):
50+
self._spawn_workers()
51+
self.read_results_task = asyncio.create_task(self.read_results_forever())
52+
53+
def _spawn_workers(self):
54+
for i in range(self.num_workers):
55+
worker = PoolWorker(worker_id=self.next_worker_id, finished_queue=self.finished_queue)
56+
worker.start()
57+
self.workers[self.next_worker_id] = worker
58+
self.next_worker_id += 1
59+
60+
async def stop_workers(self):
61+
for worker in self.workers.values():
62+
await worker.stop()
63+
64+
async def force_shutdown(self):
65+
for worker in self.workers.values():
66+
if worker.process.is_alive():
67+
logger.warning(f'Force killing worker {worker.worker_id} pid={worker.process.pid}')
68+
os.kill(worker.process.pid)
69+
70+
self.read_results_task.cancel()
71+
logger.info('Finished watcher had to be canceled, awaiting it a second time')
72+
try:
73+
await self.read_results_task
74+
except asyncio.CancelledError:
75+
pass
76+
77+
async def shutdown(self):
78+
self.shutting_down = True
79+
await self.stop_workers()
80+
if self.read_results_task:
81+
logger.info('Waiting for the finished watcher to return')
82+
try:
83+
await asyncio.wait_for(self.read_results_task, timeout=self.shutdown_timeout)
84+
except asyncio.TimeoutError:
85+
logger.warning(f'The finished task failed to cancel in {self.shutdown_timeout} seconds, will force.')
86+
await self.force_shutdown()
87+
except asyncio.CancelledError:
88+
logger.info('The finished task was canceled, but we are shutting down so that is alright')
89+
logger.info('The finished watcher has returned. Pool is shut down')
90+
91+
async def dispatch_task(self, message):
92+
# TODO: handle this more elegantly, maybe through the DispatcherMain, or tell clients not to do this
93+
if isinstance(message, str):
94+
try:
95+
message = json.loads(message)
96+
except Exception:
97+
message = {'task': message}
98+
99+
for candidate_worker in self.workers.values():
100+
if not candidate_worker.current_task:
101+
worker = candidate_worker
102+
break
103+
else:
104+
# TODO: under certain conditions scale up workers
105+
logger.warning(f'Ran out of available workers, queueing up next task, current queued {len(self.queued_messages)}')
106+
self.queued_messages.append(message)
107+
return
108+
109+
logging.debug(f"Dispatching task to worker {worker.process.pid}: {message}")
110+
111+
# Put the message in the selected worker's queue, NOTE: this marks the worker as busy
112+
worker.current_task = message
113+
114+
# Go ahead and do the put synchronously, because it is just putting it on the queue
115+
worker.message_queue.put(message)
116+
117+
async def process_finished(self, worker, message):
118+
result = message["result"]
119+
logger.debug(f"Task completed by worker {worker.worker_id}: {result}")
120+
121+
# Mark the worker as no longer busy
122+
worker.mark_finished_task()
123+
self.finished_count += 1
124+
125+
async def read_results_forever(self):
126+
"""Perpetual task that continuously waits for task completions."""
127+
loop = asyncio.get_event_loop()
128+
while True:
129+
# Wait for a result from the finished queue (blocking)
130+
# worker_id, finished_message
131+
message = await loop.run_in_executor(None, self.finished_queue.get)
132+
worker_id = message["worker"]
133+
event = message["event"]
134+
worker = self.workers[worker_id]
135+
136+
if event == 'ready':
137+
worker.status = 'ready'
138+
139+
elif event == 'shutdown':
140+
# TODO: remove worker from worker list... but we do not have autoscale pool yet so need that
141+
worker.status = 'exited'
142+
if self.shutting_down:
143+
if all(worker.status == 'exited' for worker in self.workers.values()):
144+
logger.debug(f"Worker {worker_id} exited and that is all, exiting finished monitoring.")
145+
break
146+
else:
147+
logger.debug(f"Worker {worker_id} exited and that is a good thing because we are trying to shut down.")
148+
elif not self.workers:
149+
logger.info('All workers exited, exiting results thread out of abundance of caution')
150+
break
151+
else:
152+
logger.debug(f"Worker {worker_id} finished exiting. The rest of this is not yet coded.")
153+
continue
154+
155+
elif event == 'done':
156+
await self.process_finished(worker, message)
157+
158+
if self.queued_messages and (not self.shutting_down):
159+
requeue_message = self.queued_messages.pop()
160+
await self.dispatch_task(requeue_message)

‎dispatcher/producers/brokered.py

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import asyncio
2+
import logging
3+
4+
from dispatcher.brokers.pg_notify import aget_connection, aprocess_notify
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
class BrokeredProducer:
10+
def __init__(self, broker='pg_notify', config=None, channels=()):
11+
self.production_task = None
12+
self.broker = broker
13+
self.config = config
14+
self.channels = channels
15+
16+
async def start_producing(self, pool):
17+
self.production_task = asyncio.create_task(self.produce_forever(pool))
18+
19+
def all_tasks(self):
20+
if self.production_task:
21+
return [self.production_task]
22+
return []
23+
24+
async def connect(self):
25+
self.connection = await aget_connection(self.config)
26+
27+
async def produce_forever(self, pool):
28+
await self.connect()
29+
30+
async with self.connection:
31+
32+
async for channel, payload in aprocess_notify(self.connection, self.channels):
33+
logger.info(f"Received message from channel '{channel}': {payload}, sending to worker")
34+
await pool.dispatch_task(payload)
35+
36+
async def shutdown(self):
37+
if self.production_task:
38+
self.production_task.cancel()
39+
try:
40+
await self.production_task
41+
except asyncio.CancelledError:
42+
logger.info(f'Successfully canceled production from {self.broker}')
43+
self.production_task = None
44+
if self.connection:
45+
await self.connection.close()
46+
self.connection = None

‎dispatcher/producers/scheduled.py

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import asyncio
2+
import logging
3+
4+
logger = logging.getLogger(__name__)
5+
6+
7+
class ScheduledProducer:
8+
def __init__(self, task_schedule):
9+
self.task_schedule = task_schedule
10+
self.scheduled_tasks = []
11+
12+
async def start_producing(self, pool):
13+
for task_name, options in self.task_schedule.items():
14+
per_seconds = options['schedule'].total_seconds()
15+
self.scheduled_tasks.append(asyncio.create_task(self.run_schedule_forever(task_name, per_seconds, pool)))
16+
17+
def all_tasks(self):
18+
return self.scheduled_tasks
19+
20+
async def run_schedule_forever(self, task_name, per_seconds, pool):
21+
logger.info(f"Starting task runner for {task_name} with interval {per_seconds} seconds")
22+
while True:
23+
await asyncio.sleep(per_seconds)
24+
logger.info(f"Sending scheduled task to worker: {task_name}")
25+
# TODO: this will be JSON data with more supporting stuff
26+
await pool.dispatch_task({"task": task_name})
27+
28+
async def shutdown(self):
29+
logger.info('Stopping scheduled tasks')
30+
for task in self.scheduled_tasks:
31+
task.cancel()
32+
await asyncio.gather(*self.scheduled_tasks, return_exceptions=True)
33+
self.scheduled_tasks = []

‎dispatcher/publish.py

+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
import inspect
2+
import json
3+
import logging
4+
import time
5+
from uuid import uuid4
6+
7+
from django_guid import get_guid
8+
9+
from . import pg_bus_conn
10+
11+
logger = logging.getLogger('awx.main.dispatch')
12+
13+
14+
def serialize_task(f):
15+
return '.'.join([f.__module__, f.__name__])
16+
17+
18+
class task:
19+
"""
20+
Used to decorate a function or class so that it can be run asynchronously
21+
via the task dispatcher. Tasks can be simple functions:
22+
23+
@task()
24+
def add(a, b):
25+
return a + b
26+
27+
...or classes that define a `run` method:
28+
29+
@task()
30+
class Adder:
31+
def run(self, a, b):
32+
return a + b
33+
34+
# Tasks can be run synchronously...
35+
assert add(1, 1) == 2
36+
assert Adder().run(1, 1) == 2
37+
38+
# ...or published to a queue:
39+
add.apply_async([1, 1])
40+
Adder.apply_async([1, 1])
41+
42+
# Tasks can also define a specific target queue or use the special fan-out queue tower_broadcast:
43+
44+
@task(queue='slow-tasks')
45+
def snooze():
46+
time.sleep(10)
47+
48+
@task(queue='tower_broadcast')
49+
def announce():
50+
print("Run this everywhere!")
51+
52+
# The special parameter bind_kwargs tells the main dispatcher process to add certain kwargs
53+
54+
@task(bind_kwargs=['dispatch_time'])
55+
def print_time(dispatch_time=None):
56+
print(f"Time I was dispatched: {dispatch_time}")
57+
"""
58+
59+
def __init__(self, queue=None, bind_kwargs=None):
60+
self.queue = queue
61+
self.bind_kwargs = bind_kwargs
62+
63+
def __call__(self, fn=None):
64+
queue = self.queue
65+
bind_kwargs = self.bind_kwargs
66+
67+
class PublisherMixin(object):
68+
queue = None
69+
70+
@classmethod
71+
def delay(cls, *args, **kwargs):
72+
return cls.apply_async(args, kwargs)
73+
74+
@classmethod
75+
def get_async_body(cls, args=None, kwargs=None, uuid=None, **kw):
76+
"""
77+
Get the python dict to become JSON data in the pg_notify message
78+
This same message gets passed over the dispatcher IPC queue to workers
79+
If a task is submitted to a multiprocessing pool, skipping pg_notify, this might be used directly
80+
"""
81+
task_id = uuid or str(uuid4())
82+
args = args or []
83+
kwargs = kwargs or {}
84+
obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name, 'time_pub': time.time()}
85+
guid = get_guid()
86+
if guid:
87+
obj['guid'] = guid
88+
if bind_kwargs:
89+
obj['bind_kwargs'] = bind_kwargs
90+
obj.update(**kw)
91+
return obj
92+
93+
@classmethod
94+
def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw):
95+
queue = queue or getattr(cls.queue, 'im_func', cls.queue)
96+
if not queue:
97+
msg = f'{cls.name}: Queue value required and may not be None'
98+
logger.error(msg)
99+
raise ValueError(msg)
100+
obj = cls.get_async_body(args=args, kwargs=kwargs, uuid=uuid, **kw)
101+
if callable(queue):
102+
queue = queue()
103+
# TODO: before sending, consult an app-specific callback if configured
104+
with pg_bus_conn() as conn:
105+
conn.notify(queue, json.dumps(obj))
106+
return (obj, queue)
107+
108+
# If the object we're wrapping *is* a class (e.g., RunJob), return
109+
# a *new* class that inherits from the wrapped class *and* BaseTask
110+
# In this way, the new class returned by our decorator is the class
111+
# being decorated *plus* PublisherMixin so cls.apply_async() and
112+
# cls.delay() work
113+
bases = []
114+
ns = {'name': serialize_task(fn), 'queue': queue}
115+
if inspect.isclass(fn):
116+
bases = list(fn.__bases__)
117+
ns.update(fn.__dict__)
118+
cls = type(fn.__name__, tuple(bases + [PublisherMixin]), ns)
119+
if inspect.isclass(fn):
120+
return cls
121+
122+
# if the object being decorated is *not* a class (it's a Python
123+
# function), make fn.apply_async and fn.delay proxy through to the
124+
# PublisherMixin we dynamically created above
125+
setattr(fn, 'name', cls.name)
126+
setattr(fn, 'apply_async', cls.apply_async)
127+
setattr(fn, 'delay', cls.delay)
128+
setattr(fn, 'get_async_body', cls.get_async_body)
129+
return fn

‎dispatcher/utils.py

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import importlib
2+
3+
4+
def resolve_callable(task):
5+
"""
6+
Transform a dotted notation task into an imported, callable function, e.g.,
7+
8+
awx.main.tasks.system.delete_inventory
9+
awx.main.tasks.jobs.RunProjectUpdate
10+
11+
In AWX this also did validation that the method was marked as a task.
12+
That is out of scope of this method now.
13+
This is mainly used by the worker.
14+
"""
15+
if task.startswith('lambda:'):
16+
return eval(task)
17+
18+
module, target = task.rsplit('.', 1)
19+
module = importlib.import_module(module)
20+
_call = None
21+
if hasattr(module, target):
22+
_call = getattr(module, target, None)
23+
24+
return _call
25+
26+
27+
def serialize_task(f) -> str:
28+
"""The reverse of resolve_callable, transform callable into dotted notation"""
29+
return '.'.join([f.__module__, f.__name__])

‎dispatcher/worker/task.py

+208
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
import inspect
2+
import json
3+
import logging
4+
import os
5+
import signal
6+
import sys
7+
import time
8+
import traceback
9+
from queue import Empty as QueueEmpty
10+
11+
from dispatcher.utils import resolve_callable
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
"""This module contains code ran by the worker subprocess"""
17+
18+
19+
class WorkerSignalHandler:
20+
def __init__(self):
21+
self.kill_now = False
22+
signal.signal(signal.SIGTERM, signal.SIG_DFL)
23+
signal.signal(signal.SIGINT, self.exit_gracefully)
24+
25+
def exit_gracefully(self, *args, **kwargs):
26+
logger.info('Received worker process exit signal')
27+
self.kill_now = True
28+
29+
30+
class TaskWorker:
31+
"""
32+
A worker implementation that deserializes task messages and runs native
33+
Python code.
34+
35+
This mainly takes messages from the main process, imports, and calls them.
36+
37+
Original code existed at:
38+
https://github.com/ansible/awx/blob/devel/awx/main/dispatch/worker/task.py
39+
https://github.com/ansible/awx/blob/devel/awx/main/dispatch/worker/base.py
40+
41+
Major change from AWX is adding __init__ which now runs post-fork.
42+
Previously this initialized pre-fork, making init logic unusable.
43+
"""
44+
45+
def __init__(self, worker_id):
46+
self.worker_id = worker_id
47+
self.ppid = os.getppid()
48+
self.pid = os.getpid()
49+
self.signal_handler = WorkerSignalHandler()
50+
51+
def should_exit(self) -> str:
52+
"""Called before continuing the loop, something suspicious, return True, should exit"""
53+
if os.getppid() != self.ppid:
54+
logger.error('My parent PID changed, this process has been orphaned, like segfault or sigkill, exiting')
55+
return True
56+
elif self.signal_handler.kill_now:
57+
logger.error('Exiting main loop of worker process due to interupt signal')
58+
return True
59+
return False
60+
61+
def get_uuid(self, message):
62+
return message.get('uuid', '<unknown>')
63+
64+
def run_callable(self, message):
65+
"""
66+
Given some AMQP message, import the correct Python code and run it.
67+
"""
68+
task = message['task']
69+
args = message.get('args', [])
70+
kwargs = message.get('kwargs', {})
71+
_call = resolve_callable(task)
72+
if inspect.isclass(_call):
73+
# the callable is a class, e.g., RunJob; instantiate and
74+
# return its `run()` method
75+
_call = _call().run
76+
77+
# don't print kwargs, they often contain launch-time secrets
78+
logger.debug(f'task {self.get_uuid(message)} starting {task}(*{args}) on worker {self.worker_id}')
79+
80+
return _call(*args, **kwargs)
81+
82+
def perform_work(self, message):
83+
"""
84+
Import and run code for a task e.g.,
85+
86+
body = {
87+
'args': [8],
88+
'callbacks': [{
89+
'args': [],
90+
'kwargs': {}
91+
'task': u'awx.main.tasks.system.handle_work_success'
92+
}],
93+
'errbacks': [{
94+
'args': [],
95+
'kwargs': {},
96+
'task': 'awx.main.tasks.system.handle_work_error'
97+
}],
98+
'kwargs': {},
99+
'task': u'awx.main.tasks.jobs.RunProjectUpdate'
100+
}
101+
"""
102+
# TODO: callback before starting task, previously ran
103+
# settings.__clean_on_fork__()
104+
result = None
105+
try:
106+
result = self.run_callable(message)
107+
except Exception as exc:
108+
result = exc
109+
110+
try:
111+
if getattr(exc, 'is_awx_task_error', False):
112+
# Error caused by user / tracked in job output
113+
logger.warning("{}".format(exc))
114+
else:
115+
task = message['task']
116+
args = message.get('args', [])
117+
kwargs = message.get('kwargs', {})
118+
logger.exception('Worker failed to run task {}(*{}, **{}'.format(task, args, kwargs))
119+
except Exception:
120+
# It's fairly critical that this code _not_ raise exceptions on logging
121+
# If you configure external logging in a way that _it_ fails, there's
122+
# not a lot we can do here; sys.stderr.write is a final hail mary
123+
_, _, tb = sys.exc_info()
124+
traceback.print_tb(tb)
125+
126+
for callback in message.get('errbacks', []) or []:
127+
callback['uuid'] = self.get_uuid(message)
128+
self.perform_work(callback)
129+
finally:
130+
# TODO: callback after running a task, previously ran
131+
# kube_config._cleanup_temp_files()
132+
pass
133+
134+
for callback in message.get('callbacks', []) or []:
135+
callback['uuid'] = self.get_uuid(message)
136+
self.perform_work(callback)
137+
return result
138+
139+
# NOTE: on_start and on_stop were intentionally removed
140+
# these were used for the consumer classes, but not the worker classes
141+
142+
# TODO: new WorkerTaskCall class to track timings and such
143+
def get_finished_message(self, result, message, time_started):
144+
"""I finished the task in message, giving result. This is what I send back to traffic control."""
145+
return {
146+
"worker": self.worker_id,
147+
"event": "done",
148+
"result": result,
149+
"uuid": self.get_uuid(message),
150+
"time_started": time_started,
151+
"time_finish": time.time(),
152+
}
153+
154+
def get_ready_message(self):
155+
"""Message for traffic control, saying am entering the main work loop and am HOT TO GO"""
156+
return {"worker": self.worker_id, "event": "ready"}
157+
158+
def get_shutdown_message(self):
159+
"""Message for traffic control, do not deliver any more mail to this address"""
160+
return {"worker": self.worker_id, "event": "shutdown"}
161+
162+
163+
def work_loop(worker_id, queue, finished_queue):
164+
"""
165+
Worker function that processes messages from the queue and sends confirmation
166+
to the finished_queue once done.
167+
"""
168+
worker = TaskWorker(worker_id)
169+
# TODO: add an app callback here to set connection name and things like that
170+
171+
finished_queue.put(worker.get_ready_message())
172+
173+
while True:
174+
if worker.should_exit():
175+
break
176+
177+
try:
178+
message = queue.get()
179+
except QueueEmpty:
180+
logger.info(f'Worker {worker_id} Encountered strange QueueEmpty condition')
181+
continue # a race condition that mostly can be ignored
182+
except Exception as exc:
183+
logger.exception(f"Exception on worker {worker_id}, type {type(exc)}, error: {str(exc)}, exiting")
184+
break
185+
186+
if not isinstance(message, dict):
187+
188+
if isinstance(message, str):
189+
if message.lower() == "stop":
190+
logger.warning(f"Worker {worker_id} stopping.")
191+
break
192+
193+
try:
194+
message = json.loads(message)
195+
except Exception as e:
196+
logger.error(f'Worker {worker.worker_id} could not process message {message}, error: {str(e)}')
197+
break
198+
199+
logger.info(f'message to perform_work on {message}')
200+
logger.info(f'the type {type(message)}')
201+
time_started = time.time()
202+
result = worker.perform_work(message)
203+
204+
# Indicate that the task is finished by putting a message in the finished_queue
205+
finished_queue.put(worker.get_finished_message(result, message, time_started))
206+
207+
finished_queue.put(worker.get_shutdown_message())
208+
logger.debug('Informed the pool manager that we have exited')

‎docker-compose.yml

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
---
2+
services:
3+
msg_postgres:
4+
image: "postgres:15"
5+
container_name: dispatch_postgres
6+
environment:
7+
POSTGRES_DB: dispatch_db
8+
POSTGRES_USER: dispatch
9+
POSTGRES_PASSWORD: dispatching
10+
healthcheck:
11+
test: ["CMD", "pg_isready", "-U", "dispatch", "-d", "dispatch_db"]
12+
interval: 10s
13+
timeout: 5s
14+
retries: 5
15+
ports:
16+
- "55777:5432"

‎docs/design_notes.md

+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
## Reference Designs
2+
3+
### AWX dispatcher
4+
5+
This is directly taken from the AWX dispatcher.
6+
7+
https://github.com/ansible/awx/tree/devel/awx/main/dispatch
8+
9+
This was introduced in:
10+
11+
https://github.com/ansible/awx/pull/2266
12+
13+
> ...much like the callback receiver implementation in 3.3.0 (on which this code is based), this entry point is a kombu.ConsumerMixin.
14+
15+
### Kombu
16+
17+
Kombu is a sub-package of celery.
18+
19+
https://github.com/celery/kombu
20+
21+
In messaging module, this has a `Producer` and `Consumer` classes.
22+
In mixins it has a `ConsumerMixin`, but no methods seem to have made it into AWX dispatch.
23+
24+
This doesn't deal with worker pool management. It does have examples with `Worker` classes.
25+
These follow a similar contract with `process_task` here.
26+
27+
### AMQP
28+
29+
https://www.rabbitmq.com/tutorials/amqp-concepts
30+
31+
This protcol deals with publishers, exchanges, queues, and consumers.
32+
33+
### ProcessPoolExecutor
34+
35+
The python `ProcessPoolExecutor` uses both a single call queue and a single results queue.
36+
37+
https://github.com/python/cpython/blob/f1d33dbddd3496b062e1fbe024fb6d7b023a35f5/Lib/concurrent/futures/process.py#L217
38+
39+
Some things it does is not applicable to the dispatcher here, because it strives to adhere
40+
to an existing contract around python futures that we do not care about.
41+
42+
The local worker thread has many commonalities to the results thread being used here.
43+
It is most interesting to the the three-fold criteria for wakeups in that thread:
44+
45+
```python
46+
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
47+
```
48+
49+
By comparision, the results thread used here only has 1 condition.
50+
In some shutdown or recycle cases, it may be canceled.
51+
52+
An important similarity is that the manager maintains an internal working queue.
53+
This is also done in this library, and diverges from AWX practice.
54+
In AWX dispatcher, a full queue may put messages into individual worker IPCs.
55+
This caused bad results, like delaying tasks due to long-running jobs,
56+
while the pool had many other workers free up in the mean time.
57+
58+
## Alternative Archectures
59+
60+
This are blue-sky ideas, which may not happen anytime soon,
61+
but they are described to help structure the app today so it can expand
62+
into these potential future roles.
63+
64+
### Singleton task queue
65+
66+
A major pivot from the AWX dispatcher is that we do not use 1 result queue per worker,
67+
but a single result queue for all workers, and each meassage includes a worker id.
68+
69+
If you continue this pattern, then we would no longer have a call queue for each worker,
70+
and workers would just grab messages from the queue as they are available.
71+
72+
The problem you encounter is that you will not know what worker started what task.
73+
If you do any "management" this is a problem. For instance, if you want a task
74+
to have a timeout, you need to know which worker to kill if it goes over its limit.
75+
76+
There is a way to still consolidate the call queue while no losing these other features.
77+
When a worker receives a task, it can submit an ACK to the finished queue telling
78+
the main process that it has started a task, and which task it started.
79+
80+
This isn't ultimately robust, if there is an error between getting the message and ACK,
81+
but this probably isn't a reasonable concern. As of now, this looks viable.
82+
83+
### Persistent work manager
84+
85+
Years ago, when AWX was having trouble with output processing bottlenecks,
86+
we stopped using the main dispatcher process to dispatch job events to workers.
87+
88+
Essentially, any performance-sensitive data volumes should not go through the
89+
pool worker management system where data is passed through IPC queues.
90+
Doing this causes the main process to be a bottleneck.
91+
92+
The solution was to have workers connect to a socket on their own.
93+
94+
Nothing is wrong with this, it's just weird.
95+
None of the written facilities for pool management in dispatcher code is useful.
96+
Because of that, event processing diverged far from the rest of the dispatcher.
97+
98+
Long-term vision here is that:
99+
- a `@task` decorator may mark a task as persistent
100+
- additional messages types will need to be send into the finished queue for
101+
- analytics tracking, like how many messages were processed
102+
- whether a particular resource being monitored has been closed
103+
104+
The idea is that this would integrate what was prototyped in:
105+
106+
https://github.com/AlanCoding/receptor-reporter/tree/devel
107+
108+
That idea involved the main process more than the existing callback receiver.
109+
Because each job has its own socket that has to be read from, so these will come and go.
110+
And a worker may manage more than 1 job at the same time, asynchronously.
111+
112+
This also requires forking from what is now `dispatcher.main`.
113+
We could keep the pool (and add more feature) but this requires
114+
an entirely different main loop.

‎docs/message_formats.md

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
## Message Formats
2+
3+
There are two different types of message formats.
4+
5+
See the main design diagram for reference.
6+
7+
### Broker Message Format
8+
9+
This is the format when a client submits a task to be ran, for example, to pg_notify.
10+
This contains JSON-serialized data.
11+
12+
Example:
13+
14+
```json
15+
{
16+
"uuid": "9760671a-6261-45aa-881a-f66929ff9725",
17+
"args": [4],
18+
"kwargs": {},
19+
"task": "awx.main.tasks.jobs.RunJob",
20+
"time_pub": 1727354869.5126922,
21+
"guid": "8f887a0c51f7450db3542c501ba83756"
22+
}
23+
```
24+
25+
The `"task"` contains an importable task to run.
26+
27+
If you are doing the control-and-reply for something, then the submitted
28+
message will also contain a `"reply_to"` key for the channel to send the reply to.
29+
30+
The message sent to the reply channel will have some other purpose-specific information,
31+
like debug information.
32+
33+
### Internal Worker Pool Format
34+
35+
The main process and workers communicate through conventional IPC queues.
36+
This contains the messages to start running a job, of course.
37+
Ideally, this only contains the bare minimum, because tracking
38+
stats and lifetime are the job of the main process, not the worker.
39+
40+
```json
41+
{
42+
"args": [4],
43+
"kwargs": {},
44+
"task": "awx.main.tasks.jobs.RunJob",
45+
}
46+
```
47+
48+
#### Worker to Main Process
49+
50+
When the worker communicates information back to the main process,
51+
it must identify itself, and identify the event. For example:
52+
53+
```json
54+
{
55+
"worker": 3,
56+
"event": "ready"
57+
}
58+
```

‎docs/roadmap.md

+194
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
2+
## Roadmap, planning
3+
4+
Here, we will maintain a list of features.
5+
These may be converted into issues later.
6+
The main goal is to not forget about them.
7+
8+
I will break this down into 2 categories.
9+
The first category is pre-alpha.
10+
I will assume that all commits will be squashed and this will be
11+
moved to a new repo. At that point it will become public.
12+
Everything before it becomes public are pre-alpha things to do.
13+
14+
### Pre-alpha
15+
16+
#### Track more data with new types
17+
18+
The AWX dispatcher had a model where a task could have certain parameters.
19+
Like, imagine that we throw a generalized task timeout in.
20+
21+
The design of this was a little choppy, because the `@task` decorator
22+
would declare these paramters.
23+
24+
Yet these parameters would be send in the pg_notify JSON data.
25+
26+
This doesn't really fit the model that we want.
27+
We want it to be _impossible_ to run a task with parameters
28+
other than what are declared on `@task`
29+
30+
Think of it this way, there are
31+
- runtime arguments (or parameters)
32+
- configuration parameters
33+
34+
Something like a task timeout is a configuration parameter.
35+
This implies that when we register a method with `@task`
36+
we have to **save it in a registry**.
37+
38+
When the dispatcher gets a message saying to run a task,
39+
the most correct thing to do is look that task up in the registry.
40+
This reduces the JSON data passed, and makes a more consistent
41+
source-of-truth.
42+
43+
But this means that we need a registry, and way before that,
44+
we need to introduce a type for methods that will be called.
45+
46+
Additionally, the next big objective is that we want
47+
detailed tracking timing for every time a task is called.
48+
This goes with the call, not the task.
49+
50+
So the new types we probably want are:
51+
- `Task`
52+
- `Call`
53+
54+
This is in addition to the `PoolWorker` which is the worker that
55+
runs the task.
56+
57+
The `Call` will track the lifecycle of the call.
58+
The `PoolWorker` will reference the `Call` it is running.
59+
The `Call` will reference the `Task` it is a call of.
60+
61+
We don't need/want to pass any of this through the IPC queue
62+
to the worker, we only want it for the main dispatcher.
63+
This will mainly be useful as we ask the dispatcher to respond
64+
with stats about what it has been running.
65+
66+
Also... I see this as a mechanism to write integration tests.
67+
We can submit tasks, wait for a signal they finished,
68+
and then get the work history from the dispatcher as it ran those.
69+
70+
We should look at the `Call` class as corresponding to a log record.
71+
This should have the call details, identifiers, and mostly be a
72+
record of the call lifecycle. This is mostly log-like, and should have
73+
mostly scalar type data of floats, strings, ids, etc.
74+
75+
#### Finish integrating publisher logic
76+
77+
The content existing in `dispatcher.publish` is mostly not connected.
78+
79+
What's interesting here is that `dispatcher.publish` should import
80+
from the broker module.
81+
That gets hard to manage with multiple connections (ala Django).
82+
But some version of it we should do...
83+
84+
#### Finish integrating the worker loop
85+
86+
Overlapping with the publisher stuff, the `dispatcher.publish` should
87+
get the method name, and the args, import the method, and run it.
88+
89+
This requires moving more code in from AWX
90+
91+
https://github.com/ansible/awx/blob/devel/awx/main/dispatch/worker/task.py
92+
93+
That has
94+
- importing logic
95+
- calling logic
96+
- supporting stuff to include timings
97+
- signal handling
98+
- exception handling
99+
100+
### Post-alpha
101+
102+
#### Conditional skipping logic on publishing
103+
104+
AWX uses sqlite3 for unit tests, which would error on async tasks.
105+
Because of this, it did not publish a message if `is_testing` was True.
106+
It's not reasonable for us to implement that same thing here, and
107+
we will likely need some callback approach.
108+
109+
So the ask here is that we have some app-wide configuration,
110+
which can inspect a message _before publishing_ and take some action,
111+
or possibly cancel the NOTIFY.
112+
113+
Probably not good coding practice generally, but probably useful.
114+
115+
#### Feature branch to integrate with AWX
116+
117+
Make AWX run using this library, this should be an early goal in this stage.
118+
119+
#### Worker and Broker Self-Checks
120+
121+
A moderate version of this was proposed in:
122+
123+
https://github.com/ansible/awx/pull/14749
124+
125+
In grand conclusion, there is no way to assure that the LISTSEN connection
126+
is not dropped.
127+
Worse, when it is dropped, we may get no notification.
128+
Astonishingly, there appears to be no way around this.
129+
130+
Because of this, the ultimate option of last-resource must be taken.
131+
That means that we can only assure health of a connection of a worker
132+
by experiential means.
133+
134+
To know if a connection works, you must publish a control message and receive it.
135+
To know if a worker is alive, you must send a message and receive a reply.
136+
137+
Because of this knowledge, the new dispatcher library must just straight to this eventuality.
138+
Implement checks for brokers and workers based on send-and-receive.
139+
This can be done fully with asynio patterns.
140+
141+
For the issues related to AWX 14749, we also need means to recycle connections
142+
in cases where we fail to receive check messages.
143+
144+
#### Worker Allocation Cookbook
145+
146+
Several very practical problems are not intended to ever be solved by the dispatcher.
147+
However, for someone using postgres or any other modern database,
148+
combined with the dispatcher, they have the ability to solve these problems.
149+
150+
https://github.com/ansible/awx/issues/11997
151+
152+
Breakdown of those problems:
153+
1. Have a node in the cluster, any node, process a task
154+
2. Have a periodic task run, anywhere in the cluster, at a certain frequency
155+
156+
The solution for (1) is to add an entry to a table when submitting the task.
157+
Then depending on the use case, there are 2 decent options:
158+
- broadcast a task asking any willing node to run the task, get lock, if lock is taken, bail
159+
- run a periodic task that will use `select_for_update` to get entries and mark as received
160+
161+
The solution for (2) in AWX uses the Solo model to track a `datetime`.
162+
This is self-obviously needed for the feature of _user_ schedules.
163+
164+
#### Task Timeout
165+
166+
When using `@task()` decorator, we add `timeout=5` to timeout in 5 seconds.
167+
168+
A solution was drafted in the branch:
169+
170+
https://github.com/ansible/awx/compare/devel...AlanCoding:awx:dispatcher_timeout
171+
172+
#### Singleton Tasks
173+
174+
AWX commonly used pg locks to prevent multiple workers running the same task,
175+
but a more efficient alternative is to never start those tasks.
176+
177+
This proposes another argument to `@task()` decorator that makes the task exclusive.
178+
When another version of the task is already running, there are 2 sub-options we could do:
179+
- wait for the existing task to finish before running the new task
180+
- discard the new task
181+
182+
The use cases for AWX mainly wand the 2nd one.
183+
Idepotent tasks are used extremely heavily on schedules, meaning that
184+
when the dispatcher receives too many it should simply discard extras.
185+
186+
#### Triggering Tasks from Tasks
187+
188+
For the solution to (2) in the cookbook to be fully functional,
189+
it is best that tasks can directly start other tasks via messaging
190+
internal to the worker pool.
191+
192+
This means passing some kind of object into the task being called
193+
where this object contains callbacks that can be used to
194+
trigger methods in the worker pool's finished watcher.

‎pyproject.toml

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
[project.urls]
2+
Repository = "https://github.com/ansible/dispatcher"
3+
4+
[build-system]
5+
requires = ["setuptools", "wheel"]
6+
build-backend = "setuptools.build_meta"
7+
8+
[project]
9+
name = "dispatcher"
10+
version = "0.1.0"
11+
description = "An asyncio-based dispatcher for tasks in a worker pool."
12+
readme = "README.md"
13+
authors = [
14+
{ name = "Alan Rominger", email = "arominge@redhat.com.com" }
15+
]
16+
license = { text = "MIT" }
17+
keywords = ["asyncio", "multiprocessing", "dispatcher", "pg_notify", "python"]
18+
classifiers = [
19+
"Development Status :: 3 - Alpha",
20+
"Intended Audience :: Developers",
21+
"Programming Language :: Python :: 3",
22+
"Programming Language :: Python :: 3.10",
23+
"Programming Language :: Python :: 3.11",
24+
"Programming Language :: Python :: 3.12",
25+
"Programming Language :: Python :: 3.13",
26+
]
27+
28+
[project.scripts]
29+
dispatcher-standalone = "dispatcher.cli:standalone"
30+
31+
[tool.setuptools.packages.find]
32+
include = ["dispatcher*"]
33+
34+
# You need psycopg, but this will not help you to install it
35+
36+
# Linters coppied from django-ansible-base, exceptions removed
37+
38+
[tool.setuptools_scm]
39+
version_scheme = "calver-by-date"
40+
41+
[tool.black]
42+
line-length = 160
43+
fast = true
44+
skip-string-normalization = true
45+
46+
[tool.isort]
47+
profile = "black"
48+
line_length = 160

‎requirements_dev.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pytest
2+
pytest-asyncio
3+
pytest-benchmark

‎tests/benchmark/__init__.py

Whitespace-only changes.

‎tests/conftest.py

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import pytest
2+
3+
from dispatcher.main import DispatcherMain
4+
5+
6+
# List of channels to listen on
7+
CHANNELS = ['test_channel', 'test_channel2', 'test_channel2']
8+
9+
# Database connection details
10+
CONNECTION_STRING = "dbname=dispatch_db user=dispatch password=dispatching host=localhost port=55777"
11+
12+
13+
@pytest.fixture
14+
def pg_dispatcher():
15+
return DispatcherMain({"producers": {"brokers": {"pg_notify": {"conninfo": CONNECTION_STRING}, "channels": CHANNELS}}, "pool": {"max_workers": 3}})

‎tests/integration/test_main.py

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import asyncio
2+
3+
import pytest
4+
5+
from dispatcher.brokers.pg_notify import publish_message
6+
7+
8+
# List of channels to listen on
9+
CHANNELS = ['test_channel', 'test_channel2', 'test_channel2']
10+
11+
# Database connection details
12+
CONNECTION_STRING = "dbname=dispatch_db user=dispatch password=dispatching host=localhost port=55777"
13+
14+
15+
@pytest.mark.asyncio
16+
async def test_run_and_then_shutdown(pg_dispatcher):
17+
await pg_dispatcher.start_working()
18+
await asyncio.sleep(2)
19+
20+
await pg_dispatcher.shutdown()
21+
22+
assert pg_dispatcher.pool.finished_count == 0
23+
24+
25+
@pytest.mark.asyncio
26+
async def test_run_lambda_function(pg_dispatcher):
27+
await pg_dispatcher.start_working()
28+
await asyncio.sleep(1)
29+
30+
# TODO: do config better
31+
publish_message('test_channel', 'lambda: "This worked!"', config={"conninfo": CONNECTION_STRING})
32+
await asyncio.sleep(1)
33+
34+
await pg_dispatcher.shutdown()
35+
36+
assert pg_dispatcher.pool.finished_count == 1

‎tests/unit/test_config.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# this is a good place to create config files, load them, and test that we get the params we expected
2+
# also a good place to take some configs and test initializing dispatcher objects with them
3+
# None of this has been done, but you could do it

‎tools/write_messages.py

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# send_notifications.py
2+
import asyncio
3+
4+
from dispatcher.brokers.pg_notify import publish_message
5+
6+
# Database connection details
7+
CONNECTION_STRING = "dbname=dispatch_db user=dispatch password=dispatching host=localhost port=55777"
8+
9+
10+
TEST_MSGS = [
11+
('test_channel', 'lambda: __import__("time").sleep(1)'),
12+
('test_channel2', 'lambda: __import__("time").sleep(1)'),
13+
('test_channel', 'lambda: __import__("time").sleep(1)'),
14+
]
15+
16+
17+
async def main():
18+
for channel, message in TEST_MSGS:
19+
# Send the notification
20+
publish_message(channel, message, config={'conninfo': CONNECTION_STRING})
21+
# await send_notification(channel, message)
22+
# send more than number of workers quickly
23+
for i in range(15):
24+
publish_message('test_channel', f'lambda: {i}', config={'conninfo': CONNECTION_STRING})
25+
26+
27+
if __name__ == "__main__":
28+
asyncio.run(main())

0 commit comments

Comments
 (0)
Please sign in to comment.