diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 3b8a6e2b4f3..19a24b1827e 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -102,7 +102,7 @@ const ( type usageTrackerGenericClient interface { services.Service - TrackSeries(ctx context.Context, userID string, series []uint64) ([]uint64, error) + TrackSeries(ctx context.Context, userID string, series []uint64, async bool) ([]uint64, error) CanTrackAsync(userID string) bool } @@ -1607,7 +1607,7 @@ func (d *Distributor) prePushMaxSeriesLimitMiddleware(next PushFunc) PushFunc { } // User is close to limit, track synchronously. - rejectedHashes, err := d.usageTrackerClient.TrackSeries(ctx, userID, seriesHashes) + rejectedHashes, err := d.usageTrackerClient.TrackSeries(ctx, userID, seriesHashes, false) if err != nil { return errors.Wrap(err, "failed to enforce max series limit") } @@ -1642,7 +1642,7 @@ func (d *Distributor) parallelUsageTrackerClientTrackSeriesCall(ctx context.Cont asyncTrackingCtx, cancelAsyncTracking := context.WithCancelCause(ctx) go func() { defer close(done) - rejected, err := d.usageTrackerClient.TrackSeries(asyncTrackingCtx, userID, seriesHashes) + rejected, err := d.usageTrackerClient.TrackSeries(asyncTrackingCtx, userID, seriesHashes, true) if err != nil { level.Error(d.log).Log("msg", "failed to track series asynchronously", "err", err, "user", userID, "series", len(seriesHashes)) } diff --git a/pkg/usagetracker/tracker.go b/pkg/usagetracker/tracker.go index 46afd2dde0f..a87a858cc6d 100644 --- a/pkg/usagetracker/tracker.go +++ b/pkg/usagetracker/tracker.go @@ -7,6 +7,7 @@ import ( "flag" "fmt" "net/http" + "runtime/trace" "slices" "strconv" "sync" @@ -670,13 +671,41 @@ func (t *UsageTracker) stop(_ error) error { // TrackSeries implements usagetrackerpb.UsageTrackerServer. func (t *UsageTracker) TrackSeries(_ context.Context, req *usagetrackerpb.TrackSeriesRequest) (*usagetrackerpb.TrackSeriesResponse, error) { + + if req.Subrequests != nil { + resps := make([]*usagetrackerpb.TrackSeriesSubresponse, 0, len(req.Subrequests)) + now := time.Now() + // a batch request. + level.Info(t.logger).Log("msg", "logging bulk series", "count", len(req.Subrequests)) + for _, sr := range req.Subrequests { + p, err := t.runningPartition(sr.Partition) + if err != nil { + return nil, err + } + rej, err := p.store.trackSeries(context.Background(), sr.UserID, sr.SeriesHashes, now) + if err != nil { + return nil, err + } + resp := usagetrackerpb.TrackSeriesSubresponse{} + resp.RejectedSeriesHashes = rej + resps = append(resps, &resp) + } + + return &usagetrackerpb.TrackSeriesResponse{ + Subresponses: resps, + }, nil + } + partition := req.Partition p, err := t.runningPartition(partition) if err != nil { return nil, err } - rejected, err := p.store.trackSeries(context.Background(), req.UserID, req.SeriesHashes, time.Now()) + var rejected []uint64 + trace.WithRegion(context.Background(), "trackSeries", func() { + rejected, err = p.store.trackSeries(context.Background(), req.UserID, req.SeriesHashes, time.Now()) + }) if err != nil { return nil, err } diff --git a/pkg/usagetracker/usagetrackerclient/client.go b/pkg/usagetracker/usagetrackerclient/client.go index c4769790f0f..7501273be28 100644 --- a/pkg/usagetracker/usagetrackerclient/client.go +++ b/pkg/usagetracker/usagetrackerclient/client.go @@ -207,7 +207,7 @@ func (c *UsageTrackerClient) stopping(_ error) error { return nil } -func (c *UsageTrackerClient) TrackSeries(ctx context.Context, userID string, series []uint64) (_ []uint64, returnErr error) { +func (c *UsageTrackerClient) TrackSeries(ctx context.Context, userID string, series []uint64, async bool) (_ []uint64, returnErr error) { // Nothing to do if there are no series to track. if len(series) == 0 { return nil, nil @@ -259,7 +259,7 @@ func (c *UsageTrackerClient) TrackSeries(ctx context.Context, userID string, ser } // Track the series for this partition. - partitionRejected, err := c.trackSeriesPerPartition(ctx, userID, int32(partitionID), partitionSeries) + partitionRejected, err := c.trackSeriesPerPartition(ctx, userID, int32(partitionID), partitionSeries, async) if err != nil { return errors.Wrapf(err, "partition %d", partitionID) } @@ -295,7 +295,7 @@ func (c *UsageTrackerClient) TrackSeries(ctx context.Context, userID string, ser return rejectedCopy, nil } -func (c *UsageTrackerClient) trackSeriesPerPartition(ctx context.Context, userID string, partitionID int32, series []uint64) ([]uint64, error) { +func (c *UsageTrackerClient) trackSeriesPerPartition(ctx context.Context, userID string, partitionID int32, series []uint64, async bool) ([]uint64, error) { // Get the usage-tracker instances for the input partition. set, err := c.partitionRing.GetReplicationSetForPartitionAndOperation(partitionID, TrackSeriesOp) if err != nil { @@ -329,13 +329,21 @@ func (c *UsageTrackerClient) trackSeriesPerPartition(ctx context.Context, userID return nil, errors.Errorf("usage-tracker instance %s (%s)", instance.Id, instance.Addr) } - trackerClient := poolClient.(usagetrackerpb.UsageTrackerClient) - trackerRes, err := trackerClient.TrackSeries(ctx, req) - if err != nil { - return nil, errors.Wrapf(err, "usage-tracker instance %s (%s)", instance.Id, instance.Addr) - } + trackerClient := poolClient.(*usageTrackerClient) + + if async { + trackerClient.AsyncTrackSeries(ctx, req) - return trackerRes.RejectedSeriesHashes, nil + // AsyncTrackSeries is non-blocking and doesn't return rejected series. + // Return empty slice since we're tracking asynchronously. + return nil, nil + } else { + res, err := trackerClient.TrackSeries(ctx, req) + if err != nil { + return nil, err + } + return res.RejectedSeriesHashes, nil + } }, func(_ []uint64) { // No cleanup. }) diff --git a/pkg/usagetracker/usagetrackerclient/client_test.go b/pkg/usagetracker/usagetrackerclient/client_test.go index 7f54b33dea4..763badcc2e1 100644 --- a/pkg/usagetracker/usagetrackerclient/client_test.go +++ b/pkg/usagetracker/usagetrackerclient/client_test.go @@ -176,7 +176,7 @@ func TestUsageTrackerClient_TrackSeries(t *testing.T) { series4Partition2 := uint64(partitions[1].Tokens[0] - 1) series5Partition2 := uint64(partitions[1].Tokens[1] - 1) - rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2}) + rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2}, false) require.NoError(t, err) require.Empty(t, rejected) @@ -238,7 +238,7 @@ func TestUsageTrackerClient_TrackSeries(t *testing.T) { series2Partition1 := uint64(partitions[0].Tokens[1] - 1) series3Partition1 := uint64(partitions[0].Tokens[2] - 1) - rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1}) + rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1}, false) require.NoError(t, err) require.Empty(t, rejected) @@ -298,7 +298,7 @@ func TestUsageTrackerClient_TrackSeries(t *testing.T) { series4Partition2 := uint64(partitions[1].Tokens[0] - 1) series5Partition2 := uint64(partitions[1].Tokens[1] - 1) - rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2}) + rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2}, false) require.NoError(t, err) require.Empty(t, rejected) @@ -376,7 +376,7 @@ func TestUsageTrackerClient_TrackSeries(t *testing.T) { require.NoError(t, services.StopAndAwaitTerminated(ctx, c)) }) - rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2}) + rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2}, false) require.NoError(t, err) if returnRejectedSeries { require.ElementsMatch(t, []uint64{series2Partition1, series4Partition2, series5Partition2}, rejected) @@ -448,7 +448,7 @@ func TestUsageTrackerClient_TrackSeries(t *testing.T) { require.NoError(t, services.StopAndAwaitTerminated(ctx, c)) }) - rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2}) + rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2}, false) require.NoError(t, err) require.ElementsMatch(t, []uint64{series2Partition1}, rejected) @@ -504,7 +504,7 @@ func TestUsageTrackerClient_TrackSeries(t *testing.T) { require.NoError(t, services.StopAndAwaitTerminated(ctx, c)) }) - rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{}) + rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{}, false) require.NoError(t, err) require.Empty(t, rejected) }) @@ -556,7 +556,7 @@ func TestUsageTrackerClient_TrackSeries(t *testing.T) { series5Partition2 := uint64(partitions[1].Tokens[1] - 1) // Despite all instances failing, the client should not return an error when IgnoreErrors is enabled. - rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2}) + rejected, err := c.TrackSeries(user.InjectOrgID(ctx, userID), userID, []uint64{series1Partition1, series2Partition1, series3Partition1, series4Partition2, series5Partition2}, false) require.NoError(t, err) require.Empty(t, rejected) diff --git a/pkg/usagetracker/usagetrackerclient/grpc_client_pool.go b/pkg/usagetracker/usagetrackerclient/grpc_client_pool.go index 2bfd7504c3a..b7eab2499a0 100644 --- a/pkg/usagetracker/usagetrackerclient/grpc_client_pool.go +++ b/pkg/usagetracker/usagetrackerclient/grpc_client_pool.go @@ -4,13 +4,17 @@ package usagetrackerclient import ( "context" + "slices" + "sync" "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/ring/client" + "github.com/grafana/dskit/user" otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -76,17 +80,34 @@ func dialUsageTracker(clientCfg grpcclient.Config, instance ring.InstanceDesc, r return nil, errors.Wrapf(err, "failed to dial usage-tracker %s %s", instance.Id, instance.Addr) } - return &usageTrackerClient{ + c := &usageTrackerClient{ UsageTrackerClient: usagetrackerpb.NewUsageTrackerClient(conn), HealthClient: grpc_health_v1.NewHealthClient(conn), conn: conn, - }, nil + + logger: logger, + + flushDelay: 100 * time.Millisecond, + maxBatchSize: 1000, // Flush when batch reaches this size + } + + go c.flusher() + + return c, nil } type usageTrackerClient struct { usagetrackerpb.UsageTrackerClient grpc_health_v1.HealthClient conn *grpc.ClientConn + + logger log.Logger + + mu sync.Mutex + pending []*usagetrackerpb.TrackSeriesSubrequest + flushTimer *time.Timer + flushDelay time.Duration + maxBatchSize int } func (c *usageTrackerClient) Close() error { @@ -101,6 +122,79 @@ func (c *usageTrackerClient) RemoteAddress() string { return c.conn.Target() } +// AsyncTrackSeries queues a TrackSeries request to be batched and flushed on a timer. +// This method is non-blocking and does not return rejected series. +func (c *usageTrackerClient) AsyncTrackSeries(ctx context.Context, req *usagetrackerpb.TrackSeriesRequest) { + // Convert the request to a subrequest for batching. + subreq := &usagetrackerpb.TrackSeriesSubrequest{ + UserID: req.UserID, + Partition: req.Partition, + SeriesHashes: req.SeriesHashes, + } + + c.mu.Lock() + c.pending = append(c.pending, subreq) + + // Flush if batch size is reached. + if len(c.pending) >= c.maxBatchSize { + c.mu.Unlock() + c.flush() + } else { + c.mu.Unlock() + } +} + +func (c *usageTrackerClient) flusher() { + // Dangles for now. Fix later. + + for { + time.Sleep(util.DurationWithJitter(c.flushDelay, 0.15)) + c.flush() + } +} + +// flush sends all pending requests as a single batched request. +func (c *usageTrackerClient) flush() { + c.mu.Lock() + + if len(c.pending) == 0 { + c.mu.Unlock() + return + } + + // Create a batched request with all pending subrequests. + batchReq := &usagetrackerpb.TrackSeriesRequest{ + Subrequests: slices.Clone(c.pending), + } + + // Clear pending requests and reset timer. + pending := c.pending + c.pending = nil + + // Release the lock before making the gRPC call. + c.mu.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Just trying to make things succeed. + tctx := user.InjectOrgID(ctx, "100") + + _, err := c.TrackSeries(tctx, batchReq) + if err != nil { + level.Warn(c.logger).Log( + "msg", "failed to send batched TrackSeries request", + "batch_size", len(pending), + "err", err, + ) + } else { + level.Debug(c.logger).Log( + "msg", "successfully sent batched TrackSeries request", + "batch_size", len(pending), + ) + } +} + // grpcclientInstrument is a copy of grpcclient.Instrument, but it doesn't add the ClientUserHeaderInterceptor for the method that doesn't need auth. func grpcclientInstrument(requestDuration *prometheus.HistogramVec, instrumentationLabelOptions ...middleware.InstrumentationOption) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { noAuthMethods := map[string]bool{ diff --git a/pkg/usagetracker/usagetrackerpb/usagetracker.pb.go b/pkg/usagetracker/usagetrackerpb/usagetracker.pb.go index 7907b481a71..c271e2e9ada 100644 --- a/pkg/usagetracker/usagetrackerpb/usagetracker.pb.go +++ b/pkg/usagetracker/usagetrackerpb/usagetracker.pb.go @@ -7,16 +7,17 @@ import ( bytes "bytes" context "context" fmt "fmt" - _ "github.com/gogo/protobuf/gogoproto" - proto "github.com/gogo/protobuf/proto" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" io "io" math "math" math_bits "math/bits" reflect "reflect" strings "strings" + + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" ) // Reference imports to suppress errors if they are not otherwise used. @@ -30,7 +31,7 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package -type TrackSeriesRequest struct { +type TrackSeriesSubrequest struct { // The tenant owning the series. UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID,omitempty"` // Partition that this series belong to. @@ -39,10 +40,73 @@ type TrackSeriesRequest struct { SeriesHashes []uint64 `protobuf:"varint,3,rep,packed,name=seriesHashes,proto3" json:"seriesHashes,omitempty"` } +func (m *TrackSeriesSubrequest) Reset() { *m = TrackSeriesSubrequest{} } +func (*TrackSeriesSubrequest) ProtoMessage() {} +func (*TrackSeriesSubrequest) Descriptor() ([]byte, []int) { + return fileDescriptor_24aa1621a7eb7fd6, []int{0} +} +func (m *TrackSeriesSubrequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TrackSeriesSubrequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TrackSeriesSubrequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TrackSeriesSubrequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_TrackSeriesSubrequest.Merge(m, src) +} +func (m *TrackSeriesSubrequest) XXX_Size() int { + return m.Size() +} +func (m *TrackSeriesSubrequest) XXX_DiscardUnknown() { + xxx_messageInfo_TrackSeriesSubrequest.DiscardUnknown(m) +} + +var xxx_messageInfo_TrackSeriesSubrequest proto.InternalMessageInfo + +func (m *TrackSeriesSubrequest) GetUserID() string { + if m != nil { + return m.UserID + } + return "" +} + +func (m *TrackSeriesSubrequest) GetPartition() int32 { + if m != nil { + return m.Partition + } + return 0 +} + +func (m *TrackSeriesSubrequest) GetSeriesHashes() []uint64 { + if m != nil { + return m.SeriesHashes + } + return nil +} + +type TrackSeriesRequest struct { + // The tenant owning the series. + UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID,omitempty"` + // Partition that this series belong to. + Partition int32 `protobuf:"varint,2,opt,name=partition,proto3" json:"partition,omitempty"` + // The hashes of the series to track. + SeriesHashes []uint64 `protobuf:"varint,3,rep,packed,name=seriesHashes,proto3" json:"seriesHashes,omitempty"` + Subrequests []*TrackSeriesSubrequest `protobuf:"bytes,4,rep,name=subrequests,proto3" json:"subrequests,omitempty"` +} + func (m *TrackSeriesRequest) Reset() { *m = TrackSeriesRequest{} } func (*TrackSeriesRequest) ProtoMessage() {} func (*TrackSeriesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_24aa1621a7eb7fd6, []int{0} + return fileDescriptor_24aa1621a7eb7fd6, []int{1} } func (m *TrackSeriesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -92,15 +156,67 @@ func (m *TrackSeriesRequest) GetSeriesHashes() []uint64 { return nil } -type TrackSeriesResponse struct { +func (m *TrackSeriesRequest) GetSubrequests() []*TrackSeriesSubrequest { + if m != nil { + return m.Subrequests + } + return nil +} + +type TrackSeriesSubresponse struct { // The hashes of the series that have been rejected because the tenant is over the limit. RejectedSeriesHashes []uint64 `protobuf:"varint,1,rep,packed,name=rejectedSeriesHashes,proto3" json:"rejectedSeriesHashes,omitempty"` } +func (m *TrackSeriesSubresponse) Reset() { *m = TrackSeriesSubresponse{} } +func (*TrackSeriesSubresponse) ProtoMessage() {} +func (*TrackSeriesSubresponse) Descriptor() ([]byte, []int) { + return fileDescriptor_24aa1621a7eb7fd6, []int{2} +} +func (m *TrackSeriesSubresponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TrackSeriesSubresponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TrackSeriesSubresponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TrackSeriesSubresponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_TrackSeriesSubresponse.Merge(m, src) +} +func (m *TrackSeriesSubresponse) XXX_Size() int { + return m.Size() +} +func (m *TrackSeriesSubresponse) XXX_DiscardUnknown() { + xxx_messageInfo_TrackSeriesSubresponse.DiscardUnknown(m) +} + +var xxx_messageInfo_TrackSeriesSubresponse proto.InternalMessageInfo + +func (m *TrackSeriesSubresponse) GetRejectedSeriesHashes() []uint64 { + if m != nil { + return m.RejectedSeriesHashes + } + return nil +} + +type TrackSeriesResponse struct { + // The hashes of the series that have been rejected because the tenant is over the limit. + RejectedSeriesHashes []uint64 `protobuf:"varint,1,rep,packed,name=rejectedSeriesHashes,proto3" json:"rejectedSeriesHashes,omitempty"` + Subresponses []*TrackSeriesSubresponse `protobuf:"bytes,2,rep,name=subresponses,proto3" json:"subresponses,omitempty"` +} + func (m *TrackSeriesResponse) Reset() { *m = TrackSeriesResponse{} } func (*TrackSeriesResponse) ProtoMessage() {} func (*TrackSeriesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_24aa1621a7eb7fd6, []int{1} + return fileDescriptor_24aa1621a7eb7fd6, []int{3} } func (m *TrackSeriesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -136,6 +252,13 @@ func (m *TrackSeriesResponse) GetRejectedSeriesHashes() []uint64 { return nil } +func (m *TrackSeriesResponse) GetSubresponses() []*TrackSeriesSubresponse { + if m != nil { + return m.Subresponses + } + return nil +} + type SeriesCreatedEvent struct { // The tenant owning the series. UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID,omitempty"` @@ -148,7 +271,7 @@ type SeriesCreatedEvent struct { func (m *SeriesCreatedEvent) Reset() { *m = SeriesCreatedEvent{} } func (*SeriesCreatedEvent) ProtoMessage() {} func (*SeriesCreatedEvent) Descriptor() ([]byte, []int) { - return fileDescriptor_24aa1621a7eb7fd6, []int{2} + return fileDescriptor_24aa1621a7eb7fd6, []int{4} } func (m *SeriesCreatedEvent) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -208,7 +331,7 @@ type SnapshotRecord struct { func (m *SnapshotRecord) Reset() { *m = SnapshotRecord{} } func (*SnapshotRecord) ProtoMessage() {} func (*SnapshotRecord) Descriptor() ([]byte, []int) { - return fileDescriptor_24aa1621a7eb7fd6, []int{3} + return fileDescriptor_24aa1621a7eb7fd6, []int{5} } func (m *SnapshotRecord) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -273,7 +396,7 @@ type SnapshotEvent struct { func (m *SnapshotEvent) Reset() { *m = SnapshotEvent{} } func (*SnapshotEvent) ProtoMessage() {} func (*SnapshotEvent) Descriptor() ([]byte, []int) { - return fileDescriptor_24aa1621a7eb7fd6, []int{4} + return fileDescriptor_24aa1621a7eb7fd6, []int{6} } func (m *SnapshotEvent) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -323,7 +446,7 @@ type SnapshotFile struct { func (m *SnapshotFile) Reset() { *m = SnapshotFile{} } func (*SnapshotFile) ProtoMessage() {} func (*SnapshotFile) Descriptor() ([]byte, []int) { - return fileDescriptor_24aa1621a7eb7fd6, []int{5} + return fileDescriptor_24aa1621a7eb7fd6, []int{7} } func (m *SnapshotFile) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -367,7 +490,7 @@ type GetUsersCloseToLimitRequest struct { func (m *GetUsersCloseToLimitRequest) Reset() { *m = GetUsersCloseToLimitRequest{} } func (*GetUsersCloseToLimitRequest) ProtoMessage() {} func (*GetUsersCloseToLimitRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_24aa1621a7eb7fd6, []int{6} + return fileDescriptor_24aa1621a7eb7fd6, []int{8} } func (m *GetUsersCloseToLimitRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -414,7 +537,7 @@ type GetUsersCloseToLimitResponse struct { func (m *GetUsersCloseToLimitResponse) Reset() { *m = GetUsersCloseToLimitResponse{} } func (*GetUsersCloseToLimitResponse) ProtoMessage() {} func (*GetUsersCloseToLimitResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_24aa1621a7eb7fd6, []int{7} + return fileDescriptor_24aa1621a7eb7fd6, []int{9} } func (m *GetUsersCloseToLimitResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -458,7 +581,9 @@ func (m *GetUsersCloseToLimitResponse) GetPartition() int32 { } func init() { + proto.RegisterType((*TrackSeriesSubrequest)(nil), "usagetrackerpb.TrackSeriesSubrequest") proto.RegisterType((*TrackSeriesRequest)(nil), "usagetrackerpb.TrackSeriesRequest") + proto.RegisterType((*TrackSeriesSubresponse)(nil), "usagetrackerpb.TrackSeriesSubresponse") proto.RegisterType((*TrackSeriesResponse)(nil), "usagetrackerpb.TrackSeriesResponse") proto.RegisterType((*SeriesCreatedEvent)(nil), "usagetrackerpb.SeriesCreatedEvent") proto.RegisterType((*SnapshotRecord)(nil), "usagetrackerpb.SnapshotRecord") @@ -471,42 +596,81 @@ func init() { func init() { proto.RegisterFile("usagetracker.proto", fileDescriptor_24aa1621a7eb7fd6) } var fileDescriptor_24aa1621a7eb7fd6 = []byte{ - // 520 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0xcf, 0x8e, 0x12, 0x4f, - 0x10, 0x9e, 0x5e, 0xf8, 0x6d, 0x7e, 0x53, 0xb2, 0x6b, 0xd2, 0x12, 0x9d, 0x20, 0xe9, 0x90, 0x31, - 0xd9, 0x90, 0x68, 0x30, 0x59, 0x2f, 0x26, 0xde, 0x76, 0xd7, 0x3f, 0x24, 0x26, 0x9a, 0x81, 0xdd, - 0x83, 0x97, 0x4d, 0xc3, 0x14, 0xd0, 0x0a, 0xd3, 0xb3, 0xdd, 0x8d, 0x67, 0x1f, 0xc1, 0xc7, 0xf0, - 0x51, 0x3c, 0x72, 0x5c, 0x2f, 0x46, 0x86, 0x8b, 0xc7, 0x7d, 0x04, 0x33, 0x3d, 0x20, 0x0c, 0x22, - 0x72, 0xeb, 0xfa, 0xba, 0xea, 0xeb, 0xaf, 0xba, 0xbf, 0x6a, 0xa0, 0x63, 0xcd, 0xfb, 0x68, 0x14, - 0xef, 0x7e, 0x40, 0xd5, 0x88, 0x95, 0x34, 0x92, 0x1e, 0xae, 0x62, 0x71, 0xa7, 0x52, 0xee, 0xcb, - 0xbe, 0xb4, 0x5b, 0x8f, 0xd3, 0x55, 0x96, 0xe5, 0x47, 0x40, 0xdb, 0x69, 0x4a, 0x0b, 0x95, 0x40, - 0x1d, 0xe0, 0xd5, 0x18, 0xb5, 0xa1, 0x77, 0x61, 0x7f, 0xac, 0x51, 0x35, 0xcf, 0x3c, 0x52, 0x23, - 0x75, 0x37, 0x98, 0x47, 0xb4, 0x0a, 0x6e, 0xcc, 0x95, 0x11, 0x46, 0xc8, 0xc8, 0xdb, 0xab, 0x91, - 0xfa, 0x7f, 0xc1, 0x12, 0xa0, 0x3e, 0x94, 0xb4, 0xa5, 0x79, 0xc5, 0xf5, 0x00, 0xb5, 0x57, 0xa8, - 0x15, 0xea, 0xc5, 0x20, 0x87, 0xf9, 0x4d, 0xb8, 0x93, 0x3b, 0x4f, 0xc7, 0x32, 0xd2, 0x48, 0x8f, - 0xa1, 0xac, 0xf0, 0x3d, 0x76, 0x0d, 0x86, 0xad, 0x55, 0x0a, 0x62, 0x29, 0x36, 0xee, 0xa5, 0xd2, - 0xb3, 0xf8, 0x54, 0x21, 0x37, 0x18, 0x3e, 0xff, 0x88, 0xd1, 0x56, 0xe9, 0x46, 0x8c, 0x50, 0x1b, - 0x3e, 0x8a, 0xad, 0xf4, 0x42, 0xb0, 0x04, 0x76, 0x92, 0xfe, 0x9d, 0xc0, 0x61, 0x2b, 0xe2, 0xb1, - 0x1e, 0x48, 0x13, 0x60, 0x57, 0xaa, 0x30, 0x4f, 0x4a, 0xd6, 0x49, 0xab, 0xe0, 0xf6, 0xc4, 0x10, - 0x23, 0x3e, 0x42, 0xed, 0xed, 0xd5, 0x0a, 0x75, 0x37, 0x58, 0x02, 0xf4, 0x02, 0x8e, 0x86, 0x5c, - 0x1b, 0xab, 0xfa, 0x4d, 0xaf, 0xa7, 0xd1, 0xbc, 0x1d, 0x77, 0x86, 0x42, 0x0f, 0x30, 0x3c, 0xc1, - 0x9e, 0x54, 0xb8, 0x38, 0xcb, 0x2b, 0x58, 0xe2, 0x1d, 0xb3, 0xe9, 0x53, 0xb8, 0x97, 0x66, 0x2e, - 0xe2, 0x95, 0x0a, 0xaf, 0x68, 0x89, 0xfe, 0xb6, 0xed, 0x37, 0xe1, 0x20, 0x07, 0xff, 0xa3, 0xbd, - 0x0a, 0xfc, 0xbf, 0xe8, 0xc6, 0x5e, 0xa8, 0x1b, 0xfc, 0x8e, 0x7d, 0x1f, 0x4a, 0x0b, 0xaa, 0x17, - 0x62, 0x88, 0x94, 0x42, 0x31, 0xe4, 0x86, 0xdb, 0xf7, 0x2c, 0x05, 0x76, 0xed, 0x3f, 0x83, 0xfb, - 0x2f, 0xd1, 0x9c, 0x6b, 0x54, 0xfa, 0x74, 0x28, 0x35, 0xb6, 0xe5, 0x6b, 0x31, 0x12, 0x66, 0xe1, - 0xc1, 0x9c, 0xd7, 0xc8, 0x9a, 0xd7, 0xfc, 0x10, 0xaa, 0x9b, 0x8b, 0xe7, 0x86, 0x3a, 0x82, 0xdb, - 0x5a, 0x2a, 0x83, 0xe1, 0x65, 0xfa, 0xfe, 0x97, 0x22, 0xcc, 0xbc, 0xe4, 0x06, 0x07, 0x19, 0x9c, - 0x56, 0x36, 0x43, 0xbd, 0xdd, 0xd1, 0xc7, 0xdf, 0x08, 0x94, 0xce, 0xd3, 0x31, 0x6a, 0x67, 0x63, - 0x44, 0x2f, 0xe0, 0xd6, 0x8a, 0x7d, 0xa9, 0xdf, 0xc8, 0x0f, 0x59, 0xe3, 0xcf, 0x59, 0xaa, 0x3c, - 0xd8, 0x9a, 0x33, 0x97, 0x7b, 0x05, 0xe5, 0x4d, 0xed, 0xd0, 0x87, 0xeb, 0xc5, 0x5b, 0x6e, 0xac, - 0xf2, 0x68, 0xb7, 0xe4, 0xec, 0xc8, 0x93, 0xb3, 0xc9, 0x94, 0x39, 0xd7, 0x53, 0xe6, 0xdc, 0x4c, - 0x19, 0xf9, 0x94, 0x30, 0xf2, 0x25, 0x61, 0xe4, 0x6b, 0xc2, 0xc8, 0x24, 0x61, 0xe4, 0x47, 0xc2, - 0xc8, 0xcf, 0x84, 0x39, 0x37, 0x09, 0x23, 0x9f, 0x67, 0xcc, 0x99, 0xcc, 0x98, 0x73, 0x3d, 0x63, - 0xce, 0xbb, 0xb5, 0x5f, 0xa5, 0xb3, 0x6f, 0xbf, 0x91, 0x27, 0xbf, 0x02, 0x00, 0x00, 0xff, 0xff, - 0xd4, 0x83, 0x94, 0xa7, 0x82, 0x04, 0x00, 0x00, -} + // 585 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0xc1, 0x6e, 0xd3, 0x4c, + 0x10, 0xf6, 0x36, 0xf9, 0xab, 0xdf, 0xd3, 0xb4, 0x48, 0x4b, 0x29, 0x56, 0xa8, 0x56, 0xd1, 0x22, + 0xaa, 0x48, 0xa0, 0x22, 0x95, 0x0b, 0x12, 0xb7, 0xb6, 0x50, 0x8a, 0x2a, 0x81, 0x36, 0x6d, 0x0f, + 0x5c, 0x2a, 0xa7, 0x9e, 0xb4, 0x86, 0xc4, 0xeb, 0xee, 0x6e, 0x38, 0xf3, 0x08, 0x5c, 0x78, 0x07, + 0xde, 0x80, 0x57, 0xe0, 0x98, 0x63, 0xb9, 0x20, 0xe2, 0x5c, 0x38, 0xf6, 0x11, 0x90, 0xd7, 0x49, + 0x63, 0x47, 0x21, 0x8d, 0x90, 0xb8, 0x79, 0x67, 0xbf, 0xf9, 0x66, 0xbe, 0xd9, 0x6f, 0x0c, 0xb4, + 0xab, 0xfd, 0x33, 0x34, 0xca, 0x3f, 0x7d, 0x8f, 0x6a, 0x33, 0x56, 0xd2, 0x48, 0xba, 0x92, 0x8f, + 0xc5, 0xcd, 0xea, 0xea, 0x99, 0x3c, 0x93, 0xf6, 0xea, 0x71, 0xfa, 0x95, 0xa1, 0xf8, 0x05, 0xdc, + 0x39, 0x4c, 0x21, 0x0d, 0x54, 0x21, 0xea, 0x46, 0xb7, 0xa9, 0xf0, 0xa2, 0x8b, 0xda, 0xd0, 0x35, + 0x58, 0xec, 0x6a, 0x54, 0xfb, 0xbb, 0x1e, 0xa9, 0x91, 0xba, 0x2b, 0x86, 0x27, 0xba, 0x0e, 0x6e, + 0xec, 0x2b, 0x13, 0x9a, 0x50, 0x46, 0xde, 0x42, 0x8d, 0xd4, 0xff, 0x13, 0xe3, 0x00, 0xe5, 0x50, + 0xd1, 0x96, 0xe9, 0xa5, 0xaf, 0xcf, 0x51, 0x7b, 0xa5, 0x5a, 0xa9, 0x5e, 0x16, 0x85, 0x18, 0xff, + 0x4a, 0x80, 0xe6, 0x6a, 0x8a, 0x7f, 0x5d, 0x90, 0xee, 0xc1, 0x92, 0xbe, 0x16, 0xa6, 0xbd, 0x72, + 0xad, 0x54, 0x5f, 0xda, 0x7a, 0xb0, 0x59, 0x9c, 0xcf, 0xe6, 0xd4, 0x31, 0x88, 0x7c, 0x26, 0x3f, + 0x80, 0xb5, 0x49, 0x94, 0x8e, 0x65, 0xa4, 0x91, 0x6e, 0xc1, 0xaa, 0xc2, 0x77, 0x78, 0x6a, 0x30, + 0x68, 0xe4, 0xdb, 0x21, 0xb6, 0x9d, 0xa9, 0x77, 0xfc, 0x33, 0x81, 0xdb, 0x85, 0x39, 0xfc, 0x3d, + 0x17, 0x7d, 0x05, 0x15, 0x3d, 0x6e, 0x47, 0x7b, 0x0b, 0x56, 0xe3, 0xc6, 0x4d, 0x1a, 0x33, 0xb8, + 0x28, 0xe4, 0xf2, 0x08, 0x68, 0x06, 0xd9, 0x51, 0xe8, 0x1b, 0x0c, 0x9e, 0x7f, 0xc0, 0x68, 0xe6, + 0xf3, 0x98, 0xb0, 0x83, 0xda, 0xf8, 0x9d, 0xd8, 0x3e, 0x4f, 0x49, 0x8c, 0x03, 0x73, 0xf9, 0xe1, + 0x07, 0x81, 0x95, 0x46, 0xe4, 0xc7, 0xfa, 0x5c, 0x1a, 0x81, 0xa7, 0x52, 0x05, 0x45, 0x52, 0x32, + 0x49, 0xba, 0x0e, 0x6e, 0x2b, 0x6c, 0x63, 0xe4, 0x77, 0x86, 0x4a, 0x5d, 0x31, 0x0e, 0xd0, 0x63, + 0xd8, 0x68, 0xfb, 0xda, 0xd8, 0xae, 0x5f, 0xb7, 0x5a, 0x1a, 0xcd, 0x9b, 0x6e, 0xb3, 0x1d, 0xea, + 0x73, 0x0c, 0xb6, 0xb1, 0x25, 0x15, 0x8e, 0x6a, 0x79, 0x25, 0x4b, 0x3c, 0x27, 0x9a, 0x3e, 0x85, + 0xbb, 0x29, 0x72, 0x74, 0xce, 0x65, 0x78, 0x65, 0x4b, 0xf4, 0xa7, 0x6b, 0xbe, 0x0f, 0xcb, 0x85, + 0xf0, 0x0d, 0xf2, 0xaa, 0xf0, 0xff, 0x48, 0x8d, 0x1d, 0xa8, 0x2b, 0xae, 0xcf, 0x9c, 0x43, 0x65, + 0x44, 0xf5, 0x22, 0x6c, 0x23, 0xa5, 0x50, 0x0e, 0x7c, 0xe3, 0x5b, 0x6f, 0x54, 0x84, 0xfd, 0xe6, + 0xcf, 0xe0, 0xde, 0x1e, 0x9a, 0x23, 0x8d, 0x4a, 0xef, 0xb4, 0xa5, 0xc6, 0x43, 0x79, 0x10, 0x76, + 0x42, 0x33, 0xda, 0xb3, 0xc2, 0x3e, 0x91, 0x89, 0x7d, 0xe2, 0x01, 0xac, 0x4f, 0x4f, 0x1e, 0x9a, + 0x73, 0x03, 0x6e, 0x69, 0xa9, 0x0c, 0x06, 0x27, 0xe9, 0xfb, 0x9f, 0x84, 0x41, 0xe6, 0x4b, 0x57, + 0x2c, 0x67, 0xe1, 0x34, 0x73, 0x3f, 0xd0, 0xb3, 0xb7, 0x76, 0xeb, 0x3b, 0x81, 0xca, 0x51, 0x6a, + 0xcd, 0xc3, 0xcc, 0x9a, 0xf4, 0x18, 0x96, 0x72, 0xde, 0xa4, 0x7c, 0x86, 0x71, 0x87, 0x3a, 0xaa, + 0xf7, 0x67, 0x62, 0x86, 0xed, 0x5e, 0xc0, 0xea, 0x34, 0x39, 0xf4, 0xe1, 0x64, 0xf2, 0x8c, 0x89, + 0x55, 0x1f, 0xcd, 0x07, 0xce, 0x4a, 0x6e, 0xef, 0xf6, 0xfa, 0xcc, 0xb9, 0xec, 0x33, 0xe7, 0xaa, + 0xcf, 0xc8, 0xc7, 0x84, 0x91, 0x2f, 0x09, 0x23, 0xdf, 0x12, 0x46, 0x7a, 0x09, 0x23, 0x3f, 0x13, + 0x46, 0x7e, 0x25, 0xcc, 0xb9, 0x4a, 0x18, 0xf9, 0x34, 0x60, 0x4e, 0x6f, 0xc0, 0x9c, 0xcb, 0x01, + 0x73, 0xde, 0x4e, 0xfc, 0xad, 0x9b, 0x8b, 0xf6, 0xf7, 0xfc, 0xe4, 0x77, 0x00, 0x00, 0x00, 0xff, + 0xff, 0x45, 0x1b, 0xed, 0x96, 0xda, 0x05, 0x00, 0x00, +} + +func (this *TrackSeriesSubrequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + that1, ok := that.(*TrackSeriesSubrequest) + if !ok { + that2, ok := that.(TrackSeriesSubrequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.UserID != that1.UserID { + return false + } + if this.Partition != that1.Partition { + return false + } + if len(this.SeriesHashes) != len(that1.SeriesHashes) { + return false + } + for i := range this.SeriesHashes { + if this.SeriesHashes[i] != that1.SeriesHashes[i] { + return false + } + } + return true +} func (this *TrackSeriesRequest) Equal(that interface{}) bool { if that == nil { return this == nil @@ -540,6 +704,43 @@ func (this *TrackSeriesRequest) Equal(that interface{}) bool { return false } } + if len(this.Subrequests) != len(that1.Subrequests) { + return false + } + for i := range this.Subrequests { + if !this.Subrequests[i].Equal(that1.Subrequests[i]) { + return false + } + } + return true +} +func (this *TrackSeriesSubresponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*TrackSeriesSubresponse) + if !ok { + that2, ok := that.(TrackSeriesSubresponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if len(this.RejectedSeriesHashes) != len(that1.RejectedSeriesHashes) { + return false + } + for i := range this.RejectedSeriesHashes { + if this.RejectedSeriesHashes[i] != that1.RejectedSeriesHashes[i] { + return false + } + } return true } func (this *TrackSeriesResponse) Equal(that interface{}) bool { @@ -569,6 +770,14 @@ func (this *TrackSeriesResponse) Equal(that interface{}) bool { return false } } + if len(this.Subresponses) != len(that1.Subresponses) { + return false + } + for i := range this.Subresponses { + if !this.Subresponses[i].Equal(that1.Subresponses[i]) { + return false + } + } return true } func (this *SeriesCreatedEvent) Equal(that interface{}) bool { @@ -756,25 +965,53 @@ func (this *GetUsersCloseToLimitResponse) Equal(that interface{}) bool { } return true } -func (this *TrackSeriesRequest) GoString() string { +func (this *TrackSeriesSubrequest) GoString() string { if this == nil { return "nil" } s := make([]string, 0, 7) + s = append(s, "&usagetrackerpb.TrackSeriesSubrequest{") + s = append(s, "UserID: "+fmt.Sprintf("%#v", this.UserID)+",\n") + s = append(s, "Partition: "+fmt.Sprintf("%#v", this.Partition)+",\n") + s = append(s, "SeriesHashes: "+fmt.Sprintf("%#v", this.SeriesHashes)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *TrackSeriesRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 8) s = append(s, "&usagetrackerpb.TrackSeriesRequest{") s = append(s, "UserID: "+fmt.Sprintf("%#v", this.UserID)+",\n") s = append(s, "Partition: "+fmt.Sprintf("%#v", this.Partition)+",\n") s = append(s, "SeriesHashes: "+fmt.Sprintf("%#v", this.SeriesHashes)+",\n") + if this.Subrequests != nil { + s = append(s, "Subrequests: "+fmt.Sprintf("%#v", this.Subrequests)+",\n") + } s = append(s, "}") return strings.Join(s, "") } -func (this *TrackSeriesResponse) GoString() string { +func (this *TrackSeriesSubresponse) GoString() string { if this == nil { return "nil" } s := make([]string, 0, 5) + s = append(s, "&usagetrackerpb.TrackSeriesSubresponse{") + s = append(s, "RejectedSeriesHashes: "+fmt.Sprintf("%#v", this.RejectedSeriesHashes)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *TrackSeriesResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) s = append(s, "&usagetrackerpb.TrackSeriesResponse{") s = append(s, "RejectedSeriesHashes: "+fmt.Sprintf("%#v", this.RejectedSeriesHashes)+",\n") + if this.Subresponses != nil { + s = append(s, "Subresponses: "+fmt.Sprintf("%#v", this.Subresponses)+",\n") + } s = append(s, "}") return strings.Join(s, "") } @@ -970,7 +1207,7 @@ var _UsageTracker_serviceDesc = grpc.ServiceDesc{ Metadata: "usagetracker.proto", } -func (m *TrackSeriesRequest) Marshal() (dAtA []byte, err error) { +func (m *TrackSeriesSubrequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -980,12 +1217,12 @@ func (m *TrackSeriesRequest) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *TrackSeriesRequest) MarshalTo(dAtA []byte) (int, error) { +func (m *TrackSeriesSubrequest) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *TrackSeriesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *TrackSeriesSubrequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int @@ -1023,7 +1260,7 @@ func (m *TrackSeriesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } -func (m *TrackSeriesResponse) Marshal() (dAtA []byte, err error) { +func (m *TrackSeriesRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -1033,20 +1270,34 @@ func (m *TrackSeriesResponse) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *TrackSeriesResponse) MarshalTo(dAtA []byte) (int, error) { +func (m *TrackSeriesRequest) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *TrackSeriesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *TrackSeriesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if len(m.RejectedSeriesHashes) > 0 { - dAtA4 := make([]byte, len(m.RejectedSeriesHashes)*10) + if len(m.Subrequests) > 0 { + for iNdEx := len(m.Subrequests) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Subrequests[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintUsagetracker(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + } + if len(m.SeriesHashes) > 0 { + dAtA4 := make([]byte, len(m.SeriesHashes)*10) var j3 int - for _, num := range m.RejectedSeriesHashes { + for _, num := range m.SeriesHashes { for num >= 1<<7 { dAtA4[j3] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 @@ -1059,12 +1310,24 @@ func (m *TrackSeriesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { copy(dAtA[i:], dAtA4[:j3]) i = encodeVarintUsagetracker(dAtA, i, uint64(j3)) i-- + dAtA[i] = 0x1a + } + if m.Partition != 0 { + i = encodeVarintUsagetracker(dAtA, i, uint64(m.Partition)) + i-- + dAtA[i] = 0x10 + } + if len(m.UserID) > 0 { + i -= len(m.UserID) + copy(dAtA[i:], m.UserID) + i = encodeVarintUsagetracker(dAtA, i, uint64(len(m.UserID))) + i-- dAtA[i] = 0xa } return len(dAtA) - i, nil } -func (m *SeriesCreatedEvent) Marshal() (dAtA []byte, err error) { +func (m *TrackSeriesSubresponse) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -1074,20 +1337,20 @@ func (m *SeriesCreatedEvent) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *SeriesCreatedEvent) MarshalTo(dAtA []byte) (int, error) { +func (m *TrackSeriesSubresponse) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *SeriesCreatedEvent) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *TrackSeriesSubresponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if len(m.SeriesHashes) > 0 { - dAtA6 := make([]byte, len(m.SeriesHashes)*10) + if len(m.RejectedSeriesHashes) > 0 { + dAtA6 := make([]byte, len(m.RejectedSeriesHashes)*10) var j5 int - for _, num := range m.SeriesHashes { + for _, num := range m.RejectedSeriesHashes { for num >= 1<<7 { dAtA6[j5] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 @@ -1100,24 +1363,120 @@ func (m *SeriesCreatedEvent) MarshalToSizedBuffer(dAtA []byte) (int, error) { copy(dAtA[i:], dAtA6[:j5]) i = encodeVarintUsagetracker(dAtA, i, uint64(j5)) i-- - dAtA[i] = 0x1a - } - if m.Timestamp != 0 { - i = encodeVarintUsagetracker(dAtA, i, uint64(m.Timestamp)) - i-- - dAtA[i] = 0x10 - } - if len(m.UserID) > 0 { - i -= len(m.UserID) - copy(dAtA[i:], m.UserID) - i = encodeVarintUsagetracker(dAtA, i, uint64(len(m.UserID))) - i-- dAtA[i] = 0xa } return len(dAtA) - i, nil } -func (m *SnapshotRecord) Marshal() (dAtA []byte, err error) { +func (m *TrackSeriesResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TrackSeriesResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TrackSeriesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Subresponses) > 0 { + for iNdEx := len(m.Subresponses) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Subresponses[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintUsagetracker(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + } + if len(m.RejectedSeriesHashes) > 0 { + dAtA8 := make([]byte, len(m.RejectedSeriesHashes)*10) + var j7 int + for _, num := range m.RejectedSeriesHashes { + for num >= 1<<7 { + dAtA8[j7] = uint8(uint64(num)&0x7f | 0x80) + num >>= 7 + j7++ + } + dAtA8[j7] = uint8(num) + j7++ + } + i -= j7 + copy(dAtA[i:], dAtA8[:j7]) + i = encodeVarintUsagetracker(dAtA, i, uint64(j7)) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *SeriesCreatedEvent) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SeriesCreatedEvent) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SeriesCreatedEvent) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.SeriesHashes) > 0 { + dAtA10 := make([]byte, len(m.SeriesHashes)*10) + var j9 int + for _, num := range m.SeriesHashes { + for num >= 1<<7 { + dAtA10[j9] = uint8(uint64(num)&0x7f | 0x80) + num >>= 7 + j9++ + } + dAtA10[j9] = uint8(num) + j9++ + } + i -= j9 + copy(dAtA[i:], dAtA10[:j9]) + i = encodeVarintUsagetracker(dAtA, i, uint64(j9)) + i-- + dAtA[i] = 0x1a + } + if m.Timestamp != 0 { + i = encodeVarintUsagetracker(dAtA, i, uint64(m.Timestamp)) + i-- + dAtA[i] = 0x10 + } + if len(m.UserID) > 0 { + i -= len(m.UserID) + copy(dAtA[i:], m.UserID) + i = encodeVarintUsagetracker(dAtA, i, uint64(len(m.UserID))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *SnapshotRecord) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -1307,6 +1666,29 @@ func encodeVarintUsagetracker(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } +func (m *TrackSeriesSubrequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.UserID) + if l > 0 { + n += 1 + l + sovUsagetracker(uint64(l)) + } + if m.Partition != 0 { + n += 1 + sovUsagetracker(uint64(m.Partition)) + } + if len(m.SeriesHashes) > 0 { + l = 0 + for _, e := range m.SeriesHashes { + l += sovUsagetracker(uint64(e)) + } + n += 1 + sovUsagetracker(uint64(l)) + l + } + return n +} + func (m *TrackSeriesRequest) Size() (n int) { if m == nil { return 0 @@ -1327,6 +1709,28 @@ func (m *TrackSeriesRequest) Size() (n int) { } n += 1 + sovUsagetracker(uint64(l)) + l } + if len(m.Subrequests) > 0 { + for _, e := range m.Subrequests { + l = e.Size() + n += 1 + l + sovUsagetracker(uint64(l)) + } + } + return n +} + +func (m *TrackSeriesSubresponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.RejectedSeriesHashes) > 0 { + l = 0 + for _, e := range m.RejectedSeriesHashes { + l += sovUsagetracker(uint64(e)) + } + n += 1 + sovUsagetracker(uint64(l)) + l + } return n } @@ -1343,6 +1747,12 @@ func (m *TrackSeriesResponse) Size() (n int) { } n += 1 + sovUsagetracker(uint64(l)) + l } + if len(m.Subresponses) > 0 { + for _, e := range m.Subresponses { + l = e.Size() + n += 1 + l + sovUsagetracker(uint64(l)) + } + } return n } @@ -1460,14 +1870,42 @@ func sovUsagetracker(x uint64) (n int) { func sozUsagetracker(x uint64) (n int) { return sovUsagetracker(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } +func (this *TrackSeriesSubrequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&TrackSeriesSubrequest{`, + `UserID:` + fmt.Sprintf("%v", this.UserID) + `,`, + `Partition:` + fmt.Sprintf("%v", this.Partition) + `,`, + `SeriesHashes:` + fmt.Sprintf("%v", this.SeriesHashes) + `,`, + `}`, + }, "") + return s +} func (this *TrackSeriesRequest) String() string { if this == nil { return "nil" } + repeatedStringForSubrequests := "[]*TrackSeriesSubrequest{" + for _, f := range this.Subrequests { + repeatedStringForSubrequests += strings.Replace(f.String(), "TrackSeriesSubrequest", "TrackSeriesSubrequest", 1) + "," + } + repeatedStringForSubrequests += "}" s := strings.Join([]string{`&TrackSeriesRequest{`, `UserID:` + fmt.Sprintf("%v", this.UserID) + `,`, `Partition:` + fmt.Sprintf("%v", this.Partition) + `,`, `SeriesHashes:` + fmt.Sprintf("%v", this.SeriesHashes) + `,`, + `Subrequests:` + repeatedStringForSubrequests + `,`, + `}`, + }, "") + return s +} +func (this *TrackSeriesSubresponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&TrackSeriesSubresponse{`, + `RejectedSeriesHashes:` + fmt.Sprintf("%v", this.RejectedSeriesHashes) + `,`, `}`, }, "") return s @@ -1476,8 +1914,14 @@ func (this *TrackSeriesResponse) String() string { if this == nil { return "nil" } + repeatedStringForSubresponses := "[]*TrackSeriesSubresponse{" + for _, f := range this.Subresponses { + repeatedStringForSubresponses += strings.Replace(f.String(), "TrackSeriesSubresponse", "TrackSeriesSubresponse", 1) + "," + } + repeatedStringForSubresponses += "}" s := strings.Join([]string{`&TrackSeriesResponse{`, `RejectedSeriesHashes:` + fmt.Sprintf("%v", this.RejectedSeriesHashes) + `,`, + `Subresponses:` + repeatedStringForSubresponses + `,`, `}`, }, "") return s @@ -1557,6 +2001,183 @@ func valueToStringUsagetracker(v interface{}) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("*%v", pv) } +func (m *TrackSeriesSubrequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowUsagetracker + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TrackSeriesSubrequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TrackSeriesSubrequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field UserID", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowUsagetracker + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthUsagetracker + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthUsagetracker + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.UserID = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Partition", wireType) + } + m.Partition = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowUsagetracker + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Partition |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType == 0 { + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowUsagetracker + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.SeriesHashes = append(m.SeriesHashes, v) + } else if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowUsagetracker + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + packedLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthUsagetracker + } + postIndex := iNdEx + packedLen + if postIndex < 0 { + return ErrInvalidLengthUsagetracker + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var elementCount int + var count int + for _, integer := range dAtA[iNdEx:postIndex] { + if integer < 128 { + count++ + } + } + elementCount = count + if elementCount != 0 && len(m.SeriesHashes) == 0 { + m.SeriesHashes = make([]uint64, 0, elementCount) + } + for iNdEx < postIndex { + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowUsagetracker + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.SeriesHashes = append(m.SeriesHashes, v) + } + } else { + return fmt.Errorf("proto: wrong wireType = %d for field SeriesHashes", wireType) + } + default: + iNdEx = preIndex + skippy, err := skipUsagetracker(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthUsagetracker + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *TrackSeriesRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -1713,6 +2334,166 @@ func (m *TrackSeriesRequest) Unmarshal(dAtA []byte) error { } else { return fmt.Errorf("proto: wrong wireType = %d for field SeriesHashes", wireType) } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Subrequests", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowUsagetracker + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthUsagetracker + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthUsagetracker + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Subrequests = append(m.Subrequests, &TrackSeriesSubrequest{}) + if err := m.Subrequests[len(m.Subrequests)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipUsagetracker(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthUsagetracker + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *TrackSeriesSubresponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowUsagetracker + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TrackSeriesSubresponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TrackSeriesSubresponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType == 0 { + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowUsagetracker + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.RejectedSeriesHashes = append(m.RejectedSeriesHashes, v) + } else if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowUsagetracker + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + packedLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthUsagetracker + } + postIndex := iNdEx + packedLen + if postIndex < 0 { + return ErrInvalidLengthUsagetracker + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var elementCount int + var count int + for _, integer := range dAtA[iNdEx:postIndex] { + if integer < 128 { + count++ + } + } + elementCount = count + if elementCount != 0 && len(m.RejectedSeriesHashes) == 0 { + m.RejectedSeriesHashes = make([]uint64, 0, elementCount) + } + for iNdEx < postIndex { + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowUsagetracker + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.RejectedSeriesHashes = append(m.RejectedSeriesHashes, v) + } + } else { + return fmt.Errorf("proto: wrong wireType = %d for field RejectedSeriesHashes", wireType) + } default: iNdEx = preIndex skippy, err := skipUsagetracker(dAtA[iNdEx:]) @@ -1839,6 +2620,40 @@ func (m *TrackSeriesResponse) Unmarshal(dAtA []byte) error { } else { return fmt.Errorf("proto: wrong wireType = %d for field RejectedSeriesHashes", wireType) } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Subresponses", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowUsagetracker + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthUsagetracker + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthUsagetracker + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Subresponses = append(m.Subresponses, &TrackSeriesSubresponse{}) + if err := m.Subresponses[len(m.Subresponses)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipUsagetracker(dAtA[iNdEx:]) diff --git a/pkg/usagetracker/usagetrackerpb/usagetracker.proto b/pkg/usagetracker/usagetrackerpb/usagetracker.proto index 4eb482522c8..522e1e74b1a 100644 --- a/pkg/usagetracker/usagetrackerpb/usagetracker.proto +++ b/pkg/usagetracker/usagetrackerpb/usagetracker.proto @@ -15,6 +15,17 @@ service UsageTracker { rpc GetUsersCloseToLimit(GetUsersCloseToLimitRequest) returns (GetUsersCloseToLimitResponse); } +message TrackSeriesSubrequest { + // The tenant owning the series. + string userID = 1; + + // Partition that this series belong to. + int32 partition = 2; + + // The hashes of the series to track. + repeated uint64 seriesHashes = 3; +} + message TrackSeriesRequest { // The tenant owning the series. string userID = 1; @@ -24,11 +35,21 @@ message TrackSeriesRequest { // The hashes of the series to track. repeated uint64 seriesHashes = 3; + + repeated TrackSeriesSubrequest subrequests = 4; +} + +message TrackSeriesSubresponse { + // The hashes of the series that have been rejected because the tenant is over the limit. + repeated uint64 rejectedSeriesHashes = 1; } + message TrackSeriesResponse { // The hashes of the series that have been rejected because the tenant is over the limit. repeated uint64 rejectedSeriesHashes = 1; + + repeated TrackSeriesSubresponse subresponses = 2; } message SeriesCreatedEvent {