Skip to content

Commit 9d0d108

Browse files
authored
chore(engine): use github.com/grafana/dskit/dns for scheduler discovery (#19835)
Signed-off-by: Robert Fratto <[email protected]>
1 parent 25c8827 commit 9d0d108

File tree

2 files changed

+179
-130
lines changed

2 files changed

+179
-130
lines changed

pkg/engine/internal/worker/scheduler_lookup.go

Lines changed: 125 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,14 @@ import (
1010

1111
"github.com/go-kit/log"
1212
"github.com/go-kit/log/level"
13+
"github.com/grafana/dskit/dns"
1314
"github.com/grafana/dskit/grpcutil"
1415
)
1516

1617
type schedulerLookup struct {
17-
// logger to log messages with.
18-
logger log.Logger
19-
20-
// Watcher emits events when schedulers are found. Each found address must
21-
// be an IP address.
22-
watcher grpcutil.Watcher
23-
24-
closeOnce sync.Once
18+
logger log.Logger
19+
watcher *dnsWatcher
20+
interval time.Duration
2521
}
2622

2723
type handleScheduler func(ctx context.Context, addr net.Addr)
@@ -31,29 +27,16 @@ func newSchedulerLookup(logger log.Logger, address string, lookupInterval time.D
3127
logger = log.NewNopLogger()
3228
}
3329

34-
resolver, err := grpcutil.NewDNSResolverWithFreq(lookupInterval, logger)
35-
if err != nil {
36-
return nil, fmt.Errorf("creating DNS resolver: %w", err)
37-
}
38-
39-
watcher, err := resolver.Resolve(address, "")
40-
if err != nil {
41-
return nil, fmt.Errorf("creating DNS watcher: %w", err)
42-
}
30+
provider := dns.NewProvider(logger, nil, dns.GolangResolverType)
4331

4432
return &schedulerLookup{
45-
logger: logger,
46-
watcher: watcher,
33+
logger: logger,
34+
watcher: newDNSWatcher(address, provider),
35+
interval: lookupInterval,
4736
}, nil
4837
}
4938

