Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
* [BUGFIX] Query-frontend: Fix silent panic when executing a remote read API request if the request has no matchers. #13745
* [BUGFIX] Ruler: Fixed `-ruler.max-rule-groups-per-tenant-by-namespace` to only count rule groups in the specified namespace instead of all namespaces. #13743
* [BUGFIX] Query-frontend: Fix race condition that could sometimes cause unnecessary resharding of queriers if querier shuffle sharding and remote execution is enabled. #13794 #13838
* [ENHANCEMENT] MQE: Add metrics to track step-invariant expression usage and data point reuse savings: `cortex_mimir_query_engine_step_invariant_nodes_total` and `cortex_mimir_query_engine_step_invariant_points_saved_total`. #13911

### Mixin

Expand Down
1 change: 1 addition & 0 deletions pkg/streamingpromql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func (e *Engine) materializeAndCreateEvaluator(ctx context.Context, queryable st
operatorParams := &planning.OperatorParameters{
Queryable: queryable,
MemoryConsumptionTracker: memoryConsumptionTracker,
OperatorMetricsTracker: e.planner.operatorMetricsTracker,
Annotations: annotations.New(),
QueryStats: types.NewQueryStats(),
LookbackDelta: lookbackDelta,
Expand Down
146 changes: 146 additions & 0 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/storage/lazyquery"
"github.com/grafana/mimir/pkg/streamingpromql/compat"
operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics"
"github.com/grafana/mimir/pkg/streamingpromql/planning"
"github.com/grafana/mimir/pkg/streamingpromql/testutils"
"github.com/grafana/mimir/pkg/streamingpromql/types"
Expand Down Expand Up @@ -4771,6 +4772,151 @@ func TestExtendedRangeSelectorsIrregular(t *testing.T) {
}
}

func TestStepInvariantMetricsTracker(t *testing.T) {
storage := promqltest.LoadedStorage(t, `
load 1m
metric 0 1 2 3 4 5
nodata _ _ _ _ _ _
histogram {{schema:3 sum:4 count:4 buckets:[1 2 1]}}+{{schema:5 sum:2 count:1 buckets:[1] offset:1}}x6
`)
t.Cleanup(func() { storage.Close() })

tc := []struct {
query string
start time.Time
end time.Time
interval time.Duration

expectedNodes int
expectedPoints map[operatormetrics.StepInvariantPointType]int
}{
{
query: "metric @ 20",
start: time.Unix(0, 0),
end: time.Unix(0, 0).Add(time.Second * 50),
interval: time.Second * 10,
expectedNodes: 1,
expectedPoints: map[operatormetrics.StepInvariantPointType]int{
operatormetrics.FPoint: 5, // 6 steps
operatormetrics.HPoint: 0,
},
},
{
query: "metric @ 20",
start: time.Unix(0, 0).Add(time.Second * 50),
end: time.Unix(0, 0).Add(time.Second * 50),
interval: time.Second * 10,
expectedNodes: 0, // step invariant operation has been removed
expectedPoints: map[operatormetrics.StepInvariantPointType]int{
operatormetrics.FPoint: 0,
operatormetrics.HPoint: 0,
},
},
{
query: "nodata @ 20",
start: time.Unix(0, 0),
end: time.Unix(0, 0).Add(time.Second * 50),
interval: time.Second * 10,
expectedNodes: 1,
expectedPoints: map[operatormetrics.StepInvariantPointType]int{
operatormetrics.FPoint: 0,
operatormetrics.HPoint: 0,
},
},
{
query: "metric @ 20 + metric @ 30",
start: time.Unix(0, 0),
end: time.Unix(0, 0).Add(time.Second * 50),
interval: time.Second * 10,
expectedNodes: 1, // the left and right are all wrapped into a single step invariant
expectedPoints: map[operatormetrics.StepInvariantPointType]int{
operatormetrics.FPoint: 5,
operatormetrics.HPoint: 0,
},
},
{
query: "metric @ 20 * metric + metric @ 30",
start: time.Unix(0, 0),
end: time.Unix(0, 0).Add(time.Second * 50),
interval: time.Second * 10,
expectedNodes: 2,
expectedPoints: map[operatormetrics.StepInvariantPointType]int{
operatormetrics.FPoint: 10,
operatormetrics.HPoint: 0,
},
},
{
query: "abs(metric @ 20 * metric + metric @ 30)",
start: time.Unix(0, 0),
end: time.Unix(0, 0).Add(time.Second * 50),
interval: time.Second * 10,
expectedNodes: 2,
expectedPoints: map[operatormetrics.StepInvariantPointType]int{
operatormetrics.FPoint: 10,
operatormetrics.HPoint: 0,
},
},
{
query: "scalar(metric @ 20)",
start: time.Unix(0, 0),
end: time.Unix(0, 0).Add(time.Second * 50),
interval: time.Second * 10,
expectedNodes: 1,
expectedPoints: map[operatormetrics.StepInvariantPointType]int{
operatormetrics.FPoint: 5,
operatormetrics.HPoint: 0,
},
},
{
query: "histogram @ 20",
start: time.Unix(0, 0),
end: time.Unix(0, 0).Add(time.Second * 50),
interval: time.Second * 10,
expectedNodes: 1,
expectedPoints: map[operatormetrics.StepInvariantPointType]int{
operatormetrics.FPoint: 0,
operatormetrics.HPoint: 5,
},
},
{
query: "(metric @ 20 or metric) or (histogram @ 20 or histogram) or (vector(scalar(metric @ 30)) or metric)",
start: time.Unix(0, 0),
end: time.Unix(0, 0).Add(time.Second * 50),
interval: time.Second * 10,
expectedNodes: 3,
expectedPoints: map[operatormetrics.StepInvariantPointType]int{
operatormetrics.FPoint: 10,
operatormetrics.HPoint: 5,
},
},
}

for _, tc := range tc {
t.Run(tc.query, func(t *testing.T) {

tracker := operatormetrics.NewOperatorMetricsTracker(prometheus.NewRegistry())

opts := NewTestEngineOpts()
planner, err := NewQueryPlannerWithoutOptimizationPasses(opts, NewMaximumSupportedVersionQueryPlanVersionProvider())
require.NoError(t, err)

planner.RegisterOperatorMetrics(tracker)
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), planner)
require.NoError(t, err)

qry, err := engine.NewRangeQuery(context.Background(), storage, nil, tc.query, tc.start, tc.end, tc.interval)
require.NoError(t, err)
res := qry.Exec(context.Background())
require.NoError(t, res.Err)
require.Equal(t, float64(tc.expectedNodes), testutil.ToFloat64(tracker.StepInvariantTracker.NodeCounter()))

for pointType, expectedCount := range tc.expectedPoints {
require.Equal(t, float64(expectedCount), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(pointType)))
}
})
}
}

