Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
* [ENHANCEMENT] Ingester: Improved the performance of active series custom trackers matchers. #12663
* [ENHANCEMENT] Compactor: Log sizes of downloaded and uploaded blocks. #12656
* [ENHANCEMENT] Ingester: Add `cortex_ingest_storage_reader_receive_and_consume_delay_seconds` metric tracking the time between when a write request is received in the distributor and its content is ingested in ingesters, when the ingest storage is enabled. #12751
* [ENHANCEMENT] Distributor: Add experimental configuration option `-distributor.sharding.exclude-classic-histogram-bucket-label` to compute the series hash – used for sharding – excluding the 'le' label. When this configuration option is set to true, all buckets of a classic histogram metric are stored in the same shard. #12815
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* [ENHANCEMENT] Distributor: Add experimental configuration option `-distributor.sharding.exclude-classic-histogram-bucket-label` to compute the series hashused for shardingexcluding the 'le' label. When this configuration option is set to true, all buckets of a classic histogram metric are stored in the same shard. #12815
* [ENHANCEMENT] Distributor: Add anexperimental configuration option, `-distributor.sharding.exclude-classic-histogram-bucket-label`, to compute the series hash, used for sharding, excluding the 'le' label. When you set this configuration option to true, all buckets of a classic histogram metric are stored in the same shard. #12815

* [ENHANCEMENT] Ruler: Add `ruler_evaluation_consistency_max_delay` per-tenant configuration option support, to specify the maximum tolerated ingestion delay for eventually consistent rule evaluations. This feature is used only when ingest storage is enabled. By default, no maximum delay is enforced. #12751
* [BUGFIX] Distributor: Calculate `WriteResponseStats` before validation and `PushWrappers`. This prevents clients using Remote-Write 2.0 from seeing a diff in written samples, histograms and exemplars. #12682
* [BUGFIX] Compactor: Fix cortex_compactor_block_uploads_failed_total metric showing type="unknown". #12477
Expand Down
21 changes: 21 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1794,6 +1794,27 @@
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "block",
"name": "sharding",
"required": false,
"desc": "",
"blockEntries": [
{
"kind": "field",
"name": "exclude_classic_histogram_bucket_label",
"required": false,
"desc": "When set to true, the distributor computes the series hash – used for sharding – excluding the 'le' label. This means that, when this configuration option is set to true, all buckets of a classic histogram metric are stored in the same shard. This configuration option should be set for distributors, rulers and ingesters.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"desc": "When set to true, the distributor computes the series hashused for shardingexcluding the 'le' label. This means that, when this configuration option is set to true, all buckets of a classic histogram metric are stored in the same shard. This configuration option should be set for distributors, rulers and ingesters.",
"desc": "When set to true, the distributor computes the series hash, used for sharding, excluding the 'le' label. This means that when you set this configuration option to true, all buckets of a classic histogram metric are stored in the same shard. Set this configuration option for distributors, rulers, and ingesters.",

"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "distributor.sharding.exclude-classic-histogram-bucket-label",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "write_requests_buffer_pooling_enabled",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,8 @@ Usage of ./cmd/mimir/mimir:
Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi. (default "memberlist")
-distributor.service-overload-status-code-on-rate-limit-enabled
[experimental] If enabled, rate limit errors will be reported to the client with HTTP status code 529 (Service is overloaded). If disabled, status code 429 (Too Many Requests) is used. Enabling -distributor.retry-after-header.enabled before utilizing this option is strongly recommended as it helps prevent premature request retries by the client.
-distributor.sharding.exclude-classic-histogram-bucket-label
[experimental] When set to true, the distributor computes the series hash – used for sharding – excluding the 'le' label. This means that, when this configuration option is set to true, all buckets of a classic histogram metric are stored in the same shard. This configuration option should be set for distributors, rulers and ingesters.
-distributor.usage-tracker-client.grpc-client-config.backoff-max-period duration
Maximum delay when backing off. (default 10s)
-distributor.usage-tracker-client.grpc-client-config.backoff-min-period duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2003,6 +2003,15 @@ instance_limits:
# CLI flag: -distributor.instance-limits.max-inflight-push-requests-bytes
[max_inflight_push_requests_bytes: <int> | default = 0]

sharding:
# (experimental) When set to true, the distributor computes the series hash –
# used for sharding – excluding the 'le' label. This means that, when this
# configuration option is set to true, all buckets of a classic histogram
# metric are stored in the same shard. This configuration option should be set
# for distributors, rulers and ingesters.
# CLI flag: -distributor.sharding.exclude-classic-histogram-bucket-label
[exclude_classic_histogram_bucket_label: <boolean> | default = false]

# (experimental) Enable pooling of buffers used for marshaling write requests.
# CLI flag: -distributor.write-requests-buffer-pooling-enabled
[write_requests_buffer_pooling_enabled: <boolean> | default = true]
Expand Down
1 change: 1 addition & 0 deletions operations/mimir/mimir-flags-defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@
"validation.max-cost-attribution-cardinality": 2000,
"validation.cost-attribution-cooldown": 0,
"ruler.evaluation-delay-duration": 60000000000,
"ruler.evaluation-consistency-max-delay": 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Know where this diff came from?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new file introduced in #12766. There was a race between merging that PR and #12751, reason why the new config option (-ruler.evaluation-consistency-max-delay) wasn't included in Dimitar's PR.

"ruler.tenant-shard-size": 0,
"ruler.max-rules-per-rule-group": 20,
"ruler.max-rule-groups-per-tenant": 70,
Expand Down
18 changes: 11 additions & 7 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ type Config struct {
DefaultLimits InstanceLimits `yaml:"instance_limits"`
InstanceLimitsFn func() *InstanceLimits `yaml:"-"`

// ShardingConfig is the configuration used by distributors to shard series across ingesters / partitions.
ShardingConfig mimirpb.ShardingConfig `yaml:"sharding"`

// This allows downstream projects to wrap the distributor push function
// and access the deserialized write requests before/after they are pushed.
// These functions will only receive samples that don't get dropped by HA deduplication.
Expand Down Expand Up @@ -294,6 +297,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
cfg.DistributorRing.RegisterFlags(f, logger)
cfg.RetryConfig.RegisterFlags(f)
cfg.UsageTrackerClient.RegisterFlagsWithPrefix("distributor.usage-tracker-client.", f)
cfg.ShardingConfig.RegisterFlags(f)

f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected.")
f.IntVar(&cfg.MaxOTLPRequestSize, maxOTLPRequestSizeFlag, 100<<20, "Maximum OTLP request size in bytes that the distributors accept. Requests exceeding this limit are rejected.")
Expand Down Expand Up @@ -1876,7 +1880,7 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
}

// Get both series and metadata keys in one slice.
keys, initialMetadataIndex := getSeriesAndMetadataTokens(userID, req)
keys, initialMetadataIndex := getSeriesAndMetadataTokens(userID, req, d.cfg.ShardingConfig)

var (
ingestersSubring ring.DoBatchRing
Expand Down Expand Up @@ -2072,8 +2076,8 @@ func (d *Distributor) sendWriteRequestToPartitions(ctx context.Context, tenantID

// getSeriesAndMetadataTokens returns a slice of tokens for the series and metadata from the request in this specific order.
// Metadata tokens start at initialMetadataIndex.
func getSeriesAndMetadataTokens(userID string, req *mimirpb.WriteRequest) (keys []uint32, initialMetadataIndex int) {
seriesKeys := getTokensForSeries(userID, req.Timeseries)
func getSeriesAndMetadataTokens(userID string, req *mimirpb.WriteRequest, cfg mimirpb.ShardingConfig) (keys []uint32, initialMetadataIndex int) {
seriesKeys := getTokensForSeries(userID, req.Timeseries, cfg)
metadataKeys := getTokensForMetadata(userID, req.Metadata)

// All tokens, stored in order: series, metadata.
Expand All @@ -2084,14 +2088,14 @@ func getSeriesAndMetadataTokens(userID string, req *mimirpb.WriteRequest) (keys
return keys, initialMetadataIndex
}

func getTokensForSeries(userID string, series []mimirpb.PreallocTimeseries) []uint32 {
func getTokensForSeries(userID string, series []mimirpb.PreallocTimeseries, cfg mimirpb.ShardingConfig) []uint32 {
if len(series) == 0 {
return nil
}

result := make([]uint32, 0, len(series))
for _, ts := range series {
result = append(result, tokenForLabels(userID, ts.Labels))
result = append(result, tokenForLabels(userID, ts.Labels, cfg))
}
return result
}
Expand All @@ -2108,8 +2112,8 @@ func getTokensForMetadata(userID string, metadata []*mimirpb.MetricMetadata) []u
return metadataKeys
}

func tokenForLabels(userID string, labels []mimirpb.LabelAdapter) uint32 {
return mimirpb.ShardByAllLabelAdapters(userID, labels)
func tokenForLabels(userID string, labels []mimirpb.LabelAdapter, cfg mimirpb.ShardingConfig) uint32 {
return mimirpb.ShardBySeriesLabelAdapters(userID, labels, cfg)
}

func tokenForMetadata(userID string, metricName string) uint32 {
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6526,7 +6526,7 @@ func (i *mockIngester) Push(ctx context.Context, req *mimirpb.WriteRequest, _ ..
return nil, err
}

hash := mimirpb.ShardByAllLabelAdapters(orgid, series.Labels)
hash := mimirpb.ShardBySeriesLabelAdapters(orgid, series.Labels, mimirpb.ShardingConfig{})
existing, ok := i.timeseries[hash]
if !ok {
i.timeseries[hash] = &series
Expand Down Expand Up @@ -8083,7 +8083,7 @@ func TestSeriesAreShardedToCorrectIngesters(t *testing.T) {
totalMetadata += len(ing.metadata)

for _, ts := range ing.timeseries {
token := tokenForLabels(userName, ts.Labels)
token := tokenForLabels(userName, ts.Labels, distrib.cfg.ShardingConfig)
ingIx := getIngesterIndexForToken(token, ingesters)
assert.Equal(t, ix, ingIx)
}
Expand All @@ -8099,7 +8099,7 @@ func TestSeriesAreShardedToCorrectIngesters(t *testing.T) {

// Verify that all timeseries were forwarded to ingesters.
for _, ts := range req.Timeseries {
token := tokenForLabels(userName, ts.Labels)
token := tokenForLabels(userName, ts.Labels, distrib.cfg.ShardingConfig)
ingIx := getIngesterIndexForToken(token, ingesters)

assert.Equal(t, ts.Labels, ingesters[ingIx].timeseries[token].Labels)
Expand Down
5 changes: 4 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ type Config struct {
DefaultLimits InstanceLimits `yaml:"instance_limits"`
InstanceLimitsFn func() *InstanceLimits `yaml:"-"`

// DistributorShardingConfig is the configuration used by distributors to shard series across ingesters / partitions.
DistributorShardingConfig mimirpb.ShardingConfig `yaml:"-"`

IgnoreSeriesLimitForMetricNames string `yaml:"ignore_series_limit_for_metric_names" category:"advanced"`

ReadPathCPUUtilizationLimit float64 `yaml:"read_path_cpu_utilization_limit" category:"experimental"`
Expand Down Expand Up @@ -2878,7 +2881,7 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD
),
PostingsClonerFactory: tsdb.DefaultPostingsClonerFactory{},
EnableNativeHistograms: i.limits.NativeHistogramsIngestionEnabled(userID),
SecondaryHashFunction: secondaryTSDBHashFunctionForUser(userID),
SecondaryHashFunction: secondaryTSDBHashFunctionForUser(userID, i.cfg.DistributorShardingConfig),
IndexLookupPlannerFunc: i.getIndexLookupPlannerFunc(userID),
BlockChunkQuerierFunc: func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
return i.createBlockChunkQuerier(userID, b, mint, maxt)
Expand Down
6 changes: 4 additions & 2 deletions pkg/ingester/owned_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,11 @@ func (oss *ownedSeriesService) updateTenant(userID string, db *userTSDB, ringCha
return false
}

func secondaryTSDBHashFunctionForUser(userID string) func(labels.Labels) uint32 {
func secondaryTSDBHashFunctionForUser(userID string, cfg mimirpb.ShardingConfig) func(labels.Labels) uint32 {
return func(ls labels.Labels) uint32 {
return mimirpb.ShardByAllLabels(userID, ls)
// The TSDB secondary hash is just used to compute the series owned by the ingester, and it's not used
// by query sharding, so it's safe – and desired - to use the same sharding config used by distributors.
return mimirpb.ShardBySeriesLabels(userID, ls, cfg)
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/owned_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ func generateSeriesWithTokensAt(testUser string, startTime time.Time) ([]util_te
Samples: []util_test.Sample{{TS: startTime.Add(time.Duration(seriesIdx) * time.Millisecond).UnixMilli(), Val: float64(0)}},
}
seriesToWrite = append(seriesToWrite, s)
seriesTokens = append(seriesTokens, mimirpb.ShardByAllLabels(testUser, s.Labels))
seriesTokens = append(seriesTokens, mimirpb.ShardBySeriesLabels(testUser, s.Labels, mimirpb.ShardingConfig{}))
}
return seriesToWrite, seriesTokens
}
Expand All @@ -822,7 +822,7 @@ func generateSeriesWithTargetInfoAt(testUser string, startTime time.Time, target
Samples: []util_test.Sample{{TS: startTime.Add(time.Duration(i) * time.Millisecond).UnixMilli(), Val: float64(1)}},
}
seriesToWrite = append(seriesToWrite, s)
seriesTokens = append(seriesTokens, mimirpb.ShardByAllLabels(testUser, s.Labels))
seriesTokens = append(seriesTokens, mimirpb.ShardBySeriesLabels(testUser, s.Labels, mimirpb.ShardingConfig{}))
}
return seriesToWrite, seriesTokens
}
Expand Down
28 changes: 23 additions & 5 deletions pkg/mimirpb/series_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,19 @@
package mimirpb

import (
"flag"

"github.com/prometheus/prometheus/model/labels"
)

type ShardingConfig struct {
ExcludeClassicHistogramBucketLabel bool `yaml:"exclude_classic_histogram_bucket_label" category:"experimental"`
}

func (l *ShardingConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&l.ExcludeClassicHistogramBucketLabel, "distributor.sharding.exclude-classic-histogram-bucket-label", false, "When set to true, the distributor computes the series hash – used for sharding – excluding the 'le' label. This means that, when this configuration option is set to true, all buckets of a classic histogram metric are stored in the same shard. This configuration option should be set for distributors, rulers and ingesters.")
}

// ShardByMetricName returns the token for the given metric. The provided metricName
// is guaranteed to not be retained.
func ShardByMetricName(userID string, metricName string) uint32 {
Expand All @@ -23,22 +33,30 @@ func ShardByUser(userID string) uint32 {
return h
}

// ShardByAllLabels returns the token for given user and series.
// ShardBySeriesLabels returns the token that must be used to shard a series across ingesters / partitions for given user.
//
// ShardByAllLabels generates different values for different order of same labels.
func ShardByAllLabels(userID string, ls labels.Labels) uint32 {
// ShardBySeriesLabels generates different values for different order of same labels.
func ShardBySeriesLabels(userID string, ls labels.Labels, cfg ShardingConfig) uint32 {
h := ShardByUser(userID)
ls.Range(func(l labels.Label) {
if cfg.ExcludeClassicHistogramBucketLabel && l.Name == labels.BucketLabel {
return
}

h = HashAdd32(h, l.Name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it wouldn't improve anything to include the label name in the hash as all series in foo_seconds_histogram have that label, correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think adding just the label name makes any practical difference.

h = HashAdd32(h, l.Value)
})
return h
}

// ShardByAllLabelAdapters is like ShardByAllLabel, but uses LabelAdapter type.
func ShardByAllLabelAdapters(userID string, ls []LabelAdapter) uint32 {
// ShardBySeriesLabelAdapters is like ShardByAllLabel, but uses LabelAdapter type.
func ShardBySeriesLabelAdapters(userID string, ls []LabelAdapter, cfg ShardingConfig) uint32 {
h := ShardByUser(userID)
for _, l := range ls {
if cfg.ExcludeClassicHistogramBucketLabel && l.Name == labels.BucketLabel {
continue
}

h = HashAdd32(h, l.Name)
h = HashAdd32(h, l.Value)
}
Expand Down
Loading