From 0f539615517e381567acda69aadd92f278612e5c Mon Sep 17 00:00:00 2001 From: Andrew Hall Date: Fri, 2 Jan 2026 14:05:33 +0800 Subject: [PATCH 01/12] Adding step invariant metrics and past path for instant queries. --- pkg/streamingpromql/engine.go | 1 + pkg/streamingpromql/engine_test.go | 146 ++++++++++++++++++ .../metrics/operator_metrics_tracker.go | 79 ++++++++++ .../metrics/operator_metrics_tracker_test.go | 50 ++++++ .../step_invariant_instant_vector_operator.go | 10 +- .../step_invariant_scalar_operator.go | 8 +- pkg/streamingpromql/planning.go | 12 +- .../core/step_invariant_expression.go | 9 +- pkg/streamingpromql/planning/plan.go | 2 + 9 files changed, 311 insertions(+), 6 deletions(-) create mode 100644 pkg/streamingpromql/operators/metrics/operator_metrics_tracker.go create mode 100644 pkg/streamingpromql/operators/metrics/operator_metrics_tracker_test.go diff --git a/pkg/streamingpromql/engine.go b/pkg/streamingpromql/engine.go index 4c5dc0ff780..39cd88d264c 100644 --- a/pkg/streamingpromql/engine.go +++ b/pkg/streamingpromql/engine.go @@ -273,6 +273,7 @@ func (e *Engine) materializeAndCreateEvaluator(ctx context.Context, queryable st operatorParams := &planning.OperatorParameters{ Queryable: queryable, MemoryConsumptionTracker: memoryConsumptionTracker, + OperatorMetricsTracker: e.planner.operatorMetricsTracker, Annotations: annotations.New(), QueryStats: types.NewQueryStats(), LookbackDelta: lookbackDelta, diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 8cdf235a0f2..1a5249a0cde 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -43,6 +43,7 @@ import ( "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/storage/lazyquery" "github.com/grafana/mimir/pkg/streamingpromql/compat" + operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics" "github.com/grafana/mimir/pkg/streamingpromql/planning" "github.com/grafana/mimir/pkg/streamingpromql/testutils" "github.com/grafana/mimir/pkg/streamingpromql/types" @@ -4771,6 +4772,151 @@ func TestExtendedRangeSelectorsIrregular(t *testing.T) { } } +func TestStepInvariantMetricsTracker(t *testing.T) { + storage := promqltest.LoadedStorage(t, ` + load 1m + metric 0 1 2 3 4 5 + nodata _ _ _ _ _ _ + histogram {{schema:3 sum:4 count:4 buckets:[1 2 1]}}+{{schema:5 sum:2 count:1 buckets:[1] offset:1}}x6 + `) + t.Cleanup(func() { storage.Close() }) + + tc := []struct { + query string + start time.Time + end time.Time + interval time.Duration + + expectedNodes int + expectedPoints map[operatormetrics.StepInvariantPointType]int + }{ + { + query: "metric @ 20", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 5, // 6 steps + operatormetrics.HPoint: 0, + }, + }, + { + query: "metric @ 20", + start: time.Unix(0, 0).Add(time.Second * 50), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 0, // step invariant operation has been removed + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 0, + operatormetrics.HPoint: 0, + }, + }, + { + query: "nodata @ 20", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 0, + operatormetrics.HPoint: 0, + }, + }, + { + query: "metric @ 20 + metric @ 30", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, // the left and right are all wrapped into a single step invariant + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 5, + operatormetrics.HPoint: 0, + }, + }, + { + query: "metric @ 20 * metric + metric @ 30", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 2, + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 10, + operatormetrics.HPoint: 0, + }, + }, + { + query: "abs(metric @ 20 * metric + metric @ 30)", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 2, + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 10, + operatormetrics.HPoint: 0, + }, + }, + { + query: "scalar(metric @ 20)", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 5, + operatormetrics.HPoint: 0, + }, + }, + { + query: "histogram @ 20", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 0, + operatormetrics.HPoint: 5, + }, + }, + { + query: "(metric @ 20 or metric) or (histogram @ 20 or histogram) or (vector(scalar(metric @ 30)) or metric)", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 3, + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 10, + operatormetrics.HPoint: 5, + }, + }, + } + + for _, tc := range tc { + t.Run(tc.query, func(t *testing.T) { + + tracker := operatormetrics.NewOperatorMetricsTracker(prometheus.NewRegistry()) + + opts := NewTestEngineOpts() + planner, err := NewQueryPlannerWithoutOptimizationPasses(opts, NewMaximumSupportedVersionQueryPlanVersionProvider()) + require.NoError(t, err) + + planner.RegisterOperatorMetrics(tracker) + engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), planner) + require.NoError(t, err) + + qry, err := engine.NewRangeQuery(context.Background(), storage, nil, tc.query, tc.start, tc.end, tc.interval) + require.NoError(t, err) + res := qry.Exec(context.Background()) + require.NoError(t, res.Err) + require.Equal(t, float64(tc.expectedNodes), testutil.ToFloat64(tracker.StepInvariantTracker.NodeCounter())) + + for pointType, expectedCount := range tc.expectedPoints { + require.Equal(t, float64(expectedCount), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(pointType))) + } + }) + } +} + type dummyMaterializer struct{} func (d dummyMaterializer) Materialize(n planning.Node, materializer *planning.Materializer, timeRange types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) { diff --git a/pkg/streamingpromql/operators/metrics/operator_metrics_tracker.go b/pkg/streamingpromql/operators/metrics/operator_metrics_tracker.go new file mode 100644 index 00000000000..516e7366a1c --- /dev/null +++ b/pkg/streamingpromql/operators/metrics/operator_metrics_tracker.go @@ -0,0 +1,79 @@ +package operatormetrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// MetricsTracker holds metrics trackers used by operators. +// These trackers can be passed into operators to elicit metrics +// during their lifecycle. +type MetricsTracker struct { + StepInvariantTracker *StepInvariantExpressionMetricsTracker +} + +func NewOperatorMetricsTracker(reg prometheus.Registerer) *MetricsTracker { + return &MetricsTracker{ + StepInvariantTracker: newStepInvariantExpressionMetricsTracker(reg), + } +} + +type StepInvariantPointType int + +const ( + FPoint StepInvariantPointType = iota + HPoint StepInvariantPointType = iota +) + +var stepInvariantPointTypeLabels = []string{ + "fpoint", "hpoint", +} + +type StepInvariantExpressionMetricsTracker struct { + // total number of observed step invariant nodes + nodes prometheus.Counter + + // total number of points which were saved from being retrieved from the inner operation + points *prometheus.CounterVec +} + +// OnStepInvariantNodeObserved is called when a step invariant node is observed. +// It increments the nodes counter by 1. +func (t *StepInvariantExpressionMetricsTracker) OnStepInvariantNodeObserved() { + t.nodes.Inc() +} + +// NodeCounter returns the counter tracking step invariant nodes. +// This is provided for unit tests and allowing the underlying counter to remain unexported. +func (t *StepInvariantExpressionMetricsTracker) NodeCounter() prometheus.Counter { + return t.nodes +} + +// Counter returns the counter tracking step invariant point savings for a given type. +// This is provided for unit tests and allowing the underlying counter to remain unexported. +func (t *StepInvariantExpressionMetricsTracker) Counter(pointType StepInvariantPointType) prometheus.Counter { + return t.points.WithLabelValues(stepInvariantPointTypeLabels[pointType]) +} + +// OnStepInvariantStepsSaved increments the counter related to tracking step invariant point savings for the given point type. +// The actual counter will be incremented by stepCount-1, reflecting that the first step loaded data. +// If the stepCount is less than or equal to 1 then this function is a no-op. +func (t *StepInvariantExpressionMetricsTracker) OnStepInvariantStepsSaved(pointType StepInvariantPointType, stepCount int) { + if stepCount <= 1 { + return + } + t.points.WithLabelValues(stepInvariantPointTypeLabels[pointType]).Add(float64(stepCount - 1)) +} + +func newStepInvariantExpressionMetricsTracker(reg prometheus.Registerer) *StepInvariantExpressionMetricsTracker { + return &StepInvariantExpressionMetricsTracker{ + nodes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_mimir_query_engine_step_invariant_nodes_total", + Help: "Total number of step invariant nodes.", + }), + points: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_mimir_query_engine_step_invariant_points_saved_total", + Help: "Total number of samples which were saved from being queried / loaded due to step invariant handling.", + }, []string{"type"}), + } +} diff --git a/pkg/streamingpromql/operators/metrics/operator_metrics_tracker_test.go b/pkg/streamingpromql/operators/metrics/operator_metrics_tracker_test.go new file mode 100644 index 00000000000..42bdd3f6a92 --- /dev/null +++ b/pkg/streamingpromql/operators/metrics/operator_metrics_tracker_test.go @@ -0,0 +1,50 @@ +package operatormetrics + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func TestNewStepInvariantExpressionMetricsTrackerNodes(t *testing.T) { + tracker := NewOperatorMetricsTracker(prometheus.NewRegistry()) + require.Same(t, tracker.StepInvariantTracker.nodes, tracker.StepInvariantTracker.NodeCounter()) + + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) + tracker.StepInvariantTracker.OnStepInvariantNodeObserved() + require.Equal(t, float64(1), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) +} + +func TestNewStepInvariantExpressionMetricsTrackerFloatPoints(t *testing.T) { + tracker := NewOperatorMetricsTracker(prometheus.NewRegistry()) + + // Passing in a step count <= 1 has no effect + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint))) + tracker.StepInvariantTracker.OnStepInvariantStepsSaved(FPoint, 0) + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint))) + tracker.StepInvariantTracker.OnStepInvariantStepsSaved(FPoint, 1) + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint))) + + tracker.StepInvariantTracker.OnStepInvariantStepsSaved(FPoint, 2) + require.Equal(t, float64(1), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint))) + tracker.StepInvariantTracker.OnStepInvariantStepsSaved(FPoint, 12) + require.Equal(t, float64(1+11), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint))) +} + +func TestNewStepInvariantExpressionMetricsTrackerHistogramPoints(t *testing.T) { + tracker := NewOperatorMetricsTracker(prometheus.NewRegistry()) + + // Passing in a step count <= 1 has no effect + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint))) + tracker.StepInvariantTracker.OnStepInvariantStepsSaved(HPoint, 0) + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint))) + tracker.StepInvariantTracker.OnStepInvariantStepsSaved(HPoint, 1) + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint))) + + tracker.StepInvariantTracker.OnStepInvariantStepsSaved(HPoint, 2) + require.Equal(t, float64(1), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint))) + tracker.StepInvariantTracker.OnStepInvariantStepsSaved(HPoint, 12) + require.Equal(t, float64(1+11), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint))) +} diff --git a/pkg/streamingpromql/operators/step_invariant_instant_vector_operator.go b/pkg/streamingpromql/operators/step_invariant_instant_vector_operator.go index b6fde89fd3b..0fa33463dd7 100644 --- a/pkg/streamingpromql/operators/step_invariant_instant_vector_operator.go +++ b/pkg/streamingpromql/operators/step_invariant_instant_vector_operator.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser/posrange" + operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics" "github.com/grafana/mimir/pkg/streamingpromql/types" "github.com/grafana/mimir/pkg/util/limiter" ) @@ -17,13 +18,15 @@ type StepInvariantInstantVectorOperator struct { inner types.InstantVectorOperator originalTimeRange types.QueryTimeRange memoryConsumptionTracker *limiter.MemoryConsumptionTracker + metricsTracker *operatormetrics.StepInvariantExpressionMetricsTracker } -func NewStepInvariantInstantVectorOperator(op types.InstantVectorOperator, originalTimeRange types.QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker) *StepInvariantInstantVectorOperator { +func NewStepInvariantInstantVectorOperator(op types.InstantVectorOperator, originalTimeRange types.QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker, metricsTracker *operatormetrics.StepInvariantExpressionMetricsTracker) *StepInvariantInstantVectorOperator { return &StepInvariantInstantVectorOperator{ inner: op, originalTimeRange: originalTimeRange, memoryConsumptionTracker: memoryConsumptionTracker, + metricsTracker: metricsTracker, } } @@ -36,6 +39,7 @@ func (s *StepInvariantInstantVectorOperator) Close() { } func (s *StepInvariantInstantVectorOperator) Prepare(ctx context.Context, params *types.PrepareParams) error { + s.metricsTracker.OnStepInvariantNodeObserved() return s.inner.Prepare(ctx, params) } @@ -70,6 +74,8 @@ func (s *StepInvariantInstantVectorOperator) NextSeries(ctx context.Context) (ty return types.InstantVectorSeriesData{}, err } + s.metricsTracker.OnStepInvariantStepsSaved(operatormetrics.FPoint, s.originalTimeRange.StepCount) + // Fill the expected steps with the same point. for ts := s.originalTimeRange.StartT; ts <= s.originalTimeRange.EndT; ts += s.originalTimeRange.IntervalMilliseconds { floats = append(floats, promql.FPoint{ @@ -89,6 +95,8 @@ func (s *StepInvariantInstantVectorOperator) NextSeries(ctx context.Context) (ty return types.InstantVectorSeriesData{}, err } + s.metricsTracker.OnStepInvariantStepsSaved(operatormetrics.HPoint, s.originalTimeRange.StepCount) + histograms = append(histograms, promql.HPoint{T: data.Histograms[0].T, H: data.Histograms[0].H}) // Note that we create a copy of the histogram for each step as we can not re-use the same *FloatHistogram in the slice from the pool. for ts := s.originalTimeRange.StartT + s.originalTimeRange.IntervalMilliseconds; ts <= s.originalTimeRange.EndT; ts += s.originalTimeRange.IntervalMilliseconds { diff --git a/pkg/streamingpromql/operators/step_invariant_scalar_operator.go b/pkg/streamingpromql/operators/step_invariant_scalar_operator.go index 877e755dd6c..b75cd746a52 100644 --- a/pkg/streamingpromql/operators/step_invariant_scalar_operator.go +++ b/pkg/streamingpromql/operators/step_invariant_scalar_operator.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser/posrange" + operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics" "github.com/grafana/mimir/pkg/streamingpromql/types" "github.com/grafana/mimir/pkg/util/limiter" ) @@ -17,13 +18,15 @@ type StepInvariantScalarOperator struct { inner types.ScalarOperator originalTimeRange types.QueryTimeRange memoryConsumptionTracker *limiter.MemoryConsumptionTracker + metricsTracker *operatormetrics.StepInvariantExpressionMetricsTracker } -func NewStepInvariantScalarOperator(op types.ScalarOperator, originalTimeRange types.QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker) *StepInvariantScalarOperator { +func NewStepInvariantScalarOperator(op types.ScalarOperator, originalTimeRange types.QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker, metricsTracker *operatormetrics.StepInvariantExpressionMetricsTracker) *StepInvariantScalarOperator { return &StepInvariantScalarOperator{ inner: op, originalTimeRange: originalTimeRange, memoryConsumptionTracker: memoryConsumptionTracker, + metricsTracker: metricsTracker, } } @@ -36,6 +39,7 @@ func (s *StepInvariantScalarOperator) Close() { } func (s *StepInvariantScalarOperator) Prepare(ctx context.Context, params *types.PrepareParams) error { + s.metricsTracker.OnStepInvariantNodeObserved() return s.inner.Prepare(ctx, params) } @@ -64,6 +68,8 @@ func (s *StepInvariantScalarOperator) GetValues(ctx context.Context) (types.Scal return types.ScalarData{}, err } + s.metricsTracker.OnStepInvariantStepsSaved(operatormetrics.FPoint, s.originalTimeRange.StepCount) + floats = append(floats, data.Samples[0]) for ts := s.originalTimeRange.StartT + s.originalTimeRange.IntervalMilliseconds; ts <= s.originalTimeRange.EndT; ts += s.originalTimeRange.IntervalMilliseconds { diff --git a/pkg/streamingpromql/planning.go b/pkg/streamingpromql/planning.go index cef5f8c3b02..723f44240f7 100644 --- a/pkg/streamingpromql/planning.go +++ b/pkg/streamingpromql/planning.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/streamingpromql/compat" "github.com/grafana/mimir/pkg/streamingpromql/operators/functions" + operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics" "github.com/grafana/mimir/pkg/streamingpromql/optimize" "github.com/grafana/mimir/pkg/streamingpromql/optimize/ast" "github.com/grafana/mimir/pkg/streamingpromql/optimize/plan" @@ -61,6 +62,7 @@ type QueryPlanner struct { planOptimizationPasses []optimize.QueryPlanOptimizationPass planStageLatency *prometheus.HistogramVec generatedPlans *prometheus.CounterVec + operatorMetricsTracker *operatormetrics.MetricsTracker versionProvider QueryPlanVersionProvider logger log.Logger @@ -139,14 +141,20 @@ func NewQueryPlannerWithoutOptimizationPasses(opts EngineOpts, versionProvider Q Name: "cortex_mimir_query_engine_plans_generated_total", Help: "Total number of query plans generated.", }, []string{"version"}), - - versionProvider: versionProvider, + operatorMetricsTracker: operatormetrics.NewOperatorMetricsTracker(opts.CommonOpts.Reg), + versionProvider: versionProvider, logger: opts.Logger, TimeSince: time.Since, }, nil } +// RegisterOperatorMetrics replaces the operatorMetricsTracker with the given tracker. +// This is only needed for unit tests +func (p *QueryPlanner) RegisterOperatorMetrics(tracker *operatormetrics.MetricsTracker) { + p.operatorMetricsTracker = tracker +} + // RegisterASTOptimizationPass registers an AST optimization pass used with this engine. // // This method is not thread-safe and must not be called concurrently with any other method on this type. diff --git a/pkg/streamingpromql/planning/core/step_invariant_expression.go b/pkg/streamingpromql/planning/core/step_invariant_expression.go index 88fe47bbcee..7ec767bf516 100644 --- a/pkg/streamingpromql/planning/core/step_invariant_expression.go +++ b/pkg/streamingpromql/planning/core/step_invariant_expression.go @@ -98,11 +98,16 @@ func MaterializeStepInvariantExpression(s *StepInvariantExpression, materializer return nil, err } + // There is no advantage to wrapping an instant query in a step invariant + if timeRange.StepCount <= 1 { + return planning.NewSingleUseOperatorFactory(op), nil + } + switch op := op.(type) { case types.InstantVectorOperator: - return planning.NewSingleUseOperatorFactory(operators.NewStepInvariantInstantVectorOperator(op, timeRange, params.MemoryConsumptionTracker)), nil + return planning.NewSingleUseOperatorFactory(operators.NewStepInvariantInstantVectorOperator(op, timeRange, params.MemoryConsumptionTracker, params.OperatorMetricsTracker.StepInvariantTracker)), nil case types.ScalarOperator: - return planning.NewSingleUseOperatorFactory(operators.NewStepInvariantScalarOperator(op, timeRange, params.MemoryConsumptionTracker)), nil + return planning.NewSingleUseOperatorFactory(operators.NewStepInvariantScalarOperator(op, timeRange, params.MemoryConsumptionTracker, params.OperatorMetricsTracker.StepInvariantTracker)), nil case types.RangeVectorOperator: // Notes on range vector handling: // diff --git a/pkg/streamingpromql/planning/plan.go b/pkg/streamingpromql/planning/plan.go index 1a51aeb58e4..9bd0edf028e 100644 --- a/pkg/streamingpromql/planning/plan.go +++ b/pkg/streamingpromql/planning/plan.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/prometheus/util/annotations" apierror "github.com/grafana/mimir/pkg/api/error" + operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics" "github.com/grafana/mimir/pkg/streamingpromql/types" "github.com/grafana/mimir/pkg/util/limiter" ) @@ -219,6 +220,7 @@ func (t QueriedTimeRange) Union(other QueriedTimeRange) QueriedTimeRange { type OperatorParameters struct { Queryable storage.Queryable MemoryConsumptionTracker *limiter.MemoryConsumptionTracker + OperatorMetricsTracker *operatormetrics.MetricsTracker Annotations *annotations.Annotations QueryStats *types.QueryStats LookbackDelta time.Duration From 54be3e3f51671ec1f611449ddb8606e250211040 Mon Sep 17 00:00:00 2001 From: Andrew Hall Date: Fri, 2 Jan 2026 14:17:56 +0800 Subject: [PATCH 02/12] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0658517cb2..71ffee8d230 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -126,6 +126,7 @@ * [BUGFIX] Query-frontend: Fix silent panic when executing a remote read API request if the request has no matchers. #13745 * [BUGFIX] Ruler: Fixed `-ruler.max-rule-groups-per-tenant-by-namespace` to only count rule groups in the specified namespace instead of all namespaces. #13743 * [BUGFIX] Query-frontend: Fix race condition that could sometimes cause unnecessary resharding of queriers if querier shuffle sharding and remote execution is enabled. #13794 #13838 +* [ENHANCEMENT] MQE: Add metrics to track step-invariant expression usage and data point reuse savings: `cortex_mimir_query_engine_step_invariant_nodes_total` and `cortex_mimir_query_engine_step_invariant_points_saved_total`. #13911 ### Mixin From 91ae18af10d2e5f28768fffab89e0273afdae590 Mon Sep 17 00:00:00 2001 From: Andrew Hall Date: Fri, 2 Jan 2026 14:32:34 +0800 Subject: [PATCH 03/12] Add license --- .../operators/metrics/operator_metrics_tracker.go | 2 ++ .../operators/metrics/operator_metrics_tracker_test.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/pkg/streamingpromql/operators/metrics/operator_metrics_tracker.go b/pkg/streamingpromql/operators/metrics/operator_metrics_tracker.go index 516e7366a1c..4ebb57e95da 100644 --- a/pkg/streamingpromql/operators/metrics/operator_metrics_tracker.go +++ b/pkg/streamingpromql/operators/metrics/operator_metrics_tracker.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: AGPL-3.0-only + package operatormetrics import ( diff --git a/pkg/streamingpromql/operators/metrics/operator_metrics_tracker_test.go b/pkg/streamingpromql/operators/metrics/operator_metrics_tracker_test.go index 42bdd3f6a92..517b06f3b44 100644 --- a/pkg/streamingpromql/operators/metrics/operator_metrics_tracker_test.go +++ b/pkg/streamingpromql/operators/metrics/operator_metrics_tracker_test.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: AGPL-3.0-only + package operatormetrics import ( From 777774bce554caf4417ce774d2e9d7cd8429b9a8 Mon Sep 17 00:00:00 2001 From: Andrew Hall Date: Tue, 20 Jan 2026 08:11:56 +0800 Subject: [PATCH 04/12] Remove step invariant in planning - WIP --- pkg/streamingpromql/planning.go | 29 ++++--- .../core/step_invariant_expression.go | 5 -- pkg/streamingpromql/planning_test.go | 85 ++++++++++++++++--- 3 files changed, 88 insertions(+), 31 deletions(-) diff --git a/pkg/streamingpromql/planning.go b/pkg/streamingpromql/planning.go index 723f44240f7..3524cb8b5fa 100644 --- a/pkg/streamingpromql/planning.go +++ b/pkg/streamingpromql/planning.go @@ -248,7 +248,7 @@ func (p *QueryPlanner) NewQueryPlan(ctx context.Context, qs string, timeRange ty spanLogger.DebugLog("msg", "AST optimisation passes completed", "expression", expr) plan, err := p.runPlanningStage("Original plan", observer, func() (*planning.QueryPlan, error) { - root, err := p.nodeFromExpr(expr) + root, err := p.nodeFromExpr(expr, timeRange) if err != nil { return nil, err } @@ -401,7 +401,7 @@ func (p *QueryPlanner) runPlanningStage(stageName string, observer PlanningObser return plan, nil } -func (p *QueryPlanner) nodeFromExpr(expr parser.Expr) (planning.Node, error) { +func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeRange) (planning.Node, error) { switch expr := expr.(type) { case *parser.VectorSelector: return &core.VectorSelector{ @@ -438,7 +438,7 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr) (planning.Node, error) { }, nil case *parser.AggregateExpr: - inner, err := p.nodeFromExpr(expr.Expr) + inner, err := p.nodeFromExpr(expr.Expr, timeRange) if err != nil { return nil, err } @@ -446,7 +446,7 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr) (planning.Node, error) { var param planning.Node if expr.Param != nil { - param, err = p.nodeFromExpr(expr.Param) + param, err = p.nodeFromExpr(expr.Param, timeRange) if err != nil { return nil, err } @@ -469,12 +469,12 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr) (planning.Node, error) { }, nil case *parser.BinaryExpr: - lhs, err := p.nodeFromExpr(expr.LHS) + lhs, err := p.nodeFromExpr(expr.LHS, timeRange) if err != nil { return nil, err } - rhs, err := p.nodeFromExpr(expr.RHS) + rhs, err := p.nodeFromExpr(expr.RHS, timeRange) if err != nil { return nil, err } @@ -533,7 +533,7 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr) (planning.Node, error) { args := make([]planning.Node, 0, len(expr.Args)) for _, arg := range expr.Args { - node, err := p.nodeFromExpr(arg) + node, err := p.nodeFromExpr(arg, timeRange) if err != nil { return nil, err } @@ -572,7 +572,7 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr) (planning.Node, error) { // Note that the DeduplicateAndMerge still wraps the function call as the timestamp function returns true under functionNeedsDeduplication(). // This can be removed once https://github.com/prometheus/prometheus/pull/17313 is vendored into mimir stepInvariantExpression, ok := args[0].(*core.StepInvariantExpression) - if ok { + if ok && timeRange.StepCount > 1 { vectorSelector, ok := stepInvariantExpression.Inner.(*core.VectorSelector) if ok { vectorSelector.ReturnSampleTimestamps = true @@ -602,7 +602,7 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr) (planning.Node, error) { return f, nil case *parser.SubqueryExpr: - inner, err := p.nodeFromExpr(expr.Expr) + inner, err := p.nodeFromExpr(expr.Expr, timeRange) if err != nil { return nil, err } @@ -625,7 +625,7 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr) (planning.Node, error) { }, nil case *parser.UnaryExpr: - inner, err := p.nodeFromExpr(expr.Expr) + inner, err := p.nodeFromExpr(expr.Expr, timeRange) if err != nil { return nil, err } @@ -675,14 +675,19 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr) (planning.Node, error) { }, nil case *parser.ParenExpr: - return p.nodeFromExpr(expr.Expr) + return p.nodeFromExpr(expr.Expr, timeRange) case *parser.StepInvariantExpr: - inner, err := p.nodeFromExpr(expr.Expr) + inner, err := p.nodeFromExpr(expr.Expr, timeRange) if err != nil { return nil, err } + // There is no advantage to wrapping an instant query in a step invariant + if timeRange.StepCount <= 1 { + return inner, nil + } + return &core.StepInvariantExpression{ Inner: inner, StepInvariantExpressionDetails: &core.StepInvariantExpressionDetails{}, diff --git a/pkg/streamingpromql/planning/core/step_invariant_expression.go b/pkg/streamingpromql/planning/core/step_invariant_expression.go index 7ec767bf516..aa4ac2ffdcc 100644 --- a/pkg/streamingpromql/planning/core/step_invariant_expression.go +++ b/pkg/streamingpromql/planning/core/step_invariant_expression.go @@ -98,11 +98,6 @@ func MaterializeStepInvariantExpression(s *StepInvariantExpression, materializer return nil, err } - // There is no advantage to wrapping an instant query in a step invariant - if timeRange.StepCount <= 1 { - return planning.NewSingleUseOperatorFactory(op), nil - } - switch op := op.(type) { case types.InstantVectorOperator: return planning.NewSingleUseOperatorFactory(operators.NewStepInvariantInstantVectorOperator(op, timeRange, params.MemoryConsumptionTracker, params.OperatorMetricsTracker.StepInvariantTracker)), nil diff --git a/pkg/streamingpromql/planning_test.go b/pkg/streamingpromql/planning_test.go index 6e44fd9d1ce..d878d1f8732 100644 --- a/pkg/streamingpromql/planning_test.go +++ b/pkg/streamingpromql/planning_test.go @@ -100,14 +100,38 @@ func TestPlanCreationEncodingAndDecoding(t *testing.T) { }, }, }, - "vector selector with '@ 0'": { + "vector selector with '@ 0' instant query": { expr: `some_metric @ 0`, timeRange: instantQuery, expectedPlan: &planning.EncodedQueryPlan{ TimeRange: instantQueryEncodedTimeRange, + RootNode: 0, + Version: 0, + Nodes: []*planning.EncodedNode{ + { + NodeType: planning.NODE_TYPE_VECTOR_SELECTOR, + Details: marshalDetails(&core.VectorSelectorDetails{ + Matchers: []*core.LabelMatcher{ + {Type: 0, Name: "__name__", Value: "some_metric"}, + }, + Timestamp: timestampOf(0), + ExpressionPosition: core.PositionRange{Start: 0, End: 15}, + }), + Type: "VectorSelector", + Description: `{__name__="some_metric"} @ 0 (1970-01-01T00:00:00Z)`, + }, + }, + }, + }, + "vector selector with '@ 0' range query": { + expr: `some_metric @ 0`, + timeRange: rangeQuery, + + expectedPlan: &planning.EncodedQueryPlan{ + TimeRange: rangeQueryEncodedTimeRange, RootNode: 1, - Version: planning.QueryPlanV1, + Version: 1, Nodes: []*planning.EncodedNode{ { NodeType: planning.NODE_TYPE_VECTOR_SELECTOR, @@ -663,12 +687,53 @@ func TestPlanCreationEncodingAndDecoding(t *testing.T) { }, }, }, - "binary expression with two scalars": { + "binary expression with two scalars instant query": { expr: `2 + 3`, timeRange: instantQuery, expectedPlan: &planning.EncodedQueryPlan{ TimeRange: instantQueryEncodedTimeRange, + RootNode: 2, + Version: 0, + Nodes: []*planning.EncodedNode{ + { + NodeType: planning.NODE_TYPE_NUMBER_LITERAL, + Details: marshalDetails(&core.NumberLiteralDetails{ + Value: 2, + ExpressionPosition: core.PositionRange{Start: 0, End: 1}, + }), + Type: "NumberLiteral", + Description: `2`, + }, + { + NodeType: planning.NODE_TYPE_NUMBER_LITERAL, + Details: marshalDetails(&core.NumberLiteralDetails{ + Value: 3, + ExpressionPosition: core.PositionRange{Start: 4, End: 5}, + }), + Type: "NumberLiteral", + Description: `3`, + }, + { + NodeType: planning.NODE_TYPE_BINARY_EXPRESSION, + Details: marshalDetails(&core.BinaryExpressionDetails{ + Op: core.BINARY_ADD, + ExpressionPosition: core.PositionRange{Start: 0, End: 5}, + }), + Type: "BinaryExpression", + Children: []int64{0, 1}, + Description: `LHS + RHS`, + ChildrenLabels: []string{"LHS", "RHS"}, + }, + }, + }, + }, + "binary expression with two scalars range query": { + expr: `2 + 3`, + timeRange: rangeQuery, + + expectedPlan: &planning.EncodedQueryPlan{ + TimeRange: rangeQueryEncodedTimeRange, RootNode: 3, Version: planning.QueryPlanV1, Nodes: []*planning.EncodedNode{ @@ -1106,14 +1171,14 @@ func TestPlanCreationEncodingAndDecoding(t *testing.T) { }, }, }, - "subquery with '@'": { + "subquery with '@' instant query": { expr: `(some_metric)[1m:1s] @ 0`, timeRange: instantQuery, expectedPlan: &planning.EncodedQueryPlan{ TimeRange: instantQueryEncodedTimeRange, - RootNode: 2, - Version: 1, + RootNode: 1, + Version: 0, Nodes: []*planning.EncodedNode{ { NodeType: planning.NODE_TYPE_VECTOR_SELECTOR, @@ -1139,14 +1204,6 @@ func TestPlanCreationEncodingAndDecoding(t *testing.T) { Description: `[1m0s:1s] @ 0 (1970-01-01T00:00:00Z)`, ChildrenLabels: []string{""}, }, - { - NodeType: planning.NODE_TYPE_STEP_INVARIANT_EXPRESSION, - Details: marshalDetails(&core.StepInvariantExpressionDetails{}), - Type: "StepInvariantExpression", - Children: []int64{1}, - Description: ``, - ChildrenLabels: []string{""}, - }, }, }, }, From 5393edbdde5019597f079a33b780d1758322a6e0 Mon Sep 17 00:00:00 2001 From: Andrew Hall Date: Tue, 20 Jan 2026 11:36:40 +0800 Subject: [PATCH 05/12] Moved logic to planning --- pkg/streamingpromql/engine.go | 1 - pkg/streamingpromql/engine_test.go | 171 +++++++++--------- .../metrics/operator_metrics_tracker.go | 81 --------- .../metrics/operator_metrics_tracker_test.go | 52 ------ .../step_invariant_instant_vector_operator.go | 10 +- .../step_invariant_scalar_operator.go | 8 +- pkg/streamingpromql/planning.go | 69 ++++--- .../core/step_invariant_expression.go | 4 +- .../metrics/planning_metrics_tracker.go | 63 +++++++ .../metrics/planning_metrics_tracker_test.go | 43 +++++ pkg/streamingpromql/planning/plan.go | 2 - 11 files changed, 240 insertions(+), 264 deletions(-) delete mode 100644 pkg/streamingpromql/operators/metrics/operator_metrics_tracker.go delete mode 100644 pkg/streamingpromql/operators/metrics/operator_metrics_tracker_test.go create mode 100644 pkg/streamingpromql/planning/metrics/planning_metrics_tracker.go create mode 100644 pkg/streamingpromql/planning/metrics/planning_metrics_tracker_test.go diff --git a/pkg/streamingpromql/engine.go b/pkg/streamingpromql/engine.go index 39cd88d264c..4c5dc0ff780 100644 --- a/pkg/streamingpromql/engine.go +++ b/pkg/streamingpromql/engine.go @@ -273,7 +273,6 @@ func (e *Engine) materializeAndCreateEvaluator(ctx context.Context, queryable st operatorParams := &planning.OperatorParameters{ Queryable: queryable, MemoryConsumptionTracker: memoryConsumptionTracker, - OperatorMetricsTracker: e.planner.operatorMetricsTracker, Annotations: annotations.New(), QueryStats: types.NewQueryStats(), LookbackDelta: lookbackDelta, diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 1a5249a0cde..a08b130939a 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -43,8 +43,8 @@ import ( "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/storage/lazyquery" "github.com/grafana/mimir/pkg/streamingpromql/compat" - operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics" "github.com/grafana/mimir/pkg/streamingpromql/planning" + operatormetrics2 "github.com/grafana/mimir/pkg/streamingpromql/planning/metrics" "github.com/grafana/mimir/pkg/streamingpromql/testutils" "github.com/grafana/mimir/pkg/streamingpromql/types" "github.com/grafana/mimir/pkg/util/globalerror" @@ -4787,19 +4787,16 @@ func TestStepInvariantMetricsTracker(t *testing.T) { end time.Time interval time.Duration - expectedNodes int - expectedPoints map[operatormetrics.StepInvariantPointType]int + expectedNodes int + expectedStepsSaved int }{ { - query: "metric @ 20", - start: time.Unix(0, 0), - end: time.Unix(0, 0).Add(time.Second * 50), - interval: time.Second * 10, - expectedNodes: 1, - expectedPoints: map[operatormetrics.StepInvariantPointType]int{ - operatormetrics.FPoint: 5, // 6 steps - operatormetrics.HPoint: 0, - }, + query: "metric @ 20", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, + expectedStepsSaved: 5, }, { query: "metric @ 20", @@ -4807,94 +4804,95 @@ func TestStepInvariantMetricsTracker(t *testing.T) { end: time.Unix(0, 0).Add(time.Second * 50), interval: time.Second * 10, expectedNodes: 0, // step invariant operation has been removed - expectedPoints: map[operatormetrics.StepInvariantPointType]int{ - operatormetrics.FPoint: 0, - operatormetrics.HPoint: 0, - }, }, { - query: "nodata @ 20", - start: time.Unix(0, 0), - end: time.Unix(0, 0).Add(time.Second * 50), - interval: time.Second * 10, - expectedNodes: 1, - expectedPoints: map[operatormetrics.StepInvariantPointType]int{ - operatormetrics.FPoint: 0, - operatormetrics.HPoint: 0, - }, + query: "nodata @ 20", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, + expectedStepsSaved: 5, }, { - query: "metric @ 20 + metric @ 30", - start: time.Unix(0, 0), - end: time.Unix(0, 0).Add(time.Second * 50), - interval: time.Second * 10, - expectedNodes: 1, // the left and right are all wrapped into a single step invariant - expectedPoints: map[operatormetrics.StepInvariantPointType]int{ - operatormetrics.FPoint: 5, - operatormetrics.HPoint: 0, - }, + query: "metric @ 20 + metric @ 30", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, // the left and right are all wrapped into a single step invariant + expectedStepsSaved: 5, }, { - query: "metric @ 20 * metric + metric @ 30", - start: time.Unix(0, 0), - end: time.Unix(0, 0).Add(time.Second * 50), - interval: time.Second * 10, - expectedNodes: 2, - expectedPoints: map[operatormetrics.StepInvariantPointType]int{ - operatormetrics.FPoint: 10, - operatormetrics.HPoint: 0, - }, + query: "metric @ 20 * metric + metric @ 30", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 2, + expectedStepsSaved: 10, }, { - query: "abs(metric @ 20 * metric + metric @ 30)", - start: time.Unix(0, 0), - end: time.Unix(0, 0).Add(time.Second * 50), - interval: time.Second * 10, - expectedNodes: 2, - expectedPoints: map[operatormetrics.StepInvariantPointType]int{ - operatormetrics.FPoint: 10, - operatormetrics.HPoint: 0, - }, + query: "abs(metric @ 20 * metric + metric @ 30)", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 2, + expectedStepsSaved: 10, }, { - query: "scalar(metric @ 20)", - start: time.Unix(0, 0), - end: time.Unix(0, 0).Add(time.Second * 50), - interval: time.Second * 10, - expectedNodes: 1, - expectedPoints: map[operatormetrics.StepInvariantPointType]int{ - operatormetrics.FPoint: 5, - operatormetrics.HPoint: 0, - }, + query: "scalar(metric @ 20)", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, + expectedStepsSaved: 5, }, { - query: "histogram @ 20", - start: time.Unix(0, 0), - end: time.Unix(0, 0).Add(time.Second * 50), - interval: time.Second * 10, - expectedNodes: 1, - expectedPoints: map[operatormetrics.StepInvariantPointType]int{ - operatormetrics.FPoint: 0, - operatormetrics.HPoint: 5, - }, + query: "histogram @ 20", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, + expectedStepsSaved: 5, }, { - query: "(metric @ 20 or metric) or (histogram @ 20 or histogram) or (vector(scalar(metric @ 30)) or metric)", - start: time.Unix(0, 0), - end: time.Unix(0, 0).Add(time.Second * 50), - interval: time.Second * 10, - expectedNodes: 3, - expectedPoints: map[operatormetrics.StepInvariantPointType]int{ - operatormetrics.FPoint: 10, - operatormetrics.HPoint: 5, - }, + query: "(metric @ 20 or metric) or (histogram @ 20 or histogram) or (vector(scalar(metric @ 30)) or metric)", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 3, + expectedStepsSaved: 15, + }, + { + query: "timestamp(metric @ 20)", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, + expectedStepsSaved: 5, + }, + { + // the step invariant is relative to a sub-query. The sub-query step count is used + query: "avg_over_time((vector(1))[10m:1m])", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, + expectedStepsSaved: 9, + }, + { + // the step invariant is relative to a sub-query. The sub-query step count is used + query: "avg_over_time((rate(http_requests_total[5m]) + metric @ 10)[10m:1m])", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, + expectedStepsSaved: 9, }, } for _, tc := range tc { t.Run(tc.query, func(t *testing.T) { - tracker := operatormetrics.NewOperatorMetricsTracker(prometheus.NewRegistry()) + tracker := operatormetrics2.NewMetricsTracker(prometheus.NewRegistry()) opts := NewTestEngineOpts() planner, err := NewQueryPlannerWithoutOptimizationPasses(opts, NewMaximumSupportedVersionQueryPlanVersionProvider()) @@ -4908,10 +4906,17 @@ func TestStepInvariantMetricsTracker(t *testing.T) { require.NoError(t, err) res := qry.Exec(context.Background()) require.NoError(t, res.Err) - require.Equal(t, float64(tc.expectedNodes), testutil.ToFloat64(tracker.StepInvariantTracker.NodeCounter())) - for pointType, expectedCount := range tc.expectedPoints { - require.Equal(t, float64(expectedCount), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(pointType))) + require.Equal(t, float64(tc.expectedNodes), testutil.ToFloat64(tracker.StepInvariantTracker.NodesCounter())) + require.Equal(t, float64(tc.expectedStepsSaved), testutil.ToFloat64(tracker.StepInvariantTracker.StepsCounter())) + + // Extra assertions to ensure we do not introduce bad test data + if tc.expectedNodes == 0 { + // If there are no step invariant nodes, then we do not expect to have any steps saved + require.Equal(t, 0, tc.expectedStepsSaved) + } else { + // If there are step invariant nodes, then we expect to have saved steps + require.Greater(t, tc.expectedStepsSaved, 0) } }) } diff --git a/pkg/streamingpromql/operators/metrics/operator_metrics_tracker.go b/pkg/streamingpromql/operators/metrics/operator_metrics_tracker.go deleted file mode 100644 index 4ebb57e95da..00000000000 --- a/pkg/streamingpromql/operators/metrics/operator_metrics_tracker.go +++ /dev/null @@ -1,81 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package operatormetrics - -import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -// MetricsTracker holds metrics trackers used by operators. -// These trackers can be passed into operators to elicit metrics -// during their lifecycle. -type MetricsTracker struct { - StepInvariantTracker *StepInvariantExpressionMetricsTracker -} - -func NewOperatorMetricsTracker(reg prometheus.Registerer) *MetricsTracker { - return &MetricsTracker{ - StepInvariantTracker: newStepInvariantExpressionMetricsTracker(reg), - } -} - -type StepInvariantPointType int - -const ( - FPoint StepInvariantPointType = iota - HPoint StepInvariantPointType = iota -) - -var stepInvariantPointTypeLabels = []string{ - "fpoint", "hpoint", -} - -type StepInvariantExpressionMetricsTracker struct { - // total number of observed step invariant nodes - nodes prometheus.Counter - - // total number of points which were saved from being retrieved from the inner operation - points *prometheus.CounterVec -} - -// OnStepInvariantNodeObserved is called when a step invariant node is observed. -// It increments the nodes counter by 1. -func (t *StepInvariantExpressionMetricsTracker) OnStepInvariantNodeObserved() { - t.nodes.Inc() -} - -// NodeCounter returns the counter tracking step invariant nodes. -// This is provided for unit tests and allowing the underlying counter to remain unexported. -func (t *StepInvariantExpressionMetricsTracker) NodeCounter() prometheus.Counter { - return t.nodes -} - -// Counter returns the counter tracking step invariant point savings for a given type. -// This is provided for unit tests and allowing the underlying counter to remain unexported. -func (t *StepInvariantExpressionMetricsTracker) Counter(pointType StepInvariantPointType) prometheus.Counter { - return t.points.WithLabelValues(stepInvariantPointTypeLabels[pointType]) -} - -// OnStepInvariantStepsSaved increments the counter related to tracking step invariant point savings for the given point type. -// The actual counter will be incremented by stepCount-1, reflecting that the first step loaded data. -// If the stepCount is less than or equal to 1 then this function is a no-op. -func (t *StepInvariantExpressionMetricsTracker) OnStepInvariantStepsSaved(pointType StepInvariantPointType, stepCount int) { - if stepCount <= 1 { - return - } - t.points.WithLabelValues(stepInvariantPointTypeLabels[pointType]).Add(float64(stepCount - 1)) -} - -func newStepInvariantExpressionMetricsTracker(reg prometheus.Registerer) *StepInvariantExpressionMetricsTracker { - return &StepInvariantExpressionMetricsTracker{ - nodes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_mimir_query_engine_step_invariant_nodes_total", - Help: "Total number of step invariant nodes.", - }), - points: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_mimir_query_engine_step_invariant_points_saved_total", - Help: "Total number of samples which were saved from being queried / loaded due to step invariant handling.", - }, []string{"type"}), - } -} diff --git a/pkg/streamingpromql/operators/metrics/operator_metrics_tracker_test.go b/pkg/streamingpromql/operators/metrics/operator_metrics_tracker_test.go deleted file mode 100644 index 517b06f3b44..00000000000 --- a/pkg/streamingpromql/operators/metrics/operator_metrics_tracker_test.go +++ /dev/null @@ -1,52 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package operatormetrics - -import ( - "testing" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/stretchr/testify/require" -) - -func TestNewStepInvariantExpressionMetricsTrackerNodes(t *testing.T) { - tracker := NewOperatorMetricsTracker(prometheus.NewRegistry()) - require.Same(t, tracker.StepInvariantTracker.nodes, tracker.StepInvariantTracker.NodeCounter()) - - require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) - tracker.StepInvariantTracker.OnStepInvariantNodeObserved() - require.Equal(t, float64(1), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) -} - -func TestNewStepInvariantExpressionMetricsTrackerFloatPoints(t *testing.T) { - tracker := NewOperatorMetricsTracker(prometheus.NewRegistry()) - - // Passing in a step count <= 1 has no effect - require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint))) - tracker.StepInvariantTracker.OnStepInvariantStepsSaved(FPoint, 0) - require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint))) - tracker.StepInvariantTracker.OnStepInvariantStepsSaved(FPoint, 1) - require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint))) - - tracker.StepInvariantTracker.OnStepInvariantStepsSaved(FPoint, 2) - require.Equal(t, float64(1), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint))) - tracker.StepInvariantTracker.OnStepInvariantStepsSaved(FPoint, 12) - require.Equal(t, float64(1+11), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint))) -} - -func TestNewStepInvariantExpressionMetricsTrackerHistogramPoints(t *testing.T) { - tracker := NewOperatorMetricsTracker(prometheus.NewRegistry()) - - // Passing in a step count <= 1 has no effect - require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint))) - tracker.StepInvariantTracker.OnStepInvariantStepsSaved(HPoint, 0) - require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint))) - tracker.StepInvariantTracker.OnStepInvariantStepsSaved(HPoint, 1) - require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint))) - - tracker.StepInvariantTracker.OnStepInvariantStepsSaved(HPoint, 2) - require.Equal(t, float64(1), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint))) - tracker.StepInvariantTracker.OnStepInvariantStepsSaved(HPoint, 12) - require.Equal(t, float64(1+11), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint))) -} diff --git a/pkg/streamingpromql/operators/step_invariant_instant_vector_operator.go b/pkg/streamingpromql/operators/step_invariant_instant_vector_operator.go index 0fa33463dd7..b6fde89fd3b 100644 --- a/pkg/streamingpromql/operators/step_invariant_instant_vector_operator.go +++ b/pkg/streamingpromql/operators/step_invariant_instant_vector_operator.go @@ -9,7 +9,6 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser/posrange" - operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics" "github.com/grafana/mimir/pkg/streamingpromql/types" "github.com/grafana/mimir/pkg/util/limiter" ) @@ -18,15 +17,13 @@ type StepInvariantInstantVectorOperator struct { inner types.InstantVectorOperator originalTimeRange types.QueryTimeRange memoryConsumptionTracker *limiter.MemoryConsumptionTracker - metricsTracker *operatormetrics.StepInvariantExpressionMetricsTracker } -func NewStepInvariantInstantVectorOperator(op types.InstantVectorOperator, originalTimeRange types.QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker, metricsTracker *operatormetrics.StepInvariantExpressionMetricsTracker) *StepInvariantInstantVectorOperator { +func NewStepInvariantInstantVectorOperator(op types.InstantVectorOperator, originalTimeRange types.QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker) *StepInvariantInstantVectorOperator { return &StepInvariantInstantVectorOperator{ inner: op, originalTimeRange: originalTimeRange, memoryConsumptionTracker: memoryConsumptionTracker, - metricsTracker: metricsTracker, } } @@ -39,7 +36,6 @@ func (s *StepInvariantInstantVectorOperator) Close() { } func (s *StepInvariantInstantVectorOperator) Prepare(ctx context.Context, params *types.PrepareParams) error { - s.metricsTracker.OnStepInvariantNodeObserved() return s.inner.Prepare(ctx, params) } @@ -74,8 +70,6 @@ func (s *StepInvariantInstantVectorOperator) NextSeries(ctx context.Context) (ty return types.InstantVectorSeriesData{}, err } - s.metricsTracker.OnStepInvariantStepsSaved(operatormetrics.FPoint, s.originalTimeRange.StepCount) - // Fill the expected steps with the same point. for ts := s.originalTimeRange.StartT; ts <= s.originalTimeRange.EndT; ts += s.originalTimeRange.IntervalMilliseconds { floats = append(floats, promql.FPoint{ @@ -95,8 +89,6 @@ func (s *StepInvariantInstantVectorOperator) NextSeries(ctx context.Context) (ty return types.InstantVectorSeriesData{}, err } - s.metricsTracker.OnStepInvariantStepsSaved(operatormetrics.HPoint, s.originalTimeRange.StepCount) - histograms = append(histograms, promql.HPoint{T: data.Histograms[0].T, H: data.Histograms[0].H}) // Note that we create a copy of the histogram for each step as we can not re-use the same *FloatHistogram in the slice from the pool. for ts := s.originalTimeRange.StartT + s.originalTimeRange.IntervalMilliseconds; ts <= s.originalTimeRange.EndT; ts += s.originalTimeRange.IntervalMilliseconds { diff --git a/pkg/streamingpromql/operators/step_invariant_scalar_operator.go b/pkg/streamingpromql/operators/step_invariant_scalar_operator.go index b75cd746a52..877e755dd6c 100644 --- a/pkg/streamingpromql/operators/step_invariant_scalar_operator.go +++ b/pkg/streamingpromql/operators/step_invariant_scalar_operator.go @@ -9,7 +9,6 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser/posrange" - operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics" "github.com/grafana/mimir/pkg/streamingpromql/types" "github.com/grafana/mimir/pkg/util/limiter" ) @@ -18,15 +17,13 @@ type StepInvariantScalarOperator struct { inner types.ScalarOperator originalTimeRange types.QueryTimeRange memoryConsumptionTracker *limiter.MemoryConsumptionTracker - metricsTracker *operatormetrics.StepInvariantExpressionMetricsTracker } -func NewStepInvariantScalarOperator(op types.ScalarOperator, originalTimeRange types.QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker, metricsTracker *operatormetrics.StepInvariantExpressionMetricsTracker) *StepInvariantScalarOperator { +func NewStepInvariantScalarOperator(op types.ScalarOperator, originalTimeRange types.QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker) *StepInvariantScalarOperator { return &StepInvariantScalarOperator{ inner: op, originalTimeRange: originalTimeRange, memoryConsumptionTracker: memoryConsumptionTracker, - metricsTracker: metricsTracker, } } @@ -39,7 +36,6 @@ func (s *StepInvariantScalarOperator) Close() { } func (s *StepInvariantScalarOperator) Prepare(ctx context.Context, params *types.PrepareParams) error { - s.metricsTracker.OnStepInvariantNodeObserved() return s.inner.Prepare(ctx, params) } @@ -68,8 +64,6 @@ func (s *StepInvariantScalarOperator) GetValues(ctx context.Context) (types.Scal return types.ScalarData{}, err } - s.metricsTracker.OnStepInvariantStepsSaved(operatormetrics.FPoint, s.originalTimeRange.StepCount) - floats = append(floats, data.Samples[0]) for ts := s.originalTimeRange.StartT + s.originalTimeRange.IntervalMilliseconds; ts <= s.originalTimeRange.EndT; ts += s.originalTimeRange.IntervalMilliseconds { diff --git a/pkg/streamingpromql/planning.go b/pkg/streamingpromql/planning.go index 3524cb8b5fa..040746f0123 100644 --- a/pkg/streamingpromql/planning.go +++ b/pkg/streamingpromql/planning.go @@ -22,13 +22,13 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/streamingpromql/compat" "github.com/grafana/mimir/pkg/streamingpromql/operators/functions" - operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics" "github.com/grafana/mimir/pkg/streamingpromql/optimize" "github.com/grafana/mimir/pkg/streamingpromql/optimize/ast" "github.com/grafana/mimir/pkg/streamingpromql/optimize/plan" "github.com/grafana/mimir/pkg/streamingpromql/optimize/plan/commonsubexpressionelimination" "github.com/grafana/mimir/pkg/streamingpromql/planning" "github.com/grafana/mimir/pkg/streamingpromql/planning/core" + "github.com/grafana/mimir/pkg/streamingpromql/planning/metrics" "github.com/grafana/mimir/pkg/streamingpromql/types" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -62,10 +62,9 @@ type QueryPlanner struct { planOptimizationPasses []optimize.QueryPlanOptimizationPass planStageLatency *prometheus.HistogramVec generatedPlans *prometheus.CounterVec - operatorMetricsTracker *operatormetrics.MetricsTracker versionProvider QueryPlanVersionProvider - - logger log.Logger + planningMetricsTracker *planningmetrics.MetricsTracker + logger log.Logger // Replaced during testing to ensure timing produces consistent results. TimeSince func(time.Time) time.Duration @@ -141,7 +140,7 @@ func NewQueryPlannerWithoutOptimizationPasses(opts EngineOpts, versionProvider Q Name: "cortex_mimir_query_engine_plans_generated_total", Help: "Total number of query plans generated.", }, []string{"version"}), - operatorMetricsTracker: operatormetrics.NewOperatorMetricsTracker(opts.CommonOpts.Reg), + planningMetricsTracker: planningmetrics.NewMetricsTracker(opts.CommonOpts.Reg), versionProvider: versionProvider, logger: opts.Logger, @@ -149,10 +148,10 @@ func NewQueryPlannerWithoutOptimizationPasses(opts EngineOpts, versionProvider Q }, nil } -// RegisterOperatorMetrics replaces the operatorMetricsTracker with the given tracker. +// RegisterOperatorMetrics replaces the planningMetricsTracker with the given tracker. // This is only needed for unit tests -func (p *QueryPlanner) RegisterOperatorMetrics(tracker *operatormetrics.MetricsTracker) { - p.operatorMetricsTracker = tracker +func (p *QueryPlanner) RegisterOperatorMetrics(tracker *planningmetrics.MetricsTracker) { + p.planningMetricsTracker = tracker } // RegisterASTOptimizationPass registers an AST optimization pass used with this engine. @@ -248,7 +247,7 @@ func (p *QueryPlanner) NewQueryPlan(ctx context.Context, qs string, timeRange ty spanLogger.DebugLog("msg", "AST optimisation passes completed", "expression", expr) plan, err := p.runPlanningStage("Original plan", observer, func() (*planning.QueryPlan, error) { - root, err := p.nodeFromExpr(expr, timeRange) + root, err := p.nodeFromExpr(expr, timeRange, p.planningMetricsTracker) if err != nil { return nil, err } @@ -401,7 +400,7 @@ func (p *QueryPlanner) runPlanningStage(stageName string, observer PlanningObser return plan, nil } -func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeRange) (planning.Node, error) { +func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeRange, metricsTracker *planningmetrics.MetricsTracker) (planning.Node, error) { switch expr := expr.(type) { case *parser.VectorSelector: return &core.VectorSelector{ @@ -438,7 +437,7 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR }, nil case *parser.AggregateExpr: - inner, err := p.nodeFromExpr(expr.Expr, timeRange) + inner, err := p.nodeFromExpr(expr.Expr, timeRange, metricsTracker) if err != nil { return nil, err } @@ -446,7 +445,7 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR var param planning.Node if expr.Param != nil { - param, err = p.nodeFromExpr(expr.Param, timeRange) + param, err = p.nodeFromExpr(expr.Param, inner.ChildrenTimeRange(timeRange), metricsTracker) if err != nil { return nil, err } @@ -469,12 +468,12 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR }, nil case *parser.BinaryExpr: - lhs, err := p.nodeFromExpr(expr.LHS, timeRange) + lhs, err := p.nodeFromExpr(expr.LHS, timeRange, metricsTracker) if err != nil { return nil, err } - rhs, err := p.nodeFromExpr(expr.RHS, timeRange) + rhs, err := p.nodeFromExpr(expr.RHS, timeRange, metricsTracker) if err != nil { return nil, err } @@ -533,7 +532,7 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR args := make([]planning.Node, 0, len(expr.Args)) for _, arg := range expr.Args { - node, err := p.nodeFromExpr(arg, timeRange) + node, err := p.nodeFromExpr(arg, timeRange, metricsTracker) if err != nil { return nil, err } @@ -572,11 +571,15 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR // Note that the DeduplicateAndMerge still wraps the function call as the timestamp function returns true under functionNeedsDeduplication(). // This can be removed once https://github.com/prometheus/prometheus/pull/17313 is vendored into mimir stepInvariantExpression, ok := args[0].(*core.StepInvariantExpression) - if ok && timeRange.StepCount > 1 { + if ok { vectorSelector, ok := stepInvariantExpression.Inner.(*core.VectorSelector) if ok { vectorSelector.ReturnSampleTimestamps = true f.Args[0] = stepInvariantExpression.Inner + + // Note - we do not update the step invariant metrics tracker as this will already have been updated when the + // argument expression was created. + return &core.StepInvariantExpression{ Inner: &core.DeduplicateAndMerge{ Inner: f, @@ -602,10 +605,6 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR return f, nil case *parser.SubqueryExpr: - inner, err := p.nodeFromExpr(expr.Expr, timeRange) - if err != nil { - return nil, err - } step := expr.Step @@ -613,8 +612,12 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR step = time.Duration(p.noStepSubqueryIntervalFn(expr.Range.Milliseconds())) * time.Millisecond } - return &core.Subquery{ - Inner: inner, + // Construct the Subquery in 2 phases. + // The first step initializes the SubqueryDetails, which allows us to determine the children time range. + // The second step then creates the inner expression, passing in this child time range. + // This is needed by the step invariant expression handler which will require this child time range to + // accurately calculate the number of steps saved. + subquery := &core.Subquery{ SubqueryDetails: &core.SubqueryDetails{ Timestamp: core.TimeFromTimestamp(expr.Timestamp), Offset: expr.OriginalOffset, @@ -622,10 +625,20 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR Step: step, ExpressionPosition: core.PositionRangeFrom(expr.PositionRange()), }, - }, nil + } + + childTimeRange := subquery.ChildrenTimeRange(timeRange) + + inner, err := p.nodeFromExpr(expr.Expr, childTimeRange, metricsTracker) + if err != nil { + return nil, err + } + + subquery.Inner = inner + return subquery, nil case *parser.UnaryExpr: - inner, err := p.nodeFromExpr(expr.Expr, timeRange) + inner, err := p.nodeFromExpr(expr.Expr, timeRange, metricsTracker) if err != nil { return nil, err } @@ -675,19 +688,21 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR }, nil case *parser.ParenExpr: - return p.nodeFromExpr(expr.Expr, timeRange) + return p.nodeFromExpr(expr.Expr, timeRange, metricsTracker) case *parser.StepInvariantExpr: - inner, err := p.nodeFromExpr(expr.Expr, timeRange) + inner, err := p.nodeFromExpr(expr.Expr, timeRange, metricsTracker) if err != nil { return nil, err } - // There is no advantage to wrapping an instant query in a step invariant + // There is no advantage to wrapping an instant query in a step invariant. if timeRange.StepCount <= 1 { return inner, nil } + metricsTracker.StepInvariantTracker.OnStepInvariantExpressionAdded(timeRange.StepCount) + return &core.StepInvariantExpression{ Inner: inner, StepInvariantExpressionDetails: &core.StepInvariantExpressionDetails{}, diff --git a/pkg/streamingpromql/planning/core/step_invariant_expression.go b/pkg/streamingpromql/planning/core/step_invariant_expression.go index aa4ac2ffdcc..88fe47bbcee 100644 --- a/pkg/streamingpromql/planning/core/step_invariant_expression.go +++ b/pkg/streamingpromql/planning/core/step_invariant_expression.go @@ -100,9 +100,9 @@ func MaterializeStepInvariantExpression(s *StepInvariantExpression, materializer switch op := op.(type) { case types.InstantVectorOperator: - return planning.NewSingleUseOperatorFactory(operators.NewStepInvariantInstantVectorOperator(op, timeRange, params.MemoryConsumptionTracker, params.OperatorMetricsTracker.StepInvariantTracker)), nil + return planning.NewSingleUseOperatorFactory(operators.NewStepInvariantInstantVectorOperator(op, timeRange, params.MemoryConsumptionTracker)), nil case types.ScalarOperator: - return planning.NewSingleUseOperatorFactory(operators.NewStepInvariantScalarOperator(op, timeRange, params.MemoryConsumptionTracker, params.OperatorMetricsTracker.StepInvariantTracker)), nil + return planning.NewSingleUseOperatorFactory(operators.NewStepInvariantScalarOperator(op, timeRange, params.MemoryConsumptionTracker)), nil case types.RangeVectorOperator: // Notes on range vector handling: // diff --git a/pkg/streamingpromql/planning/metrics/planning_metrics_tracker.go b/pkg/streamingpromql/planning/metrics/planning_metrics_tracker.go new file mode 100644 index 00000000000..9f0640e78a2 --- /dev/null +++ b/pkg/streamingpromql/planning/metrics/planning_metrics_tracker.go @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package planningmetrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// MetricsTracker holds metrics trackers used by the query planner. +type MetricsTracker struct { + StepInvariantTracker *StepInvariantExpressionMetricsTracker +} + +func NewMetricsTracker(reg prometheus.Registerer) *MetricsTracker { + return &MetricsTracker{ + StepInvariantTracker: newStepInvariantExpressionMetricsTracker(reg), + } +} + +type StepInvariantExpressionMetricsTracker struct { + // total number of step invariant expression nodes created in planning + nodes prometheus.Counter + + // total number of steps which have been saved - the number of steps which did not need to be re-evaluated + steps prometheus.Counter +} + +// OnStepInvariantExpressionAdded is called when a step invariant expression planning node is added to a plan. +// It increments the nodes counter by 1, and the steps saved counter is incremented by the given stepCount-1. +// If the stepCount is less than or equal to 1 then this function is a no-op. +func (t *StepInvariantExpressionMetricsTracker) OnStepInvariantExpressionAdded(stepCount int) { + if stepCount <= 1 { + return + } + t.nodes.Inc() + t.steps.Add(float64(stepCount - 1)) +} + +// NodesCounter returns the counter tracking step invariant expression nodes. +// This is provided for unit tests and allowing the underlying counter to remain unexported. +func (t *StepInvariantExpressionMetricsTracker) NodesCounter() prometheus.Counter { + return t.nodes +} + +// StepsCounter returns the counter tracking step invariant step savings. +// This is provided for unit tests and allowing the underlying counter to remain unexported. +func (t *StepInvariantExpressionMetricsTracker) StepsCounter() prometheus.Counter { + return t.steps +} + +func newStepInvariantExpressionMetricsTracker(reg prometheus.Registerer) *StepInvariantExpressionMetricsTracker { + return &StepInvariantExpressionMetricsTracker{ + nodes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_mimir_query_engine_step_invariant_nodes_total", + Help: "Total number of step invariant nodes.", + }), + steps: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_mimir_query_engine_step_invariant_steps_saved_total", + Help: "Total number of steps which were saved from being queried / loaded due to step invariant handling.", + }), + } +} diff --git a/pkg/streamingpromql/planning/metrics/planning_metrics_tracker_test.go b/pkg/streamingpromql/planning/metrics/planning_metrics_tracker_test.go new file mode 100644 index 00000000000..7836ce0c375 --- /dev/null +++ b/pkg/streamingpromql/planning/metrics/planning_metrics_tracker_test.go @@ -0,0 +1,43 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package planningmetrics + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func TestNewStepInvariantExpressionMetricsTrackerNodes(t *testing.T) { + tracker := NewMetricsTracker(prometheus.NewRegistry()) + require.Same(t, tracker.StepInvariantTracker.nodes, tracker.StepInvariantTracker.NodesCounter()) + + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.steps)) + + tracker.StepInvariantTracker.OnStepInvariantExpressionAdded(10) + require.Equal(t, float64(1), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) + require.Equal(t, float64(9), testutil.ToFloat64(tracker.StepInvariantTracker.steps)) + + tracker.StepInvariantTracker.OnStepInvariantExpressionAdded(5) + require.Equal(t, float64(2), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) + require.Equal(t, float64(13), testutil.ToFloat64(tracker.StepInvariantTracker.steps)) +} + +func TestNewStepInvariantExpressionMetricsTrackerNodesNotEnoughSteps(t *testing.T) { + tracker := NewMetricsTracker(prometheus.NewRegistry()) + require.Same(t, tracker.StepInvariantTracker.nodes, tracker.StepInvariantTracker.NodesCounter()) + + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.steps)) + + tracker.StepInvariantTracker.OnStepInvariantExpressionAdded(0) + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.steps)) + + tracker.StepInvariantTracker.OnStepInvariantExpressionAdded(1) + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.steps)) +} diff --git a/pkg/streamingpromql/planning/plan.go b/pkg/streamingpromql/planning/plan.go index 9bd0edf028e..1a51aeb58e4 100644 --- a/pkg/streamingpromql/planning/plan.go +++ b/pkg/streamingpromql/planning/plan.go @@ -19,7 +19,6 @@ import ( "github.com/prometheus/prometheus/util/annotations" apierror "github.com/grafana/mimir/pkg/api/error" - operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics" "github.com/grafana/mimir/pkg/streamingpromql/types" "github.com/grafana/mimir/pkg/util/limiter" ) @@ -220,7 +219,6 @@ func (t QueriedTimeRange) Union(other QueriedTimeRange) QueriedTimeRange { type OperatorParameters struct { Queryable storage.Queryable MemoryConsumptionTracker *limiter.MemoryConsumptionTracker - OperatorMetricsTracker *operatormetrics.MetricsTracker Annotations *annotations.Annotations QueryStats *types.QueryStats LookbackDelta time.Duration From 5be31e1fd32c97888ba10616893deb0060e8f919 Mon Sep 17 00:00:00 2001 From: Andrew Hall Date: Tue, 20 Jan 2026 12:19:20 +0800 Subject: [PATCH 06/12] Fix tests --- pkg/streamingpromql/engine_test.go | 2 +- .../optimize/plan/narrow_selectors_test.go | 22 ++++++++--------- pkg/streamingpromql/planning.go | 24 +++++++++---------- 3 files changed, 23 insertions(+), 25 deletions(-) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 6061100bb1c..f1d2a4f40b0 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -9,7 +9,6 @@ import ( "context" "errors" "fmt" - planningmetrics "github.com/grafana/mimir/pkg/streamingpromql/planning/metrics" "io" "io/fs" "os" @@ -45,6 +44,7 @@ import ( "github.com/grafana/mimir/pkg/storage/lazyquery" "github.com/grafana/mimir/pkg/streamingpromql/compat" "github.com/grafana/mimir/pkg/streamingpromql/planning" + planningmetrics "github.com/grafana/mimir/pkg/streamingpromql/planning/metrics" mqetest "github.com/grafana/mimir/pkg/streamingpromql/testutils" "github.com/grafana/mimir/pkg/streamingpromql/types" "github.com/grafana/mimir/pkg/util/globalerror" diff --git a/pkg/streamingpromql/optimize/plan/narrow_selectors_test.go b/pkg/streamingpromql/optimize/plan/narrow_selectors_test.go index 4d214b306fc..4fb4aead5a1 100644 --- a/pkg/streamingpromql/optimize/plan/narrow_selectors_test.go +++ b/pkg/streamingpromql/optimize/plan/narrow_selectors_test.go @@ -204,12 +204,11 @@ func TestNarrowSelectorsOptimizationPass(t *testing.T) { "binary expression with no selectors": { expr: `vector(1) + vector(0)`, expectedPlan: ` - - StepInvariantExpression - - BinaryExpression: LHS + RHS - - LHS: FunctionCall: vector(...) - - NumberLiteral: 1 - - RHS: FunctionCall: vector(...) - - NumberLiteral: 0 + - BinaryExpression: LHS + RHS + - LHS: FunctionCall: vector(...) + - NumberLiteral: 1 + - RHS: FunctionCall: vector(...) + - NumberLiteral: 0 `, expectedAttempts: 0, expectedModified: 0, @@ -217,12 +216,11 @@ func TestNarrowSelectorsOptimizationPass(t *testing.T) { "binary expression on with no selectors": { expr: `vector(1) + on (region) vector(0)`, expectedPlan: ` - - StepInvariantExpression - - BinaryExpression: LHS + on (region) RHS - - LHS: FunctionCall: vector(...) - - NumberLiteral: 1 - - RHS: FunctionCall: vector(...) - - NumberLiteral: 0 + - BinaryExpression: LHS + on (region) RHS + - LHS: FunctionCall: vector(...) + - NumberLiteral: 1 + - RHS: FunctionCall: vector(...) + - NumberLiteral: 0 `, expectedAttempts: 0, expectedModified: 0, diff --git a/pkg/streamingpromql/planning.go b/pkg/streamingpromql/planning.go index d4e0dffacca..e611e246f9b 100644 --- a/pkg/streamingpromql/planning.go +++ b/pkg/streamingpromql/planning.go @@ -247,7 +247,7 @@ func (p *QueryPlanner) NewQueryPlan(ctx context.Context, qs string, timeRange ty spanLogger.DebugLog("msg", "AST optimisation passes completed", "expression", expr) plan, err := p.runPlanningStage("Original plan", observer, func() (*planning.QueryPlan, error) { - root, err := p.nodeFromExpr(expr, timeRange, p.planningMetricsTracker) + root, err := p.nodeFromExpr(expr, timeRange) if err != nil { return nil, err } @@ -400,7 +400,7 @@ func (p *QueryPlanner) runPlanningStage(stageName string, observer PlanningObser return plan, nil } -func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeRange, metricsTracker *planningmetrics.MetricsTracker) (planning.Node, error) { +func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeRange) (planning.Node, error) { switch expr := expr.(type) { case *parser.VectorSelector: return &core.VectorSelector{ @@ -437,7 +437,7 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR }, nil case *parser.AggregateExpr: - inner, err := p.nodeFromExpr(expr.Expr, timeRange, metricsTracker) + inner, err := p.nodeFromExpr(expr.Expr, timeRange) if err != nil { return nil, err } @@ -445,7 +445,7 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR var param planning.Node if expr.Param != nil { - param, err = p.nodeFromExpr(expr.Param, inner.ChildrenTimeRange(timeRange), metricsTracker) + param, err = p.nodeFromExpr(expr.Param, timeRange) if err != nil { return nil, err } @@ -468,12 +468,12 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR }, nil case *parser.BinaryExpr: - lhs, err := p.nodeFromExpr(expr.LHS, timeRange, metricsTracker) + lhs, err := p.nodeFromExpr(expr.LHS, timeRange) if err != nil { return nil, err } - rhs, err := p.nodeFromExpr(expr.RHS, timeRange, metricsTracker) + rhs, err := p.nodeFromExpr(expr.RHS, timeRange) if err != nil { return nil, err } @@ -532,7 +532,7 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR args := make([]planning.Node, 0, len(expr.Args)) for _, arg := range expr.Args { - node, err := p.nodeFromExpr(arg, timeRange, metricsTracker) + node, err := p.nodeFromExpr(arg, timeRange) if err != nil { return nil, err } @@ -629,7 +629,7 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR childTimeRange := subquery.ChildrenTimeRange(timeRange) - inner, err := p.nodeFromExpr(expr.Expr, childTimeRange, metricsTracker) + inner, err := p.nodeFromExpr(expr.Expr, childTimeRange) if err != nil { return nil, err } @@ -638,7 +638,7 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR return subquery, nil case *parser.UnaryExpr: - inner, err := p.nodeFromExpr(expr.Expr, timeRange, metricsTracker) + inner, err := p.nodeFromExpr(expr.Expr, timeRange) if err != nil { return nil, err } @@ -688,10 +688,10 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR }, nil case *parser.ParenExpr: - return p.nodeFromExpr(expr.Expr, timeRange, metricsTracker) + return p.nodeFromExpr(expr.Expr, timeRange) case *parser.StepInvariantExpr: - inner, err := p.nodeFromExpr(expr.Expr, timeRange, metricsTracker) + inner, err := p.nodeFromExpr(expr.Expr, timeRange) if err != nil { return nil, err } @@ -701,7 +701,7 @@ func (p *QueryPlanner) nodeFromExpr(expr parser.Expr, timeRange types.QueryTimeR return inner, nil } - metricsTracker.StepInvariantTracker.OnStepInvariantExpressionAdded(timeRange.StepCount) + p.planningMetricsTracker.StepInvariantTracker.OnStepInvariantExpressionAdded(timeRange.StepCount) return &core.StepInvariantExpression{ Inner: inner, From 4e499c72cd21068d7c2efeefd8d0b1c3a6f73e94 Mon Sep 17 00:00:00 2001 From: Andrew Hall Date: Tue, 20 Jan 2026 12:35:44 +0800 Subject: [PATCH 07/12] PR feedback --- pkg/streamingpromql/engine_test.go | 19 ++++++++++++++----- pkg/streamingpromql/planning.go | 6 ------ .../metrics/planning_metrics_tracker.go | 19 ++----------------- .../metrics/planning_metrics_tracker_test.go | 2 -- 4 files changed, 16 insertions(+), 30 deletions(-) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index f1d2a4f40b0..b70cb285e2f 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -4827,7 +4827,7 @@ func TestExtendedRangeSelectorsIrregular(t *testing.T) { } } -func TestStepInvariantMetricsTracker(t *testing.T) { +func TestStepInvariantMetrics(t *testing.T) { storage := promqltest.LoadedStorage(t, ` load 1m metric 0 1 2 3 4 5 @@ -4947,13 +4947,13 @@ func TestStepInvariantMetricsTracker(t *testing.T) { for _, tc := range tc { t.Run(tc.query, func(t *testing.T) { - tracker := planningmetrics.NewMetricsTracker(prometheus.NewRegistry()) + registry := prometheus.NewRegistry() opts := NewTestEngineOpts() planner, err := NewQueryPlannerWithoutOptimizationPasses(opts, NewMaximumSupportedVersionQueryPlanVersionProvider()) + planner.planningMetricsTracker = planningmetrics.NewMetricsTracker(registry) require.NoError(t, err) - planner.RegisterOperatorMetrics(tracker) engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), planner) require.NoError(t, err) @@ -4962,8 +4962,17 @@ func TestStepInvariantMetricsTracker(t *testing.T) { res := qry.Exec(context.Background()) require.NoError(t, res.Err) - require.Equal(t, float64(tc.expectedNodes), testutil.ToFloat64(tracker.StepInvariantTracker.NodesCounter())) - require.Equal(t, float64(tc.expectedStepsSaved), testutil.ToFloat64(tracker.StepInvariantTracker.StepsCounter())) + metrics, err := registry.Gather() + require.NoError(t, err) + for _, m := range metrics { + if m.GetName() == "cortex_mimir_query_engine_step_invariant_nodes_total" { + require.Equal(t, float64(tc.expectedNodes), m.GetMetric()[0].Counter.GetValue()) + } else if m.GetName() == "cortex_mimir_query_engine_step_invariant_steps_saved_total" { + require.Equal(t, float64(tc.expectedStepsSaved), m.GetMetric()[0].Counter.GetValue()) + } else { + require.Fail(t, "unexpected metric name", m.GetName()) + } + } // Extra assertions to ensure we do not introduce bad test data if tc.expectedNodes == 0 { diff --git a/pkg/streamingpromql/planning.go b/pkg/streamingpromql/planning.go index e611e246f9b..abf0d3d4d20 100644 --- a/pkg/streamingpromql/planning.go +++ b/pkg/streamingpromql/planning.go @@ -148,12 +148,6 @@ func NewQueryPlannerWithoutOptimizationPasses(opts EngineOpts, versionProvider Q }, nil } -// RegisterOperatorMetrics replaces the planningMetricsTracker with the given tracker. -// This is only needed for unit tests -func (p *QueryPlanner) RegisterOperatorMetrics(tracker *planningmetrics.MetricsTracker) { - p.planningMetricsTracker = tracker -} - // RegisterASTOptimizationPass registers an AST optimization pass used with this engine. // // This method is not thread-safe and must not be called concurrently with any other method on this type. diff --git a/pkg/streamingpromql/planning/metrics/planning_metrics_tracker.go b/pkg/streamingpromql/planning/metrics/planning_metrics_tracker.go index 9f0640e78a2..c20c49c05c1 100644 --- a/pkg/streamingpromql/planning/metrics/planning_metrics_tracker.go +++ b/pkg/streamingpromql/planning/metrics/planning_metrics_tracker.go @@ -28,27 +28,12 @@ type StepInvariantExpressionMetricsTracker struct { // OnStepInvariantExpressionAdded is called when a step invariant expression planning node is added to a plan. // It increments the nodes counter by 1, and the steps saved counter is incremented by the given stepCount-1. -// If the stepCount is less than or equal to 1 then this function is a no-op. +// It is the caller's responsibility to ensure that the step count is > 1. func (t *StepInvariantExpressionMetricsTracker) OnStepInvariantExpressionAdded(stepCount int) { - if stepCount <= 1 { - return - } t.nodes.Inc() t.steps.Add(float64(stepCount - 1)) } -// NodesCounter returns the counter tracking step invariant expression nodes. -// This is provided for unit tests and allowing the underlying counter to remain unexported. -func (t *StepInvariantExpressionMetricsTracker) NodesCounter() prometheus.Counter { - return t.nodes -} - -// StepsCounter returns the counter tracking step invariant step savings. -// This is provided for unit tests and allowing the underlying counter to remain unexported. -func (t *StepInvariantExpressionMetricsTracker) StepsCounter() prometheus.Counter { - return t.steps -} - func newStepInvariantExpressionMetricsTracker(reg prometheus.Registerer) *StepInvariantExpressionMetricsTracker { return &StepInvariantExpressionMetricsTracker{ nodes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ @@ -57,7 +42,7 @@ func newStepInvariantExpressionMetricsTracker(reg prometheus.Registerer) *StepIn }), steps: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_mimir_query_engine_step_invariant_steps_saved_total", - Help: "Total number of steps which were saved from being queried / loaded due to step invariant handling.", + Help: "Total number of steps which were saved from being evaluated due to step invariant handling.", }), } } diff --git a/pkg/streamingpromql/planning/metrics/planning_metrics_tracker_test.go b/pkg/streamingpromql/planning/metrics/planning_metrics_tracker_test.go index 7836ce0c375..ea2a0bb4d43 100644 --- a/pkg/streamingpromql/planning/metrics/planning_metrics_tracker_test.go +++ b/pkg/streamingpromql/planning/metrics/planning_metrics_tracker_test.go @@ -12,7 +12,6 @@ import ( func TestNewStepInvariantExpressionMetricsTrackerNodes(t *testing.T) { tracker := NewMetricsTracker(prometheus.NewRegistry()) - require.Same(t, tracker.StepInvariantTracker.nodes, tracker.StepInvariantTracker.NodesCounter()) require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.steps)) @@ -28,7 +27,6 @@ func TestNewStepInvariantExpressionMetricsTrackerNodes(t *testing.T) { func TestNewStepInvariantExpressionMetricsTrackerNodesNotEnoughSteps(t *testing.T) { tracker := NewMetricsTracker(prometheus.NewRegistry()) - require.Same(t, tracker.StepInvariantTracker.nodes, tracker.StepInvariantTracker.NodesCounter()) require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.steps)) From 90b81eb7f750349290a00c5905f80f815793b30f Mon Sep 17 00:00:00 2001 From: Andrew Hall Date: Tue, 20 Jan 2026 12:48:26 +0800 Subject: [PATCH 08/12] Update planning_metrics_tracker_test.go --- .../metrics/planning_metrics_tracker_test.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/pkg/streamingpromql/planning/metrics/planning_metrics_tracker_test.go b/pkg/streamingpromql/planning/metrics/planning_metrics_tracker_test.go index ea2a0bb4d43..24f200524d3 100644 --- a/pkg/streamingpromql/planning/metrics/planning_metrics_tracker_test.go +++ b/pkg/streamingpromql/planning/metrics/planning_metrics_tracker_test.go @@ -24,18 +24,3 @@ func TestNewStepInvariantExpressionMetricsTrackerNodes(t *testing.T) { require.Equal(t, float64(2), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) require.Equal(t, float64(13), testutil.ToFloat64(tracker.StepInvariantTracker.steps)) } - -func TestNewStepInvariantExpressionMetricsTrackerNodesNotEnoughSteps(t *testing.T) { - tracker := NewMetricsTracker(prometheus.NewRegistry()) - - require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) - require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.steps)) - - tracker.StepInvariantTracker.OnStepInvariantExpressionAdded(0) - require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) - require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.steps)) - - tracker.StepInvariantTracker.OnStepInvariantExpressionAdded(1) - require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) - require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.steps)) -} From c26378a5e781db059d50acb06ead28b06fd4020c Mon Sep 17 00:00:00 2001 From: Andrew Hall Date: Tue, 20 Jan 2026 12:49:18 +0800 Subject: [PATCH 09/12] Update engine_test.go --- pkg/streamingpromql/engine_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index b70cb285e2f..6e6e1a48af2 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -4951,8 +4951,8 @@ func TestStepInvariantMetrics(t *testing.T) { opts := NewTestEngineOpts() planner, err := NewQueryPlannerWithoutOptimizationPasses(opts, NewMaximumSupportedVersionQueryPlanVersionProvider()) - planner.planningMetricsTracker = planningmetrics.NewMetricsTracker(registry) require.NoError(t, err) + planner.planningMetricsTracker = planningmetrics.NewMetricsTracker(registry) engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), planner) require.NoError(t, err) From ad2372e0dc3171150f22804c86770f7eab27f25d Mon Sep 17 00:00:00 2001 From: Andrew Hall Date: Tue, 20 Jan 2026 13:01:23 +0800 Subject: [PATCH 10/12] Update engine_test.go --- pkg/streamingpromql/engine_test.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 6e6e1a48af2..f3d1604d16b 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -4962,17 +4962,17 @@ func TestStepInvariantMetrics(t *testing.T) { res := qry.Exec(context.Background()) require.NoError(t, res.Err) - metrics, err := registry.Gather() - require.NoError(t, err) - for _, m := range metrics { - if m.GetName() == "cortex_mimir_query_engine_step_invariant_nodes_total" { - require.Equal(t, float64(tc.expectedNodes), m.GetMetric()[0].Counter.GetValue()) - } else if m.GetName() == "cortex_mimir_query_engine_step_invariant_steps_saved_total" { - require.Equal(t, float64(tc.expectedStepsSaved), m.GetMetric()[0].Counter.GetValue()) - } else { - require.Fail(t, "unexpected metric name", m.GetName()) - } - } + require.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(fmt.Sprintf(` + # HELP cortex_mimir_query_engine_step_invariant_nodes_total Total number of step invariant nodes. + # TYPE cortex_mimir_query_engine_step_invariant_nodes_total counter + cortex_mimir_query_engine_step_invariant_nodes_total %d + `, tc.expectedNodes)), "cortex_mimir_query_engine_step_invariant_nodes_total")) + + require.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(fmt.Sprintf(` + # HELP cortex_mimir_query_engine_step_invariant_steps_saved_total Total number of steps which were saved from being evaluated due to step invariant handling. + # TYPE cortex_mimir_query_engine_step_invariant_steps_saved_total counter + cortex_mimir_query_engine_step_invariant_steps_saved_total %d + `, tc.expectedStepsSaved)), "cortex_mimir_query_engine_step_invariant_steps_saved_total")) // Extra assertions to ensure we do not introduce bad test data if tc.expectedNodes == 0 { From 43ae8707480e5a4a4822d18b9bb59e5d5586271c Mon Sep 17 00:00:00 2001 From: Andrew Hall Date: Tue, 20 Jan 2026 14:48:17 +0800 Subject: [PATCH 11/12] Update engine_test.go --- pkg/streamingpromql/engine_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index f3d1604d16b..075ef87e967 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -44,7 +44,6 @@ import ( "github.com/grafana/mimir/pkg/storage/lazyquery" "github.com/grafana/mimir/pkg/streamingpromql/compat" "github.com/grafana/mimir/pkg/streamingpromql/planning" - planningmetrics "github.com/grafana/mimir/pkg/streamingpromql/planning/metrics" mqetest "github.com/grafana/mimir/pkg/streamingpromql/testutils" "github.com/grafana/mimir/pkg/streamingpromql/types" "github.com/grafana/mimir/pkg/util/globalerror" @@ -4950,9 +4949,10 @@ func TestStepInvariantMetrics(t *testing.T) { registry := prometheus.NewRegistry() opts := NewTestEngineOpts() + opts.CommonOpts.Reg = registry + planner, err := NewQueryPlannerWithoutOptimizationPasses(opts, NewMaximumSupportedVersionQueryPlanVersionProvider()) require.NoError(t, err) - planner.planningMetricsTracker = planningmetrics.NewMetricsTracker(registry) engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), planner) require.NoError(t, err) From 48928e9ac021633dd920faf26bc04e327829677d Mon Sep 17 00:00:00 2001 From: Andrew Hall Date: Tue, 20 Jan 2026 14:57:42 +0800 Subject: [PATCH 12/12] Update analysis_test.go --- .../planning/analysis/analysis_test.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/pkg/streamingpromql/planning/analysis/analysis_test.go b/pkg/streamingpromql/planning/analysis/analysis_test.go index 7f1b41c6e2f..078c06fcfc1 100644 --- a/pkg/streamingpromql/planning/analysis/analysis_test.go +++ b/pkg/streamingpromql/planning/analysis/analysis_test.go @@ -136,12 +136,10 @@ func TestHandler(t *testing.T) { "outputPlan": { "timeRange": {"startT": 1640995200000, "endT": 1640995200000, "intervalMilliseconds": 1, "isInstant": true}, "nodes": [ - {"type": "VectorSelector", "description": "{__name__=\"up\"} @ 1640995200000 (2022-01-01T00:00:00Z)"}, - {"type": "StepInvariantExpression", "children": [0], "childrenLabels": [""]} + {"type": "VectorSelector", "description": "{__name__=\"up\"} @ 1640995200000 (2022-01-01T00:00:00Z)"} ], "originalExpression": "up @ start()", - "rootNode": 1, - "version": 1 + "version": 0 } }, { @@ -150,12 +148,10 @@ func TestHandler(t *testing.T) { "outputPlan": { "timeRange": {"startT": 1640995200000, "endT": 1640995200000, "intervalMilliseconds": 1, "isInstant": true}, "nodes": [ - {"type": "VectorSelector", "description": "{__name__=\"up\"} @ 1640995200000 (2022-01-01T00:00:00Z)"}, - {"type": "StepInvariantExpression", "children": [0], "childrenLabels": [""]} + {"type": "VectorSelector", "description": "{__name__=\"up\"} @ 1640995200000 (2022-01-01T00:00:00Z)"} ], "originalExpression": "up @ start()", - "rootNode": 1, - "version": 1 + "version": 0 } } ],