Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ const (

type usageTrackerGenericClient interface {
services.Service
TrackSeries(ctx context.Context, userID string, series []uint64) ([]uint64, error)
TrackSeries(ctx context.Context, userID string, series []uint64, async bool) ([]uint64, error)
CanTrackAsync(userID string) bool
}

Expand Down Expand Up @@ -1607,7 +1607,7 @@ func (d *Distributor) prePushMaxSeriesLimitMiddleware(next PushFunc) PushFunc {
}

// User is close to limit, track synchronously.
rejectedHashes, err := d.usageTrackerClient.TrackSeries(ctx, userID, seriesHashes)
rejectedHashes, err := d.usageTrackerClient.TrackSeries(ctx, userID, seriesHashes, false)
if err != nil {
return errors.Wrap(err, "failed to enforce max series limit")
}
Expand Down Expand Up @@ -1642,7 +1642,7 @@ func (d *Distributor) parallelUsageTrackerClientTrackSeriesCall(ctx context.Cont
asyncTrackingCtx, cancelAsyncTracking := context.WithCancelCause(ctx)
go func() {
defer close(done)
rejected, err := d.usageTrackerClient.TrackSeries(asyncTrackingCtx, userID, seriesHashes)
rejected, err := d.usageTrackerClient.TrackSeries(asyncTrackingCtx, userID, seriesHashes, true)
if err != nil {
level.Error(d.log).Log("msg", "failed to track series asynchronously", "err", err, "user", userID, "series", len(seriesHashes))
}
Expand Down
31 changes: 30 additions & 1 deletion pkg/usagetracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"flag"
"fmt"
"net/http"
"runtime/trace"
"slices"
"strconv"
"sync"
Expand Down Expand Up @@ -670,13 +671,41 @@ func (t *UsageTracker) stop(_ error) error {

// TrackSeries implements usagetrackerpb.UsageTrackerServer.
func (t *UsageTracker) TrackSeries(_ context.Context, req *usagetrackerpb.TrackSeriesRequest) (*usagetrackerpb.TrackSeriesResponse, error) {

if req.Subrequests != nil {
resps := make([]*usagetrackerpb.TrackSeriesSubresponse, 0, len(req.Subrequests))
now := time.Now()
// a batch request.
level.Info(t.logger).Log("msg", "logging bulk series", "count", len(req.Subrequests))
for _, sr := range req.Subrequests {
p, err := t.runningPartition(sr.Partition)
if err != nil {
return nil, err
}
rej, err := p.store.trackSeries(context.Background(), sr.UserID, sr.SeriesHashes, now)
if err != nil {
return nil, err
}
resp := usagetrackerpb.TrackSeriesSubresponse{}
resp.RejectedSeriesHashes = rej
resps = append(resps, &resp)
}

return &usagetrackerpb.TrackSeriesResponse{
Subresponses: resps,
}, nil
}

partition := req.Partition
p, err := t.runningPartition(partition)
if err != nil {
return nil, err
}

rejected, err := p.store.trackSeries(context.Background(), req.UserID, req.SeriesHashes, time.Now())
var rejected []uint64
trace.WithRegion(context.Background(), "trackSeries", func() {
rejected, err = p.store.trackSeries(context.Background(), req.UserID, req.SeriesHashes, time.Now())
})
if err != nil {
return nil, err
}
Expand Down
26 changes: 17 additions & 9 deletions pkg/usagetracker/usagetrackerclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (c *UsageTrackerClient) stopping(_ error) error {
return nil
}

func (c *UsageTrackerClient) TrackSeries(ctx context.Context, userID string, series []uint64) (_ []uint64, returnErr error) {
func (c *UsageTrackerClient) TrackSeries(ctx context.Context, userID string, series []uint64, async bool) (_ []uint64, returnErr error) {
// Nothing to do if there are no series to track.
if len(series) == 0 {
return nil, nil
Expand Down Expand Up @@ -259,7 +259,7 @@ func (c *UsageTrackerClient) TrackSeries(ctx context.Context, userID string, ser
}

// Track the series for this partition.
partitionRejected, err := c.trackSeriesPerPartition(ctx, userID, int32(partitionID), partitionSeries)
partitionRejected, err := c.trackSeriesPerPartition(ctx, userID, int32(partitionID), partitionSeries, async)
if err != nil {
return errors.Wrapf(err, "partition %d", partitionID)
}
Expand Down Expand Up @@ -295,7 +295,7 @@ func (c *UsageTrackerClient) TrackSeries(ctx context.Context, userID string, ser
return rejectedCopy, nil
}

func (c *UsageTrackerClient) trackSeriesPerPartition(ctx context.Context, userID string, partitionID int32, series []uint64) ([]uint64, error) {
func (c *UsageTrackerClient) trackSeriesPerPartition(ctx context.Context, userID string, partitionID int32, series []uint64, async bool) ([]uint64, error) {
// Get the usage-tracker instances for the input partition.
set, err := c.partitionRing.GetReplicationSetForPartitionAndOperation(partitionID, TrackSeriesOp)
if err != nil {
Expand Down Expand Up @@ -329,13 +329,21 @@ func (c *UsageTrackerClient) trackSeriesPerPartition(ctx context.Context, userID
return nil, errors.Errorf("usage-tracker instance %s (%s)", instance.Id, instance.Addr)
}

trackerClient := poolClient.(usagetrackerpb.UsageTrackerClient)
trackerRes, err := trackerClient.TrackSeries(ctx, req)
if err != nil {
return nil, errors.Wrapf(err, "usage-tracker instance %s (%s)", instance.Id, instance.Addr)
}
trackerClient := poolClient.(*usageTrackerClient)

if async {
trackerClient.AsyncTrackSeries(ctx, req)

return trackerRes.RejectedSeriesHashes, nil
// AsyncTrackSeries is non-blocking and doesn't return rejected series.
// Return empty slice since we're tracking asynchronously.
return nil, nil
} else {
res, err := trackerClient.TrackSeries(ctx, req)
if err != nil {
return nil, err
}
return res.RejectedSeriesHashes, nil
}
}, func(_ []uint64) {
// No cleanup.
})
Expand Down
14 changes: 7 additions & 7 deletions pkg/usagetracker/usagetrackerclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestUsageTrackerClient_TrackSeries(t *testing.T) {
series4Partition2 := uint64(partitions[1].Tokens[0] - 1)
series5Partition2 := uint64(partitions[1].Tokens[1] - 1)

rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2})
rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2}, false)
require.NoError(t, err)
require.Empty(t, rejected)

Expand Down Expand Up @@ -238,7 +238,7 @@ func TestUsageTrackerClient_TrackSeries(t *testing.T) {
series2Partition1 := uint64(partitions[0].Tokens[1] - 1)
series3Partition1 := uint64(partitions[0].Tokens[2] - 1)

rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1})
rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1}, false)
require.NoError(t, err)
require.Empty(t, rejected)

Expand Down Expand Up @@ -298,7 +298,7 @@ func TestUsageTrackerClient_TrackSeries(t *testing.T) {
series4Partition2 := uint64(partitions[1].Tokens[0] - 1)
series5Partition2 := uint64(partitions[1].Tokens[1] - 1)

rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2})
rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2}, false)
require.NoError(t, err)
require.Empty(t, rejected)

