Skip to content

Commit 7aca81c

Browse files
committed
Add cortex_ingester_active_native_histogram_series metric to track active # of NH series
Signed-off-by: SungJin1212 <[email protected]>
1 parent 06cedd9 commit 7aca81c

File tree

6 files changed

+135
-92
lines changed

6 files changed

+135
-92
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
77
* [FEATURE] Update prometheus alertmanager version to v0.28.0 and add new integration msteamsv2, jira, and rocketchat. #6590
88
* [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663
9+
* [ENHANCEMENT] Ingester: Add a `cortex_ingester_active_native_histogram_series` metric to track # of active NH series. #6695
910
* [ENHANCEMENT] Query Frontend: Add new limit `-frontend.max-query-response-size` for total query response size after decompression in query frontend. #6607
1011
* [ENHANCEMENT] Alertmanager: Add nflog and silences maintenance metrics. #6659
1112
* [ENHANCEMENT] Querier: limit label APIs to query only ingesters if `start` param is not been specified. #6618

pkg/ingester/active_series.go

+43-12
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,17 @@ type activeSeriesStripe struct {
2525
// without holding the lock -- hence the atomic).
2626
oldestEntryTs atomic.Int64
2727

28-
mu sync.RWMutex
29-
refs map[uint64][]activeSeriesEntry
30-
active int // Number of active entries in this stripe. Only decreased during purge or clear.
28+
mu sync.RWMutex
29+
refs map[uint64][]activeSeriesEntry
30+
active int // Number of active entries in this stripe. Only decreased during purge or clear.
31+
activeNativeHistogram int // Number of active entries only for Native Histogram in this stripe. Only decreased during purge or clear.
3132
}
3233

3334
// activeSeriesEntry holds a timestamp for single series.
3435
type activeSeriesEntry struct {
35-
lbs labels.Labels
36-
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
36+
lbs labels.Labels
37+
nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe.
38+
isNativeHistogram bool
3739
}
3840

3941
func NewActiveSeries() *ActiveSeries {
@@ -48,10 +50,10 @@ func NewActiveSeries() *ActiveSeries {
4850
}
4951

5052
// Updates series timestamp to 'now'. Function is called to make a copy of labels if entry doesn't exist yet.
51-
func (c *ActiveSeries) UpdateSeries(series labels.Labels, hash uint64, now time.Time, labelsCopy func(labels.Labels) labels.Labels) {
53+
func (c *ActiveSeries) UpdateSeries(series labels.Labels, hash uint64, now time.Time, nativeHistogram bool, labelsCopy func(labels.Labels) labels.Labels) {
5254
stripeID := hash % numActiveSeriesStripes
5355

54-
c.stripes[stripeID].updateSeriesTimestamp(now, series, hash, labelsCopy)
56+
c.stripes[stripeID].updateSeriesTimestamp(now, series, hash, nativeHistogram, labelsCopy)
5557
}
5658

5759
// Purge removes expired entries from the cache. This function should be called
@@ -77,13 +79,21 @@ func (c *ActiveSeries) Active() int {
7779
return total
7880
}
7981

80-
func (s *activeSeriesStripe) updateSeriesTimestamp(now time.Time, series labels.Labels, fingerprint uint64, labelsCopy func(labels.Labels) labels.Labels) {
82+
func (c *ActiveSeries) ActiveNativeHistogram() int {
83+
total := 0
84+
for s := 0; s < numActiveSeriesStripes; s++ {
85+
total += c.stripes[s].getActiveNativeHistogram()
86+
}
87+
return total
88+
}
89+
90+
func (s *activeSeriesStripe) updateSeriesTimestamp(now time.Time, series labels.Labels, fingerprint uint64, nativeHistogram bool, labelsCopy func(labels.Labels) labels.Labels) {
8191
nowNanos := now.UnixNano()
8292

8393
e := s.findEntryForSeries(fingerprint, series)
8494
entryTimeSet := false
8595
if e == nil {
86-
e, entryTimeSet = s.findOrCreateEntryForSeries(fingerprint, series, nowNanos, labelsCopy)
96+
e, entryTimeSet = s.findOrCreateEntryForSeries(fingerprint, series, nowNanos, nativeHistogram, labelsCopy)
8797
}
8898

8999
if !entryTimeSet {
@@ -117,7 +127,7 @@ func (s *activeSeriesStripe) findEntryForSeries(fingerprint uint64, series label
117127
return nil
118128
}
119129

120-
func (s *activeSeriesStripe) findOrCreateEntryForSeries(fingerprint uint64, series labels.Labels, nowNanos int64, labelsCopy func(labels.Labels) labels.Labels) (*atomic.Int64, bool) {
130+
func (s *activeSeriesStripe) findOrCreateEntryForSeries(fingerprint uint64, series labels.Labels, nowNanos int64, nativeHistogram bool, labelsCopy func(labels.Labels) labels.Labels) (*atomic.Int64, bool) {
121131
s.mu.Lock()
122132
defer s.mu.Unlock()
123133

@@ -129,9 +139,13 @@ func (s *activeSeriesStripe) findOrCreateEntryForSeries(fingerprint uint64, seri
129139
}
130140

131141
s.active++
142+
if nativeHistogram {
143+
s.activeNativeHistogram++
144+
}
132145
e := activeSeriesEntry{
133-
lbs: labelsCopy(series),
134-
nanos: atomic.NewInt64(nowNanos),
146+
lbs: labelsCopy(series),
147+
nanos: atomic.NewInt64(nowNanos),
148+
isNativeHistogram: nativeHistogram,
135149
}
136150

137151
s.refs[fingerprint] = append(s.refs[fingerprint], e)
@@ -160,6 +174,7 @@ func (s *activeSeriesStripe) purge(keepUntil time.Time) {
160174
defer s.mu.Unlock()
161175

162176
active := 0
177+
activeNativeHistogram := 0
163178

164179
oldest := int64(math.MaxInt64)
165180
for fp, entries := range s.refs {
@@ -173,6 +188,9 @@ func (s *activeSeriesStripe) purge(keepUntil time.Time) {
173188
}
174189

175190
active++
191+
if entries[0].isNativeHistogram {
192+
activeNativeHistogram++
193+
}
176194
if ts < oldest {
177195
oldest = ts
178196
}
@@ -199,6 +217,11 @@ func (s *activeSeriesStripe) purge(keepUntil time.Time) {
199217
delete(s.refs, fp)
200218
} else {
201219
active += cnt
220+
for _, e := range entries {
221+
if e.isNativeHistogram {
222+
activeNativeHistogram++
223+
}
224+
}
202225
s.refs[fp] = entries
203226
}
204227
}
@@ -209,6 +232,7 @@ func (s *activeSeriesStripe) purge(keepUntil time.Time) {
209232
s.oldestEntryTs.Store(oldest)
210233
}
211234
s.active = active
235+
s.activeNativeHistogram = activeNativeHistogram
212236
}
213237

214238
func (s *activeSeriesStripe) getActive() int {
@@ -217,3 +241,10 @@ func (s *activeSeriesStripe) getActive() int {
217241

218242
return s.active
219243
}
244+
245+
func (s *activeSeriesStripe) getActiveNativeHistogram() int {
246+
s.mu.RLock()
247+
defer s.mu.RUnlock()
248+
249+
return s.activeNativeHistogram
250+
}

pkg/ingester/active_series_test.go

+21-13
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,20 @@ func TestActiveSeries_UpdateSeries(t *testing.T) {
2626

2727
c := NewActiveSeries()
2828
assert.Equal(t, 0, c.Active())
29+
assert.Equal(t, 0, c.ActiveNativeHistogram())
2930
labels1Hash := fromLabelToLabels(ls1).Hash()
3031
labels2Hash := fromLabelToLabels(ls2).Hash()
31-
c.UpdateSeries(ls1, labels1Hash, time.Now(), copyFn)
32+
c.UpdateSeries(ls1, labels1Hash, time.Now(), true, copyFn)
3233
assert.Equal(t, 1, c.Active())
34+
assert.Equal(t, 1, c.ActiveNativeHistogram())
3335

34-
c.UpdateSeries(ls1, labels1Hash, time.Now(), copyFn)
36+
c.UpdateSeries(ls1, labels1Hash, time.Now(), true, copyFn)
3537
assert.Equal(t, 1, c.Active())
38+
assert.Equal(t, 1, c.ActiveNativeHistogram())
3639

37-
c.UpdateSeries(ls2, labels2Hash, time.Now(), copyFn)
40+
c.UpdateSeries(ls2, labels2Hash, time.Now(), true, copyFn)
3841
assert.Equal(t, 2, c.Active())
42+
assert.Equal(t, 2, c.ActiveNativeHistogram())
3943
}
4044

4145
func TestActiveSeries_Purge(t *testing.T) {
@@ -52,7 +56,7 @@ func TestActiveSeries_Purge(t *testing.T) {
5256
c := NewActiveSeries()
5357

5458
for i := 0; i < len(series); i++ {
55-
c.UpdateSeries(series[i], fromLabelToLabels(series[i]).Hash(), time.Unix(int64(i), 0), copyFn)
59+
c.UpdateSeries(series[i], fromLabelToLabels(series[i]).Hash(), time.Unix(int64(i), 0), true, copyFn)
5660
}
5761

5862
c.Purge(time.Unix(int64(ttl+1), 0))
@@ -61,6 +65,7 @@ func TestActiveSeries_Purge(t *testing.T) {
6165

6266
exp := len(series) - (ttl + 1)
6367
assert.Equal(t, exp, c.Active())
68+
assert.Equal(t, exp, c.ActiveNativeHistogram())
6469
}
6570
}
6671

@@ -71,23 +76,26 @@ func TestActiveSeries_PurgeOpt(t *testing.T) {
7176
c := NewActiveSeries()
7277

7378
now := time.Now()
74-
c.UpdateSeries(ls1, ls1.Hash(), now.Add(-2*time.Minute), copyFn)
75-
c.UpdateSeries(ls2, ls2.Hash(), now, copyFn)
79+
c.UpdateSeries(ls1, ls1.Hash(), now.Add(-2*time.Minute), true, copyFn)
80+
c.UpdateSeries(ls2, ls2.Hash(), now, true, copyFn)
7681
c.Purge(now)
7782

7883
assert.Equal(t, 1, c.Active())
84+
assert.Equal(t, 1, c.ActiveNativeHistogram())
7985

80-
c.UpdateSeries(ls1, ls1.Hash(), now.Add(-1*time.Minute), copyFn)
81-
c.UpdateSeries(ls2, ls2.Hash(), now, copyFn)
86+
c.UpdateSeries(ls1, ls1.Hash(), now.Add(-1*time.Minute), true, copyFn)
87+
c.UpdateSeries(ls2, ls2.Hash(), now, true, copyFn)
8288
c.Purge(now)
8389

8490
assert.Equal(t, 1, c.Active())
91+
assert.Equal(t, 1, c.ActiveNativeHistogram())
8592

8693
// This will *not* update the series, since there is already newer timestamp.
87-
c.UpdateSeries(ls2, ls2.Hash(), now.Add(-1*time.Minute), copyFn)
94+
c.UpdateSeries(ls2, ls2.Hash(), now.Add(-1*time.Minute), true, copyFn)
8895
c.Purge(now)
8996

9097
assert.Equal(t, 1, c.Active())
98+
assert.Equal(t, 1, c.ActiveNativeHistogram())
9199
}
92100

93101
var activeSeriesTestGoroutines = []int{50, 100, 500}
@@ -121,7 +129,7 @@ func benchmarkActiveSeriesConcurrencySingleSeries(b *testing.B, goroutines int)
121129

122130
for ix := 0; ix < max; ix++ {
123131
now = now.Add(time.Duration(ix) * time.Millisecond)
124-
c.UpdateSeries(series, labelhash, now, copyFn)
132+
c.UpdateSeries(series, labelhash, now, false, copyFn)
125133
}
126134
}()
127135
}
@@ -152,7 +160,7 @@ func BenchmarkActiveSeries_UpdateSeries(b *testing.B) {
152160

153161
b.ResetTimer()
154162
for ix := 0; ix < b.N; ix++ {
155-
c.UpdateSeries(series[ix], labelhash[ix], time.Unix(0, now+int64(ix)), copyFn)
163+
c.UpdateSeries(series[ix], labelhash[ix], time.Unix(0, now+int64(ix)), false, copyFn)
156164
}
157165
}
158166

@@ -184,9 +192,9 @@ func benchmarkPurge(b *testing.B, twice bool) {
184192
// Prepare series
185193
for ix, s := range series {
186194
if ix < numExpiresSeries {
187-
c.UpdateSeries(s, labelhash[ix], now.Add(-time.Minute), copyFn)
195+
c.UpdateSeries(s, labelhash[ix], now.Add(-time.Minute), false, copyFn)
188196
} else {
189-
c.UpdateSeries(s, labelhash[ix], now, copyFn)
197+
c.UpdateSeries(s, labelhash[ix], now, false, copyFn)
190198
}
191199
}
192200

pkg/ingester/ingester.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -1038,6 +1038,7 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) {
10381038

10391039
userDB.activeSeries.Purge(purgeTime)
10401040
i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(userDB.activeSeries.Active()))
1041+
i.metrics.activeNHSeriesPerUser.WithLabelValues(userID).Set(float64(userDB.activeSeries.ActiveNativeHistogram()))
10411042
if err := userDB.labelSetCounter.UpdateMetric(ctx, userDB, i.metrics); err != nil {
10421043
level.Warn(i.logger).Log("msg", "failed to update per labelSet metrics", "user", userID, "err", err)
10431044
}
@@ -1376,9 +1377,11 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
13761377
} else {
13771378
discardedNativeHistogramCount += len(ts.Histograms)
13781379
}
1379-
shouldUpdateSeries := (succeededSamplesCount > oldSucceededSamplesCount) || (succeededHistogramsCount > oldSucceededHistogramsCount)
1380+
1381+
isNHAppended := succeededHistogramsCount > oldSucceededHistogramsCount
1382+
shouldUpdateSeries := (succeededSamplesCount > oldSucceededSamplesCount) || isNHAppended
13801383
if i.cfg.ActiveSeriesMetricsEnabled && shouldUpdateSeries {
1381-
db.activeSeries.UpdateSeries(tsLabels, tsLabelsHash, startAppend, func(l labels.Labels) labels.Labels {
1384+
db.activeSeries.UpdateSeries(tsLabels, tsLabelsHash, startAppend, isNHAppended, func(l labels.Labels) labels.Labels {
13821385
// we must already have copied the labels if succeededSamplesCount or succeededHistogramsCount has been incremented.
13831386
return copiedLabels
13841387
})
@@ -2526,6 +2529,7 @@ func (i *Ingester) closeAllTSDB() {
25262529

25272530
i.metrics.memUsers.Dec()
25282531
i.metrics.activeSeriesPerUser.DeleteLabelValues(userID)
2532+
i.metrics.activeNHSeriesPerUser.DeleteLabelValues(userID)
25292533
}(userDB)
25302534
}
25312535

0 commit comments

Comments
 (0)