Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
"github.com/grafana/mimir/pkg/storage/tsdb/bucketcache"
"github.com/grafana/mimir/pkg/storage/tsdb/bucketindex"
"github.com/grafana/mimir/pkg/storegateway/hintspb"
"github.com/grafana/mimir/pkg/storegateway/indexcache"
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
Expand Down Expand Up @@ -88,6 +89,7 @@ type BucketStore struct {
logger log.Logger
metrics *BucketStoreMetrics
bkt objstore.InstrumentedBucketReader
bucketIndexMeta BucketIndexMetadataReader
fetcher block.MetadataFetcher
dir string
indexCache indexcache.IndexCache
Expand Down Expand Up @@ -203,6 +205,7 @@ func WithLazyLoadingGate(lazyLoadingGate gate.Gate) BucketStoreOption {
func NewBucketStore(
userID string,
bkt objstore.InstrumentedBucketReader,
bucketIndexMeta BucketIndexMetadataReader,
fetcher block.MetadataFetcher,
dir string,
bucketStoreConfig tsdb.BucketStoreConfig,
Expand All @@ -217,6 +220,7 @@ func NewBucketStore(
s := &BucketStore{
logger: log.NewNopLogger(),
bkt: bkt,
bucketIndexMeta: bucketIndexMeta,
fetcher: fetcher,
dir: dir,
indexCache: noopCache{},
Expand Down Expand Up @@ -600,6 +604,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor

logSeriesRequestToSpan(spanLogger, req.MinTime, req.MaxTime, matchers, reqBlockMatchers, shardSelector, req.StreamingChunksBatchSize)

defer s.recordBucketIndexDiscoveryDiff(ctx)

blocks, indexReaders, chunkReaders := s.openBlocksForReading(ctx, req.SkipChunks, req.MinTime, req.MaxTime, reqBlockMatchers, stats)
// We must keep the readers open until all their data has been sent.
defer func() {
Expand Down Expand Up @@ -1205,6 +1211,27 @@ func (s *BucketStore) recordSeriesHashCacheStats(stats *queryStats) {
s.metrics.seriesHashCacheHits.Add(float64(stats.seriesHashCacheHits))
}

func (s *BucketStore) recordBucketIndexDiscoveryDiff(ctx context.Context) {
reqUpdatedAt := getBucketIndexUpdatedAtFromGRPCContext(ctx)
meta := s.bucketIndexMeta.Metadata()

diff := meta.UpdatedAt - reqUpdatedAt

level.Debug(spanlogger.FromContext(ctx, s.logger)).Log("msg", "bucket index versions (updated_at)", "ours", meta.UpdatedAt, "requested", reqUpdatedAt, "diff", diff)

if diff == 0 {
// Skip recording the difference in metrics when versions are the same.
return
}
var group string
if diff > 0 {
group = labelDiscoveryDiffNewer
} else {
group = labelDiscoveryDiffOlder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our hypothesis that (typically) there should be zero cases, where queriers discover a new version of bucket-index faster than store-gateways

If this is the case, it might be cheaper to just log at warning level when it's unexpectedly older, instead of introducing new metrics that are always recorded but in theory will be entirely in the Newer category?

Copy link
Contributor Author

@narqo narqo Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now as you mentioned this, I wonder if we really need this metric at all — how about we start with only a warning (and a debug for normal case)? WDYT? Ref 31d7e2d

}
s.metrics.bucketIndexDiscoveryDiffs.WithLabelValues(group).Inc()
}

func (s *BucketStore) openBlocksForReading(ctx context.Context, skipChunks bool, minT, maxT int64, blockMatchers []*labels.Matcher, stats *safeQueryStats) ([]*bucketBlock, map[ulid.ULID]*bucketIndexReader, map[ulid.ULID]chunkReader) {
spanCtx, span := tracer.Start(ctx, "bucket_store_open_blocks_for_reading")
defer span.End()
Expand Down Expand Up @@ -1270,6 +1297,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
}
}

defer s.recordBucketIndexDiscoveryDiff(ctx)

g, gctx := errgroup.WithContext(ctx)

var setsMtx sync.Mutex
Expand Down Expand Up @@ -1463,6 +1492,8 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
}
}

defer s.recordBucketIndexDiscoveryDiff(ctx)

var setsMtx sync.Mutex
var sets [][]string
s.blockSet.filter(req.Start, req.End, reqBlockMatchers, func(b *bucketBlock) {
Expand Down Expand Up @@ -2112,3 +2143,7 @@ func maybeNilShard(shard *sharding.ShardSelector) sharding.ShardSelector {
}
return *shard
}

type BucketIndexMetadataReader interface {
Metadata() *bucketindex.Metadata
}
1 change: 1 addition & 0 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS
store, err := NewBucketStore(
userID,
objstore.WithNoopInstr(bkt),
newTestBucketIndexMetadataReader(t, bkt, userID),
metaFetcher,
cfg.tempDir,
cfg.bucketStoreConfig,
Expand Down
21 changes: 21 additions & 0 deletions pkg/storegateway/bucket_index_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,24 @@ func (f *BucketIndexBlockMetadataFetcher) Fetch(ctx context.Context) (metas map[

return metas, nil, nil
}

type bucketIndexMetadataReader struct {
indexReader interface {
Index() *bucketindex.Index
}
}

// newBucketIndexMetadataReaderFromLoader is an adapter from BucketIndexLoader to BucketIndexMetadataReader.
func newBucketIndexMetadataReaderFromLoader(loader *BucketIndexLoader) BucketIndexMetadataReader {
return &bucketIndexMetadataReader{
indexReader: loader,
}
}

func (r *bucketIndexMetadataReader) Metadata() *bucketindex.Metadata {
idx := r.indexReader.Index()
if idx == nil {
return &bucketindex.Metadata{}
}
return idx.Metadata()
}
41 changes: 27 additions & 14 deletions pkg/storegateway/bucket_store_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,31 @@ import (
"github.com/grafana/mimir/pkg/storage/indexheader"
)

const (
labelDiscoveryDiffOlder = "older"
labelDiscoveryDiffNewer = "newer"
)

// BucketStoreMetrics holds all the metrics tracked by BucketStore. These metrics
// MUST be monotonic (counter, summary, histogram) because a single metrics instance
// can be passed to multiple BucketStore and metrics MUST be correct even after a
// BucketStore is offloaded.
type BucketStoreMetrics struct {
blockLoads prometheus.Counter
blockLoadFailures prometheus.Counter
blockDrops prometheus.Counter
blockDropFailures prometheus.Counter
blockDiscoveryLatency prometheus.Histogram
seriesDataTouched *prometheus.SummaryVec
seriesDataFetched *prometheus.SummaryVec
seriesDataSizeTouched *prometheus.SummaryVec
seriesDataSizeFetched *prometheus.SummaryVec
seriesBlocksQueried *prometheus.SummaryVec
resultSeriesCount prometheus.Summary
chunkSizeBytes prometheus.Histogram
queriesDropped *prometheus.CounterVec
seriesRefetches prometheus.Counter
blockLoads prometheus.Counter
blockLoadFailures prometheus.Counter
blockDrops prometheus.Counter
blockDropFailures prometheus.Counter
blockDiscoveryLatency prometheus.Histogram
bucketIndexDiscoveryDiffs *prometheus.CounterVec
seriesDataTouched *prometheus.SummaryVec
seriesDataFetched *prometheus.SummaryVec
seriesDataSizeTouched *prometheus.SummaryVec
seriesDataSizeFetched *prometheus.SummaryVec
seriesBlocksQueried *prometheus.SummaryVec
resultSeriesCount prometheus.Summary
chunkSizeBytes prometheus.Histogram
queriesDropped *prometheus.CounterVec
seriesRefetches prometheus.Counter

// Metrics tracked when streaming store-gateway is enabled.
streamingSeriesRequestDurationByStage *prometheus.HistogramVec
Expand Down Expand Up @@ -85,6 +91,13 @@ func NewBucketStoreMetrics(reg prometheus.Registerer) *BucketStoreMetrics {
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
})
m.bucketIndexDiscoveryDiffs = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_bucket_store_bucket_index_discovery_difference_total",
Help: "Total number of cases of difference between store-gateway and querier versions of bucket index UpdatedAt. diff=older|newer is from the store-gateway side, meaning its version of bucket index is behind/ahead of querier.",
}, []string{"diff"})
m.bucketIndexDiscoveryDiffs.WithLabelValues(labelDiscoveryDiffOlder)
m.bucketIndexDiscoveryDiffs.WithLabelValues(labelDiscoveryDiffNewer)

m.seriesDataTouched = promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{
Name: "cortex_bucket_store_series_data_touched",
Help: "How many items of a data type in a block were touched for a single Series/LabelValues/LabelNames request.",
Expand Down
12 changes: 12 additions & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -538,6 +539,7 @@ func (u *BucketStores) getOrCreateStore(ctx context.Context, userID string) (*Bu
bs, err := NewBucketStore(
userID,
userBkt,
newBucketIndexMetadataReaderFromLoader(loader),
fetcher,
u.syncDirForUser(userID),
u.cfg.BucketStore,
Expand Down Expand Up @@ -645,6 +647,16 @@ func getUserIDFromGRPCContext(ctx context.Context) string {
return values[0]
}

func getBucketIndexUpdatedAtFromGRPCContext(ctx context.Context) int64 {
values := metadata.ValueFromIncomingContext(ctx, GrpcContextMetadataBucketIndexUpdatedAt)
if len(values) != 1 {
return 0
}
// Ignore any parsing errors because BucketIndexUpdatedAt is best-effort.
value, _ := strconv.ParseInt(values[0], 10, 64)
return value
}

type spanSeriesServer struct {
storegatewaypb.StoreGateway_SeriesServer

Expand Down
18 changes: 14 additions & 4 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1456,6 +1456,7 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries
st, err := NewBucketStore(
"test",
ibkt,
newTestBucketIndexMetadataReader(t, bkt, "test"),
f,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down Expand Up @@ -1588,6 +1589,7 @@ func TestBucketStore_Series_Concurrency(t *testing.T) {
store, err := NewBucketStore(
"tenant",
instrumentedBucket,
newTestBucketIndexMetadataReader(t, instrumentedBucket, "tenant"),
metaFetcher,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down Expand Up @@ -1757,10 +1759,11 @@ func TestBucketStore_Series_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
}

store := &BucketStore{
userID: "test",
bkt: objstore.WithNoopInstr(bkt),
logger: logger,
indexCache: indexCache,
userID: "test",
bkt: objstore.WithNoopInstr(bkt),
bucketIndexMeta: newTestBucketIndexMetadataReader(t, bkt, "test"),
logger: logger,
indexCache: indexCache,
indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), indexheader.Config{
LazyLoadingEnabled: false,
LazyLoadingIdleTimeout: 0,
Expand Down Expand Up @@ -1920,6 +1923,7 @@ func TestBucketStore_Series_ErrorUnmarshallingRequestHints(t *testing.T) {
store, err := NewBucketStore(
"test",
instrBkt,
newTestBucketIndexMetadataReader(t, bkt, "test"),
fetcher,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down Expand Up @@ -1977,6 +1981,7 @@ func TestBucketStore_Series_CanceledRequest(t *testing.T) {
store, err := NewBucketStore(
"test",
instrBkt,
newTestBucketIndexMetadataReader(t, instrBkt, "test"),
fetcher,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down Expand Up @@ -2049,6 +2054,7 @@ func TestBucketStore_Series_TimeoutGate(t *testing.T) {
store, err := NewBucketStore(
"test",
instrBkt,
newTestBucketIndexMetadataReader(t, instrBkt, "test"),
fetcher,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down Expand Up @@ -2127,6 +2133,7 @@ func TestBucketStore_Series_InvalidRequest(t *testing.T) {
store, err := NewBucketStore(
"test",
instrBkt,
newTestBucketIndexMetadataReader(t, instrBkt, "test"),
fetcher,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down Expand Up @@ -2253,6 +2260,7 @@ func testBucketStoreSeriesBlockWithMultipleChunks(
store, err := NewBucketStore(
"tenant",
instrBkt,
newTestBucketIndexMetadataReader(t, instrBkt, "tenant"),
fetcher,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down Expand Up @@ -2414,6 +2422,7 @@ func TestBucketStore_Series_Limits(t *testing.T) {
store, err := NewBucketStore(
"tenant",
instrBkt,
newTestBucketIndexMetadataReader(t, instrBkt, "tenant"),
fetcher,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down Expand Up @@ -2533,6 +2542,7 @@ func setupStoreForHintsTest(t *testing.T, maxSeriesPerBatch int, opts ...BucketS
store, err := NewBucketStore(
"tenant",
instrBkt,
newTestBucketIndexMetadataReader(t, instrBkt, "tenant"),
fetcher,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down
19 changes: 19 additions & 0 deletions pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1675,3 +1675,22 @@ func createBucketIndex(t testing.TB, bkt objstore.Bucket, userID string) *bucket

return idx
}

type testBucketIndexMetadataReader struct {
t testing.TB
bkt objstore.Bucket
userID string
}

func newTestBucketIndexMetadataReader(t testing.TB, bkt objstore.Bucket, userID string) *testBucketIndexMetadataReader {
return &testBucketIndexMetadataReader{
t: t,
bkt: bkt,
userID: userID,
}
}

func (t testBucketIndexMetadataReader) Metadata() *bucketindex.Metadata {
idx := createBucketIndex(t.t, t.bkt, t.userID)
return idx.Metadata()
}
Loading