@@ -5,7 +5,6 @@ package batch
5
5
6
6
import (
7
7
"context"
8
- "runtime"
9
8
"sync"
10
9
"time"
11
10
)
@@ -36,19 +35,6 @@ type Options[Resource any] struct {
36
35
//
37
36
// By default, does nothing.
38
37
SaveResource func (_ context.Context , key string , _ Resource ) error
39
- // GoRoutines specifies how many goroutines should be used to run batch operations.
40
- //
41
- // By default, 16 * number of CPUs.
42
- GoRoutines int
43
- // GoRoutineNumberForKey returns go-routine number which will be used to run operation on
44
- // a given resource key. This function is crucial to properly serialize requests.
45
- //
46
- // This function must be deterministic - it should always return the same go-routine number
47
- // for given combination of key and goroutines parameters.
48
- //
49
- // By default, GoroutineNumberForKey function is used. This implementation calculates hash
50
- // on a given key and use modulo to calculate go-routine number.
51
- GoRoutineNumberForKey func (key string , goroutines int ) int
52
38
}
53
39
54
40
// StartProcessor starts batch processor which will run operations in batches.
@@ -58,42 +44,20 @@ type Options[Resource any] struct {
58
44
func StartProcessor [Resource any ](options Options [Resource ]) * Processor [Resource ] {
59
45
options = options .withDefaults ()
60
46
61
- workerChannels := make ([]chan operation [Resource ], options .GoRoutines )
62
-
63
- var workersFinished sync.WaitGroup
64
- workersFinished .Add (options .GoRoutines )
65
-
66
- for i := 0 ; i < options .GoRoutines ; i ++ {
67
- workerChannels [i ] = make (chan operation [Resource ])
68
- _worker := worker [Resource ]{
69
- goRoutineNumber : i ,
70
- incomingOperations : workerChannels [i ],
71
- loadResource : options .LoadResource ,
72
- saveResource : options .SaveResource ,
73
- minDuration : options .MinDuration ,
74
- maxDuration : options .MaxDuration ,
75
- }
76
-
77
- go func () {
78
- _worker .run ()
79
- workersFinished .Done ()
80
- }()
81
- }
82
-
83
47
return & Processor [Resource ]{
84
- options : options ,
85
- stopped : make (chan struct {}),
86
- workerChannels : workerChannels ,
87
- workersFinished : & workersFinished ,
48
+ options : options ,
49
+ stopped : make (chan struct {}),
50
+ batchChannels : map [string ]chan operation [Resource ]{},
88
51
}
89
52
}
90
53
91
54
// Processor represents instance of batch processor which can be used to issue operations which run in a batch manner.
92
55
type Processor [Resource any ] struct {
93
- options Options [Resource ]
94
- stopped chan struct {}
95
- workerChannels []chan operation [Resource ]
96
- workersFinished * sync.WaitGroup
56
+ options Options [Resource ]
57
+ stopped chan struct {}
58
+ allBatchesFinished sync.WaitGroup
59
+ mutex sync.Mutex
60
+ batchChannels map [string ]chan operation [Resource ]
97
61
}
98
62
99
63
func (s Options [Resource ]) withDefaults () Options [Resource ] {
@@ -118,14 +82,6 @@ func (s Options[Resource]) withDefaults() Options[Resource] {
118
82
s .MaxDuration = 2 * s .MinDuration
119
83
}
120
84
121
- if s .GoRoutines == 0 {
122
- s .GoRoutines = 16 * runtime .NumCPU ()
123
- }
124
-
125
- if s .GoRoutineNumberForKey == nil {
126
- s .GoRoutineNumberForKey = GoroutineNumberForKey
127
- }
128
-
129
85
return s
130
86
}
131
87
@@ -152,30 +108,70 @@ func (p *Processor[Resource]) Run(ctx context.Context, key string, _operation fu
152
108
result := make (chan error )
153
109
defer close (result )
154
110
155
- goRoutineNumber := p .options .GoRoutineNumberForKey (key , p .options .GoRoutines )
111
+ operationMessage := operation [Resource ]{
112
+ run : _operation ,
113
+ result : result ,
114
+ }
115
+
116
+ for {
117
+ incomingOperations := p .temporaryBatchChannel (key )
118
+
119
+ select {
120
+ case <- ctx .Done ():
121
+ return OperationCancelled
156
122
157
- o := operation [Resource ]{
158
- resourceKey : key ,
159
- run : _operation ,
160
- result : result ,
123
+ case incomingOperations <- operationMessage :
124
+ return <- result
125
+
126
+ case <- time .After (10 * time .Millisecond ):
127
+ // Timeout waiting to push operation. Possibly batch goroutine was stopped.
128
+ }
161
129
}
162
130
163
- select {
164
- case p .workerChannels [goRoutineNumber ] <- o :
165
- return <- result
166
- case <- ctx .Done ():
167
- return OperationCancelled
131
+ }
132
+
133
+ func (p * Processor [Resource ]) temporaryBatchChannel (key string ) chan <- operation [Resource ] {
134
+ p .mutex .Lock ()
135
+ defer p .mutex .Unlock ()
136
+
137
+ batchChannel , ok := p .batchChannels [key ]
138
+ if ! ok {
139
+ batchChannel = make (chan operation [Resource ])
140
+ p .batchChannels [key ] = batchChannel
141
+
142
+ go p .startBatch (key , batchChannel )
143
+ }
144
+
145
+ return batchChannel
146
+ }
147
+
148
+ func (p * Processor [Resource ]) startBatch (key string , batchChannel chan operation [Resource ]) {
149
+ p .allBatchesFinished .Add (1 )
150
+ defer p .allBatchesFinished .Done ()
151
+
152
+ now := time .Now ()
153
+
154
+ w := & batch [Resource ]{
155
+ Options : p .options ,
156
+ resourceKey : key ,
157
+ incomingOperations : batchChannel ,
158
+ stopped : p .stopped ,
159
+ softDeadline : now .Add (p .options .MinDuration ),
160
+ hardDeadline : now .Add (p .options .MaxDuration ),
168
161
}
162
+ w .process ()
163
+
164
+ p .mutex .Lock ()
165
+ defer p .mutex .Unlock ()
166
+ // Delete the channel even though it is still used by pending Run calls.
167
+ // Those calls should time out and retry on a new channel.
168
+ delete (p .batchChannels , key )
169
169
}
170
170
171
171
// Stop ends all running batches. No new operations will be accepted.
172
172
// Stop blocks until all pending batches are ended and resources saved.
173
173
func (p * Processor [Resource ]) Stop () {
174
174
close (p .stopped )
175
175
176
- for _ , channel := range p .workerChannels {
177
- close (channel )
178
- }
179
-
180
- p .workersFinished .Wait ()
176
+ p .allBatchesFinished .Wait ()
181
177
}
0 commit comments