Skip to content

Commit f439a11

Browse files
committed
feat: make the observers asynchronous
1 parent eeb6fde commit f439a11

File tree

5 files changed

+150
-21
lines changed

5 files changed

+150
-21
lines changed

async.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package async
22

33
import (
4+
"math"
45
"time"
56
)
67

78
const (
89
// DefaultRoutineSnapshottingInterval defines how often the routine manager checks routine status
910
DefaultRoutineSnapshottingInterval = 30 * time.Second
11+
DefaultObserverTimeout = time.Duration(math.MaxInt64)
1012
)
1113

1214
// A RoutinesObserver is an object that observes the status of the executions of routines.
@@ -29,3 +31,40 @@ type RoutinesObserver interface {
2931
// currently running
3032
RunningRoutineByNameCount(name string, count int)
3133
}
34+
35+
type routineEventType int
36+
37+
const (
38+
routineStarted routineEventType = iota
39+
routineEnded
40+
routineTimeboxExceeded
41+
takeSnapshot
42+
)
43+
44+
type routineEvent struct {
45+
Type routineEventType
46+
routine AsyncRoutine
47+
snapshot Snapshot
48+
}
49+
50+
func newRoutineEvent(eventType routineEventType) routineEvent {
51+
return routineEvent{
52+
Type: eventType,
53+
}
54+
}
55+
56+
func routineStartedEvent() routineEvent {
57+
return newRoutineEvent(routineStarted)
58+
}
59+
60+
func routineFinishedEvent() routineEvent {
61+
return newRoutineEvent(routineEnded)
62+
}
63+
64+
func routineTimeboxExceededEvent() routineEvent {
65+
return newRoutineEvent(routineTimeboxExceeded)
66+
}
67+
68+
func takeSnapshotEvent() routineEvent {
69+
return newRoutineEvent(takeSnapshot)
70+
}

async_routine.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,10 @@ func (r *asyncRoutine) run(manager AsyncRoutineManager) {
145145
r.status = RoutineStatusFinished
146146
}
147147
manager.deregister(r)
148-
manager.notify(func(observer RoutinesObserver) {
149-
observer.RoutineFinished(r)
150-
})
148+
manager.notifyAll(r, routineFinishedEvent())
151149
}
152150

153-
manager.notify(func(observer RoutinesObserver) {
154-
observer.RoutineStarted(r)
155-
})
151+
manager.notifyAll(r, routineStartedEvent())
156152