Expand Down Expand Up @@ -376,7 +376,7 @@ func TestUsageTrackerClient_TrackSeries(t *testing.T) {
require.NoError(t, services.StopAndAwaitTerminated(ctx, c))
})

rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2})
rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2}, false)
require.NoError(t, err)
if returnRejectedSeries {
require.ElementsMatch(t, []uint64{series2Partition1, series4Partition2, series5Partition2}, rejected)
Expand Down Expand Up @@ -448,7 +448,7 @@ func TestUsageTrackerClient_TrackSeries(t *testing.T) {
require.NoError(t, services.StopAndAwaitTerminated(ctx, c))
})

rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2})
rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2}, false)
require.NoError(t, err)
require.ElementsMatch(t, []uint64{series2Partition1}, rejected)

Expand Down Expand Up @@ -504,7 +504,7 @@ func TestUsageTrackerClient_TrackSeries(t *testing.T) {
require.NoError(t, services.StopAndAwaitTerminated(ctx, c))
})

rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{})
rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{}, false)
require.NoError(t, err)
require.Empty(t, rejected)
})
Expand Down Expand Up @@ -556,7 +556,7 @@ func TestUsageTrackerClient_TrackSeries(t *testing.T) {
series5Partition2 := uint64(partitions[1].Tokens[1] - 1)

// Despite all instances failing, the client should not return an error when IgnoreErrors is enabled.
rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2})
rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2}, false)
require.NoError(t, err)
require.Empty(t, rejected)