5039
func (l *schedulerLookup) Run(ctx context.Context, handlerFunc handleScheduler) error {
51-
// Hook into context cancellation to close the watcher. We need to do this
52-
// because the watcher doesn't accept a custom context when polling for
53-
// changes.
54-
stop := context.AfterFunc(ctx, l.closeWatcher)
55-
defer stop()
56-
5740
var handlerWg sync.WaitGroup
5841
defer handlerWg.Wait()
5942

@@ -67,60 +50,69 @@ func (l *schedulerLookup) Run(ctx context.Context, handlerFunc handleScheduler)
6750
ctx, cancel := context.WithCancel(ctx)
6851
defer cancel()
6952

53+
// Initially set our timer with no delay so we process the first update
54+
// immediately. We'll give it a real duration after each tick.
55+
timer := time.NewTimer(0)
56+
defer timer.Stop()
57+
7058
for {
71-
updates, err := l.watcher.Next()
72-
if err != nil && ctx.Err() == nil {
73-
return fmt.Errorf("finding schedulers: %w", err)
74-
} else if ctx.Err() != nil {
75-
// The context was canceled, we can exit gracefully.
59+
select {
60+
case <-ctx.Done():
7661
return nil
77-
}
7862

79-
for _, update := range updates {
80-
switch update.Op {
81-
case grpcutil.Add:
82-
if _, exist := handlers[update.Addr]; exist {
83-
// Ignore duplicate handlers.
84-
level.Warn(l.logger).Log("msg", "ignoring duplicate scheduler", "addr", update.Addr)
85-
continue
86-
}
63+
case <-timer.C:
64+
timer.Reset(l.interval)
8765

88-
addr, err := parseTCPAddr(update.Addr)
89-
if err != nil {
90-
level.Warn(l.logger).Log("msg", "failed to parse scheduler address", "addr", update.Addr, "err", err)
91-
continue
92-
}
66+
updates, err := l.watcher.Poll(ctx)
67+
if err != nil && ctx.Err() == nil {
68+
return fmt.Errorf("finding schedulers: %w", err)
69+
} else if ctx.Err() != nil {
70+
// The context was canceled, we can exit gracefully.
71+
return nil
72+
}
9373

94-
var handler handlerContext
95-
handler.Context, handler.Cancel = context.WithCancel(ctx)
96-
handlers[update.Addr] = handler
97-
98-
handlerWg.Add(1)
99-
go func() {
100-
defer handlerWg.Done()
101-
handlerFunc(handler.Context, addr)
102-
}()
103-
104-
case grpcutil.Delete:
105-
handler, exist := handlers[update.Addr]
106-
if !exist {
107-
level.Warn(l.logger).Log("msg", "ignoring unrecognized scheduler", "addr", update.Addr)
108-
continue
74+
for _, update := range updates {
75+
switch update.Op {
76+
case grpcutil.Add:
77+
if _, exist := handlers[update.Addr]; exist {
78+
// Ignore duplicate handlers.
79+
level.Warn(l.logger).Log("msg", "ignoring duplicate scheduler", "addr", update.Addr)
80+
continue
81+
}
82+
83+
addr, err := parseTCPAddr(update.Addr)
84+
if err != nil {
85+
level.Warn(l.logger).Log("msg", "failed to parse scheduler address", "addr", update.Addr, "err", err)
86+
continue
87+
}
88+
89+
var handler handlerContext
90+
handler.Context, handler.Cancel = context.WithCancel(ctx)
91+
handlers[update.Addr] = handler
92+
93+
handlerWg.Add(1)
94+
go func() {
95+
defer handlerWg.Done()
96+
handlerFunc(handler.Context, addr)
97+
}()
98+
99+
case grpcutil.Delete:
100+
handler, exist := handlers[update.Addr]
101+
if !exist {
102+
level.Warn(l.logger).Log("msg", "ignoring unrecognized scheduler", "addr", update.Addr)
103+
continue
104+
}
105+
handler.Cancel()
106+
delete(handlers, update.Addr)
107+
108+
default:
109+
level.Warn(l.logger).Log("msg", "unknown scheduler update operation", "op", update.Op)
109110
}
110-
handler.Cancel()
111-
delete(handlers, update.Addr)
112-
113-
default:
114-
level.Warn(l.logger).Log("msg", "unknown scheduler update operation", "op", update.Op)
115111
}
116112
}
117113
}
118114
}
119115

120-
func (l *schedulerLookup) closeWatcher() {
121-
l.closeOnce.Do(func() { l.watcher.Close() })
122-
}
123-
124116
// parseTCPAddr parses a TCP address string into a [net.TCPAddr]. It doesn't do
125117
// any name resolution: the addr must be a numeric pair of IP and port.
126118
func parseTCPAddr(addr string) (*net.TCPAddr, error) {
@@ -131,3 +123,68 @@ func parseTCPAddr(addr string) (*net.TCPAddr, error) {
131123

132124
return net.TCPAddrFromAddrPort(ap), nil
133125
}
126+
127+
type provider interface {
128+
Resolve(ctx context.Context, addrs []string) error
129+
Addresses() []string
130+
}
131+
132+
type dnsWatcher struct {
133+
addr string
134+
provider provider
135+
136+
cached map[string]struct{}
137+
}
138+
139+
func newDNSWatcher(addr string, provider provider) *dnsWatcher {
140+
return &dnsWatcher{
141+
addr: addr,
142+
provider: provider,
143+
144+
cached: make(map[string]struct{}),
145+
}
146+
}
147+
148+
// Poll polls for changes in the DNS records.
149+
func (w *dnsWatcher) Poll(ctx context.Context) ([]*grpcutil.Update, error) {
150+
if err := w.provider.Resolve(ctx, []string{w.addr}); err != nil {
151+
return nil, err
152+
}
153+
154+
actual := w.discovered()
155+
156+
var updates []*grpcutil.Update
157+
for addr := range actual {
158+
if _, exists := w.cached[addr]; exists {
159+
continue
160+
}
161+
162+
w.cached[addr] = struct{}{}
163+
updates = append(updates, &grpcutil.Update{
164+
Addr: addr,
165+
Op: grpcutil.Add,
166+
})
167+
}
168+
169+
for addr := range w.cached {
170+
if _, exists := actual[addr]; !exists {
171+
delete(w.cached, addr)
172+
updates = append(updates, &grpcutil.Update{
173+
Addr: addr,
174+
Op: grpcutil.Delete,
175+
})
176+
}
177+
}
178+
179+
return updates, nil
180+
}
181+
182+
func (w *dnsWatcher) discovered() map[string]struct{} {
183+
slice := w.provider.Addresses()
184+
185+
res := make(map[string]struct{}, len(slice))
186+
for _, addr := range slice {
187+
res[addr] = struct{}{}
188+
}
189+
return res
190+
}

pkg/engine/internal/worker/scheduler_lookup_test.go

Lines changed: 54 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -2,90 +2,82 @@ package worker
22

33
import (
44
"context"
5-
"errors"
6-
"fmt"
75
"net"
86
"sync"
97
"testing"
8+
"testing/synctest"
109
"time"
1110

1211
"github.com/go-kit/log"
13-
"github.com/grafana/dskit/grpcutil"
1412
"github.com/stretchr/testify/require"
1513
"go.uber.org/atomic"
1614
)
1715

1816
func Test_schedulerLookup(t *testing.T) {
19-
var wg sync.WaitGroup
20-
defer wg.Wait()
21-
22-
fw := &fakeWatcher{
23-
ctx: t.Context(),
24-
ch: make(chan *grpcutil.Update, 1),
25-
}
17+
// NOTE(rfratto): synctest makes it possible to reliably test asynchronous
18+
// code with time.Sleep.
19+
synctest.Test(t, func(t *testing.T) {
20+
var wg sync.WaitGroup
21+
defer wg.Wait()
22+
23+
// Provide 10 addresses to start with.
24+
addrs := []string{
25+
"127.0.0.1:8080", "127.0.0.2:8080", "127.0.0.3:8080", "127.0.0.4:8080", "127.0.0.5:8080",
26+
"127.0.0.6:8080", "127.0.0.7:8080", "127.0.0.8:8080", "127.0.0.9:8080", "127.0.0.10:8080",
27+
}
2628

27-
// Manually create a schedulerLookup so we can hook in a custom
28-
// implementation of [grpcutil.Watcher].
29-
disc := &schedulerLookup{
30-
logger: log.NewNopLogger(),
31-
watcher: fw,
32-
}
29+
fr := &fakeProvider{
30+
resolveFunc: func(_ context.Context, _ []string) ([]string, error) { return addrs, nil },
31+
}
3332

34-
var handlers atomic.Int64
33+
// Manually create a schedulerLookup so we can hook in a custom
34+
// implementation of [grpcutil.Watcher].
35+
disc := &schedulerLookup{
36+
logger: log.NewNopLogger(),
37+
watcher: newDNSWatcher("example.com", fr),
38+
interval: 1 * time.Minute,
39+
}
3540

36-
lookupContext, lookupCancel := context.WithCancel(t.Context())
37-
defer lookupCancel()
41+
var handlers atomic.Int64
3842

39-
wg.Add(1)
40-
go func() {
41-
// Decrement the wait group once Run exits. Run won't exit until all
42-
// handlers have terminated, so this validates that logic.
43-
defer wg.Done()
43+
lookupContext, lookupCancel := context.WithCancel(t.Context())
44+
defer lookupCancel()
4445

45-
_ = disc.Run(lookupContext, func(ctx context.Context, _ net.Addr) {
46-
context.AfterFunc(ctx, func() { handlers.Dec() })
47-
handlers.Inc()
46+
wg.Go(func() {
47+
_ = disc.Run(lookupContext, func(ctx context.Context, _ net.Addr) {
48+
context.AfterFunc(ctx, func() { handlers.Dec() })
49+
handlers.Inc()
50+
})
4851
})
49-
}()
50-
51-
// Emit 10 schedulers, then wait for there to be one handler per
52-
// scheduler.
53-
for i := range 10 {
54-
addr := fmt.Sprintf("127.0.0.%d:8080", i+1)
55-
fw.ch <- &grpcutil.Update{Op: grpcutil.Add, Addr: addr}
56-
}
57-
58-
require.Eventually(t, func() bool {
59-
return handlers.Load() == 10
60-
}, time.Minute, time.Millisecond*10, "should have 10 running handlers, ended with %d", handlers.Load())
61-
62-
// Delete all the schedulers, then wait for all handlers to terminate (by
63-
// context).
64-
for i := range 10 {
65-
addr := fmt.Sprintf("127.0.0.%d:8080", i+1)
66-
fw.ch <- &grpcutil.Update{Op: grpcutil.Delete, Addr: addr}
67-
}
6852

69-
require.Eventually(t, func() bool {
70-
return handlers.Load() == 0
71-
}, time.Minute, time.Millisecond*10, "should have no handlers running, ended with %d", handlers.Load())
53+
// There should immediately be running handlers without needing to wait
54+
// for the discovery interval.
55+
synctest.Wait()
56+
require.Equal(t, int64(10), handlers.Load(), "should have 10 running handlers")
57+
58+
// Remove all the addresses from discovery; after the next interval, all
59+
// handlers should be removed.
60+
addrs = addrs[:0]
61+
time.Sleep(disc.interval + time.Second)
62+
require.Equal(t, int64(0), handlers.Load(), "should have no running handlers")
63+
})
7264
}
7365

74-
type fakeWatcher struct {
75-
ctx context.Context
76-
ch chan *grpcutil.Update
66+
type fakeProvider struct {
67+
resolveFunc func(ctx context.Context, addrs []string) ([]string, error)
68+
69+
cached []string
7770
}
7871

79-
func (fw fakeWatcher) Next() ([]*grpcutil.Update, error) {
80-
select {
81-
case <-fw.ctx.Done():
82-
return nil, fw.ctx.Err()
83-
case update, ok := <-fw.ch:
84-
if !ok {
85-
return nil, errors.New("closed")
86-
}
87-
return []*grpcutil.Update{update}, nil
72+
func (fp *fakeProvider) Resolve(ctx context.Context, addrs []string) error {
73+
resolved, err := fp.resolveFunc(ctx, addrs)
74+
if err != nil {
75+
return err
8876
}
77+
fp.cached = resolved
78+
return nil
8979
}
9080

91-
func (fw fakeWatcher) Close() { close(fw.ch) }
81+
func (fp *fakeProvider) Addresses() []string {
82+
return fp.cached
83+
}

0 commit comments

Comments
 (0)