Skip to content

Commit e002a6e

Browse files
committed
parquet store gateway poc
Signed-off-by: yeya24 <[email protected]>
1 parent 746b40e commit e002a6e

File tree

11 files changed

+1912
-94
lines changed

11 files changed

+1912
-94
lines changed

integration/parquet_gateway_test.go

Lines changed: 646 additions & 0 deletions
Large diffs are not rendered by default.

pkg/cortex/cortex_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ func TestCortex(t *testing.T) {
8686
IndexCache: tsdb.IndexCacheConfig{
8787
Backend: tsdb.IndexCacheBackendInMemory,
8888
},
89+
BucketStoreType: string(tsdb.TSDBBucketStore),
8990
},
9091
UsersScanner: tsdb.UsersScannerConfig{
9192
Strategy: tsdb.UserScanStrategyList,

pkg/storage/tsdb/config.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ var (
6161
ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy")
6262
ErrInvalidTokenBucketBytesLimiterMode = errors.New("invalid token bucket bytes limiter mode")
6363
ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio = errors.New("lazy expanded posting group max key series ratio needs to be equal or greater than 0")
64+
ErrInvalidBucketStoreType = errors.New("invalid bucket store type")
6465
)
6566

6667
// BlocksStorageConfig holds the config information for the blocks storage.
@@ -292,6 +293,7 @@ type BucketStoreConfig struct {
292293
IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"`
293294
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
294295
BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"`
296+
BucketStoreType string `yaml:"bucket_store_type"`
295297

296298
// Chunk pool.
297299
MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"`
@@ -378,6 +380,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
378380
f.Float64Var(&cfg.LazyExpandedPostingGroupMaxKeySeriesRatio, "blocks-storage.bucket-store.lazy-expanded-posting-group-max-key-series-ratio", 100, "Mark posting group as lazy if it fetches more keys than R * max series the query should fetch. With R set to 100, a posting group which fetches 100K keys will be marked as lazy if the current query only fetches 1000 series. This config is only valid if lazy expanded posting is enabled. 0 disables the limit.")
379381
f.IntVar(&cfg.SeriesBatchSize, "blocks-storage.bucket-store.series-batch-size", store.SeriesBatchSize, "Controls how many series to fetch per batch in Store Gateway. Default value is 10000.")
380382
f.StringVar(&cfg.BlockDiscoveryStrategy, "blocks-storage.bucket-store.block-discovery-strategy", string(ConcurrentDiscovery), "One of "+strings.Join(supportedBlockDiscoveryStrategies, ", ")+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations. bucket_index strategy can be used in Compactor only and utilizes the existing bucket index to fetch block IDs to sync. This avoids iterating the bucket but can be impacted by delays of cleaner creating bucket index.")
383+
f.StringVar(&cfg.BucketStoreType, "blocks-storage.bucket-store.bucket-store-type", "tsdb", "Type of bucket store to use (tsdb or parquet).")
381384
f.StringVar(&cfg.TokenBucketBytesLimiter.Mode, "blocks-storage.bucket-store.token-bucket-bytes-limiter.mode", string(TokenBucketBytesLimiterDisabled), fmt.Sprintf("Token bucket bytes limiter mode. Supported values are: %s", strings.Join(supportedTokenBucketBytesLimiterModes, ", ")))
382385
f.Int64Var(&cfg.TokenBucketBytesLimiter.InstanceTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size", int64(820*units.Mebibyte), "Instance token bucket size")
383386
f.Int64Var(&cfg.TokenBucketBytesLimiter.UserTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size", int64(615*units.Mebibyte), "User token bucket size")
@@ -415,6 +418,9 @@ func (cfg *BucketStoreConfig) Validate() error {
415418
if !util.StringsContain(supportedTokenBucketBytesLimiterModes, cfg.TokenBucketBytesLimiter.Mode) {
416419
return ErrInvalidTokenBucketBytesLimiterMode
417420
}
421+
if !util.StringsContain(supportedBucketStoreTypes, cfg.BucketStoreType) {
422+
return ErrInvalidBucketStoreType
423+
}
418424
if cfg.LazyExpandedPostingGroupMaxKeySeriesRatio < 0 {
419425
return ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio
420426
}
@@ -450,6 +456,19 @@ var supportedBlockDiscoveryStrategies = []string{
450456
string(BucketIndexDiscovery),
451457
}
452458

459+
// BucketStoreType represents the type of bucket store
460+
type BucketStoreType string
461+
462+
const (
463+
TSDBBucketStore BucketStoreType = "tsdb"
464+
ParquetBucketStore BucketStoreType = "parquet"
465+
)
466+
467+
var supportedBucketStoreTypes = []string{
468+
string(TSDBBucketStore),
469+
string(ParquetBucketStore),
470+
}
471+
453472
type TokenBucketBytesLimiterMode string
454473

455474
const (

pkg/storegateway/bucket_stores.go

Lines changed: 45 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,15 @@ import (
4444
"github.com/cortexproject/cortex/pkg/util/validation"
4545
)
4646

47-
// BucketStores is a multi-tenant wrapper of Thanos BucketStore.
48-
type BucketStores struct {
47+
// BucketStores defines the methods that any bucket stores implementation must provide
48+
type BucketStores interface {
49+
storepb.StoreServer
50+
SyncBlocks(ctx context.Context) error
51+
InitialSync(ctx context.Context) error
52+
}
53+
54+
// ThanosBucketStores is a multi-tenant wrapper of Thanos BucketStore.
55+
type ThanosBucketStores struct {
4956
logger log.Logger
5057
cfg tsdb.BlocksStorageConfig
5158
limits *validation.Overrides
@@ -74,7 +81,7 @@ type BucketStores struct {
7481
storesMu sync.RWMutex
7582
stores map[string]*store.BucketStore
7683

77-
// Keeps the last sync error for the bucket store for each tenant.
84+
// Keeps the last sync error for the bucket store for each tenant.
7885
storesErrorsMu sync.RWMutex
7986
storesErrors map[string]error
8087

@@ -86,8 +93,7 @@ type BucketStores struct {
8693
userScanner users.Scanner
8794

8895
// Keeps number of inflight requests
89-
inflightRequestCnt int
90-
inflightRequestMu sync.RWMutex
96+
inflightRequests *util.InflightRequestTracker
9197

9298
// Metrics.
9399
syncTimes prometheus.Histogram
@@ -99,7 +105,19 @@ type BucketStores struct {
99105
var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway")
100106

101107
// NewBucketStores makes a new BucketStores.
102-
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
108+
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (BucketStores, error) {
109+
switch cfg.BucketStore.BucketStoreType {
110+
case string(tsdb.ParquetBucketStore):
111+
return newParquetBucketStores(cfg, bucketClient, limits, logger, reg)
112+
case string(tsdb.TSDBBucketStore):
113+
return newThanosBucketStores(cfg, shardingStrategy, bucketClient, limits, logLevel, logger, reg)
114+
default:
115+
return nil, fmt.Errorf("unsupported bucket store type: %s", cfg.BucketStore.BucketStoreType)
116+
}
117+
}
118+
119+
// newThanosBucketStores creates a new TSDB-based bucket stores
120+
func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*ThanosBucketStores, error) {
103121
matchers := tsdb.NewMatchers()
104122
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, tsdb.ParquetLabelsCacheConfig{}, matchers, bucketClient, logger, reg)
105123
if err != nil {
@@ -114,7 +132,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
114132
Help: "Number of maximum concurrent queries allowed.",
115133
}).Set(float64(cfg.BucketStore.MaxConcurrent))
116134

117-
u := &BucketStores{
135+
u := &ThanosBucketStores{
118136
logger: logger,
119137
cfg: cfg,
120138
limits: limits,
@@ -128,6 +146,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
128146
queryGate: queryGate,
129147
partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
130148
userTokenBuckets: make(map[string]*util.TokenBucket),
149+
inflightRequests: util.NewInflightRequestTracker(),
131150
syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
132151
Name: "cortex_bucket_stores_blocks_sync_seconds",
133152
Help: "The total time it takes to perform a sync stores",
@@ -187,7 +206,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
187206
}
188207

189208
// InitialSync does an initial synchronization of blocks for all users.
190-
func (u *BucketStores) InitialSync(ctx context.Context) error {
209+
func (u *ThanosBucketStores) InitialSync(ctx context.Context) error {
191210
level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users")
192211

193212
if err := u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, s *store.BucketStore) error {
@@ -202,13 +221,13 @@ func (u *BucketStores) InitialSync(ctx context.Context) error {
202221
}
203222

204223
// SyncBlocks synchronizes the stores state with the Bucket store for every user.
205-
func (u *BucketStores) SyncBlocks(ctx context.Context) error {
224+
func (u *ThanosBucketStores) SyncBlocks(ctx context.Context) error {
206225
return u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, s *store.BucketStore) error {
207226
return s.SyncBlocks(ctx)
208227
})
209228
}
210229

211-
func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *store.BucketStore) error) error {
230+
func (u *ThanosBucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *store.BucketStore) error) error {
212231
retries := backoff.New(ctx, backoff.Config{
213232
MinBackoff: 1 * time.Second,
214233
MaxBackoff: 10 * time.Second,
@@ -232,7 +251,7 @@ func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(co
232251
return lastErr
233252
}
234253

235-
func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Context, *store.BucketStore) error) (returnErr error) {
254+
func (u *ThanosBucketStores) syncUsersBlocks(ctx context.Context, f func(context.Context, *store.BucketStore) error) (returnErr error) {
236255
defer func(start time.Time) {
237256
u.syncTimes.Observe(time.Since(start).Seconds())
238257
if returnErr == nil {
@@ -330,7 +349,7 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte
330349
}
331350

332351
// Series makes a series request to the underlying user bucket store.
333-
func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
352+
func (u *ThanosBucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
334353
spanLog, spanCtx := spanlogger.New(srv.Context(), "BucketStores.Series")
335354
defer spanLog.Finish()
336355

@@ -356,12 +375,12 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
356375

357376
maxInflightRequests := u.cfg.BucketStore.MaxInflightRequests
358377
if maxInflightRequests > 0 {
359-
if u.getInflightRequestCnt() >= maxInflightRequests {
378+
if u.inflightRequests.Count() >= maxInflightRequests {
360379
return ErrTooManyInflightRequests
361380
}
362381

363-
u.incrementInflightRequestCnt()
364-
defer u.decrementInflightRequestCnt()
382+
u.inflightRequests.Inc()
383+
defer u.inflightRequests.Dec()
365384
}
366385

367386
err = store.Series(req, spanSeriesServer{
@@ -372,26 +391,8 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
372391
return err
373392
}
374393

375-
func (u *BucketStores) getInflightRequestCnt() int {
376-
u.inflightRequestMu.RLock()
377-
defer u.inflightRequestMu.RUnlock()
378-
return u.inflightRequestCnt
379-
}
380-
381-
func (u *BucketStores) incrementInflightRequestCnt() {
382-
u.inflightRequestMu.Lock()
383-
u.inflightRequestCnt++
384-
u.inflightRequestMu.Unlock()
385-
}
386-
387-
func (u *BucketStores) decrementInflightRequestCnt() {
388-
u.inflightRequestMu.Lock()
389-
u.inflightRequestCnt--
390-
u.inflightRequestMu.Unlock()
391-
}
392-
393394
// LabelNames implements the Storegateway proto service.
394-
func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
395+
func (u *ThanosBucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
395396
spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames")
396397
defer spanLog.Finish()
397398

@@ -421,7 +422,7 @@ func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRe
421422
}
422423

423424
// LabelValues implements the Storegateway proto service.
424-
func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
425+
func (u *ThanosBucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
425426
spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelValues")
426427
defer spanLog.Finish()
427428

@@ -450,7 +451,7 @@ func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValues
450451

451452
// scanUsers in the bucket and return the list of found users. It includes active and deleting users
452453
// but not deleted users.
453-
func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) {
454+
func (u *ThanosBucketStores) scanUsers(ctx context.Context) ([]string, error) {
454455
activeUsers, deletingUsers, _, err := u.userScanner.ScanUsers(ctx)
455456
if err != nil {
456457
return nil, err
@@ -477,13 +478,13 @@ func deduplicateUsers(users []string) []string {
477478
return uniqueUsers
478479
}
479480

480-
func (u *BucketStores) getStore(userID string) *store.BucketStore {
481+
func (u *ThanosBucketStores) getStore(userID string) *store.BucketStore {
481482
u.storesMu.RLock()
482483
defer u.storesMu.RUnlock()
483484
return u.stores[userID]
484485
}
485486

486-
func (u *BucketStores) getStoreError(userID string) error {
487+
func (u *ThanosBucketStores) getStoreError(userID string) error {
487488
u.storesErrorsMu.RLock()
488489
defer u.storesErrorsMu.RUnlock()
489490
return u.storesErrors[userID]
@@ -499,7 +500,7 @@ var (
499500
// If bucket store doesn't exist, returns errBucketStoreNotFound.
500501
// If bucket store is not empty, returns errBucketStoreNotEmpty.
501502
// Otherwise returns error from closing the bucket store.
502-
func (u *BucketStores) closeEmptyBucketStore(userID string) error {
503+
func (u *ThanosBucketStores) closeEmptyBucketStore(userID string) error {
503504
u.storesMu.Lock()
504505
unlockInDefer := true
505506
defer func() {
@@ -537,11 +538,11 @@ func isEmptyBucketStore(bs *store.BucketStore) bool {
537538
return min == math.MaxInt64 && max == math.MinInt64
538539
}
539540

540-
func (u *BucketStores) syncDirForUser(userID string) string {
541+
func (u *ThanosBucketStores) syncDirForUser(userID string) string {
541542
return filepath.Join(u.cfg.BucketStore.SyncDir, userID)
542543
}
543544

544-
func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, error) {
545+
func (u *ThanosBucketStores) getOrCreateStore(userID string) (*store.BucketStore, error) {
545546
// Check if the store already exists.
546547
bs := u.getStore(userID)
547548
if bs != nil {
@@ -721,7 +722,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
721722

722723
// deleteLocalFilesForExcludedTenants removes local "sync" directories for tenants that are not included in the current
723724
// shard.
724-
func (u *BucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[string]struct{}) {
725+
func (u *ThanosBucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[string]struct{}) {
725726
files, err := os.ReadDir(u.cfg.BucketStore.SyncDir)
726727
if err != nil {
727728
return
@@ -760,13 +761,13 @@ func (u *BucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[str
760761
}
761762
}
762763

763-
func (u *BucketStores) getUserTokenBucket(userID string) *util.TokenBucket {
764+
func (u *ThanosBucketStores) getUserTokenBucket(userID string) *util.TokenBucket {
764765
u.userTokenBucketsMu.RLock()
765766
defer u.userTokenBucketsMu.RUnlock()
766767
return u.userTokenBuckets[userID]
767768
}
768769

769-
func (u *BucketStores) getTokensToRetrieve(tokens uint64, dataType store.StoreDataType) int64 {
770+
func (u *ThanosBucketStores) getTokensToRetrieve(tokens uint64, dataType store.StoreDataType) int64 {
770771
tokensToRetrieve := float64(tokens)
771772
switch dataType {
772773
case store.PostingsFetched:

0 commit comments

Comments
 (0)