Skip to content

Commit ab72005

Browse files
committed
store-gateway: use streaming for skipchunks requests
This changes store-gateways to use the streaming code path for requests where chunks are not sent. This provides the same results to queriers since they already know how to parse the streaming response and allows us to remove the non-streaming code path in the store-gateway. Notably however, this change does not make remove any of the non- streaming code from queriers. This is required to avoid failing requests during a rollout. When queriers make requests without chunks to new store-gateways, the streaming code path will be used and queriers will be able to parse the result. When requests without chunks are made to older store-gateways the non-streaming code path will be used which queriers are still able to parse the result of. Part of #12673 Signed-off-by: Nick Pillitteri <[email protected]>
1 parent 7a38743 commit ab72005

File tree

6 files changed

+87
-175
lines changed

6 files changed

+87
-175
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* [CHANGE] Query-frontend: Use the Mimir Query Engine (MQE) by default. #12361
1717
* [CHANGE] Query-frontend: Remove the CLI flags `-querier.frontend-address`, `-querier.max-outstanding-requests-per-tenant`, and `-query-frontend.querier-forget-delay` and corresponding YAML configurations. This is part of a change that makes the query-scheduler a required component. This removes the ability to run the query-frontend with an embedded query-scheduler. Instead, you must run a dedicated query-scheduler component. #12200
1818
* [CHANGE] Ingester: Remove deprecated `-ingester.stream-chunks-when-using-blocks` CLI flag and `ingester_stream_chunks_when_using_blocks` runtime configuration option. #12615
19-
* [CHANGE] Querier: Require non-zero values for `-querier.streaming-chunks-per-ingester-buffer-size` and `-querier.streaming-chunks-per-store-gateway-buffer-size` CLI flags and corresponding YAML configurations. This is part of a change that makes streaming required between queriers and ingesters and store-gateways. Streaming has been the default since Mimir 2.14. #12790
19+
* [CHANGE] Querier: Require non-zero values for `-querier.streaming-chunks-per-ingester-buffer-size` and `-querier.streaming-chunks-per-store-gateway-buffer-size` CLI flags and corresponding YAML configurations. This is part of a change that makes streaming required between queriers and ingesters and store-gateways. Streaming has been the default since Mimir 2.14. #12790 #12818
2020
* [CHANGE] Remove support for the experimental read-write deployment mode. #12584
2121
* [CHANGE] Store-gateway: Update default value of `-store-gateway.dynamic-replication.multiple` to `5` to increase replication of recent blocks. #12433
2222
* [CHANGE] Cost attribution: Reduce the default maximum per-user cardinality of cost attribution labels to 2000. #12625

pkg/querier/blocks_store_queryable.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1236,9 +1236,13 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skip
12361236
}
12371237

