Skip to content

Commit 86db298

Browse files
chore: implement UpdateRates with simple in-memory tracker
This commit implements UpdateRates with a simple in-memory tracker. It does not replicate to Kafka currently because this produces a huge amount of volume that we need to replay on every restart. For now, since rate buckets are refilled very quickly, I propose we keep these in-memory without replication while we think about how to solve this.
1 parent e173cf4 commit 86db298

File tree

3 files changed

+204
-74
lines changed

3 files changed

+204
-74
lines changed

pkg/limits/service.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,34 @@ func (s *Service) ExceedsLimits(
190190

191191
// UpdateRates implements the [proto.IngestLimitsServer] interface.
192192
func (s *Service) UpdateRates(
193-
_ context.Context,
194-
_ *proto.UpdateRatesRequest,
193+
ctx context.Context,
194+
req *proto.UpdateRatesRequest,
195195
) (*proto.UpdateRatesResponse, error) {
196-
return &proto.UpdateRatesResponse{}, nil
196+
// We do not replicate data to Kafka here as we really need to figure out
197+
// how to reduce the volume during replay (the reason we added
198+
// LastProducedAt for streams).
199+
updated, err := s.usage.UpdateRates(req.Tenant, req.Streams, s.clock.Now())
200+
if err != nil {
201+
return nil, err
202+
}
203+
resp := proto.UpdateRatesResponse{
204+
Results: make([]*proto.UpdateRatesResult, len(updated)),
205+
}
206+
for i, stream := range updated {
207+
var totalSize uint64
208+
for _, bucket := range stream.rateBuckets {
209+
totalSize += bucket.size
210+
}
211+
// The average rate is calculated over the total number of
212+
// populated buckets. This allows us to calculate accurate rates
213+
// without empty buckets pulling down the average.
214+
averageRate := totalSize / (uint64(s.cfg.BucketSize.Seconds()) * uint64(len(stream.rateBuckets)))
215+
resp.Results[i] = &proto.UpdateRatesResult{
216+
StreamHash: stream.hash,
217+
Rate: averageRate,
218+
}
219+
}
220+
return &resp, nil
197221
}
198222

199223
func (s *Service) CheckReady(ctx context.Context) error {

pkg/limits/store.go

Lines changed: 120 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,30 @@ func (s *usageStore) TenantActiveStreams(tenant string) iter.Seq2[string, stream
196196
}
197197
}
198198

199+
func (s *usageStore) Get(tenant string, streamHash uint64) (streamUsage, bool) {
200+
var (
201+
now = s.clock.Now()
202+
withinActiveWindow = s.newActiveWindowFunc(now)
203+
withinRateWindow = s.newRateWindowFunc(now)
204+
stream streamUsage
205+
ok bool
206+
)
207+
partition := s.getPartitionForHash(streamHash)
208+
s.withRLock(tenant, func(i int) {
209+
stream, ok = s.get(i, tenant, partition, streamHash)
210+
if ok {
211+
ok = withinActiveWindow(stream.lastSeenAt)
212+
if ok {
213+
stream.rateBuckets = getActiveRateBuckets(
214+
stream.rateBuckets,
215+
withinRateWindow,
216+
)
217+
}
218+
}
219+
})
220+
return stream, ok
221+
}
222+
199223
func (s *usageStore) Update(tenant string, metadata *proto.StreamMetadata, seenAt time.Time) error {
200224
if !s.withinActiveWindow(seenAt.UnixNano()) {
201225
return errOutsideActiveWindow
@@ -269,6 +293,40 @@ func (s *usageStore) UpdateCond(tenant string, metadata []*proto.StreamMetadata,
269293
return toProduce, accepted, rejected, nil
270294
}
271295

296+
func (s *usageStore) UpdateRates(tenant string, metadata []*proto.StreamMetadata, seenAt time.Time) ([]streamUsage, error) {
297+
if !s.withinActiveWindow(seenAt.UnixNano()) {
298+
return nil, errOutsideActiveWindow
299+
}
300+
var (
301+
updated = make([]streamUsage, 0, len(metadata))
302+
cutoff = seenAt.Add(-s.activeWindow).UnixNano()
303+
)
304+
s.withLock(tenant, func(i int) {
305+
for _, m := range metadata {
306+
partition := s.getPartitionForHash(m.StreamHash)
307+
s.checkInitMap(i, tenant, partition, noPolicy)
308+
streams := s.stripes[i][tenant][partition][noPolicy]
309+
stream, ok := streams[m.StreamHash]
310+
311+
// If the stream does not exist, or exists but has expired,
312+
// we need to check if accepting it would exceed the maximum
313+
// stream limit.
314+
if !ok || stream.lastSeenAt < cutoff {
315+
if ok {
316+
// The stream has expired, delete it so it doesn't count
317+
// towards the active streams.
318+
delete(streams, m.StreamHash)
319+
}
320+
}
321+
s.updateWithBuckets(i, tenant, partition, noPolicy, m, seenAt)
322+
got, _ := s.get(i, tenant, partition, m.StreamHash)
323+
got.rateBuckets = getActiveRateBuckets(got.rateBuckets, s.withinRateWindow)
324+
updated = append(updated, got)
325+
}
326+
})
327+
return updated, nil
328+
}
329+
272330
// Evict evicts all streams that have not been seen within the window.
273331
func (s *usageStore) Evict() map[string]int {
274332
cutoff := s.clock.Now().Add(-s.activeWindow).UnixNano()
@@ -362,30 +420,50 @@ func (s *usageStore) update(i int, tenant string, partition int32, policyBucket
362420
stream.hash = streamHash
363421
stream.totalSize = 0
364422
stream.policy = policyBucket
365-
// stream.rateBuckets = make([]rateBucket, s.numBuckets)
366423
}
367424
seenAtUnixNano := seenAt.UnixNano()
368425
if stream.lastSeenAt <= seenAtUnixNano {
369426
stream.lastSeenAt = seenAtUnixNano
370427
}
371-
// TODO(grobinson): As mentioned above, we will come back and implement
372-
// rate limits at a later date in the future.
373-
// stream.totalSize += totalSize
428+
s.stripes[i][tenant][partition][policyBucket][streamHash] = stream
429+
}
430+
431+
// Duplicate of update but also updates the rate buckets. This allows us to
432+
// isolate the changes needed to support UpdateRates RPC without affecting
433+
// the ExceedsLimits RPCs.
434+
func (s *usageStore) updateWithBuckets(i int, tenant string, partition int32, policyBucket string, metadata *proto.StreamMetadata, seenAt time.Time) {
435+
s.checkInitMap(i, tenant, partition, policyBucket)
436+
streamHash := metadata.StreamHash
437+
// Get the stats for the stream.
438+
stream, ok := s.stripes[i][tenant][partition][policyBucket][streamHash]
439+
cutoff := seenAt.Add(-s.activeWindow).UnixNano()
440+
// If the stream does not exist, or it has expired, reset it.
441+
if !ok || stream.lastSeenAt < cutoff {
442+
stream.hash = streamHash
443+
stream.totalSize = 0
444+
stream.policy = policyBucket
445+
stream.rateBuckets = make([]rateBucket, s.numBuckets)
446+
}
447+
seenAtUnixNano := seenAt.UnixNano()
448+
if stream.lastSeenAt <= seenAtUnixNano {
449+
stream.lastSeenAt = seenAtUnixNano
450+
}
451+
stream.totalSize += metadata.TotalSize
374452
// rate buckets are implemented as a circular list. To update a rate
375453
// bucket we must first calculate the bucket index.
376-
// bucketNum := seenAtUnixNano / int64(s.bucketSize)
377-
// bucketIdx := int(bucketNum % int64(s.numBuckets))
378-
// bucket := stream.rateBuckets[bucketIdx]
454+
bucketNum := seenAtUnixNano / int64(s.bucketSize)
455+
bucketIdx := int(bucketNum % int64(s.numBuckets))
456+
bucket := stream.rateBuckets[bucketIdx]
379457
// Once we have found the bucket, we then need to check if it is an old
380458
// bucket outside the rate window. If it is, we must reset it before we
381459
// can re-use it.
382-
// bucketStart := seenAt.Truncate(s.bucketSize).UnixNano()
383-
// if bucket.timestamp < bucketStart {
384-
// bucket.timestamp = bucketStart
385-
// bucket.size = 0
386-
// }
387-
// bucket.size += totalSize
388-
// stream.rateBuckets[bucketIdx] = bucket
460+
bucketStart := seenAt.Truncate(s.bucketSize).UnixNano()
461+
if bucket.timestamp < bucketStart {
462+
bucket.timestamp = bucketStart
463+
bucket.size = 0
464+
}
465+
bucket.size += metadata.TotalSize
466+
stream.rateBuckets[bucketIdx] = bucket
389467
s.stripes[i][tenant][partition][policyBucket][streamHash] = stream
390468
}
391469

@@ -503,3 +581,31 @@ func getActiveRateBuckets(buckets []rateBucket, withinRateWindow func(int64) boo
503581
}
504582
return result
505583
}
584+
585+
// Used in tests. Is not goroutine-safe.
586+
func (s *usageStore) getForTests(tenant string, streamHash uint64) (streamUsage, bool) {
587+
partition := s.getPartitionForHash(streamHash)
588+
var result streamUsage
589+
var ok bool
590+
s.withRLock(tenant, func(i int) {
591+
result, ok = s.get(i, tenant, partition, streamHash)
592+
})
593+
return result, ok
594+
}
595+
596+
func (s *usageStore) get(i int, tenant string, partition int32, streamHash uint64) (stream streamUsage, ok bool) {
597+
partitions, ok := s.stripes[i][tenant]
598+
if !ok {
599+
return
600+
}
601+
policies, ok := partitions[partition]
602+
if !ok {
603+
return
604+
}
605+
streams, ok := policies[noPolicy]
606+
if !ok {
607+
return
608+
}
609+
stream, ok = streams[streamHash]
610+
return
611+
}

pkg/limits/store_test.go

Lines changed: 57 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -140,63 +140,63 @@ func TestUsageStore_Update(t *testing.T) {
140140
// This test asserts that we update the correct rate buckets, and as rate
141141
// buckets are implemented as a circular list, when we reach the end of
142142
// list the next bucket is the start of the list.
143-
// func TestUsageStore_UpdateRateBuckets(t *testing.T) {
144-
// s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, &mockLimits{}, prometheus.NewRegistry())
145-
// require.NoError(t, err)
146-
// clock := quartz.NewMock(t)
147-
// s.clock = clock
148-
// metadata := &proto.StreamMetadata{
149-
// StreamHash: 0x1,
150-
// TotalSize: 100,
151-
// }
152-
// // Metadata at clock.Now() should update the first rate bucket because
153-
// // the mocked clock starts at 2024-01-01T00:00:00Z.
154-
// time1 := clock.Now()
155-
// require.NoError(t, s.Update("tenant", metadata, time1))
156-
// stream, ok := s.getForTests("tenant", 0x1)
157-
// require.True(t, ok)
158-
// expected := newRateBuckets(5*time.Minute, time.Minute)
159-
// expected[0].timestamp = time1.UnixNano()
160-
// expected[0].size = 100
161-
// require.Equal(t, expected, stream.rateBuckets)
162-
// // Update the first bucket with the same metadata but 1 second later.
163-
// clock.Advance(time.Second)
164-
// time2 := clock.Now()
165-
// require.NoError(t, s.Update("tenant", metadata, time2))
166-
// expected[0].size = 200
167-
// require.Equal(t, expected, stream.rateBuckets)
168-
// // Advance the clock forward to the next bucket. Should update the second
169-
// // bucket and leave the first bucket unmodified.
170-
// clock.Advance(time.Minute)
171-
// time3 := clock.Now()
172-
// require.NoError(t, s.Update("tenant", metadata, time3))
173-
// stream, ok = s.getForTests("tenant", 0x1)
174-
// require.True(t, ok)
175-
// // As the clock is now 1 second ahead of the bucket start time, we must
176-
// // truncate the expected time to the start of the bucket.
177-
// expected[1].timestamp = time3.Truncate(time.Minute).UnixNano()
178-
// expected[1].size = 100
179-
// require.Equal(t, expected, stream.rateBuckets)
180-
// // Advance the clock to the last bucket.
181-
// clock.Advance(3 * time.Minute)
182-
// time4 := clock.Now()
183-
// require.NoError(t, s.Update("tenant", metadata, time4))
184-
// stream, ok = s.getForTests("tenant", 0x1)
185-
// require.True(t, ok)
186-
// expected[4].timestamp = time4.Truncate(time.Minute).UnixNano()
187-
// expected[4].size = 100
188-
// require.Equal(t, expected, stream.rateBuckets)
189-
// // Advance the clock one last one. It should wrap around to the start of
190-
// // the list and replace the original bucket with time1.
191-
// clock.Advance(time.Minute)
192-
// time5 := clock.Now()
193-
// require.NoError(t, s.Update("tenant", metadata, time5))
194-
// stream, ok = s.getForTests("tenant", 0x1)
195-
// require.True(t, ok)
196-
// expected[0].timestamp = time5.Truncate(time.Minute).UnixNano()
197-
// expected[0].size = 100
198-
// require.Equal(t, expected, stream.rateBuckets)
199-
// }
143+
func TestUsageStore_UpdateRates(t *testing.T) {
144+
s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, &mockLimits{}, prometheus.NewRegistry())
145+
require.NoError(t, err)
146+
clock := quartz.NewMock(t)
147+
s.clock = clock
148+
metadata := []*proto.StreamMetadata{{
149+
StreamHash: 0x1,
150+
TotalSize: 100,
151+
}}
152+
// Metadata at clock.Now() should update the first rate bucket because
153+
// the mocked clock starts at 2024-01-01T00:00:00Z.
154+
time1 := clock.Now()
155+
rates, err := s.UpdateRates("tenant", metadata, time1)
156+
require.NoError(t, err)
157+
expected := make([]rateBucket, 1, 5)
158+
expected[0].timestamp = time1.UnixNano()
159+
expected[0].size = 100
160+
require.Len(t, rates, 1)
161+
require.Equal(t, expected, rates[0].rateBuckets)
162+
// Update the first bucket with the same metadata but 1 second later.
163+
clock.Advance(time.Second)
164+
time2 := clock.Now()
165+
rates, err = s.UpdateRates("tenant", metadata, time2)
166+
require.NoError(t, err)
167+
expected[0].size = 200
168+
require.Equal(t, expected, rates[0].rateBuckets)
169+
// Advance the clock forward to the next bucket. Should update the second
170+
// bucket and leave the first bucket unmodified.
171+
clock.Advance(time.Minute)
172+
time3 := clock.Now()
173+
rates, err = s.UpdateRates("tenant", metadata, time3)
174+
require.NoError(t, err)
175+
// As the clock is now 1 second ahead of the bucket start time, we must
176+
// truncate the expected time to the start of the bucket.
177+
expected = append(expected, rateBucket{})
178+
expected[1].timestamp = time3.Truncate(time.Minute).UnixNano()
179+
expected[1].size = 100
180+
require.Equal(t, expected, rates[0].rateBuckets)
181+
// Advance the clock to the last bucket.
182+
clock.Advance(3 * time.Minute)
183+
time4 := clock.Now()
184+
rates, err = s.UpdateRates("tenant", metadata, time4)
185+
require.NoError(t, err)
186+
expected = append(expected, rateBucket{})
187+
expected[2].timestamp = time4.Truncate(time.Minute).UnixNano()
188+
expected[2].size = 100
189+
require.Equal(t, expected, rates[0].rateBuckets)
190+
// Advance the clock one last one. It should wrap around to the start of
191+
// the list and replace the original bucket with time1.
192+
clock.Advance(time.Minute)
193+
time5 := clock.Now()
194+
rates, err = s.UpdateRates("tenant", metadata, time5)
195+
require.NoError(t, err)
196+
expected[0].timestamp = time5.Truncate(time.Minute).UnixNano()
197+
expected[0].size = 100
198+
require.Equal(t, expected, rates[0].rateBuckets)
199+
}
200200

201201
// This test asserts that rate buckets are not updated while the TODOs are
202202
// in place.

0 commit comments

Comments
 (0)