diff --git a/pkg/app/loaders.go b/pkg/app/loaders.go index 488c0e5b..2b1a7f9b 100644 --- a/pkg/app/loaders.go +++ b/pkg/app/loaders.go @@ -45,6 +45,7 @@ START: } a.Logger.Printf("starting loader type %q", ldTypeS) for targetOp := range ld.Start(ctx) { + // do deletes first, since target change equates to delete+add for _, del := range targetOp.Del { // not clustered, delete local target if !a.inCluster() { @@ -113,6 +114,7 @@ START: } a.Logger.Printf("starting loader type %q", ldTypeS) for targetOp := range ld.Start(ctx) { + // do deletes first since target change is delete+add for _, del := range targetOp.Del { // clustered, delete target in all instances of the cluster a.configLock.Lock() diff --git a/pkg/loaders/consul_loader/consul_loader.go b/pkg/loaders/consul_loader/consul_loader.go index 9617e462..69427cb0 100644 --- a/pkg/loaders/consul_loader/consul_loader.go +++ b/pkg/loaders/consul_loader/consul_loader.go @@ -434,12 +434,13 @@ func (c *consulLoader) updateTargets(ctx context.Context, srvName string, tcs ma if _, ok := c.lastTargets[srvName]; !ok { c.lastTargets[srvName] = make(map[string]*types.TargetConfig) } - for _, add := range targetOp.Add { - c.lastTargets[srvName][add.Name] = add - } + // do delete first since change is delete+add for _, del := range targetOp.Del { delete(c.lastTargets[srvName], del) } + for _, add := range targetOp.Add { + c.lastTargets[srvName][add.Name] = add + } c.m.Unlock() opChan <- targetOp diff --git a/pkg/loaders/docker_loader/docker_loader.go b/pkg/loaders/docker_loader/docker_loader.go index 2596ac08..88e0a5be 100644 --- a/pkg/loaders/docker_loader/docker_loader.go +++ b/pkg/loaders/docker_loader/docker_loader.go @@ -520,12 +520,13 @@ func (d *dockerLoader) updateTargets(ctx context.Context, tcs map[string]*types. return } d.m.Lock() - for _, add := range targetOp.Add { - d.lastTargets[add.Name] = add - } + // do deletes first since change is delete+add for _, del := range targetOp.Del { delete(d.lastTargets, del) } + for _, add := range targetOp.Add { + d.lastTargets[add.Name] = add + } d.m.Unlock() opChan <- targetOp } diff --git a/pkg/loaders/file_loader/file_loader.go b/pkg/loaders/file_loader/file_loader.go index 9f0807e1..0c6aa9ae 100644 --- a/pkg/loaders/file_loader/file_loader.go +++ b/pkg/loaders/file_loader/file_loader.go @@ -291,12 +291,13 @@ func (f *fileLoader) updateTargets(ctx context.Context, tcs map[string]*types.Ta return } f.m.Lock() - for _, add := range targetOp.Add { - f.lastTargets[add.Name] = add - } + // do delete first since change is delete+add for _, del := range targetOp.Del { delete(f.lastTargets, del) } + for _, add := range targetOp.Add { + f.lastTargets[add.Name] = add + } f.m.Unlock() opChan <- targetOp } diff --git a/pkg/loaders/http_loader/http_loader.go b/pkg/loaders/http_loader/http_loader.go index 08767ab2..f3e3d4f8 100644 --- a/pkg/loaders/http_loader/http_loader.go +++ b/pkg/loaders/http_loader/http_loader.go @@ -329,14 +329,16 @@ func (h *httpLoader) updateTargets(ctx context.Context, tcs map[string]*types.Ta return } h.m.Lock() + // do delete first, since target change + // consists of delete and add + for _, n := range targetOp.Del { + delete(h.lastTargets, n) + } for n, t := range targetOp.Add { if _, ok := h.lastTargets[n]; !ok { h.lastTargets[n] = t } } - for _, n := range targetOp.Del { - delete(h.lastTargets, n) - } h.m.Unlock() opChan <- targetOp } @@ -417,12 +419,29 @@ func (f *httpLoader) runActions(ctx context.Context, tcs map[string]*types.Targe } }() // create waitGroup and add the number of target operations to it - wg := new(sync.WaitGroup) - wg.Add(len(targetOp.Add) + len(targetOp.Del)) + wgDelete := new(sync.WaitGroup) + wgDelete.Add(len(targetOp.Del)) + // run OnDelete actions first, since change==delete+add + for _, tDel := range targetOp.Del { + go func(name string) { + defer wgDelete.Done() + err := f.runOnDeleteActions(ctx, name, tcs) + if err != nil { + f.logger.Printf("failed running OnDelete actions: %v", err) + return + } + opChan <- &loaders.TargetOperation{Del: []string{name}} + }(tDel) + } + wgDelete.Wait() + + wgAdd := new(sync.WaitGroup) + wgAdd.Add(len(targetOp.Add)) + // run OnAdd actions for n, tAdd := range targetOp.Add { go func(n string, tc *types.TargetConfig) { - defer wg.Done() + defer wgDelete.Done() err := f.runOnAddActions(ctx, tc.Name, tcs) if err != nil { f.logger.Printf("failed running OnAdd actions: %v", err) @@ -431,19 +450,8 @@ func (f *httpLoader) runActions(ctx context.Context, tcs map[string]*types.Targe opChan <- &loaders.TargetOperation{Add: map[string]*types.TargetConfig{n: tc}} }(n, tAdd) } - // run OnDelete actions - for _, tDel := range targetOp.Del { - go func(name string) { - defer wg.Done() - err := f.runOnDeleteActions(ctx, name, tcs) - if err != nil { - f.logger.Printf("failed running OnDelete actions: %v", err) - return - } - opChan <- &loaders.TargetOperation{Del: []string{name}} - }(tDel) - } - wg.Wait() + + wgAdd.Wait() close(opChan) <-doneCh //wait for gathering goroutine to finish return result, nil diff --git a/pkg/loaders/loaders.go b/pkg/loaders/loaders.go index b50ee643..8efb6c14 100644 --- a/pkg/loaders/loaders.go +++ b/pkg/loaders/loaders.go @@ -11,6 +11,7 @@ package loaders import ( "context" "log" + "reflect" "github.com/mitchellh/mapstructure" "github.com/prometheus/client_golang/prometheus" @@ -72,30 +73,48 @@ func DecodeConfig(src, dst interface{}) error { return decoder.Decode(src) } -func Diff(m1, m2 map[string]*types.TargetConfig) *TargetOperation { +func Diff(currentMap, newMap map[string]*types.TargetConfig) *TargetOperation { result := &TargetOperation{ Add: make(map[string]*types.TargetConfig, 0), Del: make([]string, 0), } - if len(m1) == 0 { - for n, t := range m2 { + // handle removed and added targets + if len(currentMap) == 0 { + for n, t := range newMap { result.Add[n] = t } return result } - if len(m2) == 0 { - for name := range m1 { + if len(newMap) == 0 { + for name := range currentMap { result.Del = append(result.Del, name) } return result } - for n, t := range m2 { - if _, ok := m1[n]; !ok { + for n, t := range newMap { + if _, ok := currentMap[n]; !ok { result.Add[n] = t } } - for n := range m1 { - if _, ok := m2[n]; !ok { + for n := range currentMap { + if _, ok := newMap[n]; !ok { + result.Del = append(result.Del, n) + } + } + // handle changes + for n, currentVal := range currentMap { + newVal, ok := newMap[n] + // we don't have the target in the new config, + // already handled above + if !ok { + continue + } + // if any target parameter changes, we need to remove + // and re-add + // the only case I see where we wouldn't necessarily need to restart the actual GRPC connection + // is if Tags and EventTags changed, we could just apply the new tags internally (but right now it's done in the StartCollector phase) + if !reflect.DeepEqual(currentVal, newVal) { + result.Add[n] = newVal result.Del = append(result.Del, n) } } diff --git a/pkg/loaders/loaders_test.go b/pkg/loaders/loaders_test.go index 0f55d64b..66caee3f 100644 --- a/pkg/loaders/loaders_test.go +++ b/pkg/loaders/loaders_test.go @@ -149,6 +149,24 @@ var testSet = map[string]struct { Del: []string{"target1"}, }, }, + "t10-target-change": { + m1: map[string]*types.TargetConfig{ + "target1": {Address: "ip1"}, + "target2": {Address: "ip2"}, + }, + m2: map[string]*types.TargetConfig{ + "target1": {Address: "ip1"}, + "target2": {Address: "ip2new"}, + }, + output: &TargetOperation{ + Add: map[string]*types.TargetConfig{ + "target2": { + Address: "ip2new", + }, + }, + Del: []string{"target2"}, + }, + }, } func TestGetInstancesTagsMatches(t *testing.T) {