diff --git a/CHANGELOG.md b/CHANGELOG.md index bf78a57f30e..37e2c51eb9a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,7 @@ * [ENHANCEMENT] Ingester: Improved the performance of active series custom trackers matchers. #12663 * [ENHANCEMENT] Compactor: Log sizes of downloaded and uploaded blocks. #12656 * [ENHANCEMENT] Ingester: Add `cortex_ingest_storage_reader_receive_and_consume_delay_seconds` metric tracking the time between when a write request is received in the distributor and its content is ingested in ingesters, when the ingest storage is enabled. #12751 +* [ENHANCEMENT] Distributor: Add experimental configuration option `-distributor.sharding.exclude-classic-histogram-bucket-label` to compute the series hash – used for sharding – excluding the 'le' label. When this configuration option is set to true, all buckets of a classic histogram metric are stored in the same shard. #12815 * [ENHANCEMENT] Ruler: Add `ruler_evaluation_consistency_max_delay` per-tenant configuration option support, to specify the maximum tolerated ingestion delay for eventually consistent rule evaluations. This feature is used only when ingest storage is enabled. By default, no maximum delay is enforced. #12751 * [BUGFIX] Distributor: Calculate `WriteResponseStats` before validation and `PushWrappers`. This prevents clients using Remote-Write 2.0 from seeing a diff in written samples, histograms and exemplars. #12682 * [BUGFIX] Compactor: Fix cortex_compactor_block_uploads_failed_total metric showing type="unknown". #12477 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index ebf37352e83..84b41d364f5 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -1794,6 +1794,27 @@ "fieldValue": null, "fieldDefaultValue": null }, + { + "kind": "block", + "name": "sharding", + "required": false, + "desc": "", + "blockEntries": [ + { + "kind": "field", + "name": "exclude_classic_histogram_bucket_label", + "required": false, + "desc": "When set to true, the distributor computes the series hash – used for sharding – excluding the 'le' label. This means that, when this configuration option is set to true, all buckets of a classic histogram metric are stored in the same shard. This configuration option should be set for distributors, rulers and ingesters.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "distributor.sharding.exclude-classic-histogram-bucket-label", + "fieldType": "boolean", + "fieldCategory": "experimental" + } + ], + "fieldValue": null, + "fieldDefaultValue": null + }, { "kind": "field", "name": "write_requests_buffer_pooling_enabled", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 9fc4087075c..51df5480a12 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1373,6 +1373,8 @@ Usage of ./cmd/mimir/mimir: Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi. (default "memberlist") -distributor.service-overload-status-code-on-rate-limit-enabled [experimental] If enabled, rate limit errors will be reported to the client with HTTP status code 529 (Service is overloaded). If disabled, status code 429 (Too Many Requests) is used. Enabling -distributor.retry-after-header.enabled before utilizing this option is strongly recommended as it helps prevent premature request retries by the client. + -distributor.sharding.exclude-classic-histogram-bucket-label + [experimental] When set to true, the distributor computes the series hash – used for sharding – excluding the 'le' label. This means that, when this configuration option is set to true, all buckets of a classic histogram metric are stored in the same shard. This configuration option should be set for distributors, rulers and ingesters. -distributor.usage-tracker-client.grpc-client-config.backoff-max-period duration Maximum delay when backing off. (default 10s) -distributor.usage-tracker-client.grpc-client-config.backoff-min-period duration diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 51ea8fa2005..cacab323830 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -2003,6 +2003,15 @@ instance_limits: # CLI flag: -distributor.instance-limits.max-inflight-push-requests-bytes [max_inflight_push_requests_bytes: | default = 0] +sharding: + # (experimental) When set to true, the distributor computes the series hash – + # used for sharding – excluding the 'le' label. This means that, when this + # configuration option is set to true, all buckets of a classic histogram + # metric are stored in the same shard. This configuration option should be set + # for distributors, rulers and ingesters. + # CLI flag: -distributor.sharding.exclude-classic-histogram-bucket-label + [exclude_classic_histogram_bucket_label: | default = false] + # (experimental) Enable pooling of buffers used for marshaling write requests. # CLI flag: -distributor.write-requests-buffer-pooling-enabled [write_requests_buffer_pooling_enabled: | default = true] diff --git a/operations/mimir/mimir-flags-defaults.json b/operations/mimir/mimir-flags-defaults.json index 9fec27ffaf2..42f8d314666 100644 --- a/operations/mimir/mimir-flags-defaults.json +++ b/operations/mimir/mimir-flags-defaults.json @@ -379,6 +379,7 @@ "validation.max-cost-attribution-cardinality": 2000, "validation.cost-attribution-cooldown": 0, "ruler.evaluation-delay-duration": 60000000000, + "ruler.evaluation-consistency-max-delay": 0, "ruler.tenant-shard-size": 0, "ruler.max-rules-per-rule-group": 20, "ruler.max-rule-groups-per-tenant": 70, diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 961a599d5a6..b0267fd26f0 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -259,6 +259,9 @@ type Config struct { DefaultLimits InstanceLimits `yaml:"instance_limits"` InstanceLimitsFn func() *InstanceLimits `yaml:"-"` + // ShardingConfig is the configuration used by distributors to shard series across ingesters / partitions. + ShardingConfig mimirpb.ShardingConfig `yaml:"sharding"` + // This allows downstream projects to wrap the distributor push function // and access the deserialized write requests before/after they are pushed. // These functions will only receive samples that don't get dropped by HA deduplication. @@ -294,6 +297,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { cfg.DistributorRing.RegisterFlags(f, logger) cfg.RetryConfig.RegisterFlags(f) cfg.UsageTrackerClient.RegisterFlagsWithPrefix("distributor.usage-tracker-client.", f) + cfg.ShardingConfig.RegisterFlags(f) f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected.") f.IntVar(&cfg.MaxOTLPRequestSize, maxOTLPRequestSizeFlag, 100<<20, "Maximum OTLP request size in bytes that the distributors accept. Requests exceeding this limit are rejected.") @@ -1876,7 +1880,7 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error { } // Get both series and metadata keys in one slice. - keys, initialMetadataIndex := getSeriesAndMetadataTokens(userID, req) + keys, initialMetadataIndex := getSeriesAndMetadataTokens(userID, req, d.cfg.ShardingConfig) var ( ingestersSubring ring.DoBatchRing @@ -2072,8 +2076,8 @@ func (d *Distributor) sendWriteRequestToPartitions(ctx context.Context, tenantID // getSeriesAndMetadataTokens returns a slice of tokens for the series and metadata from the request in this specific order. // Metadata tokens start at initialMetadataIndex. -func getSeriesAndMetadataTokens(userID string, req *mimirpb.WriteRequest) (keys []uint32, initialMetadataIndex int) { - seriesKeys := getTokensForSeries(userID, req.Timeseries) +func getSeriesAndMetadataTokens(userID string, req *mimirpb.WriteRequest, cfg mimirpb.ShardingConfig) (keys []uint32, initialMetadataIndex int) { + seriesKeys := getTokensForSeries(userID, req.Timeseries, cfg) metadataKeys := getTokensForMetadata(userID, req.Metadata) // All tokens, stored in order: series, metadata. @@ -2084,14 +2088,14 @@ func getSeriesAndMetadataTokens(userID string, req *mimirpb.WriteRequest) (keys return keys, initialMetadataIndex } -func getTokensForSeries(userID string, series []mimirpb.PreallocTimeseries) []uint32 { +func getTokensForSeries(userID string, series []mimirpb.PreallocTimeseries, cfg mimirpb.ShardingConfig) []uint32 { if len(series) == 0 { return nil } result := make([]uint32, 0, len(series)) for _, ts := range series { - result = append(result, tokenForLabels(userID, ts.Labels)) + result = append(result, tokenForLabels(userID, ts.Labels, cfg)) } return result } @@ -2108,8 +2112,8 @@ func getTokensForMetadata(userID string, metadata []*mimirpb.MetricMetadata) []u return metadataKeys } -func tokenForLabels(userID string, labels []mimirpb.LabelAdapter) uint32 { - return mimirpb.ShardByAllLabelAdapters(userID, labels) +func tokenForLabels(userID string, labels []mimirpb.LabelAdapter, cfg mimirpb.ShardingConfig) uint32 { + return mimirpb.ShardBySeriesLabelAdapters(userID, labels, cfg) } func tokenForMetadata(userID string, metricName string) uint32 { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 1e971882560..ef820de7535 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -6526,7 +6526,7 @@ func (i *mockIngester) Push(ctx context.Context, req *mimirpb.WriteRequest, _ .. return nil, err } - hash := mimirpb.ShardByAllLabelAdapters(orgid, series.Labels) + hash := mimirpb.ShardBySeriesLabelAdapters(orgid, series.Labels, mimirpb.ShardingConfig{}) existing, ok := i.timeseries[hash] if !ok { i.timeseries[hash] = &series @@ -8083,7 +8083,7 @@ func TestSeriesAreShardedToCorrectIngesters(t *testing.T) { totalMetadata += len(ing.metadata) for _, ts := range ing.timeseries { - token := tokenForLabels(userName, ts.Labels) + token := tokenForLabels(userName, ts.Labels, distrib.cfg.ShardingConfig) ingIx := getIngesterIndexForToken(token, ingesters) assert.Equal(t, ix, ingIx) } @@ -8099,7 +8099,7 @@ func TestSeriesAreShardedToCorrectIngesters(t *testing.T) { // Verify that all timeseries were forwarded to ingesters. for _, ts := range req.Timeseries { - token := tokenForLabels(userName, ts.Labels) + token := tokenForLabels(userName, ts.Labels, distrib.cfg.ShardingConfig) ingIx := getIngesterIndexForToken(token, ingesters) assert.Equal(t, ts.Labels, ingesters[ingIx].timeseries[token].Labels) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index c3700c793f4..afef855aa3d 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -190,6 +190,9 @@ type Config struct { DefaultLimits InstanceLimits `yaml:"instance_limits"` InstanceLimitsFn func() *InstanceLimits `yaml:"-"` + // DistributorShardingConfig is the configuration used by distributors to shard series across ingesters / partitions. + DistributorShardingConfig mimirpb.ShardingConfig `yaml:"-"` + IgnoreSeriesLimitForMetricNames string `yaml:"ignore_series_limit_for_metric_names" category:"advanced"` ReadPathCPUUtilizationLimit float64 `yaml:"read_path_cpu_utilization_limit" category:"experimental"` @@ -2878,7 +2881,7 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD ), PostingsClonerFactory: tsdb.DefaultPostingsClonerFactory{}, EnableNativeHistograms: i.limits.NativeHistogramsIngestionEnabled(userID), - SecondaryHashFunction: secondaryTSDBHashFunctionForUser(userID), + SecondaryHashFunction: secondaryTSDBHashFunctionForUser(userID, i.cfg.DistributorShardingConfig), IndexLookupPlannerFunc: i.getIndexLookupPlannerFunc(userID), BlockChunkQuerierFunc: func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { return i.createBlockChunkQuerier(userID, b, mint, maxt) diff --git a/pkg/ingester/owned_series.go b/pkg/ingester/owned_series.go index 7a546716fee..47c311b6d34 100644 --- a/pkg/ingester/owned_series.go +++ b/pkg/ingester/owned_series.go @@ -246,9 +246,11 @@ func (oss *ownedSeriesService) updateTenant(userID string, db *userTSDB, ringCha return false } -func secondaryTSDBHashFunctionForUser(userID string) func(labels.Labels) uint32 { +func secondaryTSDBHashFunctionForUser(userID string, cfg mimirpb.ShardingConfig) func(labels.Labels) uint32 { return func(ls labels.Labels) uint32 { - return mimirpb.ShardByAllLabels(userID, ls) + // The TSDB secondary hash is just used to compute the series owned by the ingester, and it's not used + // by query sharding, so it's safe – and desired - to use the same sharding config used by distributors. + return mimirpb.ShardBySeriesLabels(userID, ls, cfg) } } diff --git a/pkg/ingester/owned_series_test.go b/pkg/ingester/owned_series_test.go index 7a77bc38687..ba7c58e5239 100644 --- a/pkg/ingester/owned_series_test.go +++ b/pkg/ingester/owned_series_test.go @@ -807,7 +807,7 @@ func generateSeriesWithTokensAt(testUser string, startTime time.Time) ([]util_te Samples: []util_test.Sample{{TS: startTime.Add(time.Duration(seriesIdx) * time.Millisecond).UnixMilli(), Val: float64(0)}}, } seriesToWrite = append(seriesToWrite, s) - seriesTokens = append(seriesTokens, mimirpb.ShardByAllLabels(testUser, s.Labels)) + seriesTokens = append(seriesTokens, mimirpb.ShardBySeriesLabels(testUser, s.Labels, mimirpb.ShardingConfig{})) } return seriesToWrite, seriesTokens } @@ -822,7 +822,7 @@ func generateSeriesWithTargetInfoAt(testUser string, startTime time.Time, target Samples: []util_test.Sample{{TS: startTime.Add(time.Duration(i) * time.Millisecond).UnixMilli(), Val: float64(1)}}, } seriesToWrite = append(seriesToWrite, s) - seriesTokens = append(seriesTokens, mimirpb.ShardByAllLabels(testUser, s.Labels)) + seriesTokens = append(seriesTokens, mimirpb.ShardBySeriesLabels(testUser, s.Labels, mimirpb.ShardingConfig{})) } return seriesToWrite, seriesTokens } diff --git a/pkg/mimirpb/series_sharding.go b/pkg/mimirpb/series_sharding.go index 2c0f6d72599..21cf031bc48 100644 --- a/pkg/mimirpb/series_sharding.go +++ b/pkg/mimirpb/series_sharding.go @@ -6,9 +6,19 @@ package mimirpb import ( + "flag" + "github.com/prometheus/prometheus/model/labels" ) +type ShardingConfig struct { + ExcludeClassicHistogramBucketLabel bool `yaml:"exclude_classic_histogram_bucket_label" category:"experimental"` +} + +func (l *ShardingConfig) RegisterFlags(f *flag.FlagSet) { + f.BoolVar(&l.ExcludeClassicHistogramBucketLabel, "distributor.sharding.exclude-classic-histogram-bucket-label", false, "When set to true, the distributor computes the series hash – used for sharding – excluding the 'le' label. This means that, when this configuration option is set to true, all buckets of a classic histogram metric are stored in the same shard. This configuration option should be set for distributors, rulers and ingesters.") +} + // ShardByMetricName returns the token for the given metric. The provided metricName // is guaranteed to not be retained. func ShardByMetricName(userID string, metricName string) uint32 { @@ -23,22 +33,30 @@ func ShardByUser(userID string) uint32 { return h } -// ShardByAllLabels returns the token for given user and series. +// ShardBySeriesLabels returns the token that must be used to shard a series across ingesters / partitions for given user. // -// ShardByAllLabels generates different values for different order of same labels. -func ShardByAllLabels(userID string, ls labels.Labels) uint32 { +// ShardBySeriesLabels generates different values for different order of same labels. +func ShardBySeriesLabels(userID string, ls labels.Labels, cfg ShardingConfig) uint32 { h := ShardByUser(userID) ls.Range(func(l labels.Label) { + if cfg.ExcludeClassicHistogramBucketLabel && l.Name == labels.BucketLabel { + return + } + h = HashAdd32(h, l.Name) h = HashAdd32(h, l.Value) }) return h } -// ShardByAllLabelAdapters is like ShardByAllLabel, but uses LabelAdapter type. -func ShardByAllLabelAdapters(userID string, ls []LabelAdapter) uint32 { +// ShardBySeriesLabelAdapters is like ShardByAllLabel, but uses LabelAdapter type. +func ShardBySeriesLabelAdapters(userID string, ls []LabelAdapter, cfg ShardingConfig) uint32 { h := ShardByUser(userID) for _, l := range ls { + if cfg.ExcludeClassicHistogramBucketLabel && l.Name == labels.BucketLabel { + continue + } + h = HashAdd32(h, l.Name) h = HashAdd32(h, l.Value) } diff --git a/pkg/mimirpb/series_sharding_test.go b/pkg/mimirpb/series_sharding_test.go index 971079922b9..c6e95185581 100644 --- a/pkg/mimirpb/series_sharding_test.go +++ b/pkg/mimirpb/series_sharding_test.go @@ -8,22 +8,138 @@ package mimirpb import ( "testing" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" ) // This is not great, but we deal with unsorted labels in prePushRelabelMiddleware. -func TestShardByAllLabelAdaptersReturnsWrongResultsForUnsortedLabels(t *testing.T) { - val1 := ShardByAllLabelAdapters("test", []LabelAdapter{ +func TestShardBySeriesLabelAdaptersReturnsWrongResultsForUnsortedLabels(t *testing.T) { + val1 := ShardBySeriesLabelAdapters("test", []LabelAdapter{ {Name: "__name__", Value: "foo"}, {Name: "bar", Value: "baz"}, {Name: "sample", Value: "1"}, - }) + }, ShardingConfig{}) - val2 := ShardByAllLabelAdapters("test", []LabelAdapter{ + val2 := ShardBySeriesLabelAdapters("test", []LabelAdapter{ {Name: "__name__", Value: "foo"}, {Name: "sample", Value: "1"}, {Name: "bar", Value: "baz"}, - }) + }, ShardingConfig{}) assert.NotEqual(t, val1, val2) } + +func TestShardBySeriesLabelsAndShardBySeriesLabelAdapters(t *testing.T) { + userID := "test-user" + + // Same series with different "le" label values + series1 := labels.FromStrings( + "__name__", "http_request_duration_seconds_bucket", + "job", "prometheus", + "le", "0.1", + ) + series2 := labels.FromStrings( + "__name__", "http_request_duration_seconds_bucket", + "job", "prometheus", + "le", "0.5", + ) + + series1Adapter := FromLabelsToLabelAdapters(series1) + series2Adapter := FromLabelsToLabelAdapters(series2) + + t.Run("ExcludeClassicHistogramBucketLabel is disabled", func(t *testing.T) { + cfg := ShardingConfig{ExcludeClassicHistogramBucketLabel: false} + + series1Hash := ShardBySeriesLabels(userID, series1, cfg) + series2Hash := ShardBySeriesLabels(userID, series2, cfg) + assert.NotEqual(t, series1Hash, series2Hash) + + series1AdapterHash := ShardBySeriesLabelAdapters(userID, series1Adapter, cfg) + series2AdapterHash := ShardBySeriesLabelAdapters(userID, series2Adapter, cfg) + assert.NotEqual(t, series1AdapterHash, series2AdapterHash) + + assert.Equal(t, series1Hash, series1AdapterHash) + assert.Equal(t, series2Hash, series2AdapterHash) + }) + + t.Run("ExcludeClassicHistogramBucketLabel is enabled", func(t *testing.T) { + cfg := ShardingConfig{ExcludeClassicHistogramBucketLabel: true} + + series1Hash := ShardBySeriesLabels(userID, series1, cfg) + series2Hash := ShardBySeriesLabels(userID, series2, cfg) + assert.Equal(t, series1Hash, series2Hash) + + series1AdapterHash := ShardBySeriesLabelAdapters(userID, series1Adapter, cfg) + series2AdapterHash := ShardBySeriesLabelAdapters(userID, series2Adapter, cfg) + assert.Equal(t, series1AdapterHash, series2AdapterHash) + + assert.Equal(t, series1Hash, series1AdapterHash) + assert.Equal(t, series2Hash, series2AdapterHash) + }) +} + +func BenchmarkShardBySeriesLabels(b *testing.B) { + userID := "test-user" + series := labels.FromStrings( + "__name__", "http_requests_total", + "job", "prometheus", + "instance", "localhost:9090", + "method", "GET", + "handler", "/api/v1/query", + "status", "200", + "cluster", "prod", + "environment", "production", + "region", "us-west-2", + "az", "us-west-2a", + ) + + scenarios := map[string]ShardingConfig{ + "ExcludeClassicHistogramBucketLabel is disabled": { + ExcludeClassicHistogramBucketLabel: false, + }, + "ExcludeClassicHistogramBucketLabel is enabled": { + ExcludeClassicHistogramBucketLabel: true, + }, + } + + for scenarioName, cfg := range scenarios { + b.Run(scenarioName, func(b *testing.B) { + for i := 0; i < b.N; i++ { + ShardBySeriesLabels(userID, series, cfg) + } + }) + } +} + +func BenchmarkShardBySeriesLabelAdapters(b *testing.B) { + userID := "test-user" + series := []LabelAdapter{ + {Name: "__name__", Value: "http_requests_total"}, + {Name: "job", Value: "prometheus"}, + {Name: "instance", Value: "localhost:9090"}, + {Name: "method", Value: "GET"}, + {Name: "handler", Value: "/api/v1/query"}, + {Name: "status", Value: "200"}, + {Name: "cluster", Value: "prod"}, + {Name: "environment", Value: "production"}, + {Name: "region", Value: "us-west-2"}, + {Name: "az", Value: "us-west-2a"}, + } + + scenarios := map[string]ShardingConfig{ + "ExcludeClassicHistogramBucketLabel is disabled": { + ExcludeClassicHistogramBucketLabel: false, + }, + "ExcludeClassicHistogramBucketLabel is enabled": { + ExcludeClassicHistogramBucketLabel: true, + }, + } + + for scenarioName, cfg := range scenarios { + b.Run(scenarioName, func(b *testing.B) { + for i := 0; i < b.N; i++ { + ShardBySeriesLabelAdapters(userID, series, cfg) + } + }) + } +}