Skip to content

Commit 4bd5b2d

Browse files
committed
Add config option to exclude 'le' from series sharding to ingesters / partitions
Signed-off-by: Marco Pracucci <[email protected]>
1 parent bdb1045 commit 4bd5b2d

File tree

12 files changed

+202
-25
lines changed

12 files changed

+202
-25
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
* [ENHANCEMENT] Ingester: Improved the performance of active series custom trackers matchers. #12663
6565
* [ENHANCEMENT] Compactor: Log sizes of downloaded and uploaded blocks. #12656
6666
* [ENHANCEMENT] Ingester: Add `cortex_ingest_storage_reader_receive_and_consume_delay_seconds` metric tracking the time between when a write request is received in the distributor and its content is ingested in ingesters, when the ingest storage is enabled. #12751
67+
* [ENHANCEMENT] Distributor: Add experimental configuration option `-distributor.sharding.exclude-classic-histogram-bucket-label` to compute the series hash – used for sharding – excluding the 'le' label. When this configuration option is set to true, all buckets of a classic histogram metric are stored in the same shard. #12815
6768
* [ENHANCEMENT] Ruler: Add `ruler_evaluation_consistency_max_delay` per-tenant configuration option support, to specify the maximum tolerated ingestion delay for eventually consistent rule evaluations. This feature is used only when ingest storage is enabled. By default, no maximum delay is enforced. #12751
6869
* [BUGFIX] Distributor: Calculate `WriteResponseStats` before validation and `PushWrappers`. This prevents clients using Remote-Write 2.0 from seeing a diff in written samples, histograms and exemplars. #12682
6970
* [BUGFIX] Compactor: Fix cortex_compactor_block_uploads_failed_total metric showing type="unknown". #12477

