diff --git a/pkg/storage/ingest/pusher.go b/pkg/storage/ingest/pusher.go index f04f3c3e356..3bd76116697 100644 --- a/pkg/storage/ingest/pusher.go +++ b/pkg/storage/ingest/pusher.go @@ -433,11 +433,18 @@ func LabelAdaptersHash(b []byte, ls []mimirpb.LabelAdapter) ([]byte, uint64) { // PushToStorageAndReleaseRequest ignores SkipLabelNameValidation because that field is only used in the distributor and not in the ingester. // PushToStorageAndReleaseRequest aborts the request if it encounters an error. func (p *parallelStorageShards) PushToStorageAndReleaseRequest(ctx context.Context, request *mimirpb.WriteRequest) error { - hashBuf := make([]byte, 0, 1024) + // Shard series by the hash of their labels. Skip sharding and always append series to + // the first shard when there's only one shard. + var hashBuf []byte + if p.numShards > 1 { + hashBuf = make([]byte, 0, 1024) + } for i := range request.Timeseries { - var shard uint64 - hashBuf, shard = LabelAdaptersHash(hashBuf, request.Timeseries[i].Labels) - shard = shard % uint64(p.numShards) + shard := uint64(0) + if p.numShards > 1 { + hashBuf, shard = LabelAdaptersHash(hashBuf, request.Timeseries[i].Labels) + shard = shard % uint64(p.numShards) + } if err := p.shards[shard].AddToBatch(ctx, request.Source, request.Timeseries[i]); err != nil { return fmt.Errorf("encountered a non-client error when ingesting; this error was for a previous write request for the same tenant: %w", err) @@ -450,15 +457,20 @@ func (p *parallelStorageShards) PushToStorageAndReleaseRequest(ctx context.Conte mimirpb.ReuseSliceOnly(request.Timeseries) request.Timeseries = nil - // Push metadata to every shard in a round-robin fashion. - // Start from a random shard to avoid hotspots in the first few shards when there are not many metadata pieces in each request. - shard := rand.IntN(p.numShards) + // Push metadata to every shard in a round-robin fashion. Start from a random shard to avoid hotspots in the first + // few shards when there are not many metadata pieces in each request. Skip the sharding if there's only one shard.' + shard := 0 + if p.numShards > 1 { + shard = rand.IntN(p.numShards) + } for mdIdx := range request.Metadata { if err := p.shards[shard].AddMetadataToBatch(ctx, request.Source, request.Metadata[mdIdx]); err != nil { return fmt.Errorf("encountered a non-client error when ingesting; this error was for a previous write request for the same tenant: %w", err) } - shard++ - shard %= p.numShards + if p.numShards > 1 { + shard++ + shard %= p.numShards + } } // We might have some data left in some of the queues in the shards, but they will be flushed eventually once Stop is called, and we're certain that no more data is coming. diff --git a/pkg/storage/ingest/pusher_test.go b/pkg/storage/ingest/pusher_test.go index 7347696ba6b..95ad2dde7ac 100644 --- a/pkg/storage/ingest/pusher_test.go +++ b/pkg/storage/ingest/pusher_test.go @@ -1691,20 +1691,7 @@ func BenchmarkPusherConsumer(b *testing.B) { } func BenchmarkPusherConsumer_ParallelPusher_MultiTenant(b *testing.B) { - // This benchmark tests PusherConsumer.Consume() with parallelStoragePusher - // under the hood, varying the number of unique tenants owning the records. - // - // Config values matching production settings (CLI flags): - // -ingest-storage.kafka.ingestion-concurrency-batch-size=150 - // -ingest-storage.kafka.ingestion-concurrency-estimated-bytes-per-sample=200 - // -ingest-storage.kafka.ingestion-concurrency-max=8 - // -ingest-storage.kafka.ingestion-concurrency-queue-capacity=3 - // -ingest-storage.kafka.ingestion-concurrency-target-flushes-per-shard=40 - - const ( - totalRecords = 1000 - timeseriesPerRecord = 10 - ) + const timeseriesPerRecord = 10 // Create a no-op pusher that just releases the request (shared across all runs). pusher := pusherFunc(func(_ context.Context, request *mimirpb.WriteRequest) error { @@ -1713,7 +1700,7 @@ func BenchmarkPusherConsumer_ParallelPusher_MultiTenant(b *testing.B) { return nil }) - // Configure KafkaConfig with production-like values (shared across all runs). + // Configure the ingester with Kafka settings using the same settings used in Grafana Cloud. kcfg := KafkaConfig{} flagext.DefaultValues(&kcfg) kcfg.IngestionConcurrencyMax = 8 @@ -1721,33 +1708,36 @@ func BenchmarkPusherConsumer_ParallelPusher_MultiTenant(b *testing.B) { kcfg.IngestionConcurrencyEstimatedBytesPerSample = 200 kcfg.IngestionConcurrencyQueueCapacity = 3 kcfg.IngestionConcurrencyTargetFlushesPerShard = 40 + kcfg.IngestionConcurrencySequentialPusherEnabled = false + + for _, numRecords := range []int{1, 10, 100, 1000} { + for _, numTenants := range []int{1, 10, 100, 1000} { + // Create records upfront, outside b.Run(), so record creation doesn't affect timing. + records := make([]*kgo.Record, numRecords) + for i := range records { + tenantID := fmt.Sprintf("tenant-%d", i%numTenants) + wr := &mimirpb.WriteRequest{Timeseries: make([]mimirpb.PreallocTimeseries, timeseriesPerRecord)} + for j := range wr.Timeseries { + wr.Timeseries[j] = mockPreallocTimeseries(fmt.Sprintf("series_%d_%d", i, j)) + } + content, err := wr.Marshal() + require.NoError(b, err) - for _, numTenants := range []int{1, 10, 100, 1000} { - // Create records upfront, outside b.Run(), so record creation doesn't affect timing. - records := make([]*kgo.Record, totalRecords) - for i := range records { - tenantID := fmt.Sprintf("tenant-%d", i%numTenants) - wr := &mimirpb.WriteRequest{Timeseries: make([]mimirpb.PreallocTimeseries, timeseriesPerRecord)} - for j := range wr.Timeseries { - wr.Timeseries[j] = mockPreallocTimeseries(fmt.Sprintf("series_%d_%d", i, j)) + records[i] = createRecord("test-topic", 1, content, 1) + records[i].Key = []byte(tenantID) + records[i].Context = context.Background() } - content, err := wr.Marshal() - require.NoError(b, err) - records[i] = createRecord("test-topic", 1, content, 1) - records[i].Key = []byte(tenantID) - records[i].Context = context.Background() - } + b.Run(fmt.Sprintf("records=%d,tenants=%d", numRecords, numTenants), func(b *testing.B) { + metrics := NewPusherConsumerMetrics(prometheus.NewPedanticRegistry()) + c := NewPusherConsumer(pusher, kcfg, metrics, log.NewNopLogger()) - b.Run(fmt.Sprintf("tenants=%d", numTenants), func(b *testing.B) { - metrics := NewPusherConsumerMetrics(prometheus.NewPedanticRegistry()) - c := NewPusherConsumer(pusher, kcfg, metrics, log.NewNopLogger()) - - b.ResetTimer() - for range b.N { - err := c.Consume(context.Background(), slices.Values(records)) - require.NoError(b, err) - } - }) + b.ResetTimer() + for range b.N { + err := c.Consume(context.Background(), slices.Values(records)) + require.NoError(b, err) + } + }) + } } }