Skip to content

Commit e8f33f1

Browse files
committed
use cancel context and atomic counter to improve goroutine synchronization
1 parent 3e618bb commit e8f33f1

File tree

1 file changed

+46
-31
lines changed

1 file changed

+46
-31
lines changed

cmd/fluent-bit-watcher/main.go

+46-31
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,18 @@ package main
22

33
import (
44
"context"
5-
"github.com/fsnotify/fsnotify"
6-
"github.com/go-kit/kit/log"
7-
"github.com/go-kit/kit/log/level"
8-
"github.com/oklog/run"
95
"math"
106
"os"
117
"os/exec"
128
"sync"
9+
"sync/atomic"
1310
"syscall"
1411
"time"
12+
13+
"github.com/fsnotify/fsnotify"
14+
"github.com/go-kit/kit/log"
15+
"github.com/go-kit/kit/log/level"
16+
"github.com/oklog/run"
1517
)
1618

1719
const (
@@ -26,14 +28,15 @@ var (
2628
logger log.Logger
2729
cmd *exec.Cmd
2830
mutex sync.Mutex
29-
restartTimes int
30-
timer *time.Timer
31+
restartTimes int32
32+
timerCtx context.Context
33+
timerCancel context.CancelFunc
3134
)
3235

3336
func main() {
3437
logger = log.NewLogfmtLogger(os.Stdout)
3538

36-
timer = time.NewTimer(0)
39+
timerCtx, timerCancel = context.WithCancel(context.Background())
3740

3841
var g run.Group
3942
{
@@ -57,6 +60,9 @@ func main() {
5760
start()
5861
// Wait for the fluent bit exit.
5962
wait()
63+
64+
timerCtx, timerCancel = context.WithCancel(context.Background())
65+
6066
// After the fluent bit exit, fluent bit watcher restarts it with an exponential
6167
// back-off delay (1s, 2s, 4s, ...), that is capped at five minutes.
6268
backoff()
@@ -97,11 +103,13 @@ func main() {
97103
continue
98104
}
99105

106+
_ = level.Info(logger).Log("msg", "Config file changed, stopping Fluent Bit")
107+
100108
// After the config file changed, it should stop the fluent bit,
101109
// and resets the restart backoff timer.
102110
stop()
103111
resetTimer()
104-
_ = level.Info(logger).Log("msg", "Config file changed, stop Fluent Bit")
112+
_ = level.Info(logger).Log("msg", "Config file changed, stopped Fluent Bit")
105113
case <-watcher.Errors:
106114
_ = level.Error(logger).Log("msg", "Watcher stopped")
107115
return nil
@@ -124,13 +132,7 @@ func main() {
124132

125133
// Inspired by https://github.com/jimmidyson/configmap-reload
126134
func isValidEvent(event fsnotify.Event) bool {
127-
if event.Op&fsnotify.Create != fsnotify.Create {
128-
return false
129-
}
130-
//if filepath.Base(event.Name) != "..data" {
131-
// return false
132-
//}
133-
return true
135+
return event.Op&fsnotify.Create == fsnotify.Create
134136
}
135137

136138
func start() {
@@ -155,17 +157,19 @@ func start() {
155157
}
156158

157159
func wait() {
158-
160+
mutex.Lock()
159161
if cmd == nil {
162+
mutex.Unlock()
160163
return
161164
}
165+
mutex.Unlock()
162166

163167
startTime := time.Now()
164168
_ = level.Error(logger).Log("msg", "Fluent bit exited", "error", cmd.Wait())
165169
// Once the fluent bit has executed for 10 minutes without any problems,
166170
// it should resets the restart backoff timer.
167-
if time.Now().Sub(startTime) >= ResetTime {
168-
restartTimes = 0
171+
if time.Since(startTime) >= ResetTime {
172+
atomic.StoreInt32(&restartTimes, 0)
169173
}
170174

171175
mutex.Lock()
@@ -175,16 +179,32 @@ func wait() {
175179

176180
func backoff() {
177181

178-
delayTime := time.Duration(math.Pow(2, float64(restartTimes))) * time.Second
182+
delayTime := time.Duration(math.Pow(2, float64(atomic.LoadInt32(&restartTimes)))) * time.Second
179183
if delayTime >= MaxDelayTime {
180184
delayTime = MaxDelayTime
181185
}
182-
timer.Reset(delayTime)
186+
187+
_ = level.Info(logger).Log("msg", "backoff", "delay", delayTime)
183188

184189
startTime := time.Now()
185-
<-timer.C
186-
_ = level.Info(logger).Log("msg", "delay", "actual", time.Now().Sub(startTime), "expected", delayTime)
187-
restartTimes = restartTimes + 1
190+
191+
timer := time.NewTimer(delayTime)
192+
defer timer.Stop()
193+
194+
select {
195+
case <-timerCtx.Done():
196+
_ = level.Info(logger).Log("msg", "context cancel", "actual", time.Since(startTime), "expected", delayTime)
197+
198+
atomic.StoreInt32(&restartTimes, 0)
199+
200+
return
201+
case <-timer.C:
202+
_ = level.Info(logger).Log("msg", "backoff timer done", "actual", time.Since(startTime), "expected", delayTime)
203+
204+
atomic.AddInt32(&restartTimes, 1)
205+
206+
return
207+
}
188208
}
189209

190210
func stop() {
@@ -193,6 +213,7 @@ func stop() {
193213
defer mutex.Unlock()
194214

195215
if cmd == nil || cmd.Process == nil {
216+
_ = level.Info(logger).Log("msg", "Fluent Bit not running. No process to stop.")
196217
return
197218
}
198219

@@ -204,12 +225,6 @@ func stop() {
204225
}
205226

206227
func resetTimer() {
207-
208-
if timer != nil {
209-
if !timer.Stop() {
210-
<-timer.C
211-
}
212-
timer.Reset(0)
213-
}
214-
restartTimes = 0
228+
timerCancel()
229+
atomic.StoreInt32(&restartTimes, 0)
215230
}

0 commit comments

Comments
 (0)