157153
if r.errGroup != nil {
158154
r.errGroup.Go(func() error {

async_routine_manager.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type AsyncRoutineManager interface {
1616
RemoveObserver(observerId string)
1717
IsEnabled() bool
1818
GetSnapshot() Snapshot
19-
notify(eventSource func(observer RoutinesObserver))
19+
notifyAll(src AsyncRoutine, evt routineEvent)
2020
Monitor() AsyncRoutineMonitor
2121
register(routine AsyncRoutine)
2222
deregister(routine AsyncRoutine)
@@ -30,7 +30,7 @@ type asyncRoutineManager struct {
3030
snapshottingToggle Toggle
3131
snapshottingInterval time.Duration
3232
routines cmap.ConcurrentMap[string, AsyncRoutine]
33-
observers cmap.ConcurrentMap[string, RoutinesObserver]
33+
observers cmap.ConcurrentMap[string, *observerProxy]
3434

3535
monitorLock sync.Mutex // user to sync the `Start` and `Stop` methods that are used to start the
3636
// snapshotting routine
@@ -46,13 +46,27 @@ func (arm *asyncRoutineManager) IsEnabled() bool {
4646
// AddObserver adds a new RoutineObserver to the list of observers.
4747
// Assigns and returns an observer ID to the RoutineObserver
4848
func (arm *asyncRoutineManager) AddObserver(observer RoutinesObserver) string {
49+
return arm.AddObserverWithTimeout(observer, DefaultObserverTimeout)
50+
}
51+
52+
// AddObserverWithTimeout registers a new RoutinesObserver with the asyncRoutineManager,
53+
// associating it with a unique identifier and a specified timeout duration.
54+
// The function returns the unique ID assigned to the observer.
55+
func (arm *asyncRoutineManager) AddObserverWithTimeout(observer RoutinesObserver, timeout time.Duration) string {
4956
uid := uuid.New().String()
50-
arm.observers.Set(uid, observer)
57+
proxy := newObserverProxy(uid, observer, arm, timeout)
58+
arm.observers.Set(uid, proxy)
59+
proxy.startObserving()
5160
return uid
5261
}
5362

5463
// RemoveObserver removes the given RoutineObserver from the list of observers
5564
func (arm *asyncRoutineManager) RemoveObserver(observerId string) {
65+
observer, ok := arm.observers.Get(observerId)
66+
if !ok {
67+
return
68+
}
69+
observer.stopObserving()
5670
arm.observers.Remove(observerId)
5771
}
5872

@@ -64,9 +78,10 @@ func (arm *asyncRoutineManager) GetSnapshot() Snapshot {
6478
return snapshot
6579
}
6680

67-
func (arm *asyncRoutineManager) notify(eventSource func(observer RoutinesObserver)) {
81+
// notifyAll notifies all the observers of the event evt received from the routine src
82+
func (arm *asyncRoutineManager) notifyAll(src AsyncRoutine, evt routineEvent) {
6883
for _, observer := range arm.observers.Items() {
69-
eventSource(observer)
84+
observer.notify(src, evt)
7085
}
7186
}
7287

@@ -113,7 +128,7 @@ var lock sync.RWMutex
113128
func newAsyncRoutineManager(options ...AsyncManagerOption) AsyncRoutineManager {
114129
mgr := &asyncRoutineManager{
115130
routines: cmap.New[AsyncRoutine](),
116-
observers: cmap.New[RoutinesObserver](),
131+
observers: cmap.New[*observerProxy](),
117132
snapshottingInterval: DefaultRoutineSnapshottingInterval,
118133
ctx: context.Background(),
119134
managerToggle: func() bool { return true }, // manager is enabled by default

async_routine_monitor.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,8 @@ func (arm *asyncRoutineManager) snapshot() {
6666
snapshot := arm.GetSnapshot()
6767

6868
for _, r := range snapshot.GetTimedOutRoutines() {
69-
arm.notify(func(observer RoutinesObserver) {
70-
observer.RoutineExceededTimebox(r)
71-
})
69+
arm.notifyAll(r, routineTimeboxExceededEvent())
7270
}
7371

74-
arm.notify(func(observer RoutinesObserver) {
75-
observer.RunningRoutineCount(snapshot.totalRoutineCount)
76-
for _, name := range snapshot.GetRunningRoutinesNames() {
77-
observer.RunningRoutineByNameCount(name, snapshot.GetRunningRoutinesCount(name))
78-
}
79-
})
72+
arm.notifyAll(nil, takeSnapshotEvent())
8073
}

async_routine_observer_proxy.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package async
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
// observerProxy acts as an intermediary between the AsyncRoutineManager and a RoutinesObserver.
9+
// It receives routine events via a channel and dispatches them to the observer's callback methods.
10+
// The proxy manages event notification asynchronously and can enforce a timeout on the observer's lifecycle.
11+
type observerProxy struct {
12+
manager AsyncRoutineManager
13+
observerId string
14+
observer RoutinesObserver
15+
channel chan routineEvent
16+
timeout time.Duration
17+
}
18+
19+
// newObserverProxy creates and initializes a new observerProxy instance.
20+
// It sets up an asynchronous routine that listens for routine events on the proxy's channel
21+
// and forwards them to the appropriate methods of the provided RoutinesObserver.
22+
//
23+
// Parameters:
24+
// - observerId: a unique identifier for the observer instance.
25+
// - observer: the RoutinesObserver to be notified of routine events.
26+
// - manager: the AsyncRoutineManager responsible for managing routines.
27+
// - timeout: the duration after which the observer routine is considered 'exceeding the timebox'.
28+
//
29+
// Returns:
30+
// - A pointer to the initialized observerProxy.
31+
func newObserverProxy(observerId string, observer RoutinesObserver, manager AsyncRoutineManager, timeout time.Duration) *observerProxy {
32+
proxy := &observerProxy{
33+
manager: manager,
34+
observerId: observerId,
35+
observer: observer,
36+
channel: make(chan routineEvent),
37+
timeout: timeout,
38+
}
39+
40+
return proxy
41+
}
42+
43+
func (proxy *observerProxy) startObserving() {
44+
NewAsyncRoutine("async-observer-notifier", context.Background(), func() {
45+
for evt := range proxy.channel {
46+
switch evt.Type {
47+
case routineStarted:
48+
proxy.observer.RoutineStarted(evt.routine)
49+
case routineEnded:
50+
proxy.observer.RoutineFinished(evt.routine)
51+
case routineTimeboxExceeded:
52+
proxy.observer.RoutineExceededTimebox(evt.routine)
53+
case takeSnapshot:
54+
proxy.observer.RunningRoutineCount(evt.snapshot.GetTotalRoutineCount())
55+
for _, routineName := range evt.snapshot.GetRunningRoutinesNames() {
56+
proxy.observer.RunningRoutineByNameCount(routineName, evt.snapshot.GetRunningRoutinesCount(routineName))
57+
}
58+
}
59+
}
60+
}).
61+
Timebox(proxy.timeout).
62+
WithData("observer-id", proxy.observerId).
63+
Run()
64+
}
65+
66+
func (proxy *observerProxy) stopObserving() {
67+
close(proxy.channel)
68+
}
69+
70+
// notify sends a routine event to the observerProxy's channel.
71+
// Depending on the event type, it either forwards the routine information
72+
// or triggers a snapshot retrieval from the manager.
73+
func (proxy *observerProxy) notify(routine AsyncRoutine, evt routineEvent) {
74+
switch evt.Type {
75+
case routineStarted, routineEnded, routineTimeboxExceeded:
76+
proxy.channel <- routineEvent{
77+
Type: evt.Type,
78+
routine: routine,
79+
}
80+
case takeSnapshot:
81+
proxy.channel <- routineEvent{
82+
Type: takeSnapshot,
83+
snapshot: proxy.manager.GetSnapshot(),
84+
}
85+
}
86+
}

0 commit comments

Comments
 (0)