12381238
if skipChunks {
1239-
// We don't do the streaming call if we are not requesting the chunks.
1239+
// We don't do the streaming call if we are not requesting the chunks. Note that setting
1240+
// a batch size of 0 is ignored in newer store-gateways as the streaming code path is always
1241+
// used. We set this to 0 anyway here so that newer queriers will continue to work with
1242+
// older store-gateways that have not been updated to ignore the value 0.
12401243
streamingBatchSize = 0
12411244
}
1245+
12421246
return &storepb.SeriesRequest{
12431247
MinTime: minT,
12441248
MaxTime: maxT,

pkg/storegateway/bucket.go

Lines changed: 64 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ const (
6767
labelDecode = "decode"
6868

6969
targetQueryStreamBatchMessageSize = 1 * 1024 * 1024
70+
fallbackStreamingBatchSize = 256
7071
)
7172

7273
type BucketStoreStats struct {
@@ -555,10 +556,17 @@ type seriesChunks struct {
555556

556557
// Series implements the storegatewaypb.StoreGatewayServer interface.
557558
func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) (err error) {
558-
if req.SkipChunks {
559-
// We don't do the streaming call if we are not requesting the chunks.
560-
req.StreamingChunksBatchSize = 0
559+
ctx := srv.Context()
560+
spanLogger := spanlogger.FromContext(ctx, s.logger)
561+
562+
// Previously, a batch size of 0 was used to indicate this was a Prometheus series request
563+
// that didn't need to load any chunk data (in addition to the SkipChunks field) and used
564+
// the non-streaming path. Now that Prometheus series requests use the streaming path we
565+
// need to make sure that the batch size is always non-zero.
566+
if req.StreamingChunksBatchSize == 0 {
567+
req.StreamingChunksBatchSize = fallbackStreamingBatchSize
561568
}
569+
562570
defer func() { err = mapSeriesError(err) }()
563571

564572
matchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
@@ -572,15 +580,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor
572580
return status.Error(codes.InvalidArgument, errors.Wrap(err, "parse query sharding label").Error())
573581
}
574582

575-
var (
576-
spanLogger = spanlogger.FromContext(srv.Context(), s.logger)
577-
ctx = srv.Context()
578-
stats = newSafeQueryStats()
579-
reqBlockMatchers []*labels.Matcher
580-
)
583+
stats := newSafeQueryStats()
581584
defer s.recordSeriesCallResult(stats)
582585
defer s.recordRequestAmbientTime(stats, time.Now())
583586

587+
var reqBlockMatchers []*labels.Matcher
584588
if req.Hints != nil {
585589
reqHints := &hintspb.SeriesRequestHints{}
586590
if err := types.UnmarshalAny(req.Hints, reqHints); err != nil {
@@ -597,17 +601,16 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor
597601

598602
blocks, indexReaders, chunkReaders := s.openBlocksForReading(ctx, req.SkipChunks, req.MinTime, req.MaxTime, reqBlockMatchers, stats)
599603
// We must keep the readers open until all their data has been sent.
600-
for _, r := range indexReaders {
601-
defer runutil.CloseWithLogOnErr(s.logger, r, "close block index reader")
602-
}
603-
for _, r := range chunkReaders {
604-
defer runutil.CloseWithLogOnErr(s.logger, r, "close block chunk reader")
605-
}
606-
607-
var readers *bucketChunkReaders
608-
if !req.SkipChunks {
609-
readers = newChunkReaders(chunkReaders)
610-
}
604+
defer func() {
605+
for _, r := range indexReaders {
606+
runutil.CloseWithLogOnErr(s.logger, r, "close block index reader")
607+
}
608+
}()
609+
defer func() {
610+
for _, r := range chunkReaders {
611+
runutil.CloseWithLogOnErr(s.logger, r, "close block chunk reader")
612+
}
613+
}()
611614

612615
// Wait for the query gate only after opening blocks. Opening blocks is usually fast (~1ms),
613616
// but sometimes it can take minutes if the block isn't loaded and there is a surge in queries for unloaded blocks.
@@ -617,92 +620,80 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor
617620
}
618621
defer done()
619622

623+
// Send hints about the blocks loaded for this query before sending series or stats.
624+
resHints := buildSeriesResponseHints(blocks)
625+
if err = s.sendHints(srv, resHints); err != nil {
626+
return err
627+
}
628+
629+
streamingSeriesCount := 0
620630
var (
631+
seriesSet storepb.SeriesSet
621632
streamingIterators *streamingSeriesIterators
622-
resHints = &hintspb.SeriesResponseHints{}
633+
seriesLoadStart = time.Now()
634+
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks"))
635+
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
623636
)
624-
for _, b := range blocks {
625-
resHints.AddQueriedBlock(b.meta.ULID)
626-
627-
if b.meta.Compaction.Level == 1 && b.meta.Thanos.Source == block.ReceiveSource && !b.queried.Load() {
628-
level.Debug(s.logger).Log("msg", "queried non-compacted block", "blockId", b.meta.ULID, "ooo", b.meta.Compaction.FromOutOfOrder())
629-
}
630637

631-
b.queried.Store(true)
632-
}
633-
if err := s.sendHints(srv, resHints); err != nil {
638+
seriesSet, streamingIterators, err = s.createIteratorForChunksStreamingLabelsPhase(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
639+
if err != nil {
634640
return err
635641
}
636642

637-
streamingSeriesCount := 0
638-
if req.StreamingChunksBatchSize > 0 {
639-
var (
640-
seriesSet storepb.SeriesSet
641-
seriesLoadStart = time.Now()
642-
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks"))
643-
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
644-
)
645-
646-
seriesSet, streamingIterators, err = s.createIteratorForChunksStreamingLabelsPhase(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
647-
if err != nil {
648-
return err
649-
}
650-
651-
streamingSeriesCount, err = s.sendStreamingSeriesLabelsAndStats(req, srv, stats, seriesSet)
652-
if err != nil {
653-
return err
654-
}
655-
spanLogger.DebugLog(
656-
"msg", "sent streaming series",
657-
"num_series", streamingSeriesCount,
658-
"duration", time.Since(seriesLoadStart),
659-
)
643+
streamingSeriesCount, err = s.sendStreamingSeriesLabelsAndStats(req, srv, stats, seriesSet)
644+
if err != nil {
645+
return err
646+
}
647+
spanLogger.DebugLog(
648+
"msg", "sent streaming series",
649+
"num_series", streamingSeriesCount,
650+
"duration", time.Since(seriesLoadStart),
651+
)
660652

661-
if streamingSeriesCount == 0 {
662-
// There is no series to send chunks for.
663-
return nil
664-
}
653+
if streamingSeriesCount == 0 {
654+
// There is no series to send chunks for.
655+
return nil
665656
}
666657

667658
// We create the limiter twice in the case of streaming so that we don't double count the series
668659
// and hit the limit prematurely.
669-
chunksLimiter := s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks"))
670-
seriesLimiter := s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
660+
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks"))
661+
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
671662

672663
start := time.Now()
673-
if req.StreamingChunksBatchSize > 0 {
664+
if !req.SkipChunks {
665+
readers := newChunkReaders(chunkReaders)
674666
seriesChunkIt := s.createIteratorForChunksStreamingChunksPhase(ctx, readers, stats, chunksLimiter, seriesLimiter, streamingIterators)
675667
err = s.sendStreamingChunks(req, srv, seriesChunkIt, stats, streamingSeriesCount)
676-
} else {
677-
var seriesSet storepb.SeriesSet
678-
seriesSet, err = s.createIteratorForNonChunksStreamingRequest(ctx, req, blocks, indexReaders, readers, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
679668
if err != nil {
680-
return err
669+
return
681670
}
682-
err = s.sendSeriesChunks(req, srv, seriesSet, stats)
683-
}
684-
if err != nil {
685-
return
686671
}
687672

688673
numSeries, numChunks := stats.seriesAndChunksCount()
689674
debugMessage := "sent series"
690-
if req.StreamingChunksBatchSize > 0 {
675+
if !req.SkipChunks {
691676
debugMessage = "sent streaming chunks"
692677
}
678+
693679
spanLogger.DebugLog(
694680
"msg", debugMessage,
695681
"num_series", numSeries,
696682
"num_chunks", numChunks,
697683
"duration", time.Since(start),
698684
)
699685

700-
if req.StreamingChunksBatchSize == 0 {
701-
// Stats were not sent before, so send it now.
702-
return s.sendStats(srv, stats)
686+
return nil
687+
}
688+
689+
func buildSeriesResponseHints(blocks []*bucketBlock) *hintspb.SeriesResponseHints {
690+
resHints := &hintspb.SeriesResponseHints{}
691+
for _, b := range blocks {
692+
resHints.AddQueriedBlock(b.meta.ULID)
693+
b.queried.Store(true)
703694
}
704695

705-
return nil
696+
return resHints
706697
}
707698

708699
func mapSeriesError(err error) error {
@@ -918,53 +909,6 @@ func (s *BucketStore) sendStreamingChunks(
918909
return it.Err()
919910
}
920911

921-
func (s *BucketStore) sendSeriesChunks(
922-
req *storepb.SeriesRequest,
923-
srv storegatewaypb.StoreGateway_SeriesServer,
924-
seriesSet storepb.SeriesSet,
925-
stats *safeQueryStats,
926-
) error {
927-
var (
928-
encodeDuration time.Duration
929-
sendDuration time.Duration
930-
seriesCount, chunksCount int
931-
)
932-
933-
defer stats.update(func(stats *queryStats) {
934-
stats.mergedSeriesCount += seriesCount
935-
stats.mergedChunksCount += chunksCount
936-
937-
stats.streamingSeriesEncodeResponseDuration += encodeDuration
938-
stats.streamingSeriesSendResponseDuration += sendDuration
939-
})
940-
941-
for seriesSet.Next() {
942-
seriesCount++
943-
// IMPORTANT: do not retain the memory returned by seriesSet.At() beyond this loop cycle
944-
// because the subsequent call to seriesSet.Next() may release it. But it is safe to hold
945-
// onto lset because the labels are not released.
946-
lset, chks := seriesSet.At()
947-
series := storepb.Series{
948-
Labels: mimirpb.FromLabelsToLabelAdapters(lset),
949-
}
950-
if !req.SkipChunks {
951-
series.Chunks = chks
952-
chunksCount += len(chks)
953-
s.metrics.chunkSizeBytes.Observe(float64(chunksSize(chks)))
954-
}
955-
956-
err := s.sendMessage("series", srv, storepb.NewSeriesResponse(&series), &encodeDuration, &sendDuration)
957-
if err != nil {
958-
return err
959-
}
960-
}
961-
if seriesSet.Err() != nil {
962-
return errors.Wrap(seriesSet.Err(), "expand series set")
963-
}
964-
965-
return nil
966-
}
967-
968912
func (s *BucketStore) sendMessage(typ string, srv storegatewaypb.StoreGateway_SeriesServer, msg interface{}, encodeDuration, sendDuration *time.Duration) error {
969913
// We encode it ourselves into a PreparedMsg in order to measure the time it takes.
970914
encodeBegin := time.Now()
@@ -1032,38 +976,6 @@ func chunksSize(chks []storepb.AggrChunk) (size int) {
1032976
return size
1033977
}
1034978

1035-
// createIteratorForNonChunksStreamingRequest is used when the streaming feature is not enabled.
1036-
func (s *BucketStore) createIteratorForNonChunksStreamingRequest(
1037-
ctx context.Context,
1038-
req *storepb.SeriesRequest,
1039-
blocks []*bucketBlock,
1040-
indexReaders map[ulid.ULID]*bucketIndexReader,
1041-
chunkReaders *bucketChunkReaders,
1042-
shardSelector *sharding.ShardSelector,
1043-
matchers []*labels.Matcher,
1044-
chunksLimiter ChunksLimiter,
1045-
seriesLimiter SeriesLimiter,
1046-
stats *safeQueryStats,
1047-
) (storepb.SeriesSet, error) {
1048-
strategy := defaultStrategy
1049-
if req.SkipChunks {
1050-
strategy = noChunkRefs
1051-
}
1052-
it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, strategy, nil)
1053-
if err != nil {
1054-
return nil, err
1055-
}
1056-
1057-
var set storepb.SeriesSet
1058-
if !req.SkipChunks {
1059-
ss := newChunksPreloadingIterator(ctx, s.logger, s.userID, *chunkReaders, it, s.maxSeriesPerBatch, stats)
1060-
set = newSeriesChunksSeriesSet(ss)
1061-
} else {
1062-
set = newSeriesSetWithoutChunks(ctx, it, stats)
1063-
}
1064-
return set, nil
1065-
}
1066-
1067979
// createIteratorForChunksStreamingLabelsPhase is used when streaming feature is enabled.
1068980
// It returns a series set that only contains the series labels without any chunks information.
1069981
// The streamingSeriesIterators should be re-used when getting chunks to save on computation.

pkg/storegateway/bucket_e2e_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -438,8 +438,8 @@ func testBucketStore_e2e(t *testing.T, ctx context.Context, s *storeSuite, addit
438438
},
439439
}
440440
for i, tcase := range append(testCases, additionalCases...) {
441-
for _, streamingBatchSize := range []int{0, 1, 5, 256} {
442-
if ok := t.Run(fmt.Sprintf("%d,streamingBatchSize=%d", i, streamingBatchSize), func(t *testing.T) {
441+
for _, streamingBatchSize := range []int{1, 5, 256} {
442+
t.Run(fmt.Sprintf("%d,streamingBatchSize=%d", i, streamingBatchSize), func(t *testing.T) {
443443
tcase.req.StreamingChunksBatchSize = uint64(streamingBatchSize)
444444
seriesSet, _, _, _, err := srv.Series(context.Background(), tcase.req)
445445
require.NoError(t, err)
@@ -451,9 +451,7 @@ func testBucketStore_e2e(t *testing.T, ctx context.Context, s *storeSuite, addit
451451
assert.Equal(t, tcase.expectedChunkLen, len(s.Chunks))
452452
}
453453
assertQueryStatsMetricsRecorded(t, len(tcase.expected), tcase.expectedChunkLen, s.metricsRegistry)
454-
}); !ok {
455-
return
456-
}
454+
})
457455
}
458456
}
459457
}

pkg/storegateway/bucket_store_server_test.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,6 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest
143143
}
144144

145145
if recvSeries := res.GetStreamingSeries(); recvSeries != nil {
146-
if req.StreamingChunksBatchSize == 0 || req.SkipChunks {
147-
err = errors.New("got a streaming series when streaming was disabled")
148-
return
149-
}
150-
151146
var recvSeriesData []byte
152147

153148
// We prefer to stay on the safest side at this stage
@@ -172,7 +167,7 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest
172167
}
173168
}
174169

175-
if req.StreamingChunksBatchSize > 0 && !req.SkipChunks {
170+
if !req.SkipChunks {
176171
res, err = stream.Recv()
177172
if err != nil {
178173
if errors.Is(err, io.EOF) {
@@ -250,6 +245,14 @@ func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest
250245
if errors.Is(err, io.EOF) {
251246
err = nil
252247
}
248+
} else {
249+
// The store-gateway returns a streaming response even when SkipChunks = true. In that
250+
// case, convert the streaming series responses into the expected series set.
251+
for _, streamingSeries := range streamingSeriesSet {
252+
seriesSet = append(seriesSet, &storepb.Series{
253+
Labels: streamingSeries.Labels,
254+
})
255+
}
253256
}
254257

255258
return

0 commit comments

Comments
 (0)