Expand Down
98 changes: 96 additions & 2 deletions pkg/usagetracker/usagetrackerclient/grpc_client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ package usagetrackerclient

import (
"context"
"slices"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/user"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand Down Expand Up @@ -76,17 +80,34 @@ func dialUsageTracker(clientCfg grpcclient.Config, instance ring.InstanceDesc, r
return nil, errors.Wrapf(err, "failed to dial usage-tracker %s %s", instance.Id, instance.Addr)
}

return &usageTrackerClient{
c := &usageTrackerClient{
UsageTrackerClient: usagetrackerpb.NewUsageTrackerClient(conn),
HealthClient: grpc_health_v1.NewHealthClient(conn),
conn: conn,
}, nil

logger: logger,

flushDelay: 100 * time.Millisecond,
maxBatchSize: 1000, // Flush when batch reaches this size
}

go c.flusher()

return c, nil
}

type usageTrackerClient struct {
usagetrackerpb.UsageTrackerClient
grpc_health_v1.HealthClient
conn *grpc.ClientConn

logger log.Logger

mu sync.Mutex
pending []*usagetrackerpb.TrackSeriesSubrequest
flushTimer *time.Timer
flushDelay time.Duration
maxBatchSize int
}

func (c *usageTrackerClient) Close() error {
Expand All @@ -101,6 +122,79 @@ func (c *usageTrackerClient) RemoteAddress() string {
return c.conn.Target()
}

// AsyncTrackSeries queues a TrackSeries request to be batched and flushed on a timer.
// This method is non-blocking and does not return rejected series.
func (c *usageTrackerClient) AsyncTrackSeries(ctx context.Context, req *usagetrackerpb.TrackSeriesRequest) {
// Convert the request to a subrequest for batching.
subreq := &usagetrackerpb.TrackSeriesSubrequest{
UserID: req.UserID,
Partition: req.Partition,
SeriesHashes: req.SeriesHashes,
}

c.mu.Lock()
c.pending = append(c.pending, subreq)

// Flush if batch size is reached.
if len(c.pending) >= c.maxBatchSize {
c.mu.Unlock()
c.flush()
} else {
c.mu.Unlock()
}
}

func (c *usageTrackerClient) flusher() {
// Dangles for now. Fix later.

for {
time.Sleep(util.DurationWithJitter(c.flushDelay, 0.15))
c.flush()
}
}

// flush sends all pending requests as a single batched request.
func (c *usageTrackerClient) flush() {
c.mu.Lock()

if len(c.pending) == 0 {
c.mu.Unlock()
return
}

// Create a batched request with all pending subrequests.
batchReq := &usagetrackerpb.TrackSeriesRequest{
Subrequests: slices.Clone(c.pending),
}

// Clear pending requests and reset timer.
pending := c.pending
c.pending = nil

// Release the lock before making the gRPC call.
c.mu.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Just trying to make things succeed.
tctx := user.InjectOrgID(ctx, "100")

_, err := c.TrackSeries(tctx, batchReq)
if err != nil {
level.Warn(c.logger).Log(
"msg", "failed to send batched TrackSeries request",
"batch_size", len(pending),
"err", err,
)
} else {
level.Debug(c.logger).Log(
"msg", "successfully sent batched TrackSeries request",
"batch_size", len(pending),
)
}
}

// grpcclientInstrument is a copy of grpcclient.Instrument, but it doesn't add the ClientUserHeaderInterceptor for the method that doesn't need auth.
func grpcclientInstrument(requestDuration *prometheus.HistogramVec, instrumentationLabelOptions ...middleware.InstrumentationOption) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
noAuthMethods := map[string]bool{
Expand Down
Loading
Loading