Skip to content

Commit bd99fa5

Browse files
chore: abstract failover to withRandomShuffle (#19915)
1 parent aafc579 commit bd99fa5

File tree

1 file changed

+21
-9
lines changed

1 file changed

+21
-9
lines changed

pkg/distributor/ingest_limits.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ type ingestLimitsFrontendClient interface {
2020
ExceedsLimits(context.Context, *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error)
2121
}
2222

23-
// ingestLimitsFrontendRingClient uses the ring to query ingest-limits frontends.
23+
// ingestLimitsFrontendRingClient uses the ring to discover ingest-limits-frontend
24+
// instances and proxy requests to them.
2425
type ingestLimitsFrontendRingClient struct {
2526
ring ring.ReadRing
2627
pool *ring_client.Pool
@@ -35,21 +36,33 @@ func newIngestLimitsFrontendRingClient(ring ring.ReadRing, pool *ring_client.Poo
3536

3637
// Implements the ingestLimitsFrontendClient interface.
3738
func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
39+
var resp *proto.ExceedsLimitsResponse
40+
err := c.withRandomShuffle(ctx, func(ctx context.Context, client proto.IngestLimitsFrontendClient) error {
41+
var clientErr error
42+
resp, clientErr = client.ExceedsLimits(ctx, req)
43+
return clientErr
44+
})
45+
return resp, err
46+
}
47+
48+
// withRandomShuffle gets all healthy frontends in the ring, randomly shuffles
49+
// them, and then calls f.
50+
func (c *ingestLimitsFrontendRingClient) withRandomShuffle(ctx context.Context, f func(ctx context.Context, client proto.IngestLimitsFrontendClient) error) error {
3851
rs, err := c.ring.GetAllHealthy(limits_frontend_client.LimitsRead)
3952
if err != nil {
40-
return nil, fmt.Errorf("failed to get limits-frontend instances from ring: %w", err)
53+
return fmt.Errorf("failed to get limits-frontend instances from ring: %w", err)
4154
}
4255
// Randomly shuffle instances to evenly distribute requests.
4356
rand.Shuffle(len(rs.Instances), func(i, j int) {
4457
rs.Instances[i], rs.Instances[j] = rs.Instances[j], rs.Instances[i]
4558
})
4659
var lastErr error
47-
// Send the request to the limits-frontend to see if it exceeds the tenant
48-
// limits. If the RPC fails, failover to the next instance in the ring.
60+
// Pass the instance to f. If it fails, failover to the next instance.
61+
// Repeat until there are no more instances.
4962
for _, instance := range rs.Instances {
5063
select {
5164
case <-ctx.Done():
52-
return nil, ctx.Err()
65+
return ctx.Err()
5366
default:
5467
}
5568
c, err := c.pool.GetClientFor(instance.Addr)
@@ -58,14 +71,13 @@ func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req
5871
continue
5972
}
6073
client := c.(proto.IngestLimitsFrontendClient)
61-
resp, err := client.ExceedsLimits(ctx, req)
62-
if err != nil {
74+
if err = f(ctx, client); err != nil {
6375
lastErr = err
6476
continue
6577
}
66-
return resp, nil
78+
return nil
6779
}
68-
return nil, lastErr
80+
return lastErr
6981
}
7082

7183
type ingestLimits struct {

0 commit comments

Comments
 (0)