Skip to content

Commit a877a5c

Browse files
authored
Merge pull request #603 from netixx/fix/target-change
Handle target changes in loaders
2 parents 8e5a9f2 + cae1832 commit a877a5c

File tree

7 files changed

+87
-37
lines changed

7 files changed

+87
-37
lines changed

pkg/app/loaders.go

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ START:
4545
}
4646
a.Logger.Printf("starting loader type %q", ldTypeS)
4747
for targetOp := range ld.Start(ctx) {
48+
// do deletes first, since target change equates to delete+add
4849
for _, del := range targetOp.Del {
4950
// not clustered, delete local target
5051
if !a.inCluster() {
@@ -113,6 +114,7 @@ START:
113114
}
114115
a.Logger.Printf("starting loader type %q", ldTypeS)
115116
for targetOp := range ld.Start(ctx) {
117+
// do deletes first since target change is delete+add
116118
for _, del := range targetOp.Del {
117119
// clustered, delete target in all instances of the cluster
118120
a.configLock.Lock()

pkg/loaders/consul_loader/consul_loader.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -434,12 +434,13 @@ func (c *consulLoader) updateTargets(ctx context.Context, srvName string, tcs ma
434434
if _, ok := c.lastTargets[srvName]; !ok {
435435
c.lastTargets[srvName] = make(map[string]*types.TargetConfig)
436436
}
437-
for _, add := range targetOp.Add {
438-
c.lastTargets[srvName][add.Name] = add
439-
}
437+
// do delete first since change is delete+add
440438
for _, del := range targetOp.Del {
441439
delete(c.lastTargets[srvName], del)
442440
}
441+
for _, add := range targetOp.Add {
442+
c.lastTargets[srvName][add.Name] = add
443+
}
443444
c.m.Unlock()
444445

445446
opChan <- targetOp

pkg/loaders/docker_loader/docker_loader.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -520,12 +520,13 @@ func (d *dockerLoader) updateTargets(ctx context.Context, tcs map[string]*types.
520520
return
521521
}
522522
d.m.Lock()
523-
for _, add := range targetOp.Add {
524-
d.lastTargets[add.Name] = add
525-
}
523+
// do deletes first since change is delete+add
526524
for _, del := range targetOp.Del {
527525
delete(d.lastTargets, del)
528526
}
527+
for _, add := range targetOp.Add {
528+
d.lastTargets[add.Name] = add
529+
}
529530
d.m.Unlock()
530531
opChan <- targetOp
531532
}

pkg/loaders/file_loader/file_loader.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -291,12 +291,13 @@ func (f *fileLoader) updateTargets(ctx context.Context, tcs map[string]*types.Ta
291291
return
292292
}
293293
f.m.Lock()
294-
for _, add := range targetOp.Add {
295-
f.lastTargets[add.Name] = add
296-
}
294+
// do delete first since change is delete+add
297295
for _, del := range targetOp.Del {
298296
delete(f.lastTargets, del)
299297
}
298+
for _, add := range targetOp.Add {
299+
f.lastTargets[add.Name] = add
300+
}
300301
f.m.Unlock()
301302
opChan <- targetOp
302303
}

pkg/loaders/http_loader/http_loader.go

+27-19
Original file line numberDiff line numberDiff line change
@@ -329,14 +329,16 @@ func (h *httpLoader) updateTargets(ctx context.Context, tcs map[string]*types.Ta
329329
return
330330
}
331331
h.m.Lock()
332+
// do delete first, since target change
333+
// consists of delete and add
334+
for _, n := range targetOp.Del {
335+
delete(h.lastTargets, n)
336+
}
332337
for n, t := range targetOp.Add {
333338
if _, ok := h.lastTargets[n]; !ok {
334339
h.lastTargets[n] = t
335340
}
336341
}
337-
for _, n := range targetOp.Del {
338-
delete(h.lastTargets, n)
339-
}
340342
h.m.Unlock()
341343
opChan <- targetOp
342344
}
@@ -417,12 +419,29 @@ func (f *httpLoader) runActions(ctx context.Context, tcs map[string]*types.Targe
417419
}
418420
}()
419421
// create waitGroup and add the number of target operations to it
420-
wg := new(sync.WaitGroup)
421-
wg.Add(len(targetOp.Add) + len(targetOp.Del))
422+
wgDelete := new(sync.WaitGroup)
423+
wgDelete.Add(len(targetOp.Del))
424+
// run OnDelete actions first, since change==delete+add
425+
for _, tDel := range targetOp.Del {
426+
go func(name string) {
427+
defer wgDelete.Done()
428+
err := f.runOnDeleteActions(ctx, name, tcs)
429+
if err != nil {
430+
f.logger.Printf("failed running OnDelete actions: %v", err)
431+
return
432+
}
433+
opChan <- &loaders.TargetOperation{Del: []string{name}}
434+
}(tDel)
435+
}
436+
wgDelete.Wait()
437+
438+
wgAdd := new(sync.WaitGroup)
439+
wgAdd.Add(len(targetOp.Add))
440+
422441
// run OnAdd actions
423442
for n, tAdd := range targetOp.Add {
424443
go func(n string, tc *types.TargetConfig) {
425-
defer wg.Done()
444+
defer wgDelete.Done()
426445
err := f.runOnAddActions(ctx, tc.Name, tcs)
427446
if err != nil {
428447
f.logger.Printf("failed running OnAdd actions: %v", err)
@@ -431,19 +450,8 @@ func (f *httpLoader) runActions(ctx context.Context, tcs map[string]*types.Targe
431450
opChan <- &loaders.TargetOperation{Add: map[string]*types.TargetConfig{n: tc}}
432451
}(n, tAdd)
433452
}
434-
// run OnDelete actions
435-
for _, tDel := range targetOp.Del {
436-
go func(name string) {
437-
defer wg.Done()
438-
err := f.runOnDeleteActions(ctx, name, tcs)
439-
if err != nil {
440-
f.logger.Printf("failed running OnDelete actions: %v", err)
441-
return
442-
}
443-
opChan <- &loaders.TargetOperation{Del: []string{name}}
444-
}(tDel)
445-
}
446-
wg.Wait()
453+
454+
wgAdd.Wait()
447455
close(opChan)
448456
<-doneCh //wait for gathering goroutine to finish
449457
return result, nil

pkg/loaders/loaders.go

+28-9
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ package loaders
1111
import (
1212
"context"
1313
"log"
14+
"reflect"
1415

1516
"github.com/mitchellh/mapstructure"
1617
"github.com/prometheus/client_golang/prometheus"
@@ -72,30 +73,48 @@ func DecodeConfig(src, dst interface{}) error {
7273
return decoder.Decode(src)
7374
}
7475

75-
func Diff(m1, m2 map[string]*types.TargetConfig) *TargetOperation {
76+
func Diff(currentMap, newMap map[string]*types.TargetConfig) *TargetOperation {
7677
result := &TargetOperation{
7778
Add: make(map[string]*types.TargetConfig, 0),
7879
Del: make([]string, 0),
7980
}
80-
if len(m1) == 0 {
81-
for n, t := range m2 {
81+
// handle removed and added targets
82+
if len(currentMap) == 0 {
83+
for n, t := range newMap {
8284
result.Add[n] = t
8385
}
8486
return result
8587
}
86-
if len(m2) == 0 {
87-
for name := range m1 {
88+
if len(newMap) == 0 {
89+
for name := range currentMap {
8890
result.Del = append(result.Del, name)
8991
}
9092
return result
9193
}
92-
for n, t := range m2 {
93-
if _, ok := m1[n]; !ok {
94+
for n, t := range newMap {
95+
if _, ok := currentMap[n]; !ok {
9496
result.Add[n] = t
9597
}
9698
}
97-
for n := range m1 {
98-
if _, ok := m2[n]; !ok {
99+
for n := range currentMap {
100+
if _, ok := newMap[n]; !ok {
101+
result.Del = append(result.Del, n)
102+
}
103+
}
104+
// handle changes
105+
for n, currentVal := range currentMap {
106+
newVal, ok := newMap[n]
107+
// we don't have the target in the new config,
108+
// already handled above
109+
if !ok {
110+
continue
111+
}
112+
// if any target parameter changes, we need to remove
113+
// and re-add
114+
// the only case I see where we wouldn't necessarily need to restart the actual GRPC connection
115+
// is if Tags and EventTags changed, we could just apply the new tags internally (but right now it's done in the StartCollector phase)
116+
if !reflect.DeepEqual(currentVal, newVal) {
117+
result.Add[n] = newVal
99118
result.Del = append(result.Del, n)
100119
}
101120
}

pkg/loaders/loaders_test.go

+18
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,24 @@ var testSet = map[string]struct {
149149
Del: []string{"target1"},
150150
},
151151
},
152+
"t10-target-change": {
153+
m1: map[string]*types.TargetConfig{
154+
"target1": {Address: "ip1"},
155+
"target2": {Address: "ip2"},
156+
},
157+
m2: map[string]*types.TargetConfig{
158+
"target1": {Address: "ip1"},
159+
"target2": {Address: "ip2new"},
160+
},
161+
output: &TargetOperation{
162+
Add: map[string]*types.TargetConfig{
163+
"target2": {
164+
Address: "ip2new",
165+
},
166+
},
167+
Del: []string{"target2"},
168+
},
169+
},
152170
}
153171

154172
func TestGetInstancesTagsMatches(t *testing.T) {

0 commit comments

Comments
 (0)