From 5d5dc906a0f6b3502dbfa03e043f3fed0a6197b8 Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Thu, 25 Sep 2025 15:03:35 -0400 Subject: [PATCH 1/3] store-gateway: use streaming for skipchunks requests This changes store-gateways to use the streaming code path for requests where chunks are not sent. This provides the same results to queriers since they already know how to parse the streaming response and allows us to remove the non-streaming code path in the store-gateway. Notably however, this change does not remove any of the non-streaming code from queriers. This is required to avoid failing requests during a rollout. When queriers make requests without chunks to new store- gateways, the streaming code path will be used and queriers will be able to parse the result. When requests without chunks are made to older store-gateways the non-streaming code path will be used which queriers are still able to parse the result of. Part of #12673 Signed-off-by: Nick Pillitteri --- CHANGELOG.md | 2 +- pkg/querier/block.go | 1 + pkg/querier/blocks_store_queryable.go | 75 ++++-- pkg/storegateway/bucket.go | 243 ++++++------------- pkg/storegateway/bucket_e2e_test.go | 8 +- pkg/storegateway/bucket_store_server_test.go | 15 +- pkg/storegateway/bucket_test.go | 15 +- pkg/storegateway/storepb/custom.go | 8 - 8 files changed, 147 insertions(+), 220 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index db7539f1042..bb6e9a2520e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ * [CHANGE] Query-frontend: Use the Mimir Query Engine (MQE) by default. #12361 * [CHANGE] Query-frontend: Remove the CLI flags `-querier.frontend-address`, `-querier.max-outstanding-requests-per-tenant`, and `-query-frontend.querier-forget-delay` and corresponding YAML configurations. This is part of a change that makes the query-scheduler a required component. This removes the ability to run the query-frontend with an embedded query-scheduler. Instead, you must run a dedicated query-scheduler component. #12200 * [CHANGE] Ingester: Remove deprecated `-ingester.stream-chunks-when-using-blocks` CLI flag and `ingester_stream_chunks_when_using_blocks` runtime configuration option. #12615 -* [CHANGE] Querier: Require non-zero values for `-querier.streaming-chunks-per-ingester-buffer-size` and `-querier.streaming-chunks-per-store-gateway-buffer-size` CLI flags and corresponding YAML configurations. This is part of a change that makes streaming required between queriers and ingesters and store-gateways. Streaming has been the default since Mimir 2.14. #12790 +* [CHANGE] Querier: Require non-zero values for `-querier.streaming-chunks-per-ingester-buffer-size` and `-querier.streaming-chunks-per-store-gateway-buffer-size` CLI flags and corresponding YAML configurations. This is part of a change that makes streaming required between queriers and ingesters and store-gateways. Streaming has been the default since Mimir 2.14. #12790 #12818 * [CHANGE] Remove support for the experimental read-write deployment mode. #12584 * [CHANGE] Store-gateway: Update default value of `-store-gateway.dynamic-replication.multiple` to `5` to increase replication of recent blocks. #12433 * [CHANGE] Cost attribution: Reduce the default maximum per-user cardinality of cost attribution labels to 2000. #12625 diff --git a/pkg/querier/block.go b/pkg/querier/block.go index 983e3171c28..78a68c3ffbe 100644 --- a/pkg/querier/block.go +++ b/pkg/querier/block.go @@ -47,6 +47,7 @@ func convertMatchersToLabelMatcher(matchers []*labels.Matcher) []storepb.LabelMa return converted } +// TODO: Remove after non-streaming removed from querier // Implementation of storage.SeriesSet, based on individual responses from store client. type blockQuerierSeriesSet struct { series []*storepb.Series diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 4dbc2632392..aaa27f65656 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -767,7 +767,7 @@ func canBlockWithCompactorShardIndexContainQueryShard(queryShardIndex, queryShar // considered serious errors. All other errors are not returned, but they give rise to fetch retrials. // // In case of a successful run, fetchSeriesFromStores returns a startStreamingChunks function to start streaming -// chunks for the fetched series iff it was a streaming call for series+chunks. startStreamingChunks must be called +// chunks for the fetched series if it was a streaming call for series+chunks. startStreamingChunks must be called // before iterating on the series. func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *storage.SelectHints, clients map[BlocksStoreClient][]ulid.ULID, minT int64, maxT int64, tenantID string, convertedMatchers []storepb.LabelMatcher) (_ []storage.SeriesSet, _ []ulid.ULID, _ annotations.Annotations, streamReaders []*storeGatewayStreamReader, estimateChunks func() int, _ error) { var ( @@ -792,9 +792,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor // Concurrently fetch series from all clients. for c, blockIDs := range clients { g.Go(func() error { - log, reqCtx := spanlogger.New(reqCtx, spanLog, tracer, "blocksStoreQuerier.fetchSeriesFromStores") - defer log.Finish() - log.SetTag("store_gateway_address", c.RemoteAddress()) + clientSpanLog, clientCtx := spanlogger.New(reqCtx, spanLog, tracer, "blocksStoreQuerier.fetchSeriesFromStores") + defer clientSpanLog.Finish() + clientSpanLog.SetTag("store_gateway_address", c.RemoteAddress()) // See: https://github.com/prometheus/prometheus/pull/8050 // TODO(goutham): we should ideally be passing the hints down to the storage layer @@ -807,7 +807,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor return errors.Wrapf(err, "failed to create series request") } - stream, err := c.Series(reqCtx, req) + stream, err := c.Series(clientCtx, req) if err == nil { mtx.Lock() streams = append(streams, stream) @@ -816,7 +816,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor } if err != nil { if shouldRetry(err) { - level.Warn(log).Log("msg", "failed to fetch series", "remote", c.RemoteAddress(), "err", err) + level.Warn(clientSpanLog).Log("msg", "failed to fetch series", "remote", c.RemoteAddress(), "err", err) return nil } @@ -851,7 +851,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor return err } if shouldRetry { - level.Warn(log).Log("msg", "failed to receive series", "remote", c.RemoteAddress(), "err", err) + level.Warn(clientSpanLog).Log("msg", "failed to receive series", "remote", c.RemoteAddress(), "err", err) return nil } @@ -883,14 +883,16 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor reqStats.AddFetchedChunkBytes(uint64(chunkBytes)) reqStats.AddFetchedChunks(uint64(chunksFetched)) - level.Debug(log).Log("msg", "received series from store-gateway", + clientSpanLog.DebugLog( + "msg", "received series from store-gateway", "instance", c.RemoteAddress(), "fetched series", len(mySeries), "fetched chunk bytes", chunkBytes, "fetched chunks", chunksFetched, "fetched index bytes", indexBytesFetched, "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), - "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) + "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "), + ) if chunkInfo != nil { for i, s := range mySeries { chunkInfo.StartSeries(mimirpb.FromLabelAdaptersToLabels(s.Labels)) @@ -901,18 +903,33 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor } else if len(myStreamingSeriesLabels) > 0 { // FetchedChunks and FetchedChunkBytes are added by the SeriesChunksStreamReader. reqStats.AddFetchedSeries(uint64(len(myStreamingSeriesLabels))) - streamReader = newStoreGatewayStreamReader(reqCtx, stream, len(myStreamingSeriesLabels), queryLimiter, memoryTracker, reqStats, q.metrics, q.logger) - level.Debug(log).Log("msg", "received streaming series from store-gateway", + + if !req.SkipChunks { + streamReader = newStoreGatewayStreamReader(clientCtx, stream, len(myStreamingSeriesLabels), queryLimiter, memoryTracker, reqStats, q.metrics, q.logger) + } else { + // If we aren't creating a stream reader for reading chunks, we need to close the stream + // ourselves. It's safe to close the stream multiple times so we don't worry about closing + // all streams below when there's an error. + if err := util.CloseAndExhaust[*storepb.SeriesResponse](stream); err != nil { + level.Warn(clientSpanLog).Log("msg", "closing store-gateway client stream failed", "err", err) + } + } + + clientSpanLog.DebugLog( + "msg", "received streaming series from store-gateway", "instance", c.RemoteAddress(), "fetched series", len(myStreamingSeriesLabels), "fetched index bytes", indexBytesFetched, "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), - "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) + "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "), + ) } else { - level.Debug(log).Log("msg", "received no series from store-gateway", + clientSpanLog.DebugLog( + "msg", "received no series from store-gateway", "instance", c.RemoteAddress(), "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), - "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) + "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "), + ) } // Store the result. @@ -923,13 +940,23 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor if chunkInfo != nil { chunkInfo.SetMsg("store-gateway streaming") } - seriesSets = append(seriesSets, &blockStreamingQuerierSeriesSet{ - series: myStreamingSeriesLabels, - streamReader: streamReader, - chunkInfo: chunkInfo, - remoteAddress: c.RemoteAddress(), - }) - streamReaders = append(streamReaders, streamReader) + + if !req.SkipChunks { + seriesSets = append(seriesSets, &blockStreamingQuerierSeriesSet{ + series: myStreamingSeriesLabels, + streamReader: streamReader, + chunkInfo: chunkInfo, + remoteAddress: c.RemoteAddress(), + }) + streamReaders = append(streamReaders, streamReader) + } else { + noChunkSeries := make([]storage.Series, 0, len(myStreamingSeriesLabels)) + for _, lbls := range myStreamingSeriesLabels { + noChunkSeries = append(noChunkSeries, series.NewConcreteSeries(lbls, nil, nil)) + } + + seriesSets = append(seriesSets, series.NewConcreteSeriesSetFromSortedSeries(noChunkSeries)) + } } warnings.Merge(myWarnings) queriedBlocks = append(queriedBlocks, myQueriedBlocks...) @@ -1236,9 +1263,13 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skip } if skipChunks { - // We don't do the streaming call if we are not requesting the chunks. + // We don't do the streaming call if we are not requesting the chunks. Note that setting + // a batch size of 0 is ignored in newer store-gateways as the streaming code path is always + // used. We set this to 0 anyway here so that newer queriers will continue to work with + // older store-gateways that have not been updated to ignore the value 0. streamingBatchSize = 0 } + return &storepb.SeriesRequest{ MinTime: minT, MaxTime: maxT, diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index dd78a97efc0..f45782488ca 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -67,6 +67,7 @@ const ( labelDecode = "decode" targetQueryStreamBatchMessageSize = 1 * 1024 * 1024 + fallbackStreamingBatchSize = 256 ) type BucketStoreStats struct { @@ -555,10 +556,17 @@ type seriesChunks struct { // Series implements the storegatewaypb.StoreGatewayServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) (err error) { - if req.SkipChunks { - // We don't do the streaming call if we are not requesting the chunks. - req.StreamingChunksBatchSize = 0 + ctx := srv.Context() + spanLogger := spanlogger.FromContext(ctx, s.logger) + + // Previously, a batch size of 0 was used to indicate this was a Prometheus series request + // that didn't need to load any chunk data (in addition to the SkipChunks field) and used + // the non-streaming path. Now that Prometheus series requests use the streaming path we + // need to make sure that the batch size is always non-zero. + if req.StreamingChunksBatchSize == 0 { + req.StreamingChunksBatchSize = fallbackStreamingBatchSize } + defer func() { err = mapSeriesError(err) }() matchers, err := storepb.MatchersToPromMatchers(req.Matchers...) @@ -572,15 +580,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor return status.Error(codes.InvalidArgument, errors.Wrap(err, "parse query sharding label").Error()) } - var ( - spanLogger = spanlogger.FromContext(srv.Context(), s.logger) - ctx = srv.Context() - stats = newSafeQueryStats() - reqBlockMatchers []*labels.Matcher - ) + stats := newSafeQueryStats() defer s.recordSeriesCallResult(stats) defer s.recordRequestAmbientTime(stats, time.Now()) + var reqBlockMatchers []*labels.Matcher if req.Hints != nil { reqHints := &hintspb.SeriesRequestHints{} if err := types.UnmarshalAny(req.Hints, reqHints); err != nil { @@ -593,21 +597,20 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor } } - logSeriesRequestToSpan(srv.Context(), s.logger, req.MinTime, req.MaxTime, matchers, reqBlockMatchers, shardSelector, req.StreamingChunksBatchSize) + logSeriesRequestToSpan(spanLogger, req.MinTime, req.MaxTime, matchers, reqBlockMatchers, shardSelector, req.StreamingChunksBatchSize) blocks, indexReaders, chunkReaders := s.openBlocksForReading(ctx, req.SkipChunks, req.MinTime, req.MaxTime, reqBlockMatchers, stats) // We must keep the readers open until all their data has been sent. - for _, r := range indexReaders { - defer runutil.CloseWithLogOnErr(s.logger, r, "close block index reader") - } - for _, r := range chunkReaders { - defer runutil.CloseWithLogOnErr(s.logger, r, "close block chunk reader") - } - - var readers *bucketChunkReaders - if !req.SkipChunks { - readers = newChunkReaders(chunkReaders) - } + defer func() { + for _, r := range indexReaders { + runutil.CloseWithLogOnErr(s.logger, r, "close block index reader") + } + }() + defer func() { + for _, r := range chunkReaders { + runutil.CloseWithLogOnErr(s.logger, r, "close block chunk reader") + } + }() // Wait for the query gate only after opening blocks. Opening blocks is usually fast (~1ms), // but sometimes it can take minutes if the block isn't loaded and there is a surge in queries for unloaded blocks. @@ -617,92 +620,76 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor } defer done() + // Send hints about the blocks loaded for this query before sending series or stats. + resHints := buildSeriesResponseHints(blocks) + if err = s.sendHints(srv, resHints); err != nil { + return err + } + var ( - streamingIterators *streamingSeriesIterators - resHints = &hintspb.SeriesResponseHints{} + streamingSeriesCount int + seriesSet storepb.SeriesSet + streamingIterators *streamingSeriesIterators + seriesLoadStart = time.Now() + chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks")) + seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) ) - for _, b := range blocks { - resHints.AddQueriedBlock(b.meta.ULID) - if b.meta.Compaction.Level == 1 && b.meta.Thanos.Source == block.ReceiveSource && !b.queried.Load() { - level.Debug(s.logger).Log("msg", "queried non-compacted block", "blockId", b.meta.ULID, "ooo", b.meta.Compaction.FromOutOfOrder()) - } - - b.queried.Store(true) - } - if err := s.sendHints(srv, resHints); err != nil { + seriesSet, streamingIterators, err = s.createIteratorForChunksStreamingLabelsPhase(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats) + if err != nil { return err } - streamingSeriesCount := 0 - if req.StreamingChunksBatchSize > 0 { - var ( - seriesSet storepb.SeriesSet - seriesLoadStart = time.Now() - chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks")) - seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) - ) - - seriesSet, streamingIterators, err = s.createIteratorForChunksStreamingLabelsPhase(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats) - if err != nil { - return err - } - - streamingSeriesCount, err = s.sendStreamingSeriesLabelsAndStats(req, srv, stats, seriesSet) - if err != nil { - return err - } - spanLogger.DebugLog( - "msg", "sent streaming series", - "num_series", streamingSeriesCount, - "duration", time.Since(seriesLoadStart), - ) + streamingSeriesCount, err = s.sendStreamingSeriesLabelsAndStats(req, srv, stats, seriesSet) + if err != nil { + return err + } + spanLogger.DebugLog( + "msg", "sent streaming series", + "num_series", streamingSeriesCount, + "duration", time.Since(seriesLoadStart), + ) - if streamingSeriesCount == 0 { - // There is no series to send chunks for. - return nil - } + if streamingSeriesCount == 0 { + // There are no series to send chunks for. + return nil } - // We create the limiter twice in the case of streaming so that we don't double count the series - // and hit the limit prematurely. - chunksLimiter := s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks")) - seriesLimiter := s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) + if !req.SkipChunks { + // We create the limiter twice in the case of streaming so that we don't double count the series + // and hit the limit prematurely. + chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks")) + seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) + + readers := newChunkReaders(chunkReaders) + chunksLoadStart := time.Now() - start := time.Now() - if req.StreamingChunksBatchSize > 0 { seriesChunkIt := s.createIteratorForChunksStreamingChunksPhase(ctx, readers, stats, chunksLimiter, seriesLimiter, streamingIterators) err = s.sendStreamingChunks(req, srv, seriesChunkIt, stats, streamingSeriesCount) - } else { - var seriesSet storepb.SeriesSet - seriesSet, err = s.createIteratorForNonChunksStreamingRequest(ctx, req, blocks, indexReaders, readers, shardSelector, matchers, chunksLimiter, seriesLimiter, stats) if err != nil { return err } - err = s.sendSeriesChunks(req, srv, seriesSet, stats) - } - if err != nil { - return - } - numSeries, numChunks := stats.seriesAndChunksCount() - debugMessage := "sent series" - if req.StreamingChunksBatchSize > 0 { - debugMessage = "sent streaming chunks" + numSeries, numChunks := stats.seriesAndChunksCount() + spanLogger.DebugLog( + "msg", "sent streaming chunks", + "num_series", numSeries, + "num_chunks", numChunks, + "duration", time.Since(chunksLoadStart), + ) } - spanLogger.DebugLog( - "msg", debugMessage, - "num_series", numSeries, - "num_chunks", numChunks, - "duration", time.Since(start), - ) - if req.StreamingChunksBatchSize == 0 { - // Stats were not sent before, so send it now. - return s.sendStats(srv, stats) + return nil +} + +func buildSeriesResponseHints(blocks []*bucketBlock) *hintspb.SeriesResponseHints { + resHints := &hintspb.SeriesResponseHints{} + for _, b := range blocks { + resHints.AddQueriedBlock(b.meta.ULID) + b.queried.Store(true) } - return nil + return resHints } func mapSeriesError(err error) error { @@ -918,53 +905,6 @@ func (s *BucketStore) sendStreamingChunks( return it.Err() } -func (s *BucketStore) sendSeriesChunks( - req *storepb.SeriesRequest, - srv storegatewaypb.StoreGateway_SeriesServer, - seriesSet storepb.SeriesSet, - stats *safeQueryStats, -) error { - var ( - encodeDuration time.Duration - sendDuration time.Duration - seriesCount, chunksCount int - ) - - defer stats.update(func(stats *queryStats) { - stats.mergedSeriesCount += seriesCount - stats.mergedChunksCount += chunksCount - - stats.streamingSeriesEncodeResponseDuration += encodeDuration - stats.streamingSeriesSendResponseDuration += sendDuration - }) - - for seriesSet.Next() { - seriesCount++ - // IMPORTANT: do not retain the memory returned by seriesSet.At() beyond this loop cycle - // because the subsequent call to seriesSet.Next() may release it. But it is safe to hold - // onto lset because the labels are not released. - lset, chks := seriesSet.At() - series := storepb.Series{ - Labels: mimirpb.FromLabelsToLabelAdapters(lset), - } - if !req.SkipChunks { - series.Chunks = chks - chunksCount += len(chks) - s.metrics.chunkSizeBytes.Observe(float64(chunksSize(chks))) - } - - err := s.sendMessage("series", srv, storepb.NewSeriesResponse(&series), &encodeDuration, &sendDuration) - if err != nil { - return err - } - } - if seriesSet.Err() != nil { - return errors.Wrap(seriesSet.Err(), "expand series set") - } - - return nil -} - func (s *BucketStore) sendMessage(typ string, srv storegatewaypb.StoreGateway_SeriesServer, msg interface{}, encodeDuration, sendDuration *time.Duration) error { // We encode it ourselves into a PreparedMsg in order to measure the time it takes. encodeBegin := time.Now() @@ -1012,8 +952,7 @@ func (s *BucketStore) sendStats(srv storegatewaypb.StoreGateway_SeriesServer, st return nil } -func logSeriesRequestToSpan(ctx context.Context, l log.Logger, minT, maxT int64, matchers, blockMatchers []*labels.Matcher, shardSelector *sharding.ShardSelector, streamingChunksBatchSize uint64) { - spanLogger := spanlogger.FromContext(ctx, l) +func logSeriesRequestToSpan(spanLogger *spanlogger.SpanLogger, minT, maxT int64, matchers, blockMatchers []*labels.Matcher, shardSelector *sharding.ShardSelector, streamingChunksBatchSize uint64) { spanLogger.DebugLog( "msg", "BucketStore.Series", "request min time", time.UnixMilli(minT).UTC().Format(time.RFC3339Nano), @@ -1032,38 +971,6 @@ func chunksSize(chks []storepb.AggrChunk) (size int) { return size } -// createIteratorForNonChunksStreamingRequest is used when the streaming feature is not enabled. -func (s *BucketStore) createIteratorForNonChunksStreamingRequest( - ctx context.Context, - req *storepb.SeriesRequest, - blocks []*bucketBlock, - indexReaders map[ulid.ULID]*bucketIndexReader, - chunkReaders *bucketChunkReaders, - shardSelector *sharding.ShardSelector, - matchers []*labels.Matcher, - chunksLimiter ChunksLimiter, - seriesLimiter SeriesLimiter, - stats *safeQueryStats, -) (storepb.SeriesSet, error) { - strategy := defaultStrategy - if req.SkipChunks { - strategy = noChunkRefs - } - it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, strategy, nil) - if err != nil { - return nil, err - } - - var set storepb.SeriesSet - if !req.SkipChunks { - ss := newChunksPreloadingIterator(ctx, s.logger, s.userID, *chunkReaders, it, s.maxSeriesPerBatch, stats) - set = newSeriesChunksSeriesSet(ss) - } else { - set = newSeriesSetWithoutChunks(ctx, it, stats) - } - return set, nil -} - // createIteratorForChunksStreamingLabelsPhase is used when streaming feature is enabled. // It returns a series set that only contains the series labels without any chunks information. // The streamingSeriesIterators should be re-used when getting chunks to save on computation. diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index ab4cd473d2c..04ccf1af4e7 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -438,8 +438,8 @@ func testBucketStore_e2e(t *testing.T, ctx context.Context, s *storeSuite, addit }, } for i, tcase := range append(testCases, additionalCases...) { - for _, streamingBatchSize := range []int{0, 1, 5, 256} { - if ok := t.Run(fmt.Sprintf("%d,streamingBatchSize=%d", i, streamingBatchSize), func(t *testing.T) { + for _, streamingBatchSize := range []int{1, 5, 256} { + t.Run(fmt.Sprintf("%d,streamingBatchSize=%d", i, streamingBatchSize), func(t *testing.T) { tcase.req.StreamingChunksBatchSize = uint64(streamingBatchSize) seriesSet, _, _, _, err := srv.Series(context.Background(), tcase.req) require.NoError(t, err) @@ -451,9 +451,7 @@ func testBucketStore_e2e(t *testing.T, ctx context.Context, s *storeSuite, addit assert.Equal(t, tcase.expectedChunkLen, len(s.Chunks)) } assertQueryStatsMetricsRecorded(t, len(tcase.expected), tcase.expectedChunkLen, s.metricsRegistry) - }); !ok { - return - } + }) } } } diff --git a/pkg/storegateway/bucket_store_server_test.go b/pkg/storegateway/bucket_store_server_test.go index a96def26ef9..d2abb6a7d6a 100644 --- a/pkg/storegateway/bucket_store_server_test.go +++ b/pkg/storegateway/bucket_store_server_test.go @@ -143,11 +143,6 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest } if recvSeries := res.GetStreamingSeries(); recvSeries != nil { - if req.StreamingChunksBatchSize == 0 || req.SkipChunks { - err = errors.New("got a streaming series when streaming was disabled") - return - } - var recvSeriesData []byte // We prefer to stay on the safest side at this stage @@ -172,7 +167,7 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest } } - if req.StreamingChunksBatchSize > 0 && !req.SkipChunks { + if !req.SkipChunks { res, err = stream.Recv() if err != nil { if errors.Is(err, io.EOF) { @@ -250,6 +245,14 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest if errors.Is(err, io.EOF) { err = nil } + } else { + // The store-gateway returns a streaming response even when SkipChunks = true. In that + // case, convert the streaming series responses into the expected series set. + for _, streamingSeries := range streamingSeriesSet { + seriesSet = append(seriesSet, &storepb.Series{ + Labels: streamingSeries.Labels, + }) + } } return diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 3b90fc27fc7..8540651ef8f 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -1484,9 +1484,9 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries }) } - streamingBatchSizes := []int{0} + streamingBatchSizes := []int{5} if !t.IsBenchmark() { - streamingBatchSizes = []int{0, 1, 5} + streamingBatchSizes = []int{1, 5} } for _, streamingBatchSize := range streamingBatchSizes { t.Run(fmt.Sprintf("streamingBatchSize=%d", streamingBatchSize), func(t test.TB) { @@ -1672,7 +1672,7 @@ func TestBucketStore_Series_Concurrency(t *testing.T) { // Run the test with different batch sizes. for _, batchSize := range []int{len(expectedSeries) / 100, len(expectedSeries) * 2} { t.Run(fmt.Sprintf("batch size: %d", batchSize), func(t *testing.T) { - for _, streamBatchSize := range []int{0, 10} { + for _, streamBatchSize := range []int{1, 10} { t.Run(fmt.Sprintf("streamBatchSize:%d", streamBatchSize), func(t *testing.T) { // Reset the memory pool tracker. seriesChunkRefsSetPool.(*pool.TrackedPool).Reset() @@ -2406,7 +2406,7 @@ func testBucketStoreSeriesBlockWithMultipleChunks( for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - for _, streamingBatchSize := range []int{0, 1, 5} { + for _, streamingBatchSize := range []int{1, 5} { t.Run(fmt.Sprintf("streamingBatchSize=%d", streamingBatchSize), func(t *testing.T) { req := &storepb.SeriesRequest{ MinTime: testData.reqMinTime, @@ -2430,12 +2430,7 @@ func testBucketStoreSeriesBlockWithMultipleChunks( numSamples += decodedChunk.NumSamples() } - if streamingBatchSize == 0 { - require.Zero(t, estimatedChunks) - } else { - require.InDelta(t, len(seriesSet[0].Chunks), estimatedChunks, 0.1, "number of chunks estimations should be within 10% of the actual number of chunks") - } - + require.InDelta(t, len(seriesSet[0].Chunks), estimatedChunks, 0.1, "number of chunks estimations should be within 10% of the actual number of chunks") compareToPromChunks(t, seriesSet[0].Chunks, mimirpb.FromLabelAdaptersToLabels(seriesSet[0].Labels), testData.reqMinTime, testData.reqMaxTime, promBlock) }) } diff --git a/pkg/storegateway/storepb/custom.go b/pkg/storegateway/storepb/custom.go index 2e25b1da9d8..cbcb86839e4 100644 --- a/pkg/storegateway/storepb/custom.go +++ b/pkg/storegateway/storepb/custom.go @@ -16,14 +16,6 @@ import ( "github.com/grafana/mimir/pkg/storage/chunk" ) -func NewSeriesResponse(series *Series) *SeriesResponse { - return &SeriesResponse{ - Result: &SeriesResponse_Series{ - Series: series, - }, - } -} - func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse { return &SeriesResponse{ Result: &SeriesResponse_Hints{ From 356362a5c10c66fa7d6baee27b2ac4ea38bf9ff2 Mon Sep 17 00:00:00 2001 From: Nick Pillitteri <56quarters@users.noreply.github.com> Date: Thu, 2 Oct 2025 14:42:55 -0400 Subject: [PATCH 2/3] Update CHANGELOG.md Co-authored-by: Taylor C <41653732+tacole02@users.noreply.github.com> --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bb6e9a2520e..0dbf275052a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ * [CHANGE] Query-frontend: Use the Mimir Query Engine (MQE) by default. #12361 * [CHANGE] Query-frontend: Remove the CLI flags `-querier.frontend-address`, `-querier.max-outstanding-requests-per-tenant`, and `-query-frontend.querier-forget-delay` and corresponding YAML configurations. This is part of a change that makes the query-scheduler a required component. This removes the ability to run the query-frontend with an embedded query-scheduler. Instead, you must run a dedicated query-scheduler component. #12200 * [CHANGE] Ingester: Remove deprecated `-ingester.stream-chunks-when-using-blocks` CLI flag and `ingester_stream_chunks_when_using_blocks` runtime configuration option. #12615 -* [CHANGE] Querier: Require non-zero values for `-querier.streaming-chunks-per-ingester-buffer-size` and `-querier.streaming-chunks-per-store-gateway-buffer-size` CLI flags and corresponding YAML configurations. This is part of a change that makes streaming required between queriers and ingesters and store-gateways. Streaming has been the default since Mimir 2.14. #12790 #12818 +* [CHANGE] Querier: Require non-zero values for `-querier.streaming-chunks-per-ingester-buffer-size` and `-querier.streaming-chunks-per-store-gateway-buffer-size` CLI flags and corresponding YAML configurations. This is part of a change that makes streaming required between queriers, ingesters, and store-gateways. Streaming has been the default since Mimir 2.14. #12790 #12818 * [CHANGE] Remove support for the experimental read-write deployment mode. #12584 * [CHANGE] Store-gateway: Update default value of `-store-gateway.dynamic-replication.multiple` to `5` to increase replication of recent blocks. #12433 * [CHANGE] Cost attribution: Reduce the default maximum per-user cardinality of cost attribution labels to 2000. #12625 From abfbb24b0109fe62f7f88b8d456fa02347d998bb Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Thu, 2 Oct 2025 15:00:17 -0400 Subject: [PATCH 3/3] code review changes: invert skipchunks conditionals Signed-off-by: Nick Pillitteri --- pkg/querier/blocks_store_queryable.go | 22 +++++----- pkg/storegateway/bucket.go | 45 +++++++++----------- pkg/storegateway/bucket_store_server_test.go | 18 ++++---- 3 files changed, 41 insertions(+), 44 deletions(-) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index aaa27f65656..233ce693e48 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -904,15 +904,15 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor // FetchedChunks and FetchedChunkBytes are added by the SeriesChunksStreamReader. reqStats.AddFetchedSeries(uint64(len(myStreamingSeriesLabels))) - if !req.SkipChunks { - streamReader = newStoreGatewayStreamReader(clientCtx, stream, len(myStreamingSeriesLabels), queryLimiter, memoryTracker, reqStats, q.metrics, q.logger) - } else { + if req.SkipChunks { // If we aren't creating a stream reader for reading chunks, we need to close the stream // ourselves. It's safe to close the stream multiple times so we don't worry about closing // all streams below when there's an error. if err := util.CloseAndExhaust[*storepb.SeriesResponse](stream); err != nil { level.Warn(clientSpanLog).Log("msg", "closing store-gateway client stream failed", "err", err) } + } else { + streamReader = newStoreGatewayStreamReader(clientCtx, stream, len(myStreamingSeriesLabels), queryLimiter, memoryTracker, reqStats, q.metrics, q.logger) } clientSpanLog.DebugLog( @@ -941,7 +941,14 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor chunkInfo.SetMsg("store-gateway streaming") } - if !req.SkipChunks { + if req.SkipChunks { + noChunkSeries := make([]storage.Series, 0, len(myStreamingSeriesLabels)) + for _, lbls := range myStreamingSeriesLabels { + noChunkSeries = append(noChunkSeries, series.NewConcreteSeries(lbls, nil, nil)) + } + + seriesSets = append(seriesSets, series.NewConcreteSeriesSetFromSortedSeries(noChunkSeries)) + } else { seriesSets = append(seriesSets, &blockStreamingQuerierSeriesSet{ series: myStreamingSeriesLabels, streamReader: streamReader, @@ -949,13 +956,6 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor remoteAddress: c.RemoteAddress(), }) streamReaders = append(streamReaders, streamReader) - } else { - noChunkSeries := make([]storage.Series, 0, len(myStreamingSeriesLabels)) - for _, lbls := range myStreamingSeriesLabels { - noChunkSeries = append(noChunkSeries, series.NewConcreteSeries(lbls, nil, nil)) - } - - seriesSets = append(seriesSets, series.NewConcreteSeriesSetFromSortedSeries(noChunkSeries)) } } warnings.Merge(myWarnings) diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index f45782488ca..8cabc88d15d 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -605,8 +605,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor for _, r := range indexReaders { runutil.CloseWithLogOnErr(s.logger, r, "close block index reader") } - }() - defer func() { + for _, r := range chunkReaders { runutil.CloseWithLogOnErr(s.logger, r, "close block chunk reader") } @@ -650,35 +649,33 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor "duration", time.Since(seriesLoadStart), ) - if streamingSeriesCount == 0 { - // There are no series to send chunks for. + if streamingSeriesCount == 0 || req.SkipChunks { + // There are no series to send chunks for, or we aren't sending chunks. return nil } - if !req.SkipChunks { - // We create the limiter twice in the case of streaming so that we don't double count the series - // and hit the limit prematurely. - chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks")) - seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) - - readers := newChunkReaders(chunkReaders) - chunksLoadStart := time.Now() + // We create the limiter twice in the case of streaming so that we don't double count the series + // and hit the limit prematurely. + chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks")) + seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) - seriesChunkIt := s.createIteratorForChunksStreamingChunksPhase(ctx, readers, stats, chunksLimiter, seriesLimiter, streamingIterators) - err = s.sendStreamingChunks(req, srv, seriesChunkIt, stats, streamingSeriesCount) - if err != nil { - return err - } + readers := newChunkReaders(chunkReaders) + chunksLoadStart := time.Now() - numSeries, numChunks := stats.seriesAndChunksCount() - spanLogger.DebugLog( - "msg", "sent streaming chunks", - "num_series", numSeries, - "num_chunks", numChunks, - "duration", time.Since(chunksLoadStart), - ) + seriesChunkIt := s.createIteratorForChunksStreamingChunksPhase(ctx, readers, stats, chunksLimiter, seriesLimiter, streamingIterators) + err = s.sendStreamingChunks(req, srv, seriesChunkIt, stats, streamingSeriesCount) + if err != nil { + return err } + numSeries, numChunks := stats.seriesAndChunksCount() + spanLogger.DebugLog( + "msg", "sent streaming chunks", + "num_series", numSeries, + "num_chunks", numChunks, + "duration", time.Since(chunksLoadStart), + ) + return nil } diff --git a/pkg/storegateway/bucket_store_server_test.go b/pkg/storegateway/bucket_store_server_test.go index d2abb6a7d6a..97886c081dd 100644 --- a/pkg/storegateway/bucket_store_server_test.go +++ b/pkg/storegateway/bucket_store_server_test.go @@ -167,7 +167,15 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest } } - if !req.SkipChunks { + if req.SkipChunks { + // The store-gateway returns a streaming response even when SkipChunks = true. In that + // case, convert the streaming series responses into the expected series set. + for _, streamingSeries := range streamingSeriesSet { + seriesSet = append(seriesSet, &storepb.Series{ + Labels: streamingSeries.Labels, + }) + } + } else { res, err = stream.Recv() if err != nil { if errors.Is(err, io.EOF) { @@ -245,14 +253,6 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest if errors.Is(err, io.EOF) { err = nil } - } else { - // The store-gateway returns a streaming response even when SkipChunks = true. In that - // case, convert the streaming series responses into the expected series set. - for _, streamingSeries := range streamingSeriesSet { - seriesSet = append(seriesSet, &storepb.Series{ - Labels: streamingSeries.Labels, - }) - } } return