type dummyMaterializer struct{}

func (d dummyMaterializer) Materialize(n planning.Node, materializer *planning.Materializer, timeRange types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) {
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should track only the number of times a step-invariant node is added as well as the number of steps (not samples) saved.

This would simplify things greatly as we could calculate these at planning time, rather than during evaluation.

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operatormetrics

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// MetricsTracker holds metrics trackers used by operators.
// These trackers can be passed into operators to elicit metrics
// during their lifecycle.
type MetricsTracker struct {
StepInvariantTracker *StepInvariantExpressionMetricsTracker
}

func NewOperatorMetricsTracker(reg prometheus.Registerer) *MetricsTracker {
return &MetricsTracker{
StepInvariantTracker: newStepInvariantExpressionMetricsTracker(reg),
}
}

type StepInvariantPointType int

const (
FPoint StepInvariantPointType = iota
HPoint StepInvariantPointType = iota
)

var stepInvariantPointTypeLabels = []string{
"fpoint", "hpoint",
}

type StepInvariantExpressionMetricsTracker struct {
// total number of observed step invariant nodes
nodes prometheus.Counter

// total number of points which were saved from being retrieved from the inner operation
points *prometheus.CounterVec
}

// OnStepInvariantNodeObserved is called when a step invariant node is observed.
// It increments the nodes counter by 1.
func (t *StepInvariantExpressionMetricsTracker) OnStepInvariantNodeObserved() {
t.nodes.Inc()
}

// NodeCounter returns the counter tracking step invariant nodes.
// This is provided for unit tests and allowing the underlying counter to remain unexported.
func (t *StepInvariantExpressionMetricsTracker) NodeCounter() prometheus.Counter {
return t.nodes
}

// Counter returns the counter tracking step invariant point savings for a given type.
// This is provided for unit tests and allowing the underlying counter to remain unexported.
func (t *StepInvariantExpressionMetricsTracker) Counter(pointType StepInvariantPointType) prometheus.Counter {
return t.points.WithLabelValues(stepInvariantPointTypeLabels[pointType])
}

// OnStepInvariantStepsSaved increments the counter related to tracking step invariant point savings for the given point type.
// The actual counter will be incremented by stepCount-1, reflecting that the first step loaded data.
// If the stepCount is less than or equal to 1 then this function is a no-op.
func (t *StepInvariantExpressionMetricsTracker) OnStepInvariantStepsSaved(pointType StepInvariantPointType, stepCount int) {
if stepCount <= 1 {
return
}
t.points.WithLabelValues(stepInvariantPointTypeLabels[pointType]).Add(float64(stepCount - 1))
}

func newStepInvariantExpressionMetricsTracker(reg prometheus.Registerer) *StepInvariantExpressionMetricsTracker {
return &StepInvariantExpressionMetricsTracker{
nodes: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_mimir_query_engine_step_invariant_nodes_total",
Help: "Total number of step invariant nodes.",
}),
points: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_mimir_query_engine_step_invariant_points_saved_total",
Help: "Total number of samples which were saved from being queried / loaded due to step invariant handling.",
}, []string{"type"}),
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operatormetrics

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
)

