diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 455e983c1ed..29a52ef5fc8 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -49,6 +49,7 @@ import ( "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" + "github.com/grafana/mimir/pkg/storage/tsdb/bucketindex" "github.com/grafana/mimir/pkg/storegateway/hintspb" "github.com/grafana/mimir/pkg/storegateway/indexcache" "github.com/grafana/mimir/pkg/storegateway/storegatewaypb" @@ -88,6 +89,7 @@ type BucketStore struct { logger log.Logger metrics *BucketStoreMetrics bkt objstore.InstrumentedBucketReader + bucketIndexMeta BucketIndexMetadataReader fetcher block.MetadataFetcher dir string indexCache indexcache.IndexCache @@ -203,6 +205,7 @@ func WithLazyLoadingGate(lazyLoadingGate gate.Gate) BucketStoreOption { func NewBucketStore( userID string, bkt objstore.InstrumentedBucketReader, + bucketIndexMeta BucketIndexMetadataReader, fetcher block.MetadataFetcher, dir string, bucketStoreConfig tsdb.BucketStoreConfig, @@ -217,6 +220,7 @@ func NewBucketStore( s := &BucketStore{ logger: log.NewNopLogger(), bkt: bkt, + bucketIndexMeta: bucketIndexMeta, fetcher: fetcher, dir: dir, indexCache: noopCache{}, @@ -600,6 +604,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor logSeriesRequestToSpan(spanLogger, req.MinTime, req.MaxTime, matchers, reqBlockMatchers, shardSelector, req.StreamingChunksBatchSize) + defer s.recordBucketIndexDiscoveryDiff(ctx) + blocks, indexReaders, chunkReaders := s.openBlocksForReading(ctx, req.SkipChunks, req.MinTime, req.MaxTime, reqBlockMatchers, stats) // We must keep the readers open until all their data has been sent. defer func() { @@ -1205,6 +1211,26 @@ func (s *BucketStore) recordSeriesHashCacheStats(stats *queryStats) { s.metrics.seriesHashCacheHits.Add(float64(stats.seriesHashCacheHits)) } +func (s *BucketStore) recordBucketIndexDiscoveryDiff(ctx context.Context) { + reqUpdatedAt, err := getBucketIndexUpdatedAtFromGRPCContext(ctx) + if err != nil { + // This is not a problem by itself. + level.Warn(spanlogger.FromContext(ctx, s.logger)).Log("msg", "can't get bucket index versions (updated_at) from request", "err", err) + return + } + + meta := s.bucketIndexMeta.Metadata() + diff := meta.UpdatedAt - reqUpdatedAt + + logger := log.With(s.logger, "ours", meta.UpdatedAt, "requested", reqUpdatedAt, "diff", diff) + + if diff < 0 { + level.Warn(spanlogger.FromContext(ctx, logger)).Log("msg", "bucket index version (updated_at) is older than requested") + } else { + level.Debug(spanlogger.FromContext(ctx, logger)).Log("msg", "bucket index versions (updated_at)") + } +} + func (s *BucketStore) openBlocksForReading(ctx context.Context, skipChunks bool, minT, maxT int64, blockMatchers []*labels.Matcher, stats *safeQueryStats) ([]*bucketBlock, map[ulid.ULID]*bucketIndexReader, map[ulid.ULID]chunkReader) { spanCtx, span := tracer.Start(ctx, "bucket_store_open_blocks_for_reading") defer span.End() @@ -1270,6 +1296,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq } } + defer s.recordBucketIndexDiscoveryDiff(ctx) + g, gctx := errgroup.WithContext(ctx) var setsMtx sync.Mutex @@ -1463,6 +1491,8 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR } } + defer s.recordBucketIndexDiscoveryDiff(ctx) + var setsMtx sync.Mutex var sets [][]string s.blockSet.filter(req.Start, req.End, reqBlockMatchers, func(b *bucketBlock) { @@ -2112,3 +2142,7 @@ func maybeNilShard(shard *sharding.ShardSelector) sharding.ShardSelector { } return *shard } + +type BucketIndexMetadataReader interface { + Metadata() *bucketindex.Metadata +} diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index 2753fa9b866..be4b8d3b272 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -207,6 +207,7 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS store, err := NewBucketStore( userID, objstore.WithNoopInstr(bkt), + newTestBucketIndexMetadataReader(t, bkt, userID), metaFetcher, cfg.tempDir, cfg.bucketStoreConfig, diff --git a/pkg/storegateway/bucket_index_metadata.go b/pkg/storegateway/bucket_index_metadata.go index 8c1994cbe0a..81245fcffe6 100644 --- a/pkg/storegateway/bucket_index_metadata.go +++ b/pkg/storegateway/bucket_index_metadata.go @@ -193,3 +193,24 @@ func (f *BucketIndexBlockMetadataFetcher) Fetch(ctx context.Context) (metas map[ return metas, nil, nil } + +type bucketIndexMetadataReader struct { + indexReader interface { + Index() *bucketindex.Index + } +} + +// newBucketIndexMetadataReaderFromLoader is an adapter from BucketIndexLoader to BucketIndexMetadataReader. +func newBucketIndexMetadataReaderFromLoader(loader *BucketIndexLoader) BucketIndexMetadataReader { + return &bucketIndexMetadataReader{ + indexReader: loader, + } +} + +func (r *bucketIndexMetadataReader) Metadata() *bucketindex.Metadata { + idx := r.indexReader.Index() + if idx == nil { + return &bucketindex.Metadata{} + } + return idx.Metadata() +} diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 45063a4eb94..06f62411807 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -10,6 +10,7 @@ import ( "fmt" "os" "path/filepath" + "strconv" "sync" "time" @@ -538,6 +539,7 @@ func (u *BucketStores) getOrCreateStore(ctx context.Context, userID string) (*Bu bs, err := NewBucketStore( userID, userBkt, + newBucketIndexMetadataReaderFromLoader(loader), fetcher, u.syncDirForUser(userID), u.cfg.BucketStore, @@ -645,6 +647,18 @@ func getUserIDFromGRPCContext(ctx context.Context) string { return values[0] } +func getBucketIndexUpdatedAtFromGRPCContext(ctx context.Context) (int64, error) { + values := metadata.ValueFromIncomingContext(ctx, GrpcContextMetadataBucketIndexUpdatedAt) + if len(values) != 1 { + return 0, fmt.Errorf("no bucket index updated at") + } + value, err := strconv.ParseInt(values[0], 10, 64) + if err != nil { + return 0, fmt.Errorf("parse bucket index updated at value %q: %w", values[0], err) + } + return value, nil +} + type spanSeriesServer struct { storegatewaypb.StoreGateway_SeriesServer diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 0d57b7a97c3..a7091c167e9 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -1456,6 +1456,7 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries st, err := NewBucketStore( "test", ibkt, + newTestBucketIndexMetadataReader(t, bkt, "test"), f, tmpDir, mimir_tsdb.BucketStoreConfig{ @@ -1588,6 +1589,7 @@ func TestBucketStore_Series_Concurrency(t *testing.T) { store, err := NewBucketStore( "tenant", instrumentedBucket, + newTestBucketIndexMetadataReader(t, instrumentedBucket, "tenant"), metaFetcher, tmpDir, mimir_tsdb.BucketStoreConfig{ @@ -1757,10 +1759,11 @@ func TestBucketStore_Series_OneBlock_InMemIndexCacheSegfault(t *testing.T) { } store := &BucketStore{ - userID: "test", - bkt: objstore.WithNoopInstr(bkt), - logger: logger, - indexCache: indexCache, + userID: "test", + bkt: objstore.WithNoopInstr(bkt), + bucketIndexMeta: newTestBucketIndexMetadataReader(t, bkt, "test"), + logger: logger, + indexCache: indexCache, indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), indexheader.Config{ LazyLoadingEnabled: false, LazyLoadingIdleTimeout: 0, @@ -1920,6 +1923,7 @@ func TestBucketStore_Series_ErrorUnmarshallingRequestHints(t *testing.T) { store, err := NewBucketStore( "test", instrBkt, + newTestBucketIndexMetadataReader(t, bkt, "test"), fetcher, tmpDir, mimir_tsdb.BucketStoreConfig{ @@ -1977,6 +1981,7 @@ func TestBucketStore_Series_CanceledRequest(t *testing.T) { store, err := NewBucketStore( "test", instrBkt, + newTestBucketIndexMetadataReader(t, instrBkt, "test"), fetcher, tmpDir, mimir_tsdb.BucketStoreConfig{ @@ -2049,6 +2054,7 @@ func TestBucketStore_Series_TimeoutGate(t *testing.T) { store, err := NewBucketStore( "test", instrBkt, + newTestBucketIndexMetadataReader(t, instrBkt, "test"), fetcher, tmpDir, mimir_tsdb.BucketStoreConfig{ @@ -2127,6 +2133,7 @@ func TestBucketStore_Series_InvalidRequest(t *testing.T) { store, err := NewBucketStore( "test", instrBkt, + newTestBucketIndexMetadataReader(t, instrBkt, "test"), fetcher, tmpDir, mimir_tsdb.BucketStoreConfig{ @@ -2253,6 +2260,7 @@ func testBucketStoreSeriesBlockWithMultipleChunks( store, err := NewBucketStore( "tenant", instrBkt, + newTestBucketIndexMetadataReader(t, instrBkt, "tenant"), fetcher, tmpDir, mimir_tsdb.BucketStoreConfig{ @@ -2414,6 +2422,7 @@ func TestBucketStore_Series_Limits(t *testing.T) { store, err := NewBucketStore( "tenant", instrBkt, + newTestBucketIndexMetadataReader(t, instrBkt, "tenant"), fetcher, tmpDir, mimir_tsdb.BucketStoreConfig{ @@ -2533,6 +2542,7 @@ func setupStoreForHintsTest(t *testing.T, maxSeriesPerBatch int, opts ...BucketS store, err := NewBucketStore( "tenant", instrBkt, + newTestBucketIndexMetadataReader(t, instrBkt, "tenant"), fetcher, tmpDir, mimir_tsdb.BucketStoreConfig{ diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index d95c4e7ac87..84377a73f7f 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -1675,3 +1675,22 @@ func createBucketIndex(t testing.TB, bkt objstore.Bucket, userID string) *bucket return idx } + +type testBucketIndexMetadataReader struct { + t testing.TB + bkt objstore.Bucket + userID string +} + +func newTestBucketIndexMetadataReader(t testing.TB, bkt objstore.Bucket, userID string) *testBucketIndexMetadataReader { + return &testBucketIndexMetadataReader{ + t: t, + bkt: bkt, + userID: userID, + } +} + +func (t testBucketIndexMetadataReader) Metadata() *bucketindex.Metadata { + idx := createBucketIndex(t.t, t.bkt, t.userID) + return idx.Metadata() +}