Skip to content

Add cortex_ingester_active_native_histogram_series metric to track ac… #6695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663
* [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680
* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. #6681
* [ENHANCEMENT] Ingester: Add a `cortex_ingester_active_native_histogram_series` metric to track # of active NH series. #6695
* [ENHANCEMENT] Query Frontend: Add new limit `-frontend.max-query-response-size` for total query response size after decompression in query frontend. #6607
* [ENHANCEMENT] Alertmanager: Add nflog and silences maintenance metrics. #6659
* [ENHANCEMENT] Querier: limit label APIs to query only ingesters if `start` param is not been specified. #6618
Expand Down
55 changes: 43 additions & 12 deletions pkg/ingester/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ type activeSeriesStripe struct {
// without holding the lock -- hence the atomic).
oldestEntryTs atomic.Int64

mu sync.RWMutex
refs map[uint64][]activeSeriesEntry
active int // Number of active entries in this stripe. Only decreased during purge or clear.
mu sync.RWMutex
refs map[uint64][]activeSeriesEntry
active int // Number of active entries in this stripe. Only decreased during purge or clear.
activeNativeHistogram int // Number of active entries only for Native Histogram in this stripe. Only decreased during purge or clear.
}

// activeSeriesEntry holds a timestamp for single series.
type activeSeriesEntry struct {
lbs labels.Labels
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
lbs labels.Labels
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
isNativeHistogram bool
}

func NewActiveSeries() *ActiveSeries {
Expand All @@ -48,10 +50,10 @@ func NewActiveSeries() *ActiveSeries {
}

// Updates series timestamp to 'now'. Function is called to make a copy of labels if entry doesn't exist yet.
func (c *ActiveSeries) UpdateSeries(series labels.Labels, hash uint64, now time.Time, labelsCopy func(labels.Labels) labels.Labels) {
func (c *ActiveSeries) UpdateSeries(series labels.Labels, hash uint64, now time.Time, nativeHistogram bool, labelsCopy func(labels.Labels) labels.Labels) {
stripeID := hash % numActiveSeriesStripes

c.stripes[stripeID].updateSeriesTimestamp(now, series, hash, labelsCopy)
c.stripes[stripeID].updateSeriesTimestamp(now, series, hash, nativeHistogram, labelsCopy)
}

// Purge removes expired entries from the cache. This function should be called
Expand All @@ -77,13 +79,21 @@ func (c *ActiveSeries) Active() int {
return total
}

func (s *activeSeriesStripe) updateSeriesTimestamp(now time.Time, series labels.Labels, fingerprint uint64, labelsCopy func(labels.Labels) labels.Labels) {
func (c *ActiveSeries) ActiveNativeHistogram() int {
total := 0
for s := 0; s < numActiveSeriesStripes; s++ {
total += c.stripes[s].getActiveNativeHistogram()
}
return total
}

func (s *activeSeriesStripe) updateSeriesTimestamp(now time.Time, series labels.Labels, fingerprint uint64, nativeHistogram bool, labelsCopy func(labels.Labels) labels.Labels) {
nowNanos := now.UnixNano()

e := s.findEntryForSeries(fingerprint, series)
entryTimeSet := false
if e == nil {
e, entryTimeSet = s.findOrCreateEntryForSeries(fingerprint, series, nowNanos, labelsCopy)
e, entryTimeSet = s.findOrCreateEntryForSeries(fingerprint, series, nowNanos, nativeHistogram, labelsCopy)
}

if !entryTimeSet {
Expand Down Expand Up @@ -117,7 +127,7 @@ func (s *activeSeriesStripe) findEntryForSeries(fingerprint uint64, series label
return nil
}

func (s *activeSeriesStripe) findOrCreateEntryForSeries(fingerprint uint64, series labels.Labels, nowNanos int64, labelsCopy func(labels.Labels) labels.Labels) (*atomic.Int64, bool) {
func (s *activeSeriesStripe) findOrCreateEntryForSeries(fingerprint uint64, series labels.Labels, nowNanos int64, nativeHistogram bool, labelsCopy func(labels.Labels) labels.Labels) (*atomic.Int64, bool) {
s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -129,9 +139,13 @@ func (s *activeSeriesStripe) findOrCreateEntryForSeries(fingerprint uint64, seri
}

s.active++
if nativeHistogram {
s.activeNativeHistogram++
}
e := activeSeriesEntry{
lbs: labelsCopy(series),
nanos: atomic.NewInt64(nowNanos),
lbs: labelsCopy(series),
nanos: atomic.NewInt64(nowNanos),
isNativeHistogram: nativeHistogram,
}

s.refs[fingerprint] = append(s.refs[fingerprint], e)
Expand Down Expand Up @@ -160,6 +174,7 @@ func (s *activeSeriesStripe) purge(keepUntil time.Time) {
defer s.mu.Unlock()

active := 0
activeNativeHistogram := 0

oldest := int64(math.MaxInt64)
for fp, entries := range s.refs {
Expand All @@ -173,6 +188,9 @@ func (s *activeSeriesStripe) purge(keepUntil time.Time) {
}

active++
if entries[0].isNativeHistogram {
activeNativeHistogram++
}
if ts < oldest {
oldest = ts
}
Expand All @@ -199,6 +217,11 @@ func (s *activeSeriesStripe) purge(keepUntil time.Time) {
delete(s.refs, fp)
} else {
active += cnt
for _, e := range entries {
if e.isNativeHistogram {
activeNativeHistogram++
}
}
s.refs[fp] = entries
}
}
Expand All @@ -209,6 +232,7 @@ func (s *activeSeriesStripe) purge(keepUntil time.Time) {
s.oldestEntryTs.Store(oldest)
}
s.active = active
s.activeNativeHistogram = activeNativeHistogram
}

func (s *activeSeriesStripe) getActive() int {
Expand All @@ -217,3 +241,10 @@ func (s *activeSeriesStripe) getActive() int {

return s.active
}

func (s *activeSeriesStripe) getActiveNativeHistogram() int {
s.mu.RLock()
defer s.mu.RUnlock()

return s.activeNativeHistogram
}
34 changes: 21 additions & 13 deletions pkg/ingester/active_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@ func TestActiveSeries_UpdateSeries(t *testing.T) {

c := NewActiveSeries()
assert.Equal(t, 0, c.Active())
assert.Equal(t, 0, c.ActiveNativeHistogram())
labels1Hash := fromLabelToLabels(ls1).Hash()
labels2Hash := fromLabelToLabels(ls2).Hash()
c.UpdateSeries(ls1, labels1Hash, time.Now(), copyFn)
c.UpdateSeries(ls1, labels1Hash, time.Now(), true, copyFn)
assert.Equal(t, 1, c.Active())
assert.Equal(t, 1, c.ActiveNativeHistogram())

c.UpdateSeries(ls1, labels1Hash, time.Now(), copyFn)
c.UpdateSeries(ls1, labels1Hash, time.Now(), true, copyFn)
assert.Equal(t, 1, c.Active())
assert.Equal(t, 1, c.ActiveNativeHistogram())

c.UpdateSeries(ls2, labels2Hash, time.Now(), copyFn)
c.UpdateSeries(ls2, labels2Hash, time.Now(), true, copyFn)
assert.Equal(t, 2, c.Active())
assert.Equal(t, 2, c.ActiveNativeHistogram())
}

func TestActiveSeries_Purge(t *testing.T) {
Expand All @@ -52,7 +56,7 @@ func TestActiveSeries_Purge(t *testing.T) {
c := NewActiveSeries()

for i := 0; i < len(series); i++ {
c.UpdateSeries(series[i], fromLabelToLabels(series[i]).Hash(), time.Unix(int64(i), 0), copyFn)
c.UpdateSeries(series[i], fromLabelToLabels(series[i]).Hash(), time.Unix(int64(i), 0), true, copyFn)
}

c.Purge(time.Unix(int64(ttl+1), 0))
Expand All @@ -61,6 +65,7 @@ func TestActiveSeries_Purge(t *testing.T) {

exp := len(series) - (ttl + 1)
assert.Equal(t, exp, c.Active())
assert.Equal(t, exp, c.ActiveNativeHistogram())
}
}

Expand All @@ -71,23 +76,26 @@ func TestActiveSeries_PurgeOpt(t *testing.T) {
c := NewActiveSeries()

now := time.Now()
c.UpdateSeries(ls1, ls1.Hash(), now.Add(-2*time.Minute), copyFn)
c.UpdateSeries(ls2, ls2.Hash(), now, copyFn)
c.UpdateSeries(ls1, ls1.Hash(), now.Add(-2*time.Minute), true, copyFn)
c.UpdateSeries(ls2, ls2.Hash(), now, true, copyFn)
c.Purge(now)

assert.Equal(t, 1, c.Active())
assert.Equal(t, 1, c.ActiveNativeHistogram())

c.UpdateSeries(ls1, ls1.Hash(), now.Add(-1*time.Minute), copyFn)
c.UpdateSeries(ls2, ls2.Hash(), now, copyFn)
c.UpdateSeries(ls1, ls1.Hash(), now.Add(-1*time.Minute), true, copyFn)
c.UpdateSeries(ls2, ls2.Hash(), now, true, copyFn)
c.Purge(now)

assert.Equal(t, 1, c.Active())
assert.Equal(t, 1, c.ActiveNativeHistogram())

// This will *not* update the series, since there is already newer timestamp.
c.UpdateSeries(ls2, ls2.Hash(), now.Add(-1*time.Minute), copyFn)
c.UpdateSeries(ls2, ls2.Hash(), now.Add(-1*time.Minute), true, copyFn)
c.Purge(now)

assert.Equal(t, 1, c.Active())
assert.Equal(t, 1, c.ActiveNativeHistogram())
}

var activeSeriesTestGoroutines = []int{50, 100, 500}
Expand Down Expand Up @@ -121,7 +129,7 @@ func benchmarkActiveSeriesConcurrencySingleSeries(b *testing.B, goroutines int)

for ix := 0; ix < max; ix++ {
now = now.Add(time.Duration(ix) * time.Millisecond)
c.UpdateSeries(series, labelhash, now, copyFn)
c.UpdateSeries(series, labelhash, now, false, copyFn)
}
}()
}
Expand Down Expand Up @@ -152,7 +160,7 @@ func BenchmarkActiveSeries_UpdateSeries(b *testing.B) {

b.ResetTimer()
for ix := 0; ix < b.N; ix++ {
c.UpdateSeries(series[ix], labelhash[ix], time.Unix(0, now+int64(ix)), copyFn)
c.UpdateSeries(series[ix], labelhash[ix], time.Unix(0, now+int64(ix)), false, copyFn)
}
}

Expand Down Expand Up @@ -184,9 +192,9 @@ func benchmarkPurge(b *testing.B, twice bool) {
// Prepare series
for ix, s := range series {
if ix < numExpiresSeries {
c.UpdateSeries(s, labelhash[ix], now.Add(-time.Minute), copyFn)
c.UpdateSeries(s, labelhash[ix], now.Add(-time.Minute), false, copyFn)
} else {
c.UpdateSeries(s, labelhash[ix], now, copyFn)
c.UpdateSeries(s, labelhash[ix], now, false, copyFn)
}
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) {

userDB.activeSeries.Purge(purgeTime)
i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(userDB.activeSeries.Active()))
i.metrics.activeNHSeriesPerUser.WithLabelValues(userID).Set(float64(userDB.activeSeries.ActiveNativeHistogram()))
if err := userDB.labelSetCounter.UpdateMetric(ctx, userDB, i.metrics); err != nil {
level.Warn(i.logger).Log("msg", "failed to update per labelSet metrics", "user", userID, "err", err)
}
Expand Down Expand Up @@ -1376,9 +1377,11 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
} else {
discardedNativeHistogramCount += len(ts.Histograms)
}
shouldUpdateSeries := (succeededSamplesCount > oldSucceededSamplesCount) || (succeededHistogramsCount > oldSucceededHistogramsCount)

isNHAppended := succeededHistogramsCount > oldSucceededHistogramsCount
shouldUpdateSeries := (succeededSamplesCount > oldSucceededSamplesCount) || isNHAppended
if i.cfg.ActiveSeriesMetricsEnabled && shouldUpdateSeries {
db.activeSeries.UpdateSeries(tsLabels, tsLabelsHash, startAppend, func(l labels.Labels) labels.Labels {
db.activeSeries.UpdateSeries(tsLabels, tsLabelsHash, startAppend, isNHAppended, func(l labels.Labels) labels.Labels {
// we must already have copied the labels if succeededSamplesCount or succeededHistogramsCount has been incremented.
return copiedLabels
})
Expand Down Expand Up @@ -2526,6 +2529,7 @@ func (i *Ingester) closeAllTSDB() {

i.metrics.memUsers.Dec()
i.metrics.activeSeriesPerUser.DeleteLabelValues(userID)
i.metrics.activeNHSeriesPerUser.DeleteLabelValues(userID)
}(userDB)
}

Expand Down
Loading
Loading