Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle target changes in loaders #603

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/app/loaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ START:
}
a.Logger.Printf("starting loader type %q", ldTypeS)
for targetOp := range ld.Start(ctx) {
// do deletes first, since target change equates to delete+add
for _, del := range targetOp.Del {
// not clustered, delete local target
if !a.inCluster() {
Expand Down Expand Up @@ -113,6 +114,7 @@ START:
}
a.Logger.Printf("starting loader type %q", ldTypeS)
for targetOp := range ld.Start(ctx) {
// do deletes first since target change is delete+add
for _, del := range targetOp.Del {
// clustered, delete target in all instances of the cluster
a.configLock.Lock()
Expand Down
7 changes: 4 additions & 3 deletions pkg/loaders/consul_loader/consul_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,12 +434,13 @@ func (c *consulLoader) updateTargets(ctx context.Context, srvName string, tcs ma
if _, ok := c.lastTargets[srvName]; !ok {
c.lastTargets[srvName] = make(map[string]*types.TargetConfig)
}
for _, add := range targetOp.Add {
c.lastTargets[srvName][add.Name] = add
}
// do delete first since change is delete+add
for _, del := range targetOp.Del {
delete(c.lastTargets[srvName], del)
}
for _, add := range targetOp.Add {
c.lastTargets[srvName][add.Name] = add
}
c.m.Unlock()

opChan <- targetOp
Expand Down
7 changes: 4 additions & 3 deletions pkg/loaders/docker_loader/docker_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,12 +520,13 @@ func (d *dockerLoader) updateTargets(ctx context.Context, tcs map[string]*types.
return
}
d.m.Lock()
for _, add := range targetOp.Add {
d.lastTargets[add.Name] = add
}
// do deletes first since change is delete+add
for _, del := range targetOp.Del {
delete(d.lastTargets, del)
}
for _, add := range targetOp.Add {
d.lastTargets[add.Name] = add
}
d.m.Unlock()
opChan <- targetOp
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/loaders/file_loader/file_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,13 @@ func (f *fileLoader) updateTargets(ctx context.Context, tcs map[string]*types.Ta
return
}
f.m.Lock()
for _, add := range targetOp.Add {
f.lastTargets[add.Name] = add
}
// do delete first since change is delete+add
for _, del := range targetOp.Del {
delete(f.lastTargets, del)
}
for _, add := range targetOp.Add {
f.lastTargets[add.Name] = add
}
f.m.Unlock()
opChan <- targetOp
}
Expand Down
46 changes: 27 additions & 19 deletions pkg/loaders/http_loader/http_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,16 @@ func (h *httpLoader) updateTargets(ctx context.Context, tcs map[string]*types.Ta
return
}
h.m.Lock()
// do delete first, since target change
// consists of delete and add
for _, n := range targetOp.Del {
delete(h.lastTargets, n)
}
for n, t := range targetOp.Add {
if _, ok := h.lastTargets[n]; !ok {
h.lastTargets[n] = t
}
}
for _, n := range targetOp.Del {
delete(h.lastTargets, n)
}
h.m.Unlock()
opChan <- targetOp
}
Expand Down Expand Up @@ -417,12 +419,29 @@ func (f *httpLoader) runActions(ctx context.Context, tcs map[string]*types.Targe
}
}()
// create waitGroup and add the number of target operations to it
wg := new(sync.WaitGroup)
wg.Add(len(targetOp.Add) + len(targetOp.Del))
wgDelete := new(sync.WaitGroup)
wgDelete.Add(len(targetOp.Del))
// run OnDelete actions first, since change==delete+add
for _, tDel := range targetOp.Del {
go func(name string) {
defer wgDelete.Done()
err := f.runOnDeleteActions(ctx, name, tcs)
if err != nil {
f.logger.Printf("failed running OnDelete actions: %v", err)
return
}
opChan <- &loaders.TargetOperation{Del: []string{name}}
}(tDel)
}
wgDelete.Wait()

wgAdd := new(sync.WaitGroup)
wgAdd.Add(len(targetOp.Add))

// run OnAdd actions
for n, tAdd := range targetOp.Add {
go func(n string, tc *types.TargetConfig) {
defer wg.Done()
defer wgDelete.Done()
err := f.runOnAddActions(ctx, tc.Name, tcs)
if err != nil {
f.logger.Printf("failed running OnAdd actions: %v", err)
Expand All @@ -431,19 +450,8 @@ func (f *httpLoader) runActions(ctx context.Context, tcs map[string]*types.Targe
opChan <- &loaders.TargetOperation{Add: map[string]*types.TargetConfig{n: tc}}
}(n, tAdd)
}
// run OnDelete actions
for _, tDel := range targetOp.Del {
go func(name string) {
defer wg.Done()
err := f.runOnDeleteActions(ctx, name, tcs)
if err != nil {
f.logger.Printf("failed running OnDelete actions: %v", err)
return
}
opChan <- &loaders.TargetOperation{Del: []string{name}}
}(tDel)
}
wg.Wait()

wgAdd.Wait()
close(opChan)
<-doneCh //wait for gathering goroutine to finish
return result, nil
Expand Down
37 changes: 28 additions & 9 deletions pkg/loaders/loaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package loaders
import (
"context"
"log"
"reflect"

"github.com/mitchellh/mapstructure"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -72,30 +73,48 @@ func DecodeConfig(src, dst interface{}) error {
return decoder.Decode(src)
}

func Diff(m1, m2 map[string]*types.TargetConfig) *TargetOperation {
func Diff(currentMap, newMap map[string]*types.TargetConfig) *TargetOperation {
result := &TargetOperation{
Add: make(map[string]*types.TargetConfig, 0),
Del: make([]string, 0),
}
if len(m1) == 0 {
for n, t := range m2 {
// handle removed and added targets
if len(currentMap) == 0 {
for n, t := range newMap {
result.Add[n] = t
}
return result
}
if len(m2) == 0 {
for name := range m1 {
if len(newMap) == 0 {
for name := range currentMap {
result.Del = append(result.Del, name)
}
return result
}
for n, t := range m2 {
if _, ok := m1[n]; !ok {
for n, t := range newMap {
if _, ok := currentMap[n]; !ok {
result.Add[n] = t
}
}
for n := range m1 {
if _, ok := m2[n]; !ok {
for n := range currentMap {
if _, ok := newMap[n]; !ok {
result.Del = append(result.Del, n)
}
}
// handle changes
for n, currentVal := range currentMap {
newVal, ok := newMap[n]
// we don't have the target in the new config,
// already handled above
if !ok {
continue
}
// if any target parameter changes, we need to remove
// and re-add
// the only case I see where we wouldn't necessarily need to restart the actual GRPC connection
// is if Tags and EventTags changed, we could just apply the new tags internally (but right now it's done in the StartCollector phase)
if !reflect.DeepEqual(currentVal, newVal) {
result.Add[n] = newVal
result.Del = append(result.Del, n)
}
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/loaders/loaders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,24 @@ var testSet = map[string]struct {
Del: []string{"target1"},
},
},
"t10-target-change": {
m1: map[string]*types.TargetConfig{
"target1": {Address: "ip1"},
"target2": {Address: "ip2"},
},
m2: map[string]*types.TargetConfig{
"target1": {Address: "ip1"},
"target2": {Address: "ip2new"},
},
output: &TargetOperation{
Add: map[string]*types.TargetConfig{
"target2": {
Address: "ip2new",
},
},
Del: []string{"target2"},
},
},
}

func TestGetInstancesTagsMatches(t *testing.T) {
Expand Down