From 4b42a00663c1d3c610f491bd20225da9fe51cdbb Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 14 Apr 2025 18:07:28 +0900 Subject: [PATCH] Add cortex_ingester_active_native_histogram_series metric to track active # of NH series Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + pkg/ingester/active_series.go | 55 +++++++++++--- pkg/ingester/active_series_test.go | 34 +++++---- pkg/ingester/ingester.go | 8 +- pkg/ingester/ingester_test.go | 114 +++++++++++++---------------- pkg/ingester/metrics.go | 15 +++- 6 files changed, 135 insertions(+), 92 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b9764faa9ba..4904e9b425d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663 * [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680 * [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. #6681 +* [ENHANCEMENT] Ingester: Add a `cortex_ingester_active_native_histogram_series` metric to track # of active NH series. #6695 * [ENHANCEMENT] Query Frontend: Add new limit `-frontend.max-query-response-size` for total query response size after decompression in query frontend. #6607 * [ENHANCEMENT] Alertmanager: Add nflog and silences maintenance metrics. #6659 * [ENHANCEMENT] Querier: limit label APIs to query only ingesters if `start` param is not been specified. #6618 diff --git a/pkg/ingester/active_series.go b/pkg/ingester/active_series.go index 5285f279639..1c3bf4c6d86 100644 --- a/pkg/ingester/active_series.go +++ b/pkg/ingester/active_series.go @@ -25,15 +25,17 @@ type activeSeriesStripe struct { // without holding the lock -- hence the atomic). oldestEntryTs atomic.Int64 - mu sync.RWMutex - refs map[uint64][]activeSeriesEntry - active int // Number of active entries in this stripe. Only decreased during purge or clear. + mu sync.RWMutex + refs map[uint64][]activeSeriesEntry + active int // Number of active entries in this stripe. Only decreased during purge or clear. + activeNativeHistogram int // Number of active entries only for Native Histogram in this stripe. Only decreased during purge or clear. } // activeSeriesEntry holds a timestamp for single series. type activeSeriesEntry struct { - lbs labels.Labels - nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe. + lbs labels.Labels + nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe. + isNativeHistogram bool } func NewActiveSeries() *ActiveSeries { @@ -48,10 +50,10 @@ func NewActiveSeries() *ActiveSeries { } // Updates series timestamp to 'now'. Function is called to make a copy of labels if entry doesn't exist yet. -func (c *ActiveSeries) UpdateSeries(series labels.Labels, hash uint64, now time.Time, labelsCopy func(labels.Labels) labels.Labels) { +func (c *ActiveSeries) UpdateSeries(series labels.Labels, hash uint64, now time.Time, nativeHistogram bool, labelsCopy func(labels.Labels) labels.Labels) { stripeID := hash % numActiveSeriesStripes - c.stripes[stripeID].updateSeriesTimestamp(now, series, hash, labelsCopy) + c.stripes[stripeID].updateSeriesTimestamp(now, series, hash, nativeHistogram, labelsCopy) } // Purge removes expired entries from the cache. This function should be called @@ -77,13 +79,21 @@ func (c *ActiveSeries) Active() int { return total } -func (s *activeSeriesStripe) updateSeriesTimestamp(now time.Time, series labels.Labels, fingerprint uint64, labelsCopy func(labels.Labels) labels.Labels) { +func (c *ActiveSeries) ActiveNativeHistogram() int { + total := 0 + for s := 0; s < numActiveSeriesStripes; s++ { + total += c.stripes[s].getActiveNativeHistogram() + } + return total +} + +func (s *activeSeriesStripe) updateSeriesTimestamp(now time.Time, series labels.Labels, fingerprint uint64, nativeHistogram bool, labelsCopy func(labels.Labels) labels.Labels) { nowNanos := now.UnixNano() e := s.findEntryForSeries(fingerprint, series) entryTimeSet := false if e == nil { - e, entryTimeSet = s.findOrCreateEntryForSeries(fingerprint, series, nowNanos, labelsCopy) + e, entryTimeSet = s.findOrCreateEntryForSeries(fingerprint, series, nowNanos, nativeHistogram, labelsCopy) } if !entryTimeSet { @@ -117,7 +127,7 @@ func (s *activeSeriesStripe) findEntryForSeries(fingerprint uint64, series label return nil } -func (s *activeSeriesStripe) findOrCreateEntryForSeries(fingerprint uint64, series labels.Labels, nowNanos int64, labelsCopy func(labels.Labels) labels.Labels) (*atomic.Int64, bool) { +func (s *activeSeriesStripe) findOrCreateEntryForSeries(fingerprint uint64, series labels.Labels, nowNanos int64, nativeHistogram bool, labelsCopy func(labels.Labels) labels.Labels) (*atomic.Int64, bool) { s.mu.Lock() defer s.mu.Unlock() @@ -129,9 +139,13 @@ func (s *activeSeriesStripe) findOrCreateEntryForSeries(fingerprint uint64, seri } s.active++ + if nativeHistogram { + s.activeNativeHistogram++ + } e := activeSeriesEntry{ - lbs: labelsCopy(series), - nanos: atomic.NewInt64(nowNanos), + lbs: labelsCopy(series), + nanos: atomic.NewInt64(nowNanos), + isNativeHistogram: nativeHistogram, } s.refs[fingerprint] = append(s.refs[fingerprint], e) @@ -160,6 +174,7 @@ func (s *activeSeriesStripe) purge(keepUntil time.Time) { defer s.mu.Unlock() active := 0 + activeNativeHistogram := 0 oldest := int64(math.MaxInt64) for fp, entries := range s.refs { @@ -173,6 +188,9 @@ func (s *activeSeriesStripe) purge(keepUntil time.Time) { } active++ + if entries[0].isNativeHistogram { + activeNativeHistogram++ + } if ts < oldest { oldest = ts } @@ -199,6 +217,11 @@ func (s *activeSeriesStripe) purge(keepUntil time.Time) { delete(s.refs, fp) } else { active += cnt + for _, e := range entries { + if e.isNativeHistogram { + activeNativeHistogram++ + } + } s.refs[fp] = entries } } @@ -209,6 +232,7 @@ func (s *activeSeriesStripe) purge(keepUntil time.Time) { s.oldestEntryTs.Store(oldest) } s.active = active + s.activeNativeHistogram = activeNativeHistogram } func (s *activeSeriesStripe) getActive() int { @@ -217,3 +241,10 @@ func (s *activeSeriesStripe) getActive() int { return s.active } + +func (s *activeSeriesStripe) getActiveNativeHistogram() int { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.activeNativeHistogram +} diff --git a/pkg/ingester/active_series_test.go b/pkg/ingester/active_series_test.go index dc97b0d4621..3d84d7570cc 100644 --- a/pkg/ingester/active_series_test.go +++ b/pkg/ingester/active_series_test.go @@ -26,16 +26,20 @@ func TestActiveSeries_UpdateSeries(t *testing.T) { c := NewActiveSeries() assert.Equal(t, 0, c.Active()) + assert.Equal(t, 0, c.ActiveNativeHistogram()) labels1Hash := fromLabelToLabels(ls1).Hash() labels2Hash := fromLabelToLabels(ls2).Hash() - c.UpdateSeries(ls1, labels1Hash, time.Now(), copyFn) + c.UpdateSeries(ls1, labels1Hash, time.Now(), true, copyFn) assert.Equal(t, 1, c.Active()) + assert.Equal(t, 1, c.ActiveNativeHistogram()) - c.UpdateSeries(ls1, labels1Hash, time.Now(), copyFn) + c.UpdateSeries(ls1, labels1Hash, time.Now(), true, copyFn) assert.Equal(t, 1, c.Active()) + assert.Equal(t, 1, c.ActiveNativeHistogram()) - c.UpdateSeries(ls2, labels2Hash, time.Now(), copyFn) + c.UpdateSeries(ls2, labels2Hash, time.Now(), true, copyFn) assert.Equal(t, 2, c.Active()) + assert.Equal(t, 2, c.ActiveNativeHistogram()) } func TestActiveSeries_Purge(t *testing.T) { @@ -52,7 +56,7 @@ func TestActiveSeries_Purge(t *testing.T) { c := NewActiveSeries() for i := 0; i < len(series); i++ { - c.UpdateSeries(series[i], fromLabelToLabels(series[i]).Hash(), time.Unix(int64(i), 0), copyFn) + c.UpdateSeries(series[i], fromLabelToLabels(series[i]).Hash(), time.Unix(int64(i), 0), true, copyFn) } c.Purge(time.Unix(int64(ttl+1), 0)) @@ -61,6 +65,7 @@ func TestActiveSeries_Purge(t *testing.T) { exp := len(series) - (ttl + 1) assert.Equal(t, exp, c.Active()) + assert.Equal(t, exp, c.ActiveNativeHistogram()) } } @@ -71,23 +76,26 @@ func TestActiveSeries_PurgeOpt(t *testing.T) { c := NewActiveSeries() now := time.Now() - c.UpdateSeries(ls1, ls1.Hash(), now.Add(-2*time.Minute), copyFn) - c.UpdateSeries(ls2, ls2.Hash(), now, copyFn) + c.UpdateSeries(ls1, ls1.Hash(), now.Add(-2*time.Minute), true, copyFn) + c.UpdateSeries(ls2, ls2.Hash(), now, true, copyFn) c.Purge(now) assert.Equal(t, 1, c.Active()) + assert.Equal(t, 1, c.ActiveNativeHistogram()) - c.UpdateSeries(ls1, ls1.Hash(), now.Add(-1*time.Minute), copyFn) - c.UpdateSeries(ls2, ls2.Hash(), now, copyFn) + c.UpdateSeries(ls1, ls1.Hash(), now.Add(-1*time.Minute), true, copyFn) + c.UpdateSeries(ls2, ls2.Hash(), now, true, copyFn) c.Purge(now) assert.Equal(t, 1, c.Active()) + assert.Equal(t, 1, c.ActiveNativeHistogram()) // This will *not* update the series, since there is already newer timestamp. - c.UpdateSeries(ls2, ls2.Hash(), now.Add(-1*time.Minute), copyFn) + c.UpdateSeries(ls2, ls2.Hash(), now.Add(-1*time.Minute), true, copyFn) c.Purge(now) assert.Equal(t, 1, c.Active()) + assert.Equal(t, 1, c.ActiveNativeHistogram()) } var activeSeriesTestGoroutines = []int{50, 100, 500} @@ -121,7 +129,7 @@ func benchmarkActiveSeriesConcurrencySingleSeries(b *testing.B, goroutines int) for ix := 0; ix < max; ix++ { now = now.Add(time.Duration(ix) * time.Millisecond) - c.UpdateSeries(series, labelhash, now, copyFn) + c.UpdateSeries(series, labelhash, now, false, copyFn) } }() } @@ -152,7 +160,7 @@ func BenchmarkActiveSeries_UpdateSeries(b *testing.B) { b.ResetTimer() for ix := 0; ix < b.N; ix++ { - c.UpdateSeries(series[ix], labelhash[ix], time.Unix(0, now+int64(ix)), copyFn) + c.UpdateSeries(series[ix], labelhash[ix], time.Unix(0, now+int64(ix)), false, copyFn) } } @@ -184,9 +192,9 @@ func benchmarkPurge(b *testing.B, twice bool) { // Prepare series for ix, s := range series { if ix < numExpiresSeries { - c.UpdateSeries(s, labelhash[ix], now.Add(-time.Minute), copyFn) + c.UpdateSeries(s, labelhash[ix], now.Add(-time.Minute), false, copyFn) } else { - c.UpdateSeries(s, labelhash[ix], now, copyFn) + c.UpdateSeries(s, labelhash[ix], now, false, copyFn) } } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index d34fbb98649..4257569b14b 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1038,6 +1038,7 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) { userDB.activeSeries.Purge(purgeTime) i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(userDB.activeSeries.Active())) + i.metrics.activeNHSeriesPerUser.WithLabelValues(userID).Set(float64(userDB.activeSeries.ActiveNativeHistogram())) if err := userDB.labelSetCounter.UpdateMetric(ctx, userDB, i.metrics); err != nil { level.Warn(i.logger).Log("msg", "failed to update per labelSet metrics", "user", userID, "err", err) } @@ -1376,9 +1377,11 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte } else { discardedNativeHistogramCount += len(ts.Histograms) } - shouldUpdateSeries := (succeededSamplesCount > oldSucceededSamplesCount) || (succeededHistogramsCount > oldSucceededHistogramsCount) + + isNHAppended := succeededHistogramsCount > oldSucceededHistogramsCount + shouldUpdateSeries := (succeededSamplesCount > oldSucceededSamplesCount) || isNHAppended if i.cfg.ActiveSeriesMetricsEnabled && shouldUpdateSeries { - db.activeSeries.UpdateSeries(tsLabels, tsLabelsHash, startAppend, func(l labels.Labels) labels.Labels { + db.activeSeries.UpdateSeries(tsLabels, tsLabelsHash, startAppend, isNHAppended, func(l labels.Labels) labels.Labels { // we must already have copied the labels if succeededSamplesCount or succeededHistogramsCount has been incremented. return copiedLabels }) @@ -2526,6 +2529,7 @@ func (i *Ingester) closeAllTSDB() { i.metrics.memUsers.Dec() i.metrics.activeSeriesPerUser.DeleteLabelValues(userID) + i.metrics.activeNHSeriesPerUser.DeleteLabelValues(userID) }(userDB) } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index ec5ce7ea788..4d76bbbaad4 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -1051,7 +1051,7 @@ func TestIngester_Push(t *testing.T) { expectedMetadataIngested: []*cortexpb.MetricMetadata{ {MetricFamilyName: "metric_name_2", Help: "a help for metric_name_2", Unit: "", Type: cortexpb.GAUGE}, }, - additionalMetrics: []string{"cortex_discarded_samples_total", "cortex_ingester_active_series"}, + additionalMetrics: []string{"cortex_discarded_samples_total", "cortex_ingester_active_series", "cortex_ingester_active_native_histogram_series"}, disableNativeHistogram: true, expectedMetrics: ` # HELP cortex_ingester_ingested_samples_total The total number of samples ingested. @@ -1078,6 +1078,9 @@ func TestIngester_Push(t *testing.T) { # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge cortex_ingester_active_series{user="test"} 1 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test"} 0 `, }, "should succeed on valid series and metadata": { @@ -1114,6 +1117,7 @@ func TestIngester_Push(t *testing.T) { "cortex_ingester_ingested_metadata_total", "cortex_ingester_ingested_metadata_failures_total", "cortex_ingester_active_series", + "cortex_ingester_active_native_histogram_series", }, expectedMetrics: ` # HELP cortex_ingester_ingested_metadata_failures_total The total number of metadata that errored on ingestion. @@ -1149,6 +1153,9 @@ func TestIngester_Push(t *testing.T) { # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge cortex_ingester_active_series{user="test"} 1 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test"} 0 `, }, "should succeed on valid series with exemplars": { @@ -1212,6 +1219,7 @@ func TestIngester_Push(t *testing.T) { "cortex_ingester_tsdb_exemplar_last_exemplars_timestamp_seconds", "cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total", "cortex_ingester_active_series", + "cortex_ingester_active_native_histogram_series", }, expectedMetrics: ` # HELP cortex_ingester_ingested_samples_total The total number of samples ingested. @@ -1235,6 +1243,9 @@ func TestIngester_Push(t *testing.T) { # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge cortex_ingester_active_series{user="test"} 1 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test"} 0 # HELP cortex_ingester_tsdb_exemplar_exemplars_appended_total Total number of TSDB exemplars appended. # TYPE cortex_ingester_tsdb_exemplar_exemplars_appended_total counter @@ -1324,6 +1335,7 @@ func TestIngester_Push(t *testing.T) { "cortex_ingester_tsdb_head_out_of_order_samples_appended_total", "cortex_discarded_samples_total", "cortex_ingester_active_series", + "cortex_ingester_active_native_histogram_series", }, expectedMetrics: ` # HELP cortex_ingester_ingested_samples_total The total number of samples ingested. @@ -1364,6 +1376,9 @@ func TestIngester_Push(t *testing.T) { # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge cortex_ingester_active_series{user="test"} 1 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test"} 0 `, }, "ooo disabled, should soft fail on sample out of bound": { @@ -1387,7 +1402,7 @@ func TestIngester_Push(t *testing.T) { expectedIngested: []cortexpb.TimeSeries{ {Labels: metricLabelAdapters, Samples: []cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}}}, }, - additionalMetrics: []string{"cortex_ingester_active_series"}, + additionalMetrics: []string{"cortex_ingester_active_series", "cortex_ingester_active_native_histogram_series"}, expectedMetrics: ` # HELP cortex_ingester_ingested_samples_total The total number of samples ingested. # TYPE cortex_ingester_ingested_samples_total counter @@ -1419,6 +1434,9 @@ func TestIngester_Push(t *testing.T) { # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge cortex_ingester_active_series{user="test"} 1 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test"} 0 `, }, "ooo enabled, should soft fail on sample too old": { @@ -1444,6 +1462,7 @@ func TestIngester_Push(t *testing.T) { additionalMetrics: []string{ "cortex_discarded_samples_total", "cortex_ingester_active_series", + "cortex_ingester_active_native_histogram_series", }, expectedMetrics: ` # HELP cortex_ingester_ingested_samples_total The total number of samples ingested. @@ -1470,6 +1489,9 @@ func TestIngester_Push(t *testing.T) { # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge cortex_ingester_active_series{user="test"} 1 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test"} 0 `, }, "ooo enabled, should succeed": { @@ -1491,7 +1513,7 @@ func TestIngester_Push(t *testing.T) { expectedIngested: []cortexpb.TimeSeries{ {Labels: metricLabelAdapters, Samples: []cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (60 * 1000)}, {Value: 2, TimestampMs: 1575043969}}}, }, - additionalMetrics: []string{"cortex_ingester_active_series"}, + additionalMetrics: []string{"cortex_ingester_active_series", "cortex_ingester_active_native_histogram_series"}, expectedMetrics: ` # HELP cortex_ingester_ingested_samples_total The total number of samples ingested. # TYPE cortex_ingester_ingested_samples_total counter @@ -1514,6 +1536,9 @@ func TestIngester_Push(t *testing.T) { # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge cortex_ingester_active_series{user="test"} 1 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test"} 0 `, }, "native histogram ooo enabled, should soft fail on sample too old": { @@ -1539,6 +1564,7 @@ func TestIngester_Push(t *testing.T) { additionalMetrics: []string{ "cortex_ingester_tsdb_head_samples_appended_total", "cortex_ingester_active_series", + "cortex_ingester_active_native_histogram_series", "cortex_discarded_samples_total", }, expectedMetrics: ` @@ -1576,6 +1602,9 @@ func TestIngester_Push(t *testing.T) { # HELP cortex_discarded_samples_total The total number of samples that were discarded. # TYPE cortex_discarded_samples_total counter cortex_discarded_samples_total{reason="sample-too-old",user="test"} 1 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test"} 1 `, }, "native histogram ooo enabled, should succeed": { @@ -1600,6 +1629,7 @@ func TestIngester_Push(t *testing.T) { additionalMetrics: []string{ "cortex_ingester_tsdb_head_samples_appended_total", "cortex_ingester_active_series", + "cortex_ingester_active_native_histogram_series", }, expectedMetrics: ` # HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion. @@ -1633,6 +1663,9 @@ func TestIngester_Push(t *testing.T) { # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge cortex_ingester_active_series{user="test"} 1 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test"} 1 `, }, "should soft fail on two different sample values at the same timestamp": { @@ -1654,7 +1687,7 @@ func TestIngester_Push(t *testing.T) { expectedIngested: []cortexpb.TimeSeries{ {Labels: metricLabelAdapters, Samples: []cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}}}, }, - additionalMetrics: []string{"cortex_discarded_samples_total", "cortex_ingester_active_series"}, + additionalMetrics: []string{"cortex_discarded_samples_total", "cortex_ingester_active_series", "cortex_ingester_active_native_histogram_series"}, expectedMetrics: ` # HELP cortex_ingester_ingested_samples_total The total number of samples ingested. # TYPE cortex_ingester_ingested_samples_total counter @@ -1680,6 +1713,9 @@ func TestIngester_Push(t *testing.T) { # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge cortex_ingester_active_series{user="test"} 1 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test"} 0 `, }, "should soft fail on exemplar with unknown series": { @@ -1714,6 +1750,7 @@ func TestIngester_Push(t *testing.T) { "cortex_ingester_tsdb_exemplar_last_exemplars_timestamp_seconds", "cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total", "cortex_ingester_active_series", + "cortex_ingester_active_native_histogram_series", }, expectedMetrics: ` # HELP cortex_ingester_ingested_samples_total The total number of samples ingested. @@ -1737,6 +1774,9 @@ func TestIngester_Push(t *testing.T) { # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge cortex_ingester_active_series{user="test"} 0 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test"} 0 # HELP cortex_ingester_tsdb_exemplar_exemplars_appended_total Total number of TSDB exemplars appended. # TYPE cortex_ingester_tsdb_exemplar_exemplars_appended_total counter @@ -1775,6 +1815,7 @@ func TestIngester_Push(t *testing.T) { additionalMetrics: []string{ "cortex_ingester_tsdb_head_samples_appended_total", "cortex_ingester_active_series", + "cortex_ingester_active_native_histogram_series", }, expectedMetrics: ` # HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion. @@ -1813,6 +1854,9 @@ func TestIngester_Push(t *testing.T) { # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge cortex_ingester_active_series{user="test"} 1 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test"} 1 `, }, "should succeed when only float native histogram present if enabled": { @@ -1831,6 +1875,7 @@ func TestIngester_Push(t *testing.T) { additionalMetrics: []string{ "cortex_ingester_tsdb_head_samples_appended_total", "cortex_ingester_active_series", + "cortex_ingester_active_native_histogram_series", }, expectedMetrics: ` # HELP cortex_ingester_ingested_samples_total The total number of samples ingested. @@ -1869,64 +1914,9 @@ func TestIngester_Push(t *testing.T) { # HELP cortex_ingester_active_series Number of currently active series per user. # TYPE cortex_ingester_active_series gauge cortex_ingester_active_series{user="test"} 1 - `, - }, - "should fail to ingest histogram due to OOO native histogram. Sample and histogram has same timestamp but sample got ingested first": { - reqs: []*cortexpb.WriteRequest{ - cortexpb.ToWriteRequest( - []labels.Labels{metricLabels}, - []cortexpb.Sample{{Value: 2, TimestampMs: 11}}, - nil, - []cortexpb.Histogram{testHistogram}, - cortexpb.API), - }, - expectedErr: nil, - expectedIngested: []cortexpb.TimeSeries{ - {Labels: metricLabelAdapters, Samples: []cortexpb.Sample{{Value: 2, TimestampMs: 11}}}, - }, - additionalMetrics: []string{ - "cortex_ingester_tsdb_head_samples_appended_total", - "cortex_ingester_tsdb_out_of_order_samples_total", - "cortex_ingester_active_series", - }, - expectedMetrics: ` - # HELP cortex_ingester_ingested_samples_total The total number of samples ingested. - # TYPE cortex_ingester_ingested_samples_total counter - cortex_ingester_ingested_samples_total 1 - # HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion. - # TYPE cortex_ingester_ingested_samples_failures_total counter - cortex_ingester_ingested_samples_failures_total 0 - # HELP cortex_ingester_ingested_native_histograms_total The total number of native histograms ingested. - # TYPE cortex_ingester_ingested_native_histograms_total counter - cortex_ingester_ingested_native_histograms_total 1 - # HELP cortex_ingester_ingested_native_histograms_failures_total The total number of native histograms that errored on ingestion. - # TYPE cortex_ingester_ingested_native_histograms_failures_total counter - cortex_ingester_ingested_native_histograms_failures_total 0 - # HELP cortex_ingester_memory_users The current number of users in memory. - # TYPE cortex_ingester_memory_users gauge - cortex_ingester_memory_users 1 - # HELP cortex_ingester_tsdb_head_samples_appended_total Total number of appended samples. - # TYPE cortex_ingester_tsdb_head_samples_appended_total counter - cortex_ingester_tsdb_head_samples_appended_total{type="float",user="test"} 1 - cortex_ingester_tsdb_head_samples_appended_total{type="histogram",user="test"} 0 - # HELP cortex_ingester_tsdb_out_of_order_samples_total Total number of out of order samples ingestion failed attempts due to out of order being disabled. - # TYPE cortex_ingester_tsdb_out_of_order_samples_total counter - cortex_ingester_tsdb_out_of_order_samples_total{type="float",user="test"} 0 - cortex_ingester_tsdb_out_of_order_samples_total{type="histogram",user="test"} 1 - # HELP cortex_ingester_memory_series The current number of series in memory. - # TYPE cortex_ingester_memory_series gauge - cortex_ingester_memory_series 1 - # HELP cortex_ingester_memory_series_created_total The total number of series that were created per user. - # TYPE cortex_ingester_memory_series_created_total counter - cortex_ingester_memory_series_created_total{user="test"} 1 - # HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user. - # TYPE cortex_ingester_memory_series_removed_total counter - cortex_ingester_memory_series_removed_total{user="test"} 0 - # HELP cortex_discarded_samples_total The total number of samples that were discarded. - # TYPE cortex_discarded_samples_total counter - # HELP cortex_ingester_active_series Number of currently active series per user. - # TYPE cortex_ingester_active_series gauge - cortex_ingester_active_series{user="test"} 1 + # HELP cortex_ingester_active_native_histogram_series Number of currently active native histogram series per user. + # TYPE cortex_ingester_active_native_histogram_series gauge + cortex_ingester_active_native_histogram_series{user="test"} 1 `, }, } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 6cfe49dc1a2..bcb8148149d 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -44,9 +44,10 @@ type ingesterMetrics struct { memSeriesRemovedTotal *prometheus.CounterVec memMetadataRemovedTotal *prometheus.CounterVec - activeSeriesPerUser *prometheus.GaugeVec - limitsPerLabelSet *prometheus.GaugeVec - usagePerLabelSet *prometheus.GaugeVec + activeSeriesPerUser *prometheus.GaugeVec + activeNHSeriesPerUser *prometheus.GaugeVec + limitsPerLabelSet *prometheus.GaugeVec + usagePerLabelSet *prometheus.GaugeVec // Global limit metrics maxUsersGauge prometheus.GaugeFunc @@ -249,6 +250,12 @@ func newIngesterMetrics(r prometheus.Registerer, Name: "cortex_ingester_active_series", Help: "Number of currently active series per user.", }, []string{"user"}), + + // Not registered automatically, but only if activeSeriesEnabled is true. + activeNHSeriesPerUser: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_active_native_histogram_series", + Help: "Number of currently active native histogram series per user.", + }, []string{"user"}), } if postingsCacheEnabled && r != nil { @@ -257,6 +264,7 @@ func newIngesterMetrics(r prometheus.Registerer, if activeSeriesEnabled && r != nil { r.MustRegister(m.activeSeriesPerUser) + r.MustRegister(m.activeNHSeriesPerUser) } if createMetricsConflictingWithTSDB { @@ -278,6 +286,7 @@ func (m *ingesterMetrics) deletePerUserMetrics(userID string) { m.memMetadataCreatedTotal.DeleteLabelValues(userID) m.memMetadataRemovedTotal.DeleteLabelValues(userID) m.activeSeriesPerUser.DeleteLabelValues(userID) + m.activeNHSeriesPerUser.DeleteLabelValues(userID) m.usagePerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID}) m.limitsPerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID})