diff --git a/CHANGELOG.md b/CHANGELOG.md index db7539f1042..0dbf275052a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ * [CHANGE] Query-frontend: Use the Mimir Query Engine (MQE) by default. #12361 * [CHANGE] Query-frontend: Remove the CLI flags `-querier.frontend-address`, `-querier.max-outstanding-requests-per-tenant`, and `-query-frontend.querier-forget-delay` and corresponding YAML configurations. This is part of a change that makes the query-scheduler a required component. This removes the ability to run the query-frontend with an embedded query-scheduler. Instead, you must run a dedicated query-scheduler component. #12200 * [CHANGE] Ingester: Remove deprecated `-ingester.stream-chunks-when-using-blocks` CLI flag and `ingester_stream_chunks_when_using_blocks` runtime configuration option. #12615 -* [CHANGE] Querier: Require non-zero values for `-querier.streaming-chunks-per-ingester-buffer-size` and `-querier.streaming-chunks-per-store-gateway-buffer-size` CLI flags and corresponding YAML configurations. This is part of a change that makes streaming required between queriers and ingesters and store-gateways. Streaming has been the default since Mimir 2.14. #12790 +* [CHANGE] Querier: Require non-zero values for `-querier.streaming-chunks-per-ingester-buffer-size` and `-querier.streaming-chunks-per-store-gateway-buffer-size` CLI flags and corresponding YAML configurations. This is part of a change that makes streaming required between queriers, ingesters, and store-gateways. Streaming has been the default since Mimir 2.14. #12790 #12818 * [CHANGE] Remove support for the experimental read-write deployment mode. #12584 * [CHANGE] Store-gateway: Update default value of `-store-gateway.dynamic-replication.multiple` to `5` to increase replication of recent blocks. #12433 * [CHANGE] Cost attribution: Reduce the default maximum per-user cardinality of cost attribution labels to 2000. #12625 diff --git a/pkg/querier/block.go b/pkg/querier/block.go index 983e3171c28..78a68c3ffbe 100644 --- a/pkg/querier/block.go +++ b/pkg/querier/block.go @@ -47,6 +47,7 @@ func convertMatchersToLabelMatcher(matchers []*labels.Matcher) []storepb.LabelMa return converted } +// TODO: Remove after non-streaming removed from querier // Implementation of storage.SeriesSet, based on individual responses from store client. type blockQuerierSeriesSet struct { series []*storepb.Series diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 4dbc2632392..233ce693e48 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -767,7 +767,7 @@ func canBlockWithCompactorShardIndexContainQueryShard(queryShardIndex, queryShar // considered serious errors. All other errors are not returned, but they give rise to fetch retrials. // // In case of a successful run, fetchSeriesFromStores returns a startStreamingChunks function to start streaming -// chunks for the fetched series iff it was a streaming call for series+chunks. startStreamingChunks must be called +// chunks for the fetched series if it was a streaming call for series+chunks. startStreamingChunks must be called // before iterating on the series. func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *storage.SelectHints, clients map[BlocksStoreClient][]ulid.ULID, minT int64, maxT int64, tenantID string, convertedMatchers []storepb.LabelMatcher) (_ []storage.SeriesSet, _ []ulid.ULID, _ annotations.Annotations, streamReaders []*storeGatewayStreamReader, estimateChunks func() int, _ error) { var ( @@ -792,9 +792,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor // Concurrently fetch series from all clients. for c, blockIDs := range clients { g.Go(func() error { - log, reqCtx := spanlogger.New(reqCtx, spanLog, tracer, "blocksStoreQuerier.fetchSeriesFromStores") - defer log.Finish() - log.SetTag("store_gateway_address", c.RemoteAddress()) + clientSpanLog, clientCtx := spanlogger.New(reqCtx, spanLog, tracer, "blocksStoreQuerier.fetchSeriesFromStores") + defer clientSpanLog.Finish() + clientSpanLog.SetTag("store_gateway_address", c.RemoteAddress()) // See: https://github.com/prometheus/prometheus/pull/8050 // TODO(goutham): we should ideally be passing the hints down to the storage layer @@ -807,7 +807,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor return errors.Wrapf(err, "failed to create series request") } - stream, err := c.Series(reqCtx, req) + stream, err := c.Series(clientCtx, req) if err == nil { mtx.Lock() streams = append(streams, stream) @@ -816,7 +816,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor } if err != nil { if shouldRetry(err) { - level.Warn(log).Log("msg", "failed to fetch series", "remote", c.RemoteAddress(), "err", err) + level.Warn(clientSpanLog).Log("msg", "failed to fetch series", "remote", c.RemoteAddress(), "err", err) return nil } @@ -851,7 +851,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor return err } if shouldRetry { - level.Warn(log).Log("msg", "failed to receive series", "remote", c.RemoteAddress(), "err", err) + level.Warn(clientSpanLog).Log("msg", "failed to receive series", "remote", c.RemoteAddress(), "err", err) return nil } @@ -883,14 +883,16 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor reqStats.AddFetchedChunkBytes(uint64(chunkBytes)) reqStats.AddFetchedChunks(uint64(chunksFetched)) - level.Debug(log).Log("msg", "received series from store-gateway", + clientSpanLog.DebugLog( + "msg", "received series from store-gateway", "instance", c.RemoteAddress(), "fetched series", len(mySeries), "fetched chunk bytes", chunkBytes, "fetched chunks", chunksFetched, "fetched index bytes", indexBytesFetched, "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), - "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) + "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "), + ) if chunkInfo != nil { for i, s := range mySeries { chunkInfo.StartSeries(mimirpb.FromLabelAdaptersToLabels(s.Labels)) @@ -901,18 +903,33 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor } else if len(myStreamingSeriesLabels) > 0 { // FetchedChunks and FetchedChunkBytes are added by the SeriesChunksStreamReader. reqStats.AddFetchedSeries(uint64(len(myStreamingSeriesLabels))) - streamReader = newStoreGatewayStreamReader(reqCtx, stream, len(myStreamingSeriesLabels), queryLimiter, memoryTracker, reqStats, q.metrics, q.logger) - level.Debug(log).Log("msg", "received streaming series from store-gateway", + + if req.SkipChunks { + // If we aren't creating a stream reader for reading chunks, we need to close the stream + // ourselves. It's safe to close the stream multiple times so we don't worry about closing + // all streams below when there's an error. + if err := util.CloseAndExhaust[*storepb.SeriesResponse](stream); err != nil { + level.Warn(clientSpanLog).Log("msg", "closing store-gateway client stream failed", "err", err) + } + } else { + streamReader = newStoreGatewayStreamReader(clientCtx, stream, len(myStreamingSeriesLabels), queryLimiter, memoryTracker, reqStats, q.metrics, q.logger) + } + + clientSpanLog.DebugLog( + "msg", "received streaming series from store-gateway", "instance", c.RemoteAddress(), "fetched series", len(myStreamingSeriesLabels), "fetched index bytes", indexBytesFetched, "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), - "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) + "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "), + ) } else { - level.Debug(log).Log("msg", "received no series from store-gateway", + clientSpanLog.DebugLog( + "msg", "received no series from store-gateway", "instance", c.RemoteAddress(), "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), - "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) + "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "), + ) } // Store the result. @@ -923,13 +940,23 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor if chunkInfo != nil { chunkInfo.SetMsg("store-gateway streaming") } - seriesSets = append(seriesSets, &blockStreamingQuerierSeriesSet{ - series: myStreamingSeriesLabels, - streamReader: streamReader, - chunkInfo: chunkInfo, - remoteAddress: c.RemoteAddress(), - }) - streamReaders = append(streamReaders, streamReader) + + if req.SkipChunks { + noChunkSeries := make([]storage.Series, 0, len(myStreamingSeriesLabels)) + for _, lbls := range myStreamingSeriesLabels { + noChunkSeries = append(noChunkSeries, series.NewConcreteSeries(lbls, nil, nil)) + } + + seriesSets = append(seriesSets, series.NewConcreteSeriesSetFromSortedSeries(noChunkSeries)) + } else { + seriesSets = append(seriesSets, &blockStreamingQuerierSeriesSet{ + series: myStreamingSeriesLabels, + streamReader: streamReader, + chunkInfo: chunkInfo, + remoteAddress: c.RemoteAddress(), + }) + streamReaders = append(streamReaders, streamReader) + } } warnings.Merge(myWarnings) queriedBlocks = append(queriedBlocks, myQueriedBlocks...) @@ -1236,9 +1263,13 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skip } if skipChunks { - // We don't do the streaming call if we are not requesting the chunks. + // We don't do the streaming call if we are not requesting the chunks. Note that setting + // a batch size of 0 is ignored in newer store-gateways as the streaming code path is always + // used. We set this to 0 anyway here so that newer queriers will continue to work with + // older store-gateways that have not been updated to ignore the value 0. streamingBatchSize = 0 } + return &storepb.SeriesRequest{ MinTime: minT, MaxTime: maxT, diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index dd78a97efc0..8cabc88d15d 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -67,6 +67,7 @@ const ( labelDecode = "decode" targetQueryStreamBatchMessageSize = 1 * 1024 * 1024 + fallbackStreamingBatchSize = 256 ) type BucketStoreStats struct { @@ -555,10 +556,17 @@ type seriesChunks struct { // Series implements the storegatewaypb.StoreGatewayServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) (err error) { - if req.SkipChunks { - // We don't do the streaming call if we are not requesting the chunks. - req.StreamingChunksBatchSize = 0 + ctx := srv.Context() + spanLogger := spanlogger.FromContext(ctx, s.logger) + + // Previously, a batch size of 0 was used to indicate this was a Prometheus series request + // that didn't need to load any chunk data (in addition to the SkipChunks field) and used + // the non-streaming path. Now that Prometheus series requests use the streaming path we + // need to make sure that the batch size is always non-zero. + if req.StreamingChunksBatchSize == 0 { + req.StreamingChunksBatchSize = fallbackStreamingBatchSize } + defer func() { err = mapSeriesError(err) }() matchers, err := storepb.MatchersToPromMatchers(req.Matchers...) @@ -572,15 +580,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor return status.Error(codes.InvalidArgument, errors.Wrap(err, "parse query sharding label").Error()) } - var ( - spanLogger = spanlogger.FromContext(srv.Context(), s.logger) - ctx = srv.Context() - stats = newSafeQueryStats() - reqBlockMatchers []*labels.Matcher - ) + stats := newSafeQueryStats() defer s.recordSeriesCallResult(stats) defer s.recordRequestAmbientTime(stats, time.Now()) + var reqBlockMatchers []*labels.Matcher if req.Hints != nil { reqHints := &hintspb.SeriesRequestHints{} if err := types.UnmarshalAny(req.Hints, reqHints); err != nil { @@ -593,21 +597,19 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor } } - logSeriesRequestToSpan(srv.Context(), s.logger, req.MinTime, req.MaxTime, matchers, reqBlockMatchers, shardSelector, req.StreamingChunksBatchSize) + logSeriesRequestToSpan(spanLogger, req.MinTime, req.MaxTime, matchers, reqBlockMatchers, shardSelector, req.StreamingChunksBatchSize) blocks, indexReaders, chunkReaders := s.openBlocksForReading(ctx, req.SkipChunks, req.MinTime, req.MaxTime, reqBlockMatchers, stats) // We must keep the readers open until all their data has been sent. - for _, r := range indexReaders { - defer runutil.CloseWithLogOnErr(s.logger, r, "close block index reader") - } - for _, r := range chunkReaders { - defer runutil.CloseWithLogOnErr(s.logger, r, "close block chunk reader") - } + defer func() { + for _, r := range indexReaders { + runutil.CloseWithLogOnErr(s.logger, r, "close block index reader") + } - var readers *bucketChunkReaders - if !req.SkipChunks { - readers = newChunkReaders(chunkReaders) - } + for _, r := range chunkReaders { + runutil.CloseWithLogOnErr(s.logger, r, "close block chunk reader") + } + }() // Wait for the query gate only after opening blocks. Opening blocks is usually fast (~1ms), // but sometimes it can take minutes if the block isn't loaded and there is a surge in queries for unloaded blocks. @@ -617,92 +619,74 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor } defer done() + // Send hints about the blocks loaded for this query before sending series or stats. + resHints := buildSeriesResponseHints(blocks) + if err = s.sendHints(srv, resHints); err != nil { + return err + } + var ( - streamingIterators *streamingSeriesIterators - resHints = &hintspb.SeriesResponseHints{} + streamingSeriesCount int + seriesSet storepb.SeriesSet + streamingIterators *streamingSeriesIterators + seriesLoadStart = time.Now() + chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks")) + seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) ) - for _, b := range blocks { - resHints.AddQueriedBlock(b.meta.ULID) - if b.meta.Compaction.Level == 1 && b.meta.Thanos.Source == block.ReceiveSource && !b.queried.Load() { - level.Debug(s.logger).Log("msg", "queried non-compacted block", "blockId", b.meta.ULID, "ooo", b.meta.Compaction.FromOutOfOrder()) - } - - b.queried.Store(true) - } - if err := s.sendHints(srv, resHints); err != nil { + seriesSet, streamingIterators, err = s.createIteratorForChunksStreamingLabelsPhase(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats) + if err != nil { return err } - streamingSeriesCount := 0 - if req.StreamingChunksBatchSize > 0 { - var ( - seriesSet storepb.SeriesSet - seriesLoadStart = time.Now() - chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks")) - seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) - ) - - seriesSet, streamingIterators, err = s.createIteratorForChunksStreamingLabelsPhase(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats) - if err != nil { - return err - } - - streamingSeriesCount, err = s.sendStreamingSeriesLabelsAndStats(req, srv, stats, seriesSet) - if err != nil { - return err - } - spanLogger.DebugLog( - "msg", "sent streaming series", - "num_series", streamingSeriesCount, - "duration", time.Since(seriesLoadStart), - ) + streamingSeriesCount, err = s.sendStreamingSeriesLabelsAndStats(req, srv, stats, seriesSet) + if err != nil { + return err + } + spanLogger.DebugLog( + "msg", "sent streaming series", + "num_series", streamingSeriesCount, + "duration", time.Since(seriesLoadStart), + ) - if streamingSeriesCount == 0 { - // There is no series to send chunks for. - return nil - } + if streamingSeriesCount == 0 || req.SkipChunks { + // There are no series to send chunks for, or we aren't sending chunks. + return nil } // We create the limiter twice in the case of streaming so that we don't double count the series // and hit the limit prematurely. - chunksLimiter := s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks")) - seriesLimiter := s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) + chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks")) + seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) - start := time.Now() - if req.StreamingChunksBatchSize > 0 { - seriesChunkIt := s.createIteratorForChunksStreamingChunksPhase(ctx, readers, stats, chunksLimiter, seriesLimiter, streamingIterators) - err = s.sendStreamingChunks(req, srv, seriesChunkIt, stats, streamingSeriesCount) - } else { - var seriesSet storepb.SeriesSet - seriesSet, err = s.createIteratorForNonChunksStreamingRequest(ctx, req, blocks, indexReaders, readers, shardSelector, matchers, chunksLimiter, seriesLimiter, stats) - if err != nil { - return err - } - err = s.sendSeriesChunks(req, srv, seriesSet, stats) - } + readers := newChunkReaders(chunkReaders) + chunksLoadStart := time.Now() + + seriesChunkIt := s.createIteratorForChunksStreamingChunksPhase(ctx, readers, stats, chunksLimiter, seriesLimiter, streamingIterators) + err = s.sendStreamingChunks(req, srv, seriesChunkIt, stats, streamingSeriesCount) if err != nil { - return + return err } numSeries, numChunks := stats.seriesAndChunksCount() - debugMessage := "sent series" - if req.StreamingChunksBatchSize > 0 { - debugMessage = "sent streaming chunks" - } spanLogger.DebugLog( - "msg", debugMessage, + "msg", "sent streaming chunks", "num_series", numSeries, "num_chunks", numChunks, - "duration", time.Since(start), + "duration", time.Since(chunksLoadStart), ) - if req.StreamingChunksBatchSize == 0 { - // Stats were not sent before, so send it now. - return s.sendStats(srv, stats) + return nil +} + +func buildSeriesResponseHints(blocks []*bucketBlock) *hintspb.SeriesResponseHints { + resHints := &hintspb.SeriesResponseHints{} + for _, b := range blocks { + resHints.AddQueriedBlock(b.meta.ULID) + b.queried.Store(true) } - return nil + return resHints } func mapSeriesError(err error) error { @@ -918,53 +902,6 @@ func (s *BucketStore) sendStreamingChunks( return it.Err() } -func (s *BucketStore) sendSeriesChunks( - req *storepb.SeriesRequest, - srv storegatewaypb.StoreGateway_SeriesServer, - seriesSet storepb.SeriesSet, - stats *safeQueryStats, -) error { - var ( - encodeDuration time.Duration - sendDuration time.Duration - seriesCount, chunksCount int - ) - - defer stats.update(func(stats *queryStats) { - stats.mergedSeriesCount += seriesCount - stats.mergedChunksCount += chunksCount - - stats.streamingSeriesEncodeResponseDuration += encodeDuration - stats.streamingSeriesSendResponseDuration += sendDuration - }) - - for seriesSet.Next() { - seriesCount++ - // IMPORTANT: do not retain the memory returned by seriesSet.At() beyond this loop cycle - // because the subsequent call to seriesSet.Next() may release it. But it is safe to hold - // onto lset because the labels are not released. - lset, chks := seriesSet.At() - series := storepb.Series{ - Labels: mimirpb.FromLabelsToLabelAdapters(lset), - } - if !req.SkipChunks { - series.Chunks = chks - chunksCount += len(chks) - s.metrics.chunkSizeBytes.Observe(float64(chunksSize(chks))) - } - - err := s.sendMessage("series", srv, storepb.NewSeriesResponse(&series), &encodeDuration, &sendDuration) - if err != nil { - return err - } - } - if seriesSet.Err() != nil { - return errors.Wrap(seriesSet.Err(), "expand series set") - } - - return nil -} - func (s *BucketStore) sendMessage(typ string, srv storegatewaypb.StoreGateway_SeriesServer, msg interface{}, encodeDuration, sendDuration *time.Duration) error { // We encode it ourselves into a PreparedMsg in order to measure the time it takes. encodeBegin := time.Now() @@ -1012,8 +949,7 @@ func (s *BucketStore) sendStats(srv storegatewaypb.StoreGateway_SeriesServer, st return nil } -func logSeriesRequestToSpan(ctx context.Context, l log.Logger, minT, maxT int64, matchers, blockMatchers []*labels.Matcher, shardSelector *sharding.ShardSelector, streamingChunksBatchSize uint64) { - spanLogger := spanlogger.FromContext(ctx, l) +func logSeriesRequestToSpan(spanLogger *spanlogger.SpanLogger, minT, maxT int64, matchers, blockMatchers []*labels.Matcher, shardSelector *sharding.ShardSelector, streamingChunksBatchSize uint64) { spanLogger.DebugLog( "msg", "BucketStore.Series", "request min time", time.UnixMilli(minT).UTC().Format(time.RFC3339Nano), @@ -1032,38 +968,6 @@ func chunksSize(chks []storepb.AggrChunk) (size int) { return size } -// createIteratorForNonChunksStreamingRequest is used when the streaming feature is not enabled. -func (s *BucketStore) createIteratorForNonChunksStreamingRequest( - ctx context.Context, - req *storepb.SeriesRequest, - blocks []*bucketBlock, - indexReaders map[ulid.ULID]*bucketIndexReader, - chunkReaders *bucketChunkReaders, - shardSelector *sharding.ShardSelector, - matchers []*labels.Matcher, - chunksLimiter ChunksLimiter, - seriesLimiter SeriesLimiter, - stats *safeQueryStats, -) (storepb.SeriesSet, error) { - strategy := defaultStrategy - if req.SkipChunks { - strategy = noChunkRefs - } - it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, strategy, nil) - if err != nil { - return nil, err - } - - var set storepb.SeriesSet - if !req.SkipChunks { - ss := newChunksPreloadingIterator(ctx, s.logger, s.userID, *chunkReaders, it, s.maxSeriesPerBatch, stats) - set = newSeriesChunksSeriesSet(ss) - } else { - set = newSeriesSetWithoutChunks(ctx, it, stats) - } - return set, nil -} - // createIteratorForChunksStreamingLabelsPhase is used when streaming feature is enabled. // It returns a series set that only contains the series labels without any chunks information. // The streamingSeriesIterators should be re-used when getting chunks to save on computation. diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index ab4cd473d2c..04ccf1af4e7 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -438,8 +438,8 @@ func testBucketStore_e2e(t *testing.T, ctx context.Context, s *storeSuite, addit }, } for i, tcase := range append(testCases, additionalCases...) { - for _, streamingBatchSize := range []int{0, 1, 5, 256} { - if ok := t.Run(fmt.Sprintf("%d,streamingBatchSize=%d", i, streamingBatchSize), func(t *testing.T) { + for _, streamingBatchSize := range []int{1, 5, 256} { + t.Run(fmt.Sprintf("%d,streamingBatchSize=%d", i, streamingBatchSize), func(t *testing.T) { tcase.req.StreamingChunksBatchSize = uint64(streamingBatchSize) seriesSet, _, _, _, err := srv.Series(context.Background(), tcase.req) require.NoError(t, err) @@ -451,9 +451,7 @@ func testBucketStore_e2e(t *testing.T, ctx context.Context, s *storeSuite, addit assert.Equal(t, tcase.expectedChunkLen, len(s.Chunks)) } assertQueryStatsMetricsRecorded(t, len(tcase.expected), tcase.expectedChunkLen, s.metricsRegistry) - }); !ok { - return - } + }) } } } diff --git a/pkg/storegateway/bucket_store_server_test.go b/pkg/storegateway/bucket_store_server_test.go index a96def26ef9..97886c081dd 100644 --- a/pkg/storegateway/bucket_store_server_test.go +++ b/pkg/storegateway/bucket_store_server_test.go @@ -143,11 +143,6 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest } if recvSeries := res.GetStreamingSeries(); recvSeries != nil { - if req.StreamingChunksBatchSize == 0 || req.SkipChunks { - err = errors.New("got a streaming series when streaming was disabled") - return - } - var recvSeriesData []byte // We prefer to stay on the safest side at this stage @@ -172,7 +167,15 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest } } - if req.StreamingChunksBatchSize > 0 && !req.SkipChunks { + if req.SkipChunks { + // The store-gateway returns a streaming response even when SkipChunks = true. In that + // case, convert the streaming series responses into the expected series set. + for _, streamingSeries := range streamingSeriesSet { + seriesSet = append(seriesSet, &storepb.Series{ + Labels: streamingSeries.Labels, + }) + } + } else { res, err = stream.Recv() if err != nil { if errors.Is(err, io.EOF) { diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 3b90fc27fc7..8540651ef8f 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -1484,9 +1484,9 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries }) } - streamingBatchSizes := []int{0} + streamingBatchSizes := []int{5} if !t.IsBenchmark() { - streamingBatchSizes = []int{0, 1, 5} + streamingBatchSizes = []int{1, 5} } for _, streamingBatchSize := range streamingBatchSizes { t.Run(fmt.Sprintf("streamingBatchSize=%d", streamingBatchSize), func(t test.TB) { @@ -1672,7 +1672,7 @@ func TestBucketStore_Series_Concurrency(t *testing.T) { // Run the test with different batch sizes. for _, batchSize := range []int{len(expectedSeries) / 100, len(expectedSeries) * 2} { t.Run(fmt.Sprintf("batch size: %d", batchSize), func(t *testing.T) { - for _, streamBatchSize := range []int{0, 10} { + for _, streamBatchSize := range []int{1, 10} { t.Run(fmt.Sprintf("streamBatchSize:%d", streamBatchSize), func(t *testing.T) { // Reset the memory pool tracker. seriesChunkRefsSetPool.(*pool.TrackedPool).Reset() @@ -2406,7 +2406,7 @@ func testBucketStoreSeriesBlockWithMultipleChunks( for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - for _, streamingBatchSize := range []int{0, 1, 5} { + for _, streamingBatchSize := range []int{1, 5} { t.Run(fmt.Sprintf("streamingBatchSize=%d", streamingBatchSize), func(t *testing.T) { req := &storepb.SeriesRequest{ MinTime: testData.reqMinTime, @@ -2430,12 +2430,7 @@ func testBucketStoreSeriesBlockWithMultipleChunks( numSamples += decodedChunk.NumSamples() } - if streamingBatchSize == 0 { - require.Zero(t, estimatedChunks) - } else { - require.InDelta(t, len(seriesSet[0].Chunks), estimatedChunks, 0.1, "number of chunks estimations should be within 10% of the actual number of chunks") - } - + require.InDelta(t, len(seriesSet[0].Chunks), estimatedChunks, 0.1, "number of chunks estimations should be within 10% of the actual number of chunks") compareToPromChunks(t, seriesSet[0].Chunks, mimirpb.FromLabelAdaptersToLabels(seriesSet[0].Labels), testData.reqMinTime, testData.reqMaxTime, promBlock) }) } diff --git a/pkg/storegateway/storepb/custom.go b/pkg/storegateway/storepb/custom.go index 2e25b1da9d8..cbcb86839e4 100644 --- a/pkg/storegateway/storepb/custom.go +++ b/pkg/storegateway/storepb/custom.go @@ -16,14 +16,6 @@ import ( "github.com/grafana/mimir/pkg/storage/chunk" ) -func NewSeriesResponse(series *Series) *SeriesResponse { - return &SeriesResponse{ - Result: &SeriesResponse_Series{ - Series: series, - }, - } -} - func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse { return &SeriesResponse{ Result: &SeriesResponse_Hints{