Skip to content

Commit 4757c37

Browse files
committed
refactor worker
1 parent f8f2317 commit 4757c37

File tree

1 file changed

+59
-41
lines changed

1 file changed

+59
-41
lines changed

worker.go

+59-41
Original file line numberDiff line numberDiff line change
@@ -21,67 +21,85 @@ type worker[Resource any] struct {
2121
saveResource func(context.Context, string, Resource) error
2222
minDuration time.Duration
2323
maxDuration time.Duration
24+
25+
batchByResourceKey map[string]*batch[Resource]
26+
batchByDeadline []*batch[Resource]
2427
}
2528

26-
func (w worker[Resource]) run() {
27-
batchByResourceKey := map[string]*batch[Resource]{}
28-
var batchByDeadline []*batch[Resource]
29+
func (w *worker[Resource]) run() {
30+
w.batchByResourceKey = map[string]*batch[Resource]{}
2931

3032
ticker := time.NewTicker(time.Millisecond)
3133
defer ticker.Stop()
3234

3335
for {
3436
select {
3537
case <-ticker.C:
36-
now := time.Now()
37-
for _, _batch := range batchByDeadline {
38-
if _batch.deadline.Before(now) {
39-
err := w.saveResource(_batch.ctx, _batch.key, _batch.resource)
40-
_batch.publishResult(err)
41-
delete(batchByResourceKey, _batch.key)
42-
batchByDeadline = batchByDeadline[1:]
43-
continue
44-
}
45-
}
38+
w.endBatchesAfterDeadline()
4639
case _operation, ok := <-w.incomingOperations:
4740
if !ok {
48-
for key, _batch := range batchByResourceKey {
49-
err := w.saveResource(_batch.ctx, key, _batch.resource)
50-
_batch.publishResult(err)
51-
continue
52-
}
41+
w.endAllBatches()
5342
return
5443
}
5544

56-
_batch, found := batchByResourceKey[_operation.resourceKey]
57-
if !found {
58-
ctx, cancel := context.WithTimeout(context.Background(), w.maxDuration)
59-
defer cancel()
60-
61-
now := time.Now()
62-
63-
resource, err := w.loadResource(ctx, _operation.resourceKey)
64-
if err != nil {
65-
_operation.result <- err
66-
continue
67-
}
68-
_batch = &batch[Resource]{
69-
ctx: ctx,
70-
key: _operation.resourceKey,
71-
resource: resource,
72-
deadline: now.Add(w.minDuration),
73-
}
74-
batchByResourceKey[_operation.resourceKey] = _batch
75-
batchByDeadline = append(batchByDeadline, _batch)
76-
}
45+
w.runOperation(_operation)
46+
}
47+
}
48+
}
7749

78-
_batch.results = append(_batch.results, _operation.result)
50+
func (w *worker[Resource]) endBatchesAfterDeadline() {
51+
now := time.Now()
7952

80-
_operation.run(_batch.resource)
53+
for _, _batch := range w.batchByDeadline {
54+
if _batch.deadline.After(now) {
55+
return
8156
}
57+
58+
err := w.saveResource(_batch.ctx, _batch.key, _batch.resource)
59+
_batch.publishResult(err)
60+
delete(w.batchByResourceKey, _batch.key)
61+
w.batchByDeadline = w.batchByDeadline[1:]
8262
}
8363
}
8464

65+
func (w *worker[Resource]) endAllBatches() {
66+
for key, _batch := range w.batchByResourceKey {
67+
err := w.saveResource(_batch.ctx, key, _batch.resource)
68+
_batch.publishResult(err)
69+
}
70+
71+
w.batchByResourceKey = map[string]*batch[Resource]{}
72+
w.batchByDeadline = nil
73+
}
74+
75+
func (w *worker[Resource]) runOperation(_operation operation[Resource]) {
76+
_batch, found := w.batchByResourceKey[_operation.resourceKey]
77+
if !found {
78+
ctx, _ := context.WithTimeout(context.Background(), w.maxDuration)
79+
80+
now := time.Now()
81+
82+
resource, err := w.loadResource(ctx, _operation.resourceKey)
83+
if err != nil {
84+
_operation.result <- err
85+
return
86+
}
87+
88+
_batch = &batch[Resource]{
89+
ctx: ctx,
90+
key: _operation.resourceKey,
91+
resource: resource,
92+
deadline: now.Add(w.minDuration),
93+
}
94+
w.batchByResourceKey[_operation.resourceKey] = _batch
95+
w.batchByDeadline = append(w.batchByDeadline, _batch)
96+
}
97+
98+
_batch.results = append(_batch.results, _operation.result)
99+
100+
_operation.run(_batch.resource)
101+
}
102+
85103
type batch[Resource any] struct {
86104
ctx context.Context
87105
key string

0 commit comments

Comments
 (0)