Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* [CHANGE] Query-frontend: Use the Mimir Query Engine (MQE) by default. #12361
* [CHANGE] Query-frontend: Remove the CLI flags `-querier.frontend-address`, `-querier.max-outstanding-requests-per-tenant`, and `-query-frontend.querier-forget-delay` and corresponding YAML configurations. This is part of a change that makes the query-scheduler a required component. This removes the ability to run the query-frontend with an embedded query-scheduler. Instead, you must run a dedicated query-scheduler component. #12200
* [CHANGE] Ingester: Remove deprecated `-ingester.stream-chunks-when-using-blocks` CLI flag and `ingester_stream_chunks_when_using_blocks` runtime configuration option. #12615
* [CHANGE] Querier: Require non-zero values for `-querier.streaming-chunks-per-ingester-buffer-size` and `-querier.streaming-chunks-per-store-gateway-buffer-size` CLI flags and corresponding YAML configurations. This is part of a change that makes streaming required between queriers and ingesters and store-gateways. Streaming has been the default since Mimir 2.14. #12790
* [CHANGE] Querier: Require non-zero values for `-querier.streaming-chunks-per-ingester-buffer-size` and `-querier.streaming-chunks-per-store-gateway-buffer-size` CLI flags and corresponding YAML configurations. This is part of a change that makes streaming required between queriers, ingesters, and store-gateways. Streaming has been the default since Mimir 2.14. #12790 #12818
* [CHANGE] Remove support for the experimental read-write deployment mode. #12584
* [CHANGE] Store-gateway: Update default value of `-store-gateway.dynamic-replication.multiple` to `5` to increase replication of recent blocks. #12433
* [CHANGE] Cost attribution: Reduce the default maximum per-user cardinality of cost attribution labels to 2000. #12625
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func convertMatchersToLabelMatcher(matchers []*labels.Matcher) []storepb.LabelMa
return converted
}

