Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,11 +433,18 @@ func LabelAdaptersHash(b []byte, ls []mimirpb.LabelAdapter) ([]byte, uint64) {
// PushToStorageAndReleaseRequest ignores SkipLabelNameValidation because that field is only used in the distributor and not in the ingester.
// PushToStorageAndReleaseRequest aborts the request if it encounters an error.
func (p *parallelStorageShards) PushToStorageAndReleaseRequest(ctx context.Context, request *mimirpb.WriteRequest) error {
hashBuf := make([]byte, 0, 1024)
// Shard series by the hash of their labels. Skip sharding and always append series to
// the first shard when there's only one shard.
var hashBuf []byte
if p.numShards > 1 {
hashBuf = make([]byte, 0, 1024)
}
for i := range request.Timeseries {
var shard uint64
hashBuf, shard = LabelAdaptersHash(hashBuf, request.Timeseries[i].Labels)
shard = shard % uint64(p.numShards)
shard := uint64(0)
if p.numShards > 1 {
hashBuf, shard = LabelAdaptersHash(hashBuf, request.Timeseries[i].Labels)
shard = shard % uint64(p.numShards)
}

if err := p.shards[shard].AddToBatch(ctx, request.Source, request.Timeseries[i]); err != nil {
return fmt.Errorf("encountered a non-client error when ingesting; this error was for a previous write request for the same tenant: %w", err)
Expand All @@ -450,15 +457,20 @@ func (p *parallelStorageShards) PushToStorageAndReleaseRequest(ctx context.Conte
mimirpb.ReuseSliceOnly(request.Timeseries)
request.Timeseries = nil

// Push metadata to every shard in a round-robin fashion.
// Start from a random shard to avoid hotspots in the first few shards when there are not many metadata pieces in each request.
shard := rand.IntN(p.numShards)
// Push metadata to every shard in a round-robin fashion. Start from a random shard to avoid hotspots in the first
// few shards when there are not many metadata pieces in each request. Skip the sharding if there's only one shard.'
shard := 0
if p.numShards > 1 {
shard = rand.IntN(p.numShards)
}
for mdIdx := range request.Metadata {
if err := p.shards[shard].AddMetadataToBatch(ctx, request.Source, request.Metadata[mdIdx]); err != nil {
return fmt.Errorf("encountered a non-client error when ingesting; this error was for a previous write request for the same tenant: %w", err)
}
shard++
shard %= p.numShards
if p.numShards > 1 {
shard++
shard %= p.numShards
}
}

// We might have some data left in some of the queues in the shards, but they will be flushed eventually once Stop is called, and we're certain that no more data is coming.
Expand Down
68 changes: 29 additions & 39 deletions pkg/storage/ingest/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1691,20 +1691,7 @@ func BenchmarkPusherConsumer(b *testing.B) {
}

func BenchmarkPusherConsumer_ParallelPusher_MultiTenant(b *testing.B) {
// This benchmark tests PusherConsumer.Consume() with parallelStoragePusher
// under the hood, varying the number of unique tenants owning the records.
//
// Config values matching production settings (CLI flags):
// -ingest-storage.kafka.ingestion-concurrency-batch-size=150
// -ingest-storage.kafka.ingestion-concurrency-estimated-bytes-per-sample=200
// -ingest-storage.kafka.ingestion-concurrency-max=8
// -ingest-storage.kafka.ingestion-concurrency-queue-capacity=3
// -ingest-storage.kafka.ingestion-concurrency-target-flushes-per-shard=40

const (
totalRecords = 1000
timeseriesPerRecord = 10
)
const timeseriesPerRecord = 10

// Create a no-op pusher that just releases the request (shared across all runs).
pusher := pusherFunc(func(_ context.Context, request *mimirpb.WriteRequest) error {
Expand All @@ -1713,41 +1700,44 @@ func BenchmarkPusherConsumer_ParallelPusher_MultiTenant(b *testing.B) {
return nil
})

// Configure KafkaConfig with production-like values (shared across all runs).
// Configure the ingester with Kafka settings using the same settings used in Grafana Cloud.
kcfg := KafkaConfig{}
flagext.DefaultValues(&kcfg)
kcfg.IngestionConcurrencyMax = 8
kcfg.IngestionConcurrencyBatchSize = 150
kcfg.IngestionConcurrencyEstimatedBytesPerSample = 200
kcfg.IngestionConcurrencyQueueCapacity = 3
kcfg.IngestionConcurrencyTargetFlushesPerShard = 40
kcfg.IngestionConcurrencySequentialPusherEnabled = false

for _, numRecords := range []int{1, 10, 100, 1000} {
for _, numTenants := range []int{1, 10, 100, 1000} {
// Create records upfront, outside b.Run(), so record creation doesn't affect timing.
records := make([]*kgo.Record, numRecords)
for i := range records {
tenantID := fmt.Sprintf("tenant-%d", i%numTenants)
wr := &mimirpb.WriteRequest{Timeseries: make([]mimirpb.PreallocTimeseries, timeseriesPerRecord)}
for j := range wr.Timeseries {
wr.Timeseries[j] = mockPreallocTimeseries(fmt.Sprintf("series_%d_%d", i, j))
}
content, err := wr.Marshal()
require.NoError(b, err)

for _, numTenants := range []int{1, 10, 100, 1000} {
// Create records upfront, outside b.Run(), so record creation doesn't affect timing.
records := make([]*kgo.Record, totalRecords)
for i := range records {
tenantID := fmt.Sprintf("tenant-%d", i%numTenants)
wr := &mimirpb.WriteRequest{Timeseries: make([]mimirpb.PreallocTimeseries, timeseriesPerRecord)}
for j := range wr.Timeseries {
wr.Timeseries[j] = mockPreallocTimeseries(fmt.Sprintf("series_%d_%d", i, j))
records[i] = createRecord("test-topic", 1, content, 1)
records[i].Key = []byte(tenantID)
records[i].Context = context.Background()
}
content, err := wr.Marshal()
require.NoError(b, err)

records[i] = createRecord("test-topic", 1, content, 1)
records[i].Key = []byte(tenantID)
records[i].Context = context.Background()
}
b.Run(fmt.Sprintf("records=%d,tenants=%d", numRecords, numTenants), func(b *testing.B) {
metrics := NewPusherConsumerMetrics(prometheus.NewPedanticRegistry())
c := NewPusherConsumer(pusher, kcfg, metrics, log.NewNopLogger())

b.Run(fmt.Sprintf("tenants=%d", numTenants), func(b *testing.B) {
metrics := NewPusherConsumerMetrics(prometheus.NewPedanticRegistry())
c := NewPusherConsumer(pusher, kcfg, metrics, log.NewNopLogger())

b.ResetTimer()
for range b.N {
err := c.Consume(context.Background(), slices.Values(records))
require.NoError(b, err)
}
})
b.ResetTimer()
for range b.N {
err := c.Consume(context.Background(), slices.Values(records))
require.NoError(b, err)
}
})
}
}
}