From 8737cf681e039092c632abdfb44843f08a0e982c Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Fri, 13 Jun 2025 21:07:30 +0900 Subject: [PATCH] Introduce a regex tenant resolver Signed-off-by: SungJin1212 --- CHANGELOG.md | 3 + docs/configuration/config-file-reference.md | 16 ++ docs/configuration/v1-guarantees.md | 3 + integration/querier_tenant_federation_test.go | 230 +++++++++++++++ pkg/cortex/modules.go | 31 +++ .../exemplar_merge_queryable_test.go | 123 ++++++++ .../tenantfederation/merge_queryable_test.go | 263 ++++++++++++------ .../metadata_merge_querier_test.go | 90 ++++++ .../tenantfederation/regex_resolver.go | 197 +++++++++++++ .../tenantfederation/regex_resolver_test.go | 133 +++++++++ .../tenantfederation/tenant_federation.go | 7 + pkg/tenant/resolver.go | 5 + pkg/tenant/tenant.go | 2 +- 13 files changed, 1021 insertions(+), 82 deletions(-) create mode 100644 pkg/querier/tenantfederation/regex_resolver.go create mode 100644 pkg/querier/tenantfederation/regex_resolver_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 28905e2c7a7..c510ebaec71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ * [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663 * [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680 * [FEATURE] Ruler: Add support for group labels. #6665 +* [FEATURE] Query federation: Introduce a regex tenant resolver to allow regex in `X-Scope-OrgID` value. #6713 +- Add an experimental `tenant-federation.regex-matcher-enabled` flag. If it enabled, user can input regex to `X-Scope-OrgId`, the matched tenantIDs are automatically involved. The user discovery is based on scanning block storage, so new users can get queries after uploading a block (generally 2h). +- Add an experimental `tenant-federation.user-sync-interval` flag, it specifies how frequently to scan users. The scanned users are used to calculate matched tenantIDs. * [FEATURE] Experimental Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet and Parquet Queryable. #6716 #6743 * [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580 * [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 4460ae63021..69ac6bbf15b 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -181,6 +181,22 @@ tenant_federation: # CLI flag: -tenant-federation.max-tenant [max_tenant: | default = 0] + # [Experimental] If enabled, the `X-Scope-OrgID` header value can accept a + # regex and the matched tenantIDs are automatically involved. The regex + # matching rule follows the Prometheus, see the detail: + # https://prometheus.io/docs/prometheus/latest/querying/basics/#regular-expressions. + # The user discovery is based on scanning block storage, so new users can get + # queries after uploading a block (generally 2h). + # CLI flag: -tenant-federation.regex-matcher-enabled + [regex_matcher_enabled: | default = false] + + # [Experimental] If the regex matcher is enabled, it specifies how frequently + # to scan users. The scanned users are used to calculate matched tenantIDs. + # The scanning strategy depends on the + # `-blocks-storage.users-scanner.strategy`. + # CLI flag: -tenant-federation.user-sync-interval + [user_sync_interval: | default = 5m] + # The ruler_config configures the Cortex ruler. [ruler: ] diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 2e8b84ec58c..700fbf5beb7 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -66,6 +66,9 @@ Currently experimental features are: - The block deletion marks migration support in the compactor (`-compactor.block-deletion-marks-migration-enabled`) is temporarily and will be removed in future versions - Blocks storage user index - Querier: tenant federation + - `-tenant-federation.enabled` + - `-tenant-federation.regex-matcher-enabled` + - `-tenant-federation.user-sync-interval` - The thanosconvert tool for converting Thanos block metadata to Cortex - HA Tracker: cleanup of old replicas from KV Store. - Instance limits in ingester and distributor diff --git a/integration/querier_tenant_federation_test.go b/integration/querier_tenant_federation_test.go index c2c2ec10a62..5222b1c8e6e 100644 --- a/integration/querier_tenant_federation_test.go +++ b/integration/querier_tenant_federation_test.go @@ -28,18 +28,25 @@ type querierTenantFederationConfig struct { func TestQuerierTenantFederation(t *testing.T) { runQuerierTenantFederationTest(t, querierTenantFederationConfig{}) + runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{}) } func TestQuerierTenantFederationWithQueryScheduler(t *testing.T) { runQuerierTenantFederationTest(t, querierTenantFederationConfig{ querySchedulerEnabled: true, }) + runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{ + querySchedulerEnabled: true, + }) } func TestQuerierTenantFederationWithShuffleSharding(t *testing.T) { runQuerierTenantFederationTest(t, querierTenantFederationConfig{ shuffleShardingEnabled: true, }) + runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{ + shuffleShardingEnabled: true, + }) } func TestQuerierTenantFederationWithQuerySchedulerAndShuffleSharding(t *testing.T) { @@ -47,6 +54,229 @@ func TestQuerierTenantFederationWithQuerySchedulerAndShuffleSharding(t *testing. querySchedulerEnabled: true, shuffleShardingEnabled: true, }) + runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{ + querySchedulerEnabled: true, + shuffleShardingEnabled: true, + }) +} + +func TestRegexResolver_NewlyCreatedTenant(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + consul := e2edb.NewConsulWithName("consul") + require.NoError(t, s.StartAndWaitReady(consul)) + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-querier.cache-results": "true", + "-querier.split-queries-by-interval": "24h", + "-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range + "-tenant-federation.enabled": "true", + "-tenant-federation.regex-matcher-enabled": "true", + + // to upload block quickly + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + + // store gateway + "-blocks-storage.bucket-store.sync-interval": blockRangePeriod.String(), + "-querier.max-fetched-series-per-query": "1", + }) + + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + // Start ingester and distributor. + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(ingester, distributor)) + + // Wait until distributor have updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // Start the query-frontend. + queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "") + require.NoError(t, s.Start(queryFrontend)) + + // Start the querier + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), + }), "") + + // Start queriers. + require.NoError(t, s.StartAndWaitReady(querier)) + require.NoError(t, s.WaitReady(queryFrontend)) + + now := time.Now() + series, expectedVector := generateSeries("series_1", now) + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + res, err := c.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + result, err := c.Query("series_1", now) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + require.Equal(t, expectedVector, result.(model.Vector)) +} + +func runQuerierTenantFederationTest_UseRegexResolver(t *testing.T, cfg querierTenantFederationConfig) { + const numUsers = 10 + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + memcached := e2ecache.NewMemcached() + consul := e2edb.NewConsul() + require.NoError(t, s.StartAndWaitReady(consul, memcached)) + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-querier.cache-results": "true", + "-querier.split-queries-by-interval": "24h", + "-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range + "-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + "-tenant-federation.enabled": "true", + "-tenant-federation.regex-matcher-enabled": "true", + "-tenant-federation.user-sync-interval": "1s", + + // to upload block quickly + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + + // store gateway + "-blocks-storage.bucket-store.sync-interval": blockRangePeriod.String(), + "-querier.max-fetched-series-per-query": "1", + }) + + // Start the query-scheduler if enabled. + var queryScheduler *e2ecortex.CortexService + if cfg.querySchedulerEnabled { + queryScheduler = e2ecortex.NewQueryScheduler("query-scheduler", flags, "") + require.NoError(t, s.StartAndWaitReady(queryScheduler)) + flags["-frontend.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint() + flags["-querier.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint() + } + + if cfg.shuffleShardingEnabled { + // Use only single querier for each user. + flags["-frontend.max-queriers-per-tenant"] = "1" + } + + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + // Start ingester and distributor. + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(ingester, distributor)) + + // Wait until distributor have updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // Push a series for each user to Cortex. + now := time.Now() + expectedVectors := make([]model.Vector, numUsers) + tenantIDs := make([]string, numUsers) + + for u := 0; u < numUsers; u++ { + tenantIDs[u] = fmt.Sprintf("user-%d", u) + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", tenantIDs[u]) + require.NoError(t, err) + + var series []prompb.TimeSeries + series, expectedVectors[u] = generateSeries("series_1", now) + // To ship series_1 block + series2, _ := generateSeries("series_2", now.Add(blockRangePeriod*2)) + + res, err := c.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + // Start the query-frontend. + queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "") + require.NoError(t, s.Start(queryFrontend)) + + if !cfg.querySchedulerEnabled { + flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint() + } + + // Start the querier and store-gateway + storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + + var querier2 *e2ecortex.CortexService + if cfg.shuffleShardingEnabled { + querier2 = e2ecortex.NewQuerier("querier-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + } + + // Start queriers. + require.NoError(t, s.StartAndWaitReady(querier, storeGateway)) + require.NoError(t, s.WaitReady(queryFrontend)) + if cfg.shuffleShardingEnabled { + require.NoError(t, s.StartAndWaitReady(querier2)) + } + + // Wait until the querier and store-gateway have updated ring + require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total")) + if cfg.shuffleShardingEnabled { + require.NoError(t, querier2.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total")) + } + + // wait to upload blocks + require.NoError(t, ingester.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_ingester_shipper_uploads_total"}, e2e.WaitMissingMetrics)) + + // wait to update knownUsers + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}), e2e.WaitMissingMetrics) + if cfg.shuffleShardingEnabled { + require.NoError(t, querier2.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}), e2e.WaitMissingMetrics) + } + + // query all tenants + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-.+") + require.NoError(t, err) + + result, err := c.Query("series_1", now) + require.NoError(t, err) + + assert.Equal(t, mergeResults(tenantIDs, expectedVectors), result.(model.Vector)) + + // ensure a push to multiple tenants is failing + series, _ := generateSeries("series_1", now) + res, err := c.Push(series) + require.NoError(t, err) + + require.Equal(t, 500, res.StatusCode) + + // check metric label values for total queries in the query frontend + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "user", "user-.+"), + labels.MustNewMatcher(labels.MatchEqual, "op", "query")))) + + // check metric label values for query queue length in either query frontend or query scheduler + queueComponent := queryFrontend + queueMetricName := "cortex_query_frontend_queue_length" + if cfg.querySchedulerEnabled { + queueComponent = queryScheduler + queueMetricName = "cortex_query_scheduler_queue_length" + } + require.NoError(t, queueComponent.WaitSumMetricsWithOptions(e2e.Equals(0), []string{queueMetricName}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "user", "user-.+")))) } func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationConfig) { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index c0842a9422e..4bcd88a9413 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -51,6 +51,7 @@ import ( "github.com/cortexproject/cortex/pkg/scheduler" "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storegateway" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/grpcclient" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/modules" @@ -282,10 +283,30 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) { // single tenant. This allows for a less impactful enabling of tenant // federation. byPassForSingleQuerier := true + t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer)) t.MetadataQuerier = tenantfederation.NewMetadataQuerier(t.MetadataQuerier, t.Cfg.TenantFederation.MaxConcurrent, prometheus.DefaultRegisterer) t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer) + + if t.Cfg.TenantFederation.RegexMatcherEnabled { + util_log.WarnExperimentalUse("tenant-federation.regex-matcher-enabled") + + bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { + return bucket.NewClient(ctx, t.Cfg.BlocksStorage.Bucket, nil, "regex-resolver", util_log.Logger, prometheus.DefaultRegisterer) + } + + regexResolver, err := tenantfederation.NewRegexResolver(t.Cfg.BlocksStorage.UsersScanner, prometheus.DefaultRegisterer, bucketClientFactory, t.Cfg.TenantFederation.UserSyncInterval, util_log.Logger) + if err != nil { + return nil, fmt.Errorf("failed to initialize regex resolver: %v", err) + } + tenant.WithDefaultResolver(regexResolver) + + return regexResolver, nil + } + + return nil, nil } + return nil, nil } @@ -497,6 +518,11 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec) instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec) + if t.Cfg.TenantFederation.Enabled && t.Cfg.TenantFederation.RegexMatcherEnabled { + // If regex matcher enabled, we use regex validator to pass regex to the querier + tenant.WithDefaultResolver(tenantfederation.NewRegexValidator()) + } + queryRangeMiddlewares, cache, err := queryrange.Middlewares( t.Cfg.QueryRange, util_log.Logger, @@ -776,6 +802,11 @@ func (t *Cortex) initTenantDeletionAPI() (services.Service, error) { } func (t *Cortex) initQueryScheduler() (services.Service, error) { + if t.Cfg.TenantFederation.Enabled && t.Cfg.TenantFederation.RegexMatcherEnabled { + // If regex matcher enabled, we use regex validator to pass regex to the querier + tenant.WithDefaultResolver(tenantfederation.NewRegexValidator()) + } + s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, errors.Wrap(err, "query-scheduler init") diff --git a/pkg/querier/tenantfederation/exemplar_merge_queryable_test.go b/pkg/querier/tenantfederation/exemplar_merge_queryable_test.go index 309f2dea53a..d58a89d7b02 100644 --- a/pkg/querier/tenantfederation/exemplar_merge_queryable_test.go +++ b/pkg/querier/tenantfederation/exemplar_merge_queryable_test.go @@ -5,16 +5,23 @@ import ( "errors" "strings" "testing" + "time" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" "github.com/weaveworks/common/user" + "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/test" ) var ( @@ -311,6 +318,122 @@ func Test_MergeExemplarQuerier_Select(t *testing.T) { } } +func Test_MergeExemplarQuerier_Select_WhenUseRegexResolver(t *testing.T) { + // set a regex tenant resolver + reg := prometheus.NewRegistry() + + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-2"), false, nil) + + bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { + return bucketClient, nil + } + + usersScannerConfig := cortex_tsdb.UsersScannerConfig{Strategy: cortex_tsdb.UserScanStrategyList} + regexResolver, err := NewRegexResolver(usersScannerConfig, reg, bucketClientFactory, time.Second, log.NewNopLogger()) + require.NoError(t, err) + tenant.WithDefaultResolver(regexResolver) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), regexResolver)) + + // wait update knownUsers + test.Poll(t, time.Second*10, true, func() interface{} { + return testutil.ToFloat64(regexResolver.lastUpdateUserRun) > 0 && testutil.ToFloat64(regexResolver.discoveredUsers) == 2 + }) + + tests := []struct { + name string + upstream mockExemplarQueryable + matcher [][]*labels.Matcher + orgId string + expectedResult []exemplar.QueryResult + expectedErr error + expectedMetrics string + }{ + { + name: "result labels should contains __tenant_id__ even if one tenant is queried", + upstream: mockExemplarQueryable{exemplarQueriers: map[string]storage.ExemplarQuerier{ + "user-1": &mockExemplarQuerier{res: getFixtureExemplarResult1()}, + "user-2": &mockExemplarQuerier{res: getFixtureExemplarResult2()}, + }}, + matcher: [][]*labels.Matcher{{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "exemplar_series"), + }}, + orgId: ".+-1", + expectedResult: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "123"), + Value: 123, + Ts: 1734942337900, + }, + }, + }, + }, + expectedMetrics: expectedSingleTenantsExemplarMetrics, + }, + { + name: "two tenants results should be aggregated", + upstream: mockExemplarQueryable{exemplarQueriers: map[string]storage.ExemplarQuerier{ + "user-1": &mockExemplarQuerier{res: getFixtureExemplarResult1()}, + "user-2": &mockExemplarQuerier{res: getFixtureExemplarResult2()}, + }}, + matcher: [][]*labels.Matcher{{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "exemplar_series"), + }}, + orgId: "user-.+", + expectedResult: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "123"), + Value: 123, + Ts: 1734942337900, + }, + }, + }, + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-2"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "456"), + Value: 456, + Ts: 1734942338000, + }, + }, + }, + }, + expectedMetrics: expectedTwoTenantsExemplarMetrics, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + exemplarQueryable := NewExemplarQueryable(&test.upstream, defaultMaxConcurrency, false, reg) + ctx := user.InjectOrgID(context.Background(), test.orgId) + q, err := exemplarQueryable.ExemplarQuerier(ctx) + require.NoError(t, err) + + result, err := q.Select(mint, maxt, test.matcher...) + if test.expectedErr != nil { + require.Error(t, err) + } else { + require.NoError(t, err) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_querier_federated_tenants_per_exemplar_query")) + require.Equal(t, test.expectedResult, result) + } + }) + } +} + func Test_filterAllTenantsAndMatchers(t *testing.T) { idLabelName := defaultTenantLabel diff --git a/pkg/querier/tenantfederation/merge_queryable_test.go b/pkg/querier/tenantfederation/merge_queryable_test.go index 4d76071b024..c69e5f2c3e3 100644 --- a/pkg/querier/tenantfederation/merge_queryable_test.go +++ b/pkg/querier/tenantfederation/merge_queryable_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/mocktracer" "github.com/prometheus/client_golang/prometheus" @@ -20,11 +21,16 @@ import ( "github.com/prometheus/prometheus/util/annotations" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/querier/series" + "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/spanlogger" + "github.com/cortexproject/cortex/pkg/util/test" ) const ( @@ -472,9 +478,6 @@ cortex_querier_federated_tenants_per_query_count 1 ) func TestMergeQueryable_Select(t *testing.T) { - // Set a multi tenant resolver. - tenant.WithDefaultResolver(tenant.NewMultiResolver()) - for _, scenario := range []selectScenario{ { mergeQueryableScenario: threeTenantsScenario, @@ -653,51 +656,84 @@ func TestMergeQueryable_Select(t *testing.T) { } { scenario := scenario t.Run(scenario.name, func(t *testing.T) { - for _, tc := range scenario.selectTestCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - querier, reg, err := scenario.init() - require.NoError(t, err) - - // inject tenants into context - ctx := context.Background() - if len(scenario.tenants) > 0 { - ctx = user.InjectOrgID(ctx, strings.Join(scenario.tenants, "|")) - } + for _, useRegexResolver := range []bool{true, false} { + for _, tc := range scenario.selectTestCases { + tc := tc + t.Run(fmt.Sprintf("%s, useRegexResolver: %v", tc.name, useRegexResolver), func(t *testing.T) { + ctx := context.Background() + if useRegexResolver { + reg := prometheus.NewRegistry() + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", scenario.tenants, nil) + bucketClient.MockIter("__markers__", []string{}, nil) + + for _, tenant := range scenario.tenants { + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(tenant), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath(tenant), false, nil) + } + + bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { + return bucketClient, nil + } + + usersScannerConfig := cortex_tsdb.UsersScannerConfig{Strategy: cortex_tsdb.UserScanStrategyList} + + regexResolver, err := NewRegexResolver(usersScannerConfig, reg, bucketClientFactory, time.Second, log.NewNopLogger()) + require.NoError(t, err) + + // set a regex tenant resolver + tenant.WithDefaultResolver(regexResolver) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), regexResolver)) + + // wait update knownUsers + test.Poll(t, time.Second*10, true, func() interface{} { + return testutil.ToFloat64(regexResolver.lastUpdateUserRun) > 0 && testutil.ToFloat64(regexResolver.discoveredUsers) == float64(len(scenario.tenants)) + }) + + ctx = user.InjectOrgID(ctx, "team-.+") + } else { + // Set a multi tenant resolver. + tenant.WithDefaultResolver(tenant.NewMultiResolver()) + + // inject tenants into context + if len(scenario.tenants) > 0 { + ctx = user.InjectOrgID(ctx, strings.Join(scenario.tenants, "|")) + } + } - seriesSet := querier.Select(ctx, true, &storage.SelectHints{Start: mint, End: maxt}, tc.matchers...) + querier, reg, err := scenario.init() + require.NoError(t, err) - if tc.expectedQueryErr != nil { - require.EqualError(t, seriesSet.Err(), tc.expectedQueryErr.Error()) - } else { - require.NoError(t, seriesSet.Err()) - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(tc.expectedMetrics), "cortex_querier_federated_tenants_per_query")) - assertEqualWarnings(t, tc.expectedWarnings, seriesSet.Warnings()) - } + seriesSet := querier.Select(ctx, true, &storage.SelectHints{Start: mint, End: maxt}, tc.matchers...) - if tc.expectedLabels != nil { - require.Equal(t, len(tc.expectedLabels), tc.expectedSeriesCount) - } + if tc.expectedQueryErr != nil { + require.EqualError(t, seriesSet.Err(), tc.expectedQueryErr.Error()) + } else { + require.NoError(t, seriesSet.Err()) + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(tc.expectedMetrics), "cortex_querier_federated_tenants_per_query")) + assertEqualWarnings(t, tc.expectedWarnings, seriesSet.Warnings()) + } - count := 0 - for i := 0; seriesSet.Next(); i++ { - count++ if tc.expectedLabels != nil { - require.Equal(t, tc.expectedLabels[i], seriesSet.At().Labels(), fmt.Sprintf("labels index: %d", i)) + require.Equal(t, len(tc.expectedLabels), tc.expectedSeriesCount) } - } - require.Equal(t, tc.expectedSeriesCount, count) - }) + + count := 0 + for i := 0; seriesSet.Next(); i++ { + count++ + if tc.expectedLabels != nil { + require.Equal(t, tc.expectedLabels[i], seriesSet.At().Labels(), fmt.Sprintf("labels index: %d", i)) + } + } + require.Equal(t, tc.expectedSeriesCount, count) + }) + } } }) } } func TestMergeQueryable_LabelNames(t *testing.T) { - // set a multi tenant resolver - tenant.WithDefaultResolver(tenant.NewMultiResolver()) - for _, scenario := range []labelNamesScenario{ { mergeQueryableScenario: singleTenantScenario, @@ -822,38 +858,69 @@ func TestMergeQueryable_LabelNames(t *testing.T) { }, } { scenario := scenario - t.Run(scenario.mergeQueryableScenario.name, func(t *testing.T) { - t.Parallel() - querier, reg, err := scenario.init() - require.NoError(t, err) - - // inject tenants into context - ctx := context.Background() - if len(scenario.tenants) > 0 { - ctx = user.InjectOrgID(ctx, strings.Join(scenario.tenants, "|")) - } + for _, useRegexResolver := range []bool{true, false} { + t.Run(fmt.Sprintf("%s, useRegexResolver: %v", scenario.mergeQueryableScenario.name, useRegexResolver), func(t *testing.T) { + ctx := context.Background() + if useRegexResolver { + reg := prometheus.NewRegistry() + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", scenario.tenants, nil) + bucketClient.MockIter("__markers__", []string{}, nil) + + for _, tenant := range scenario.tenants { + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(tenant), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath(tenant), false, nil) + } - t.Run(scenario.labelNamesTestCase.name, func(t *testing.T) { - t.Parallel() - labelNames, warnings, err := querier.LabelNames(ctx, nil, scenario.matchers...) - if scenario.expectedQueryErr != nil { - require.EqualError(t, err, scenario.expectedQueryErr.Error()) - } else { + bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { + return bucketClient, nil + } + usersScannerConfig := cortex_tsdb.UsersScannerConfig{Strategy: cortex_tsdb.UserScanStrategyList} + + regexResolver, err := NewRegexResolver(usersScannerConfig, reg, bucketClientFactory, time.Second, log.NewNopLogger()) require.NoError(t, err) - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(scenario.expectedMetrics), "cortex_querier_federated_tenants_per_query")) - assert.Equal(t, scenario.expectedLabelNames, labelNames) - assertEqualWarnings(t, scenario.expectedWarnings, warnings) + + // set a regex tenant resolver + tenant.WithDefaultResolver(regexResolver) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), regexResolver)) + + // wait update knownUsers + test.Poll(t, time.Second*10, true, func() interface{} { + return testutil.ToFloat64(regexResolver.lastUpdateUserRun) > 0 && testutil.ToFloat64(regexResolver.discoveredUsers) == float64(len(scenario.tenants)) + }) + + ctx = user.InjectOrgID(ctx, "team-.+") + } else { + // Set a multi tenant resolver. + tenant.WithDefaultResolver(tenant.NewMultiResolver()) + + // inject tenants into context + if len(scenario.tenants) > 0 { + ctx = user.InjectOrgID(ctx, strings.Join(scenario.tenants, "|")) + } } + + querier, reg, err := scenario.init() + require.NoError(t, err) + + t.Run(scenario.labelNamesTestCase.name, func(t *testing.T) { + t.Parallel() + labelNames, warnings, err := querier.LabelNames(ctx, nil, scenario.matchers...) + if scenario.expectedQueryErr != nil { + require.EqualError(t, err, scenario.expectedQueryErr.Error()) + } else { + require.NoError(t, err) + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(scenario.expectedMetrics), "cortex_querier_federated_tenants_per_query")) + assert.Equal(t, scenario.expectedLabelNames, labelNames) + assertEqualWarnings(t, scenario.expectedWarnings, warnings) + } + }) }) - }) + } } } func TestMergeQueryable_LabelValues(t *testing.T) { - t.Parallel() - // set a multi tenant resolver - tenant.WithDefaultResolver(tenant.NewMultiResolver()) - for _, scenario := range []labelValuesScenario{ { mergeQueryableScenario: singleTenantScenario, @@ -1028,29 +1095,63 @@ func TestMergeQueryable_LabelValues(t *testing.T) { } { scenario := scenario t.Run(scenario.name, func(t *testing.T) { - for _, tc := range scenario.labelValuesTestCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - querier, reg, err := scenario.init() - require.NoError(t, err) - - // inject tenants into context - ctx := context.Background() - if len(scenario.tenants) > 0 { - ctx = user.InjectOrgID(ctx, strings.Join(scenario.tenants, "|")) - } + for _, useRegexResolver := range []bool{true, false} { + for _, tc := range scenario.labelValuesTestCases { + t.Run(fmt.Sprintf("%s, useRegexResolver: %v", tc.name, useRegexResolver), func(t *testing.T) { + ctx := context.Background() + if useRegexResolver { + reg := prometheus.NewRegistry() + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", scenario.tenants, nil) + bucketClient.MockIter("__markers__", []string{}, nil) + + for _, tenant := range scenario.tenants { + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(tenant), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath(tenant), false, nil) + } + + bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { + return bucketClient, nil + } + usersScannerConfig := cortex_tsdb.UsersScannerConfig{Strategy: cortex_tsdb.UserScanStrategyList} + + regexResolver, err := NewRegexResolver(usersScannerConfig, reg, bucketClientFactory, time.Second, log.NewNopLogger()) + require.NoError(t, err) + + // set a regex tenant resolver + tenant.WithDefaultResolver(regexResolver) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), regexResolver)) + + // wait update knownUsers + test.Poll(t, time.Second*10, true, func() interface{} { + return testutil.ToFloat64(regexResolver.lastUpdateUserRun) > 0 && testutil.ToFloat64(regexResolver.discoveredUsers) == float64(len(scenario.tenants)) + }) + + ctx = user.InjectOrgID(ctx, "team-.+") + } else { + // Set a multi tenant resolver. + tenant.WithDefaultResolver(tenant.NewMultiResolver()) + + // inject tenants into context + if len(scenario.tenants) > 0 { + ctx = user.InjectOrgID(ctx, strings.Join(scenario.tenants, "|")) + } + } - actLabelValues, warnings, err := querier.LabelValues(ctx, tc.labelName, nil, tc.matchers...) - if tc.expectedQueryErr != nil { - require.EqualError(t, err, tc.expectedQueryErr.Error()) - } else { + querier, reg, err := scenario.init() require.NoError(t, err) - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(tc.expectedMetrics), "cortex_querier_federated_tenants_per_query")) - assert.Equal(t, tc.expectedLabelValues, actLabelValues, fmt.Sprintf("unexpected values for label '%s'", tc.labelName)) - assertEqualWarnings(t, tc.expectedWarnings, warnings) - } - }) + + actLabelValues, warnings, err := querier.LabelValues(ctx, tc.labelName, nil, tc.matchers...) + if tc.expectedQueryErr != nil { + require.EqualError(t, err, tc.expectedQueryErr.Error()) + } else { + require.NoError(t, err) + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(tc.expectedMetrics), "cortex_querier_federated_tenants_per_query")) + assert.Equal(t, tc.expectedLabelValues, actLabelValues, fmt.Sprintf("unexpected values for label '%s'", tc.labelName)) + assertEqualWarnings(t, tc.expectedWarnings, warnings) + } + }) + } } }) } diff --git a/pkg/querier/tenantfederation/metadata_merge_querier_test.go b/pkg/querier/tenantfederation/metadata_merge_querier_test.go index a9a93147338..0f930d57dca 100644 --- a/pkg/querier/tenantfederation/metadata_merge_querier_test.go +++ b/pkg/querier/tenantfederation/metadata_merge_querier_test.go @@ -5,15 +5,22 @@ import ( "fmt" "strings" "testing" + "time" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/scrape" "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/test" ) var ( @@ -145,3 +152,86 @@ func Test_mergeMetadataQuerier_MetricsMetadata(t *testing.T) { }) } } + +func Test_mergeMetadataQuerier_MetricsMetadata_WhenUseRegexResolver(t *testing.T) { + // set a regex tenant resolver + reg := prometheus.NewRegistry() + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-2"), false, nil) + + bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { + return bucketClient, nil + } + + usersScannerConfig := cortex_tsdb.UsersScannerConfig{Strategy: cortex_tsdb.UserScanStrategyList} + regexResolver, err := NewRegexResolver(usersScannerConfig, reg, bucketClientFactory, time.Second, log.NewNopLogger()) + require.NoError(t, err) + tenant.WithDefaultResolver(regexResolver) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), regexResolver)) + + // wait update knownUsers + test.Poll(t, time.Second*10, true, func() interface{} { + return testutil.ToFloat64(regexResolver.lastUpdateUserRun) > 0 && testutil.ToFloat64(regexResolver.discoveredUsers) == 2 + }) + + tests := []struct { + name string + tenantIdToMetadata map[string][]scrape.MetricMetadata + orgId string + expectedResults []scrape.MetricMetadata + expectedMetrics string + }{ + { + name: "single tenant", + tenantIdToMetadata: map[string][]scrape.MetricMetadata{ + "user-1": { + {MetricFamily: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, + }, + }, + orgId: "user-1", + expectedResults: []scrape.MetricMetadata{ + {MetricFamily: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, + }, + expectedMetrics: expectedSingleTenantsMetadataMetrics, + }, + { + name: "should be merged two tenants results", + tenantIdToMetadata: map[string][]scrape.MetricMetadata{ + "user-1": { + {MetricFamily: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, + }, + "user-2": { + {MetricFamily: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""}, + {MetricFamily: "metadata3", Help: "metadata3 help", Type: "gauge", Unit: ""}, + }, + }, + orgId: "user-.+", + expectedResults: []scrape.MetricMetadata{ + {MetricFamily: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, + {MetricFamily: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""}, + {MetricFamily: "metadata3", Help: "metadata3 help", Type: "gauge", Unit: ""}, + }, + expectedMetrics: expectedTwoTenantsMetadataMetrics, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + reg = prometheus.NewPedanticRegistry() + upstream := mockMetadataQuerier{ + tenantIdToMetadata: test.tenantIdToMetadata, + } + + mergeMetadataQuerier := NewMetadataQuerier(&upstream, defaultMaxConcurrency, reg) + metadata, err := mergeMetadataQuerier.MetricsMetadata(user.InjectOrgID(context.Background(), test.orgId), &client.MetricsMetadataRequest{Limit: -1, LimitPerMetric: -1, Metric: ""}) + require.NoError(t, err) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_querier_federated_tenants_per_metadata_query")) + require.Equal(t, test.expectedResults, metadata) + }) + } +} diff --git a/pkg/querier/tenantfederation/regex_resolver.go b/pkg/querier/tenantfederation/regex_resolver.go new file mode 100644 index 00000000000..a17f57c0474 --- /dev/null +++ b/pkg/querier/tenantfederation/regex_resolver.go @@ -0,0 +1,197 @@ +package tenantfederation + +import ( + "context" + "sort" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/users" + "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util/services" +) + +var ( + errInvalidRegex = errors.New("invalid regex present") +) + +// RegexResolver resolves tenantIDs matched given regex. +type RegexResolver struct { + services.Service + + knownUsers []string + userSyncInterval time.Duration + userScanner users.Scanner + logger log.Logger + sync.Mutex + + // lastUpdateUserRun stores the timestamps of the latest update user loop run + lastUpdateUserRun prometheus.Gauge + // discoveredUsers stores the number of discovered user + discoveredUsers prometheus.Gauge +} + +func NewRegexResolver(cfg tsdb.UsersScannerConfig, reg prometheus.Registerer, bucketClientFactory func(ctx context.Context) (objstore.InstrumentedBucket, error), userSyncInterval time.Duration, logger log.Logger) (*RegexResolver, error) { + bucketClient, err := bucketClientFactory(context.Background()) + if err != nil { + return nil, errors.Wrap(err, "failed to create the bucket client") + } + + userScanner, err := users.NewScanner(cfg, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "regex-resolver"}, reg)) + if err != nil { + return nil, errors.Wrap(err, "failed to create users scanner") + } + + r := &RegexResolver{ + userSyncInterval: userSyncInterval, + userScanner: userScanner, + logger: logger, + } + + r.lastUpdateUserRun = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_regex_resolver_last_update_run_timestamp_seconds", + Help: "Unix timestamp of the last successful regex resolver update user run.", + }) + r.discoveredUsers = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_regex_resolver_discovered_users", + Help: "Number of discovered users.", + }) + + r.Service = services.NewBasicService(nil, r.running, nil) + + return r, nil +} + +func (r *RegexResolver) running(ctx context.Context) error { + level.Info(r.logger).Log("msg", "regex-resolver started") + ticker := time.NewTicker(r.userSyncInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + // active and deleting users are considered + // The store-gateway can query for deleting users. + active, deleting, _, err := r.userScanner.ScanUsers(ctx) + if err != nil { + level.Error(r.logger).Log("msg", "failed to discover users from bucket", "err", err) + } + + r.Lock() + r.knownUsers = append(active, deleting...) + // We keep it sort + sort.Strings(r.knownUsers) + r.Unlock() + r.lastUpdateUserRun.SetToCurrentTime() + r.discoveredUsers.Set(float64(len(active) + len(deleting))) + } + } +} + +func (r *RegexResolver) TenantID(ctx context.Context) (string, error) { + orgIDs, err := r.TenantIDs(ctx) + if err != nil { + return "", err + } + + if len(orgIDs) > 1 { + return "", user.ErrTooManyOrgIDs + } + + return orgIDs[0], nil +} + +func (r *RegexResolver) TenantIDs(ctx context.Context) ([]string, error) { + //lint:ignore faillint wrapper around upstream method + orgID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + + orgIDs, err := r.getRegexMatchedOrgIds(orgID) + if err != nil { + return nil, err + } + + return tenant.ValidateOrgIDs(orgIDs) +} + +func (r *RegexResolver) getRegexMatchedOrgIds(orgID string) ([]string, error) { + var matched []string + + // Use the Prometheus FastRegexMatcher + m, err := labels.NewFastRegexMatcher(orgID) + if err != nil { + return nil, errInvalidRegex + } + + r.Lock() + defer r.Unlock() + for _, id := range r.knownUsers { + if m.MatchString(id) { + matched = append(matched, id) + } + } + + if len(matched) == 0 { + if err := tenant.ValidTenantID(orgID); err == nil { + // when querying for a newly created orgID, the query may not + // work because it has not been uploaded to object storage. + // To make the query work (not breaking existing behavior), + // paas the orgID if it is valid. + return []string{orgID}, nil + } + + // when the entered regex is an invalid tenantID, + // set the `fake` to `X-Scope-OrgID`. + return []string{"fake"}, nil + } + + return matched, nil +} + +// RegexValidator used to pass a regex orgID to the querier. +// Using an existing tenant resolver could emit an errTenantIDUnsupportedCharacter +// since the regex would contain unsupported characters like a `+`. +type RegexValidator struct{} + +func NewRegexValidator() *RegexValidator { + return &RegexValidator{} +} + +func (r *RegexValidator) TenantID(ctx context.Context) (string, error) { + //lint:ignore faillint wrapper around upstream method + id, err := user.ExtractOrgID(ctx) + if err != nil { + return "", err + } + + _, err = labels.NewFastRegexMatcher(id) + if err != nil { + return "", errInvalidRegex + } + + return id, nil +} + +func (r *RegexValidator) TenantIDs(ctx context.Context) ([]string, error) { + orgID, err := r.TenantID(ctx) + if err != nil { + return nil, err + } + + return []string{orgID}, nil +} diff --git a/pkg/querier/tenantfederation/regex_resolver_test.go b/pkg/querier/tenantfederation/regex_resolver_test.go new file mode 100644 index 00000000000..6e759e99837 --- /dev/null +++ b/pkg/querier/tenantfederation/regex_resolver_test.go @@ -0,0 +1,133 @@ +package tenantfederation + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/test" +) + +func Test_RegexResolver(t *testing.T) { + tests := []struct { + description string + existingTenants []string + orgID string + expectedErr error + expectedOrgIDs []string + }{ + { + description: "invalid regex", + existingTenants: []string{}, + orgID: "[a-z", + expectedErr: errInvalidRegex, + }, + { + description: "no matched tenantID", + existingTenants: []string{"user-1", "user-2", "user-3"}, + orgID: "user-[4-6]", + expectedOrgIDs: []string{"fake"}, + }, + { + description: "use tenantIDsLabelSeparator", + existingTenants: []string{"user-1", "user-2", "user-3"}, + orgID: "user-1|user-2|user-3", + expectedOrgIDs: []string{"user-1", "user-2", "user-3"}, + }, + { + description: "use regex", + existingTenants: []string{"user-1", "user-2", "user-3"}, + orgID: "user-.+", + expectedOrgIDs: []string{"user-1", "user-2", "user-3"}, + }, + { + description: "newly created tenant", + existingTenants: []string{}, + orgID: "user-1", + expectedOrgIDs: []string{"user-1"}, + }, + { + description: "user-2 hasn't been uploaded yet", + existingTenants: []string{"user-1"}, + orgID: "user-1|user-2", + expectedOrgIDs: []string{"user-1"}, + }, + } + + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + reg := prometheus.NewRegistry() + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", tc.existingTenants, nil) + bucketClient.MockIter("__markers__", []string{}, nil) + for _, tenant := range tc.existingTenants { + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(tenant), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath(tenant), false, nil) + } + + bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) { + return bucketClient, nil + } + + usersScannerConfig := cortex_tsdb.UsersScannerConfig{Strategy: cortex_tsdb.UserScanStrategyList} + regexResolver, err := NewRegexResolver(usersScannerConfig, reg, bucketClientFactory, time.Second, log.NewNopLogger()) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), regexResolver)) + + // wait update knownUsers + test.Poll(t, time.Second*10, true, func() interface{} { + return testutil.ToFloat64(regexResolver.lastUpdateUserRun) > 0 && testutil.ToFloat64(regexResolver.discoveredUsers) == float64(len(tc.existingTenants)) + }) + + // set regexOrgID + ctx := context.Background() + ctx = user.InjectOrgID(ctx, tc.orgID) + orgIDs, err := regexResolver.TenantIDs(ctx) + + if tc.expectedErr != nil { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tc.expectedOrgIDs, orgIDs) + } + }) + } +} + +func Test_RegexValidator(t *testing.T) { + tests := []struct { + description string + orgID string + expectedErr error + }{ + { + description: "valid regex", + orgID: "user-.*", + expectedErr: nil, + }, + { + description: "invalid regex", + orgID: "[a-z", + expectedErr: errInvalidRegex, + }, + } + + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + regexValidator := NewRegexValidator() + ctx := user.InjectOrgID(context.Background(), tc.orgID) + _, err := regexValidator.TenantID(ctx) + require.Equal(t, tc.expectedErr, err) + }) + } +} diff --git a/pkg/querier/tenantfederation/tenant_federation.go b/pkg/querier/tenantfederation/tenant_federation.go index 4b161ab7328..9023f3b4b1e 100644 --- a/pkg/querier/tenantfederation/tenant_federation.go +++ b/pkg/querier/tenantfederation/tenant_federation.go @@ -2,6 +2,7 @@ package tenantfederation import ( "flag" + "time" ) type Config struct { @@ -11,10 +12,16 @@ type Config struct { MaxConcurrent int `yaml:"max_concurrent"` // MaxTenant A maximum number of tenants to query at once. MaxTenant int `yaml:"max_tenant"` + // RegexMatcherEnabled If true, the `X-Scope-OrgID` header can accept a regex, matched tenantIDs are automatically involved. + RegexMatcherEnabled bool `yaml:"regex_matcher_enabled"` + // UserSyncInterval How frequently to scan users, scanned users are used to calculate matched tenantIDs if the regex matcher is enabled. + UserSyncInterval time.Duration `yaml:"user_sync_interval"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.Enabled, "tenant-federation.enabled", false, "If enabled on all Cortex services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` header (experimental).") f.IntVar(&cfg.MaxConcurrent, "tenant-federation.max-concurrent", defaultMaxConcurrency, "The number of workers used to process each federated query.") f.IntVar(&cfg.MaxTenant, "tenant-federation.max-tenant", 0, "A maximum number of tenants to query at once. 0 means no limit.") + f.BoolVar(&cfg.RegexMatcherEnabled, "tenant-federation.regex-matcher-enabled", false, "[Experimental] If enabled, the `X-Scope-OrgID` header value can accept a regex and the matched tenantIDs are automatically involved. The regex matching rule follows the Prometheus, see the detail: https://prometheus.io/docs/prometheus/latest/querying/basics/#regular-expressions. The user discovery is based on scanning block storage, so new users can get queries after uploading a block (generally 2h).") + f.DurationVar(&cfg.UserSyncInterval, "tenant-federation.user-sync-interval", time.Minute*5, "[Experimental] If the regex matcher is enabled, it specifies how frequently to scan users. The scanned users are used to calculate matched tenantIDs. The scanning strategy depends on the `-blocks-storage.users-scanner.strategy`.") } diff --git a/pkg/tenant/resolver.go b/pkg/tenant/resolver.go index 48e40fd5ec7..3505030c674 100644 --- a/pkg/tenant/resolver.go +++ b/pkg/tenant/resolver.go @@ -123,6 +123,11 @@ func (t *MultiResolver) TenantIDs(ctx context.Context) ([]string, error) { } orgIDs := strings.Split(orgID, tenantIDsLabelSeparator) + + return ValidateOrgIDs(orgIDs) +} + +func ValidateOrgIDs(orgIDs []string) ([]string, error) { for _, orgID := range orgIDs { if err := ValidTenantID(orgID); err != nil { return nil, err diff --git a/pkg/tenant/tenant.go b/pkg/tenant/tenant.go index 9d9a7f4aad6..b38b50c742f 100644 --- a/pkg/tenant/tenant.go +++ b/pkg/tenant/tenant.go @@ -34,7 +34,7 @@ func (e *errTenantIDUnsupportedCharacter) Error() string { const tenantIDsLabelSeparator = "|" -// NormalizeTenantIDs is creating a normalized form by sortiing and de-duplicating the list of tenantIDs +// NormalizeTenantIDs is creating a normalized form by sorting and de-duplicating the list of tenantIDs func NormalizeTenantIDs(tenantIDs []string) []string { sort.Strings(tenantIDs)