Skip to content

Commit 13b35dc

Browse files
authored
Some final preparations (#108)
- **Add method to check if a type is currently being dispatched** - **Make actor_factory function async for more flexibility** - **Add parameter `autostart` to `Dispatcher()` service** - **Add retry delay and auto restart to ActorService** - **Update Docs, Readme, Release Notes**
2 parents df84c5d + d06291e commit 13b35dc

File tree

7 files changed

+416
-151
lines changed

7 files changed

+416
-151
lines changed

README.md

Lines changed: 17 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,47 +21,33 @@ The [`Dispatcher` class](https://frequenz-floss.github.io/frequenz-dispatch-pyth
2121

2222
```python
2323
import os
24-
from frequenz.dispatch import Dispatcher
2524
from unittest.mock import MagicMock
25+
from datetime import timedelta
26+
27+
from frequenz.dispatch import Dispatcher, DispatchInfo, MergeByType
28+
29+
async def create_actor(dispatch: DispatchInfo, receiver: Receiver[DispatchInfo]) -> Actor:
30+
return MagicMock(dispatch=dispatch, receiver=receiver)
2631

2732
async def run():
2833
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
2934
key = os.getenv("DISPATCH_API_KEY", "some-key")
3035

3136
microgrid_id = 1
3237

33-
dispatcher = Dispatcher(
38+
async with Dispatcher(
3439
microgrid_id=microgrid_id,
3540
server_url=url,
36-
key=key
37-
)
38-
await dispatcher.start()
39-
40-
actor = MagicMock() # replace with your actor
41-
42-
changed_running_status_rx = dispatcher.new_running_state_event_receiver("MY_TYPE")
43-
44-
async for dispatch in changed_running_status_rx:
45-
if dispatch.started:
46-
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
47-
if actor.is_running:
48-
actor.reconfigure(
49-
components=dispatch.target,
50-
run_parameters=dispatch.payload, # custom actor parameters
51-
dry_run=dispatch.dry_run,
52-
until=dispatch.until,
53-
) # this will reconfigure the actor
54-
else:
55-
# this will start a new actor with the given components
56-
# and run it for the duration of the dispatch
57-
actor.start(
58-
components=dispatch.target,
59-
run_parameters=dispatch.payload, # custom actor parameters
60-
dry_run=dispatch.dry_run,
61-
until=dispatch.until,
62-
)
63-
else:
64-
actor.stop() # this will stop the actor
41+
key=key,
42+
) as dispatcher:
43+
await dispatcher.start_managing(
44+
dispatch_type="EXAMPLE_TYPE",
45+
actor_factory=create_actor,
46+
merge_strategy=MergeByType(),
47+
retry_interval=timedelta(seconds=10)
48+
)
49+
50+
await dispatcher
6551
```
6652

6753
## Supported Platforms

RELEASE_NOTES.md

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,50 @@ This release introduces a more flexible and powerful mechanism for managing disp
66

77
## Upgrading
88

9+
A new simplified way to manage actors has been introduced:
10+
11+
Change your code from:
12+
```python
13+
dispatcher = Dispatcher(
14+
microgrid_id=microgrid_id,
15+
server_url=url,
16+
key=key
17+
)
18+
dispatcher.start()
19+
20+
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
21+
22+
managing_actor = ActorDispatcher(
23+
actor_factory=MyActor.new_with_dispatch,
24+
running_status_receiver=status_receiver,
25+
)
26+
27+
await run(managing_actor)
28+
```
29+
30+
to
31+
32+
```python
33+
async with Dispatcher(
34+
microgrid_id=microgrid_id,
35+
server_url=url,
36+
key=key
37+
) as dispatcher:
38+
await dispatcher.start_managing(
39+
dispatch_type="EXAMPLE_TYPE",
40+
actor_factory=MyActor.new_with_dispatch, # now async factory!
41+
merge_strategy=MergeByType,
42+
)
43+
await dispatcher
44+
```
45+
46+
Further changes:
47+
948
* `Dispatcher.start` is no longer `async`. Remove `await` when calling it.
1049
* Two properties have been replaced by methods that require a type as parameter.
1150
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`.
1251
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, merge_strategy: MergeStrategy)`.
13-
* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new-receiver function.
52+
* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new_receiver function.
1453
* The `DispatchManagingActor` class has been renamed to `DispatchActorsService`.
1554
* It's interface has been simplified and now only requires an actor factory and a running status receiver.
1655
* It only starts/stops a single actor at a time now instead of a set of actors.
@@ -22,4 +61,10 @@ This release introduces a more flexible and powerful mechanism for managing disp
2261

2362
* A new feature "merge strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
2463
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1800.
25-
* Actor management with dispatches has been simplified. Calling `Dispatcher.start_dispatching(dispatch_type, actor_factory, merge_strategy)` will begin managing your actor for the given type and merge strategy. All you need provide is an actor factory. To stop dispatching for your type, call `Dispatcher.stop_dispatching(dispatch_type)`.
64+
* Actor management with dispatches has been simplified:
65+
* `Dispatcher.start_managing(dispatch_type, actor_factory, merge_strategy, retry_interval)` to manage your actor for the given type and merge strategy. All you need provide is an actor factory.
66+
* `Dispatcher.stop_managing(dispatch_type)` to stop dispatching for the given type.
67+
* `Dispatcher.is_managed(dispatch_type)` to check if dispatching is active for the given type.
68+
* Dispatches that failed to start will now be retried after a delay.
69+
* A new method `Dispatcher.wait_for_initialization()` has been added to wait for all actors to be initialized.
70+
* When using `async with Dispatcher(..) as dispatcher`, the dispatcher will first wait for the dispatch service to be initialized before entering the block.

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 103 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
import logging
88
from collections.abc import Callable
99
from dataclasses import dataclass
10-
from typing import Any
10+
from datetime import timedelta
11+
from typing import Any, Awaitable
1112

12-
from frequenz.channels import Broadcast, Receiver
13+
from frequenz.channels import Broadcast, Receiver, select
1314
from frequenz.client.dispatch.types import TargetComponents
1415
from frequenz.sdk.actor import Actor, BackgroundService
1516

@@ -116,29 +117,77 @@ async def main():
116117
117118
microgrid_id = 1
118119
119-
dispatcher = Dispatcher(
120+
async with Dispatcher(
120121
microgrid_id=microgrid_id,
121122
server_url=url,
122123
key=key
123-
)
124-
dispatcher.start()
125-
126-
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
124+
) as dispatcher:
125+
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
127126
128-
managing_actor = ActorDispatcher(
129-
actor_factory=MyActor.new_with_dispatch,
130-
running_status_receiver=status_receiver,
131-
)
127+
managing_actor = ActorDispatcher(
128+
actor_factory=MyActor.new_with_dispatch,
129+
running_status_receiver=status_receiver,
130+
)
132131
133-
await run(managing_actor)
132+
await run(managing_actor)
134133
```
135134
"""
136135

137-
def __init__(
136+
class RetryFailedDispatches:
137+
"""Manages the retry of failed dispatches."""
138+
139+
def __init__(self, retry_interval: timedelta) -> None:
140+
"""Initialize the retry manager.
141+
142+
Args:
143+
retry_interval: The interval between retries.
144+
"""
145+
self._retry_interval = retry_interval
146+
self._channel = Broadcast[Dispatch](name="retry_channel")
147+
self._sender = self._channel.new_sender()
148+
self._tasks: set[asyncio.Task[None]] = set()
149+
150+
def new_receiver(self) -> Receiver[Dispatch]:
151+
"""Create a new receiver for dispatches to retry.
152+
153+
Returns:
154+
The receiver.
155+
"""
156+
return self._channel.new_receiver()
157+
158+
def retry(self, dispatch: Dispatch) -> None:
159+
"""Retry a dispatch.
160+
161+
Args:
162+
dispatch: The dispatch information to retry.
163+
"""
164+
task = asyncio.create_task(self._retry_after_delay(dispatch))
165+
self._tasks.add(task)
166+
task.add_done_callback(self._tasks.remove)
167+
168+
async def _retry_after_delay(self, dispatch: Dispatch) -> None:
169+
"""Retry a dispatch after a delay.
170+
171+
Args:
172+
dispatch: The dispatch information to retry.
173+
"""
174+
_logger.info(
175+
"Will retry dispatch %s after %s",
176+
dispatch.id,
177+
self._retry_interval,
178+
)
179+
await asyncio.sleep(self._retry_interval.total_seconds())
180+
_logger.info("Retrying dispatch %s now", dispatch.id)
181+
await self._sender.send(dispatch)
182+
183+
def __init__( # pylint: disable=too-many-arguments, too-many-positional-arguments
138184
self,
139-
actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor],
185+
actor_factory: Callable[
186+
[DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]
187+
],
140188
running_status_receiver: Receiver[Dispatch],
141189
dispatch_identity: Callable[[Dispatch], int] | None = None,
190+
retry_interval: timedelta | None = timedelta(seconds=60),
142191
) -> None:
143192
"""Initialize the dispatch handler.
144193
@@ -148,6 +197,7 @@ def __init__(
148197
running_status_receiver: The receiver for dispatch running status changes.
149198
dispatch_identity: A function to identify to which actor a dispatch refers.
150199
By default, it uses the dispatch ID.
200+
retry_interval: The interval between retries. If `None`, retries are disabled.
151201
"""
152202
super().__init__()
153203
self._dispatch_identity: Callable[[Dispatch], int] = (
@@ -161,6 +211,11 @@ def __init__(
161211
name="dispatch_updates_channel", resend_latest=True
162212
)
163213
self._updates_sender = self._updates_channel.new_sender()
214+
self._retrier = (
215+
ActorDispatcher.RetryFailedDispatches(retry_interval)
216+
if retry_interval
217+
else None
218+
)
164219

165220
def start(self) -> None:
166221
"""Start the background service."""
@@ -174,7 +229,8 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
174229
options=dispatch.payload,
175230
)
176231

177-
actor: Actor | None = self._actors.get(self._dispatch_identity(dispatch))
232+
identity = self._dispatch_identity(dispatch)
233+
actor: Actor | None = self._actors.get(identity)
178234

179235
if actor:
180236
sent_str = ""
@@ -189,21 +245,28 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
189245
else:
190246
try:
191247
_logger.info("Starting actor for dispatch type %r", dispatch.type)
192-
actor = self._actor_factory(
248+
actor = await self._actor_factory(
193249
dispatch_update,
194250
self._updates_channel.new_receiver(limit=1, warn_on_overflow=False),
195251
)
196-
self._actors[self._dispatch_identity(dispatch)] = actor
197252

198253
actor.start()
199254

200255
except Exception as e: # pylint: disable=broad-except
201256
_logger.error(
202-
"Failed to start actor for dispatch type %r: %s",
257+
"Failed to start actor for dispatch type %r",
203258
dispatch.type,
204-
e,
205-
exc_info=True,
259+
exc_info=e,
206260
)
261+
if self._retrier:
262+
self._retrier.retry(dispatch)
263+
else:
264+
_logger.error(
265+
"No retry mechanism enabled, dispatch %r failed", dispatch
266+
)
267+
else:
268+
# No exception occurred, so we can add the actor to the list
269+
self._actors[identity] = actor
207270

208271
async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
209272
"""Stop all actors.
@@ -212,17 +275,33 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
212275
stopping_dispatch: The dispatch that is stopping the actor.
213276
msg: The message to be passed to the actors being stopped.
214277
"""
215-
if actor := self._actors.pop(self._dispatch_identity(stopping_dispatch), None):
278+
actor: Actor | None = None
279+
identity = self._dispatch_identity(stopping_dispatch)
280+
281+
actor = self._actors.get(identity)
282+
283+
if actor:
216284
await actor.stop(msg)
285+
286+
del self._actors[identity]
217287
else:
218288
_logger.warning(
219289
"Actor for dispatch type %r is not running", stopping_dispatch.type
220290
)
221291

222292
async def _run(self) -> None:
223-
"""Wait for dispatches and handle them."""
224-
async for dispatch in self._dispatch_rx:
225-
await self._handle_dispatch(dispatch=dispatch)
293+
"""Run the background service."""
294+
if not self._retrier:
295+
async for dispatch in self._dispatch_rx:
296+
await self._handle_dispatch(dispatch)
297+
else:
298+
retry_recv = self._retrier.new_receiver()
299+
300+
async for selected in select(retry_recv, self._dispatch_rx):
301+
if retry_recv.triggered(selected):
302+
self._retrier.retry(selected.message)
303+
elif self._dispatch_rx.triggered(selected):
304+
await self._handle_dispatch(selected.message)
226305

227306
async def _handle_dispatch(self, dispatch: Dispatch) -> None:
228307
"""Handle a dispatch.

0 commit comments

Comments
 (0)