-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathcounter.go
106 lines (86 loc) · 2.21 KB
/
counter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package todo
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/modernice/goes/event"
"github.com/modernice/goes/projection"
"github.com/modernice/goes/projection/schedule"
)
// Counter is a read model that provides the number of active, removed, and archived tasks.
type Counter struct {
*projection.Base
sync.RWMutex
active int
removed int
archived int
}
// NewCounter returns a new task counter.
func NewCounter() *Counter {
c := &Counter{Base: projection.New()}
// Register event appliers for each of the projection events.
event.ApplyWith(c, c.taskAdded, TaskAdded)
event.ApplyWith(c, c.taskRemoved, TaskRemoved)
event.ApplyWith(c, c.tasksDone, TasksDone)
return c
}
// Active returns the active tasks.
func (c *Counter) Active() int {
c.RLock()
defer c.RUnlock()
return c.active
}
// Removed returns the removed tasks.
func (c *Counter) Removed() int {
c.RLock()
defer c.RUnlock()
return c.removed
}
// Archived returns the archived tasks.
func (c *Counter) Archived() int {
c.RLock()
defer c.RUnlock()
return c.archived
}
// Project projects the Counter until ctx is canceled. Each time one of
// TaskEvents is published, the counter is updated.
func (c *Counter) Project(
ctx context.Context,
bus event.Bus,
store event.Store,
opts ...schedule.ContinuousOption,
) (<-chan error, error) {
s := schedule.Continuously(bus, store, ListEvents[:], opts...)
errs, err := s.Subscribe(ctx, func(ctx projection.Job) error {
c.print()
defer c.print()
start := time.Now()
log.Printf("[Counter] Applying projection job ...")
defer func() { log.Printf("[Counter] Applied projection job. (%s)", time.Since(start)) }()
c.Lock()
defer c.Unlock()
return ctx.Apply(ctx, c)
})
if err != nil {
return nil, fmt.Errorf("subscribe to projection schedule: %w", err)
}
return errs, nil
}
func (c *Counter) taskAdded(evt event.Of[string]) {
c.active++
}
func (c *Counter) taskRemoved(evt event.Of[TaskRemovedEvent]) {
c.removed++
c.active--
}
func (c *Counter) tasksDone(evt event.Of[[]string]) {
c.archived++
c.active -= len(evt.Data())
}
func (c *Counter) print() {
c.RLock()
defer c.RUnlock()
log.Printf("[Counter] Active: %d, Removed: %d, Archived: %d", c.active, c.removed, c.archived)
}