// TODO: Remove after non-streaming removed from querier
// Implementation of storage.SeriesSet, based on individual responses from store client.
type blockQuerierSeriesSet struct {
series []*storepb.Series
Expand Down
75 changes: 53 additions & 22 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ func canBlockWithCompactorShardIndexContainQueryShard(queryShardIndex, queryShar
// considered serious errors. All other errors are not returned, but they give rise to fetch retrials.
//
// In case of a successful run, fetchSeriesFromStores returns a startStreamingChunks function to start streaming
// chunks for the fetched series iff it was a streaming call for series+chunks. startStreamingChunks must be called
// chunks for the fetched series if it was a streaming call for series+chunks. startStreamingChunks must be called
// before iterating on the series.
func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *storage.SelectHints, clients map[BlocksStoreClient][]ulid.ULID, minT int64, maxT int64, tenantID string, convertedMatchers []storepb.LabelMatcher) (_ []storage.SeriesSet, _ []ulid.ULID, _ annotations.Annotations, streamReaders []*storeGatewayStreamReader, estimateChunks func() int, _ error) {
var (
Expand All @@ -792,9 +792,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
// Concurrently fetch series from all clients.
for c, blockIDs := range clients {
g.Go(func() error {
log, reqCtx := spanlogger.New(reqCtx, spanLog, tracer, "blocksStoreQuerier.fetchSeriesFromStores")
defer log.Finish()
log.SetTag("store_gateway_address", c.RemoteAddress())
clientSpanLog, clientCtx := spanlogger.New(reqCtx, spanLog, tracer, "blocksStoreQuerier.fetchSeriesFromStores")
defer clientSpanLog.Finish()
clientSpanLog.SetTag("store_gateway_address", c.RemoteAddress())

// See: https://github.com/prometheus/prometheus/pull/8050
// TODO(goutham): we should ideally be passing the hints down to the storage layer
Expand All @@ -807,7 +807,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
return errors.Wrapf(err, "failed to create series request")
}

stream, err := c.Series(reqCtx, req)
stream, err := c.Series(clientCtx, req)
if err == nil {
mtx.Lock()
streams = append(streams, stream)
Expand All @@ -816,7 +816,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
}
if err != nil {
if shouldRetry(err) {
level.Warn(log).Log("msg", "failed to fetch series", "remote", c.RemoteAddress(), "err", err)
level.Warn(clientSpanLog).Log("msg", "failed to fetch series", "remote", c.RemoteAddress(), "err", err)
return nil
}

Expand Down Expand Up @@ -851,7 +851,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
return err
}
if shouldRetry {
level.Warn(log).Log("msg", "failed to receive series", "remote", c.RemoteAddress(), "err", err)
level.Warn(clientSpanLog).Log("msg", "failed to receive series", "remote", c.RemoteAddress(), "err", err)
return nil
}

Expand Down Expand Up @@ -883,14 +883,16 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
reqStats.AddFetchedChunkBytes(uint64(chunkBytes))
reqStats.AddFetchedChunks(uint64(chunksFetched))

level.Debug(log).Log("msg", "received series from store-gateway",
clientSpanLog.DebugLog(
"msg", "received series from store-gateway",
"instance", c.RemoteAddress(),
"fetched series", len(mySeries),
"fetched chunk bytes", chunkBytes,
"fetched chunks", chunksFetched,
"fetched index bytes", indexBytesFetched,
"requested blocks", strings.Join(convertULIDsToString(blockIDs), " "),
"queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "))
"queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "),
)
if chunkInfo != nil {
for i, s := range mySeries {
chunkInfo.StartSeries(mimirpb.FromLabelAdaptersToLabels(s.Labels))
Expand All @@ -901,18 +903,33 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
} else if len(myStreamingSeriesLabels) > 0 {
// FetchedChunks and FetchedChunkBytes are added by the SeriesChunksStreamReader.
reqStats.AddFetchedSeries(uint64(len(myStreamingSeriesLabels)))
streamReader = newStoreGatewayStreamReader(reqCtx, stream, len(myStreamingSeriesLabels), queryLimiter, memoryTracker, reqStats, q.metrics, q.logger)
level.Debug(log).Log("msg", "received streaming series from store-gateway",

if req.SkipChunks {
// If we aren't creating a stream reader for reading chunks, we need to close the stream
// ourselves. It's safe to close the stream multiple times so we don't worry about closing
// all streams below when there's an error.
if err := util.CloseAndExhaust[*storepb.SeriesResponse](stream); err != nil {
level.Warn(clientSpanLog).Log("msg", "closing store-gateway client stream failed", "err", err)
}
} else {
streamReader = newStoreGatewayStreamReader(clientCtx, stream, len(myStreamingSeriesLabels), queryLimiter, memoryTracker, reqStats, q.metrics, q.logger)
}

clientSpanLog.DebugLog(
"msg", "received streaming series from store-gateway",
"instance", c.RemoteAddress(),
"fetched series", len(myStreamingSeriesLabels),
"fetched index bytes", indexBytesFetched,
"requested blocks", strings.Join(convertULIDsToString(blockIDs), " "),
"queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "))
"queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "),
)
} else {
level.Debug(log).Log("msg", "received no series from store-gateway",
clientSpanLog.DebugLog(
"msg", "received no series from store-gateway",
"instance", c.RemoteAddress(),
"requested blocks", strings.Join(convertULIDsToString(blockIDs), " "),
"queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "))
"queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "),
)
}

// Store the result.
Expand All @@ -923,13 +940,23 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
if chunkInfo != nil {
chunkInfo.SetMsg("store-gateway streaming")
}
seriesSets = append(seriesSets, &blockStreamingQuerierSeriesSet{
series: myStreamingSeriesLabels,
streamReader: streamReader,
chunkInfo: chunkInfo,
remoteAddress: c.RemoteAddress(),
})
streamReaders = append(streamReaders, streamReader)

if req.SkipChunks {
noChunkSeries := make([]storage.Series, 0, len(myStreamingSeriesLabels))
for _, lbls := range myStreamingSeriesLabels {
noChunkSeries = append(noChunkSeries, series.NewConcreteSeries(lbls, nil, nil))
}

seriesSets = append(seriesSets, series.NewConcreteSeriesSetFromSortedSeries(noChunkSeries))
} else {
seriesSets = append(seriesSets, &blockStreamingQuerierSeriesSet{
series: myStreamingSeriesLabels,
streamReader: streamReader,
chunkInfo: chunkInfo,
remoteAddress: c.RemoteAddress(),
})
streamReaders = append(streamReaders, streamReader)
}
}
warnings.Merge(myWarnings)
queriedBlocks = append(queriedBlocks, myQueriedBlocks...)
Expand Down Expand Up @@ -1236,9 +1263,13 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skip
}

if skipChunks {
// We don't do the streaming call if we are not requesting the chunks.
// We don't do the streaming call if we are not requesting the chunks. Note that setting
// a batch size of 0 is ignored in newer store-gateways as the streaming code path is always
// used. We set this to 0 anyway here so that newer queriers will continue to work with
// older store-gateways that have not been updated to ignore the value 0.
streamingBatchSize = 0
}

return &storepb.SeriesRequest{
MinTime: minT,
MaxTime: maxT,
Expand Down
Loading