cmd/mimir/config-descriptor.json

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1794,6 +1794,27 @@
17941794
"fieldValue": null,
17951795
"fieldDefaultValue": null
17961796
},
1797+
{
1798+
"kind": "block",
1799+
"name": "sharding",
1800+
"required": false,
1801+
"desc": "",
1802+
"blockEntries": [
1803+
{
1804+
"kind": "field",
1805+
"name": "exclude_classic_histogram_bucket_label",
1806+
"required": false,
1807+
"desc": "When set to true, the distributor computes the series hash – used for sharding – excluding the 'le' label. This means that, when this configuration option is set to true, all buckets of a classic histogram metric are stored in the same shard. This configuration option should be set for distributors, rulers and ingesters.",
1808+
"fieldValue": null,
1809+
"fieldDefaultValue": false,
1810+
"fieldFlag": "distributor.sharding.exclude-classic-histogram-bucket-label",
1811+
"fieldType": "boolean",
1812+
"fieldCategory": "experimental"
1813+
}
1814+
],
1815+
"fieldValue": null,
1816+
"fieldDefaultValue": null
1817+
},
17971818
{
17981819
"kind": "field",
17991820
"name": "write_requests_buffer_pooling_enabled",

cmd/mimir/help-all.txt.tmpl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1373,6 +1373,8 @@ Usage of ./cmd/mimir/mimir:
13731373
Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi. (default "memberlist")
13741374
-distributor.service-overload-status-code-on-rate-limit-enabled
13751375
[experimental] If enabled, rate limit errors will be reported to the client with HTTP status code 529 (Service is overloaded). If disabled, status code 429 (Too Many Requests) is used. Enabling -distributor.retry-after-header.enabled before utilizing this option is strongly recommended as it helps prevent premature request retries by the client.
1376+
-distributor.sharding.exclude-classic-histogram-bucket-label
1377+
[experimental] When set to true, the distributor computes the series hash – used for sharding – excluding the 'le' label. This means that, when this configuration option is set to true, all buckets of a classic histogram metric are stored in the same shard. This configuration option should be set for distributors, rulers and ingesters.
13761378
-distributor.usage-tracker-client.grpc-client-config.backoff-max-period duration
13771379
Maximum delay when backing off. (default 10s)
13781380
-distributor.usage-tracker-client.grpc-client-config.backoff-min-period duration

docs/sources/mimir/configure/configuration-parameters/index.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2003,6 +2003,15 @@ instance_limits:
20032003
# CLI flag: -distributor.instance-limits.max-inflight-push-requests-bytes
20042004
[max_inflight_push_requests_bytes: <int> | default = 0]
20052005
2006+
sharding:
2007+
# (experimental) When set to true, the distributor computes the series hash –
2008+
# used for sharding – excluding the 'le' label. This means that, when this
2009+
# configuration option is set to true, all buckets of a classic histogram
2010+
# metric are stored in the same shard. This configuration option should be set
2011+
# for distributors, rulers and ingesters.
2012+
# CLI flag: -distributor.sharding.exclude-classic-histogram-bucket-label
2013+
[exclude_classic_histogram_bucket_label: <boolean> | default = false]
2014+
20062015
# (experimental) Enable pooling of buffers used for marshaling write requests.
20072016
# CLI flag: -distributor.write-requests-buffer-pooling-enabled
20082017
[write_requests_buffer_pooling_enabled: <boolean> | default = true]

operations/mimir/mimir-flags-defaults.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@
379379
"validation.max-cost-attribution-cardinality": 2000,
380380
"validation.cost-attribution-cooldown": 0,
381381
"ruler.evaluation-delay-duration": 60000000000,
382+
"ruler.evaluation-consistency-max-delay": 0,
382383
"ruler.tenant-shard-size": 0,
383384
"ruler.max-rules-per-rule-group": 20,
384385
"ruler.max-rule-groups-per-tenant": 70,

pkg/distributor/distributor.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,9 @@ type Config struct {
259259
DefaultLimits InstanceLimits `yaml:"instance_limits"`
260260
InstanceLimitsFn func() *InstanceLimits `yaml:"-"`
261261

262+
// ShardingConfig is the configuration used by distributors to shard series across ingesters / partitions.
263+
ShardingConfig mimirpb.ShardingConfig `yaml:"sharding"`
264+
262265
// This allows downstream projects to wrap the distributor push function
263266
// and access the deserialized write requests before/after they are pushed.
264267
// These functions will only receive samples that don't get dropped by HA deduplication.
@@ -294,6 +297,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
294297
cfg.DistributorRing.RegisterFlags(f, logger)
295298
cfg.RetryConfig.RegisterFlags(f)
296299
cfg.UsageTrackerClient.RegisterFlagsWithPrefix("distributor.usage-tracker-client.", f)
300+
cfg.ShardingConfig.RegisterFlags(f)
297301

298302
f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected.")
299303
f.IntVar(&cfg.MaxOTLPRequestSize, maxOTLPRequestSizeFlag, 100<<20, "Maximum OTLP request size in bytes that the distributors accept. Requests exceeding this limit are rejected.")
@@ -1876,7 +1880,7 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
18761880
}
18771881

18781882
// Get both series and metadata keys in one slice.
1879-
keys, initialMetadataIndex := getSeriesAndMetadataTokens(userID, req)
1883+
keys, initialMetadataIndex := getSeriesAndMetadataTokens(userID, req, d.cfg.ShardingConfig)
18801884

18811885
var (
18821886
ingestersSubring ring.DoBatchRing
@@ -2072,8 +2076,8 @@ func (d *Distributor) sendWriteRequestToPartitions(ctx context.Context, tenantID
20722076

20732077
// getSeriesAndMetadataTokens returns a slice of tokens for the series and metadata from the request in this specific order.
20742078
// Metadata tokens start at initialMetadataIndex.
2075-
func getSeriesAndMetadataTokens(userID string, req *mimirpb.WriteRequest) (keys []uint32, initialMetadataIndex int) {
2076-
seriesKeys := getTokensForSeries(userID, req.Timeseries)
2079+
func getSeriesAndMetadataTokens(userID string, req *mimirpb.WriteRequest, cfg mimirpb.ShardingConfig) (keys []uint32, initialMetadataIndex int) {
2080+
seriesKeys := getTokensForSeries(userID, req.Timeseries, cfg)
20772081
metadataKeys := getTokensForMetadata(userID, req.Metadata)
20782082

20792083
// All tokens, stored in order: series, metadata.
@@ -2084,14 +2088,14 @@ func getSeriesAndMetadataTokens(userID string, req *mimirpb.WriteRequest) (keys
20842088
return keys, initialMetadataIndex
20852089
}
20862090

2087-
func getTokensForSeries(userID string, series []mimirpb.PreallocTimeseries) []uint32 {
2091+
func getTokensForSeries(userID string, series []mimirpb.PreallocTimeseries, cfg mimirpb.ShardingConfig) []uint32 {
20882092
if len(series) == 0 {
20892093
return nil
20902094
}
20912095

20922096
result := make([]uint32, 0, len(series))
20932097
for _, ts := range series {
2094-
result = append(result, tokenForLabels(userID, ts.Labels))
2098+
result = append(result, tokenForLabels(userID, ts.Labels, cfg))
20952099
}
20962100
return result
20972101
}
@@ -2108,8 +2112,8 @@ func getTokensForMetadata(userID string, metadata []*mimirpb.MetricMetadata) []u
21082112
return metadataKeys
21092113
}
21102114

2111-
func tokenForLabels(userID string, labels []mimirpb.LabelAdapter) uint32 {
2112-
return mimirpb.ShardByAllLabelAdapters(userID, labels)
2115+
func tokenForLabels(userID string, labels []mimirpb.LabelAdapter, cfg mimirpb.ShardingConfig) uint32 {
2116+
return mimirpb.ShardBySeriesLabelAdapters(userID, labels, cfg)
21132117
}
21142118

21152119
func tokenForMetadata(userID string, metricName string) uint32 {

pkg/distributor/distributor_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6526,7 +6526,7 @@ func (i *mockIngester) Push(ctx context.Context, req *mimirpb.WriteRequest, _ ..
65266526
return nil, err
65276527
}
65286528

6529-
hash := mimirpb.ShardByAllLabelAdapters(orgid, series.Labels)
6529+
hash := mimirpb.ShardBySeriesLabelAdapters(orgid, series.Labels, mimirpb.ShardingConfig{})
65306530
existing, ok := i.timeseries[hash]
65316531
if !ok {
65326532
i.timeseries[hash] = &series
@@ -8083,7 +8083,7 @@ func TestSeriesAreShardedToCorrectIngesters(t *testing.T) {
80838083
totalMetadata += len(ing.metadata)
80848084

80858085
for _, ts := range ing.timeseries {
8086-
token := tokenForLabels(userName, ts.Labels)
8086+
token := tokenForLabels(userName, ts.Labels, distrib.cfg.ShardingConfig)
80878087
ingIx := getIngesterIndexForToken(token, ingesters)
80888088
assert.Equal(t, ix, ingIx)
80898089
}
@@ -8099,7 +8099,7 @@ func TestSeriesAreShardedToCorrectIngesters(t *testing.T) {
80998099

81008100
// Verify that all timeseries were forwarded to ingesters.
81018101
for _, ts := range req.Timeseries {
8102-
token := tokenForLabels(userName, ts.Labels)
8102+
token := tokenForLabels(userName, ts.Labels, distrib.cfg.ShardingConfig)
81038103
ingIx := getIngesterIndexForToken(token, ingesters)
81048104

81058105
assert.Equal(t, ts.Labels, ingesters[ingIx].timeseries[token].Labels)

pkg/ingester/ingester.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ type Config struct {
190190
DefaultLimits InstanceLimits `yaml:"instance_limits"`
191191
InstanceLimitsFn func() *InstanceLimits `yaml:"-"`
192192

193+
// DistributorShardingConfig is the configuration used by distributors to shard series across ingesters / partitions.
194+
DistributorShardingConfig mimirpb.ShardingConfig `yaml:"-"`
195+
193196
IgnoreSeriesLimitForMetricNames string `yaml:"ignore_series_limit_for_metric_names" category:"advanced"`
194197

195198
ReadPathCPUUtilizationLimit float64 `yaml:"read_path_cpu_utilization_limit" category:"experimental"`
@@ -2878,7 +2881,7 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD
28782881
),
28792882
PostingsClonerFactory: tsdb.DefaultPostingsClonerFactory{},
28802883
EnableNativeHistograms: i.limits.NativeHistogramsIngestionEnabled(userID),
2881-
SecondaryHashFunction: secondaryTSDBHashFunctionForUser(userID),
2884+
SecondaryHashFunction: secondaryTSDBHashFunctionForUser(userID, i.cfg.DistributorShardingConfig),
28822885
IndexLookupPlannerFunc: i.getIndexLookupPlannerFunc(userID),
28832886
BlockChunkQuerierFunc: func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
28842887
return i.createBlockChunkQuerier(userID, b, mint, maxt)

pkg/ingester/owned_series.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,11 @@ func (oss *ownedSeriesService) updateTenant(userID string, db *userTSDB, ringCha
246246
return false
247247
}
248248

249-
func secondaryTSDBHashFunctionForUser(userID string) func(labels.Labels) uint32 {
249+
func secondaryTSDBHashFunctionForUser(userID string, cfg mimirpb.ShardingConfig) func(labels.Labels) uint32 {
250250
return func(ls labels.Labels) uint32 {
251-
return mimirpb.ShardByAllLabels(userID, ls)
251+
// The TSDB secondary hash is just used to compute the series owned by the ingester, and it's not used
252+
// by query sharding, so it's safe – and desired - to use the same sharding config used by distributors.
253+
return mimirpb.ShardBySeriesLabels(userID, ls, cfg)
252254
}
253255
}
254256

pkg/ingester/owned_series_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -807,7 +807,7 @@ func generateSeriesWithTokensAt(testUser string, startTime time.Time) ([]util_te
807807
Samples: []util_test.Sample{{TS: startTime.Add(time.Duration(seriesIdx) * time.Millisecond).UnixMilli(), Val: float64(0)}},
808808
}
809809
seriesToWrite = append(seriesToWrite, s)
810-
seriesTokens = append(seriesTokens, mimirpb.ShardByAllLabels(testUser, s.Labels))
810+
seriesTokens = append(seriesTokens, mimirpb.ShardBySeriesLabels(testUser, s.Labels, mimirpb.ShardingConfig{}))
811811
}
812812
return seriesToWrite, seriesTokens
813813
}
@@ -822,7 +822,7 @@ func generateSeriesWithTargetInfoAt(testUser string, startTime time.Time, target
822822
Samples: []util_test.Sample{{TS: startTime.Add(time.Duration(i) * time.Millisecond).UnixMilli(), Val: float64(1)}},
823823
}
824824
seriesToWrite = append(seriesToWrite, s)
825-
seriesTokens = append(seriesTokens, mimirpb.ShardByAllLabels(testUser, s.Labels))
825+
seriesTokens = append(seriesTokens, mimirpb.ShardBySeriesLabels(testUser, s.Labels, mimirpb.ShardingConfig{}))
826826
}
827827
return seriesToWrite, seriesTokens
828828
}

0 commit comments

Comments
 (0)