func TestNewStepInvariantExpressionMetricsTrackerNodes(t *testing.T) {
tracker := NewOperatorMetricsTracker(prometheus.NewRegistry())
require.Same(t, tracker.StepInvariantTracker.nodes, tracker.StepInvariantTracker.NodeCounter())

require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.nodes))
tracker.StepInvariantTracker.OnStepInvariantNodeObserved()
require.Equal(t, float64(1), testutil.ToFloat64(tracker.StepInvariantTracker.nodes))
}

func TestNewStepInvariantExpressionMetricsTrackerFloatPoints(t *testing.T) {
tracker := NewOperatorMetricsTracker(prometheus.NewRegistry())

// Passing in a step count <= 1 has no effect
require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint)))
tracker.StepInvariantTracker.OnStepInvariantStepsSaved(FPoint, 0)
require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint)))
tracker.StepInvariantTracker.OnStepInvariantStepsSaved(FPoint, 1)
require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint)))

tracker.StepInvariantTracker.OnStepInvariantStepsSaved(FPoint, 2)
require.Equal(t, float64(1), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint)))
tracker.StepInvariantTracker.OnStepInvariantStepsSaved(FPoint, 12)
require.Equal(t, float64(1+11), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint)))
}

func TestNewStepInvariantExpressionMetricsTrackerHistogramPoints(t *testing.T) {
tracker := NewOperatorMetricsTracker(prometheus.NewRegistry())

// Passing in a step count <= 1 has no effect
require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint)))
tracker.StepInvariantTracker.OnStepInvariantStepsSaved(HPoint, 0)
require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint)))
tracker.StepInvariantTracker.OnStepInvariantStepsSaved(HPoint, 1)
require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint)))

tracker.StepInvariantTracker.OnStepInvariantStepsSaved(HPoint, 2)
require.Equal(t, float64(1), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint)))
tracker.StepInvariantTracker.OnStepInvariantStepsSaved(HPoint, 12)
require.Equal(t, float64(1+11), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser/posrange"

operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics"
"github.com/grafana/mimir/pkg/streamingpromql/types"
"github.com/grafana/mimir/pkg/util/limiter"
)
Expand All @@ -17,13 +18,15 @@ type StepInvariantInstantVectorOperator struct {
inner types.InstantVectorOperator
originalTimeRange types.QueryTimeRange
memoryConsumptionTracker *limiter.MemoryConsumptionTracker
metricsTracker *operatormetrics.StepInvariantExpressionMetricsTracker
}

func NewStepInvariantInstantVectorOperator(op types.InstantVectorOperator, originalTimeRange types.QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker) *StepInvariantInstantVectorOperator {
func NewStepInvariantInstantVectorOperator(op types.InstantVectorOperator, originalTimeRange types.QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker, metricsTracker *operatormetrics.StepInvariantExpressionMetricsTracker) *StepInvariantInstantVectorOperator {
return &StepInvariantInstantVectorOperator{
inner: op,
originalTimeRange: originalTimeRange,
memoryConsumptionTracker: memoryConsumptionTracker,
metricsTracker: metricsTracker,
}
}

Expand All @@ -36,6 +39,7 @@ func (s *StepInvariantInstantVectorOperator) Close() {
}

func (s *StepInvariantInstantVectorOperator) Prepare(ctx context.Context, params *types.PrepareParams) error {
s.metricsTracker.OnStepInvariantNodeObserved()
return s.inner.Prepare(ctx, params)
}

Expand Down Expand Up @@ -70,6 +74,8 @@ func (s *StepInvariantInstantVectorOperator) NextSeries(ctx context.Context) (ty
return types.InstantVectorSeriesData{}, err
}

s.metricsTracker.OnStepInvariantStepsSaved(operatormetrics.FPoint, s.originalTimeRange.StepCount)

// Fill the expected steps with the same point.
for ts := s.originalTimeRange.StartT; ts <= s.originalTimeRange.EndT; ts += s.originalTimeRange.IntervalMilliseconds {
floats = append(floats, promql.FPoint{
Expand All @@ -89,6 +95,8 @@ func (s *StepInvariantInstantVectorOperator) NextSeries(ctx context.Context) (ty
return types.InstantVectorSeriesData{}, err
}

s.metricsTracker.OnStepInvariantStepsSaved(operatormetrics.HPoint, s.originalTimeRange.StepCount)

histograms = append(histograms, promql.HPoint{T: data.Histograms[0].T, H: data.Histograms[0].H})
// Note that we create a copy of the histogram for each step as we can not re-use the same *FloatHistogram in the slice from the pool.
for ts := s.originalTimeRange.StartT + s.originalTimeRange.IntervalMilliseconds; ts <= s.originalTimeRange.EndT; ts += s.originalTimeRange.IntervalMilliseconds {
Expand Down
Loading