Skip to content

Commit 0e6a1b1

Browse files
authored
Fixed instancer logic for Consul (#1215)
The instancer loop was never updating the lastIndex but passing it to getInstances which caused Instancer to spam Consul repeatedly and nearly killed our Consul agent pods and time instances were added or removed from Consul for that service.
1 parent 6ed2328 commit 0e6a1b1

File tree

2 files changed

+67
-2
lines changed

2 files changed

+67
-2
lines changed

sd/consul/instancer.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ func (s *Instancer) loop(lastIndex uint64) {
6565
instances []string
6666
err error
6767
d time.Duration = 10 * time.Millisecond
68+
index uint64
6869
)
6970
for {
70-
index := lastIndex
7171
instances, index, err = s.getInstances(lastIndex, s.quitc)
7272
switch {
7373
case errors.Is(err, errStopped):
@@ -82,11 +82,12 @@ func (s *Instancer) loop(lastIndex uint64) {
8282
time.Sleep(d)
8383
d = conn.Exponential(d)
8484
case index < lastIndex:
85-
s.logger.Log("err", "index is less than previous; reseting to default")
85+
s.logger.Log("err", "index is less than previous; resetting to default")
8686
lastIndex = defaultIndex
8787
time.Sleep(d)
8888
d = conn.Exponential(d)
8989
default:
90+
lastIndex = index
9091
s.cache.Update(sd.Event{Instances: instances})
9192
d = 10 * time.Millisecond
9293
}

sd/consul/instancer_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package consul
22

33
import (
44
"context"
5+
"fmt"
56
"io"
67
"testing"
78
"time"
@@ -261,3 +262,66 @@ func TestInstancerWithInvalidIndex(t *testing.T) {
261262
t.Error("failed, to receive call in time")
262263
}
263264
}
265+
266+
type indexTestClient struct {
267+
client *testClient
268+
index uint64
269+
errs chan error
270+
}
271+
272+
func newIndexTestClient(c *testClient, errs chan error) *indexTestClient {
273+
return &indexTestClient{
274+
client: c,
275+
index: 0,
276+
errs: errs,
277+
}
278+
}
279+
280+
func (i *indexTestClient) Register(r *consul.AgentServiceRegistration) error {
281+
return i.client.Register(r)
282+
}
283+
284+
func (i *indexTestClient) Deregister(r *consul.AgentServiceRegistration) error {
285+
return i.client.Deregister(r)
286+
}
287+
288+
func (i *indexTestClient) Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
289+
290+
// Assumes this is the first call Service, loop hasn't begun running yet
291+
if i.index == 0 && queryOpts.WaitIndex == 0 {
292+
i.index = 100
293+
entries, meta, err := i.client.Service(service, tag, passingOnly, queryOpts)
294+
meta.LastIndex = i.index
295+
return entries, meta, err
296+
}
297+
298+
if queryOpts.WaitIndex < i.index {
299+
i.errs <- fmt.Errorf("wait index %d is less than or equal to previous value", queryOpts.WaitIndex)
300+
}
301+
302+
entries, meta, err := i.client.Service(service, tag, passingOnly, queryOpts)
303+
i.index++
304+
meta.LastIndex = i.index
305+
return entries, meta, err
306+
}
307+
308+
func TestInstancerLoopIndex(t *testing.T) {
309+
310+
var (
311+
errs = make(chan error, 1)
312+
logger = log.NewNopLogger()
313+
client = newIndexTestClient(newTestClient(consulState), errs)
314+
)
315+
316+
go func() {
317+
for err := range errs {
318+
t.Error(err)
319+
t.FailNow()
320+
}
321+
}()
322+
323+
instancer := NewInstancer(client, logger, "search", []string{"api"}, true)
324+
defer instancer.Stop()
325+
326+
time.Sleep(2 * time.Second)
327+
}

0 commit comments

Comments
 (0)