Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
"github.com/grafana/mimir/pkg/storage/tsdb/bucketcache"
"github.com/grafana/mimir/pkg/storage/tsdb/bucketindex"
"github.com/grafana/mimir/pkg/storegateway/hintspb"
"github.com/grafana/mimir/pkg/storegateway/indexcache"
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
Expand Down Expand Up @@ -88,6 +89,7 @@ type BucketStore struct {
logger log.Logger
metrics *BucketStoreMetrics
bkt objstore.InstrumentedBucketReader
bucketIndexMeta BucketIndexMetadataReader
fetcher block.MetadataFetcher
dir string
indexCache indexcache.IndexCache
Expand Down Expand Up @@ -203,6 +205,7 @@ func WithLazyLoadingGate(lazyLoadingGate gate.Gate) BucketStoreOption {
func NewBucketStore(
userID string,
bkt objstore.InstrumentedBucketReader,
bucketIndexMeta BucketIndexMetadataReader,
fetcher block.MetadataFetcher,
dir string,
bucketStoreConfig tsdb.BucketStoreConfig,
Expand All @@ -217,6 +220,7 @@ func NewBucketStore(
s := &BucketStore{
logger: log.NewNopLogger(),
bkt: bkt,
bucketIndexMeta: bucketIndexMeta,
fetcher: fetcher,
dir: dir,
indexCache: noopCache{},
Expand Down Expand Up @@ -600,6 +604,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor

logSeriesRequestToSpan(spanLogger, req.MinTime, req.MaxTime, matchers, reqBlockMatchers, shardSelector, req.StreamingChunksBatchSize)

defer s.recordBucketIndexDiscoveryDiff(ctx)

blocks, indexReaders, chunkReaders := s.openBlocksForReading(ctx, req.SkipChunks, req.MinTime, req.MaxTime, reqBlockMatchers, stats)
// We must keep the readers open until all their data has been sent.
defer func() {
Expand Down Expand Up @@ -1205,6 +1211,26 @@ func (s *BucketStore) recordSeriesHashCacheStats(stats *queryStats) {
s.metrics.seriesHashCacheHits.Add(float64(stats.seriesHashCacheHits))
}

func (s *BucketStore) recordBucketIndexDiscoveryDiff(ctx context.Context) {
reqUpdatedAt, err := getBucketIndexUpdatedAtFromGRPCContext(ctx)
if err != nil {
// This is not a problem by itself.
level.Warn(spanlogger.FromContext(ctx, s.logger)).Log("msg", "can't get bucket index versions (updated_at) from request", "err", err)
return
}

meta := s.bucketIndexMeta.Metadata()
diff := meta.UpdatedAt - reqUpdatedAt

logger := log.With(s.logger, "ours", meta.UpdatedAt, "requested", reqUpdatedAt, "diff", diff)

if diff < 0 {
level.Warn(spanlogger.FromContext(ctx, logger)).Log("msg", "bucket index version (updated_at) is older than requested")
} else {
level.Debug(spanlogger.FromContext(ctx, logger)).Log("msg", "bucket index versions (updated_at)")
}
}

func (s *BucketStore) openBlocksForReading(ctx context.Context, skipChunks bool, minT, maxT int64, blockMatchers []*labels.Matcher, stats *safeQueryStats) ([]*bucketBlock, map[ulid.ULID]*bucketIndexReader, map[ulid.ULID]chunkReader) {
spanCtx, span := tracer.Start(ctx, "bucket_store_open_blocks_for_reading")
defer span.End()
Expand Down Expand Up @@ -1270,6 +1296,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
}
}

defer s.recordBucketIndexDiscoveryDiff(ctx)

g, gctx := errgroup.WithContext(ctx)

var setsMtx sync.Mutex
Expand Down Expand Up @@ -1463,6 +1491,8 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
}
}

defer s.recordBucketIndexDiscoveryDiff(ctx)

var setsMtx sync.Mutex
var sets [][]string
s.blockSet.filter(req.Start, req.End, reqBlockMatchers, func(b *bucketBlock) {
Expand Down Expand Up @@ -2112,3 +2142,7 @@ func maybeNilShard(shard *sharding.ShardSelector) sharding.ShardSelector {
}
return *shard
}

type BucketIndexMetadataReader interface {
Metadata() *bucketindex.Metadata
}
1 change: 1 addition & 0 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS
store, err := NewBucketStore(
userID,
objstore.WithNoopInstr(bkt),
newTestBucketIndexMetadataReader(t, bkt, userID),
metaFetcher,
cfg.tempDir,
cfg.bucketStoreConfig,
Expand Down
21 changes: 21 additions & 0 deletions pkg/storegateway/bucket_index_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,24 @@ func (f *BucketIndexBlockMetadataFetcher) Fetch(ctx context.Context) (metas map[

return metas, nil, nil
}

type bucketIndexMetadataReader struct {
indexReader interface {
Index() *bucketindex.Index
}
}

// newBucketIndexMetadataReaderFromLoader is an adapter from BucketIndexLoader to BucketIndexMetadataReader.
func newBucketIndexMetadataReaderFromLoader(loader *BucketIndexLoader) BucketIndexMetadataReader {
return &bucketIndexMetadataReader{
indexReader: loader,
}
}

func (r *bucketIndexMetadataReader) Metadata() *bucketindex.Metadata {
idx := r.indexReader.Index()
if idx == nil {
return &bucketindex.Metadata{}
}
return idx.Metadata()
}
14 changes: 14 additions & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -538,6 +539,7 @@ func (u *BucketStores) getOrCreateStore(ctx context.Context, userID string) (*Bu
bs, err := NewBucketStore(
userID,
userBkt,
newBucketIndexMetadataReaderFromLoader(loader),
fetcher,
u.syncDirForUser(userID),
u.cfg.BucketStore,
Expand Down Expand Up @@ -645,6 +647,18 @@ func getUserIDFromGRPCContext(ctx context.Context) string {
return values[0]
}

func getBucketIndexUpdatedAtFromGRPCContext(ctx context.Context) (int64, error) {
values := metadata.ValueFromIncomingContext(ctx, GrpcContextMetadataBucketIndexUpdatedAt)
if len(values) != 1 {
return 0, fmt.Errorf("no bucket index updated at")
}
value, err := strconv.ParseInt(values[0], 10, 64)
if err != nil {
return 0, fmt.Errorf("parse bucket index updated at value %q: %w", values[0], err)
}
return value, nil
}

type spanSeriesServer struct {
storegatewaypb.StoreGateway_SeriesServer

Expand Down
18 changes: 14 additions & 4 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1456,6 +1456,7 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries
st, err := NewBucketStore(
"test",
ibkt,
newTestBucketIndexMetadataReader(t, bkt, "test"),
f,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down Expand Up @@ -1588,6 +1589,7 @@ func TestBucketStore_Series_Concurrency(t *testing.T) {
store, err := NewBucketStore(
"tenant",
instrumentedBucket,
newTestBucketIndexMetadataReader(t, instrumentedBucket, "tenant"),
metaFetcher,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down Expand Up @@ -1757,10 +1759,11 @@ func TestBucketStore_Series_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
}

store := &BucketStore{
userID: "test",
bkt: objstore.WithNoopInstr(bkt),
logger: logger,
indexCache: indexCache,
userID: "test",
bkt: objstore.WithNoopInstr(bkt),
bucketIndexMeta: newTestBucketIndexMetadataReader(t, bkt, "test"),
logger: logger,
indexCache: indexCache,
indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), indexheader.Config{
LazyLoadingEnabled: false,
LazyLoadingIdleTimeout: 0,
Expand Down Expand Up @@ -1920,6 +1923,7 @@ func TestBucketStore_Series_ErrorUnmarshallingRequestHints(t *testing.T) {
store, err := NewBucketStore(
"test",
instrBkt,
newTestBucketIndexMetadataReader(t, bkt, "test"),
fetcher,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down Expand Up @@ -1977,6 +1981,7 @@ func TestBucketStore_Series_CanceledRequest(t *testing.T) {
store, err := NewBucketStore(
"test",
instrBkt,
newTestBucketIndexMetadataReader(t, instrBkt, "test"),
fetcher,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down Expand Up @@ -2049,6 +2054,7 @@ func TestBucketStore_Series_TimeoutGate(t *testing.T) {
store, err := NewBucketStore(
"test",
instrBkt,
newTestBucketIndexMetadataReader(t, instrBkt, "test"),
fetcher,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down Expand Up @@ -2127,6 +2133,7 @@ func TestBucketStore_Series_InvalidRequest(t *testing.T) {
store, err := NewBucketStore(
"test",
instrBkt,
newTestBucketIndexMetadataReader(t, instrBkt, "test"),
fetcher,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down Expand Up @@ -2253,6 +2260,7 @@ func testBucketStoreSeriesBlockWithMultipleChunks(
store, err := NewBucketStore(
"tenant",
instrBkt,
newTestBucketIndexMetadataReader(t, instrBkt, "tenant"),
fetcher,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down Expand Up @@ -2414,6 +2422,7 @@ func TestBucketStore_Series_Limits(t *testing.T) {
store, err := NewBucketStore(
"tenant",
instrBkt,
newTestBucketIndexMetadataReader(t, instrBkt, "tenant"),
fetcher,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down Expand Up @@ -2533,6 +2542,7 @@ func setupStoreForHintsTest(t *testing.T, maxSeriesPerBatch int, opts ...BucketS
store, err := NewBucketStore(
"tenant",
instrBkt,
newTestBucketIndexMetadataReader(t, instrBkt, "tenant"),
fetcher,
tmpDir,
mimir_tsdb.BucketStoreConfig{
Expand Down
19 changes: 19 additions & 0 deletions pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1675,3 +1675,22 @@ func createBucketIndex(t testing.TB, bkt objstore.Bucket, userID string) *bucket

return idx
}

type testBucketIndexMetadataReader struct {
t testing.TB
bkt objstore.Bucket
userID string
}

func newTestBucketIndexMetadataReader(t testing.TB, bkt objstore.Bucket, userID string) *testBucketIndexMetadataReader {
return &testBucketIndexMetadataReader{
t: t,
bkt: bkt,
userID: userID,
}
}

func (t testBucketIndexMetadataReader) Metadata() *bucketindex.Metadata {
idx := createBucketIndex(t.t, t.bkt, t.userID)
return idx.Metadata()
}