diff --git a/CHANGELOG.md b/CHANGELOG.md index f0658517cb2..71ffee8d230 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -126,6 +126,7 @@ * [BUGFIX] Query-frontend: Fix silent panic when executing a remote read API request if the request has no matchers. #13745 * [BUGFIX] Ruler: Fixed `-ruler.max-rule-groups-per-tenant-by-namespace` to only count rule groups in the specified namespace instead of all namespaces. #13743 * [BUGFIX] Query-frontend: Fix race condition that could sometimes cause unnecessary resharding of queriers if querier shuffle sharding and remote execution is enabled. #13794 #13838 +* [ENHANCEMENT] MQE: Add metrics to track step-invariant expression usage and data point reuse savings: `cortex_mimir_query_engine_step_invariant_nodes_total` and `cortex_mimir_query_engine_step_invariant_points_saved_total`. #13911 ### Mixin diff --git a/pkg/streamingpromql/engine.go b/pkg/streamingpromql/engine.go index 4c5dc0ff780..39cd88d264c 100644 --- a/pkg/streamingpromql/engine.go +++ b/pkg/streamingpromql/engine.go @@ -273,6 +273,7 @@ func (e *Engine) materializeAndCreateEvaluator(ctx context.Context, queryable st operatorParams := &planning.OperatorParameters{ Queryable: queryable, MemoryConsumptionTracker: memoryConsumptionTracker, + OperatorMetricsTracker: e.planner.operatorMetricsTracker, Annotations: annotations.New(), QueryStats: types.NewQueryStats(), LookbackDelta: lookbackDelta, diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 8cdf235a0f2..1a5249a0cde 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -43,6 +43,7 @@ import ( "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/storage/lazyquery" "github.com/grafana/mimir/pkg/streamingpromql/compat" + operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics" "github.com/grafana/mimir/pkg/streamingpromql/planning" "github.com/grafana/mimir/pkg/streamingpromql/testutils" "github.com/grafana/mimir/pkg/streamingpromql/types" @@ -4771,6 +4772,151 @@ func TestExtendedRangeSelectorsIrregular(t *testing.T) { } } +func TestStepInvariantMetricsTracker(t *testing.T) { + storage := promqltest.LoadedStorage(t, ` + load 1m + metric 0 1 2 3 4 5 + nodata _ _ _ _ _ _ + histogram {{schema:3 sum:4 count:4 buckets:[1 2 1]}}+{{schema:5 sum:2 count:1 buckets:[1] offset:1}}x6 + `) + t.Cleanup(func() { storage.Close() }) + + tc := []struct { + query string + start time.Time + end time.Time + interval time.Duration + + expectedNodes int + expectedPoints map[operatormetrics.StepInvariantPointType]int + }{ + { + query: "metric @ 20", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 5, // 6 steps + operatormetrics.HPoint: 0, + }, + }, + { + query: "metric @ 20", + start: time.Unix(0, 0).Add(time.Second * 50), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 0, // step invariant operation has been removed + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 0, + operatormetrics.HPoint: 0, + }, + }, + { + query: "nodata @ 20", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 0, + operatormetrics.HPoint: 0, + }, + }, + { + query: "metric @ 20 + metric @ 30", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, // the left and right are all wrapped into a single step invariant + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 5, + operatormetrics.HPoint: 0, + }, + }, + { + query: "metric @ 20 * metric + metric @ 30", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 2, + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 10, + operatormetrics.HPoint: 0, + }, + }, + { + query: "abs(metric @ 20 * metric + metric @ 30)", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 2, + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 10, + operatormetrics.HPoint: 0, + }, + }, + { + query: "scalar(metric @ 20)", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 5, + operatormetrics.HPoint: 0, + }, + }, + { + query: "histogram @ 20", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 1, + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 0, + operatormetrics.HPoint: 5, + }, + }, + { + query: "(metric @ 20 or metric) or (histogram @ 20 or histogram) or (vector(scalar(metric @ 30)) or metric)", + start: time.Unix(0, 0), + end: time.Unix(0, 0).Add(time.Second * 50), + interval: time.Second * 10, + expectedNodes: 3, + expectedPoints: map[operatormetrics.StepInvariantPointType]int{ + operatormetrics.FPoint: 10, + operatormetrics.HPoint: 5, + }, + }, + } + + for _, tc := range tc { + t.Run(tc.query, func(t *testing.T) { + + tracker := operatormetrics.NewOperatorMetricsTracker(prometheus.NewRegistry()) + + opts := NewTestEngineOpts() + planner, err := NewQueryPlannerWithoutOptimizationPasses(opts, NewMaximumSupportedVersionQueryPlanVersionProvider()) + require.NoError(t, err) + + planner.RegisterOperatorMetrics(tracker) + engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), planner) + require.NoError(t, err) + + qry, err := engine.NewRangeQuery(context.Background(), storage, nil, tc.query, tc.start, tc.end, tc.interval) + require.NoError(t, err) + res := qry.Exec(context.Background()) + require.NoError(t, res.Err) + require.Equal(t, float64(tc.expectedNodes), testutil.ToFloat64(tracker.StepInvariantTracker.NodeCounter())) + + for pointType, expectedCount := range tc.expectedPoints { + require.Equal(t, float64(expectedCount), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(pointType))) + } + }) + } +} + type dummyMaterializer struct{} func (d dummyMaterializer) Materialize(n planning.Node, materializer *planning.Materializer, timeRange types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) { diff --git a/pkg/streamingpromql/operators/metrics/operator_metrics_tracker.go b/pkg/streamingpromql/operators/metrics/operator_metrics_tracker.go new file mode 100644 index 00000000000..4ebb57e95da --- /dev/null +++ b/pkg/streamingpromql/operators/metrics/operator_metrics_tracker.go @@ -0,0 +1,81 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package operatormetrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// MetricsTracker holds metrics trackers used by operators. +// These trackers can be passed into operators to elicit metrics +// during their lifecycle. +type MetricsTracker struct { + StepInvariantTracker *StepInvariantExpressionMetricsTracker +} + +func NewOperatorMetricsTracker(reg prometheus.Registerer) *MetricsTracker { + return &MetricsTracker{ + StepInvariantTracker: newStepInvariantExpressionMetricsTracker(reg), + } +} + +type StepInvariantPointType int + +const ( + FPoint StepInvariantPointType = iota + HPoint StepInvariantPointType = iota +) + +var stepInvariantPointTypeLabels = []string{ + "fpoint", "hpoint", +} + +type StepInvariantExpressionMetricsTracker struct { + // total number of observed step invariant nodes + nodes prometheus.Counter + + // total number of points which were saved from being retrieved from the inner operation + points *prometheus.CounterVec +} + +// OnStepInvariantNodeObserved is called when a step invariant node is observed. +// It increments the nodes counter by 1. +func (t *StepInvariantExpressionMetricsTracker) OnStepInvariantNodeObserved() { + t.nodes.Inc() +} + +// NodeCounter returns the counter tracking step invariant nodes. +// This is provided for unit tests and allowing the underlying counter to remain unexported. +func (t *StepInvariantExpressionMetricsTracker) NodeCounter() prometheus.Counter { + return t.nodes +} + +// Counter returns the counter tracking step invariant point savings for a given type. +// This is provided for unit tests and allowing the underlying counter to remain unexported. +func (t *StepInvariantExpressionMetricsTracker) Counter(pointType StepInvariantPointType) prometheus.Counter { + return t.points.WithLabelValues(stepInvariantPointTypeLabels[pointType]) +} + +// OnStepInvariantStepsSaved increments the counter related to tracking step invariant point savings for the given point type. +// The actual counter will be incremented by stepCount-1, reflecting that the first step loaded data. +// If the stepCount is less than or equal to 1 then this function is a no-op. +func (t *StepInvariantExpressionMetricsTracker) OnStepInvariantStepsSaved(pointType StepInvariantPointType, stepCount int) { + if stepCount <= 1 { + return + } + t.points.WithLabelValues(stepInvariantPointTypeLabels[pointType]).Add(float64(stepCount - 1)) +} + +func newStepInvariantExpressionMetricsTracker(reg prometheus.Registerer) *StepInvariantExpressionMetricsTracker { + return &StepInvariantExpressionMetricsTracker{ + nodes: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_mimir_query_engine_step_invariant_nodes_total", + Help: "Total number of step invariant nodes.", + }), + points: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_mimir_query_engine_step_invariant_points_saved_total", + Help: "Total number of samples which were saved from being queried / loaded due to step invariant handling.", + }, []string{"type"}), + } +} diff --git a/pkg/streamingpromql/operators/metrics/operator_metrics_tracker_test.go b/pkg/streamingpromql/operators/metrics/operator_metrics_tracker_test.go new file mode 100644 index 00000000000..517b06f3b44 --- /dev/null +++ b/pkg/streamingpromql/operators/metrics/operator_metrics_tracker_test.go @@ -0,0 +1,52 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package operatormetrics + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func TestNewStepInvariantExpressionMetricsTrackerNodes(t *testing.T) { + tracker := NewOperatorMetricsTracker(prometheus.NewRegistry()) + require.Same(t, tracker.StepInvariantTracker.nodes, tracker.StepInvariantTracker.NodeCounter()) + + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) + tracker.StepInvariantTracker.OnStepInvariantNodeObserved() + require.Equal(t, float64(1), testutil.ToFloat64(tracker.StepInvariantTracker.nodes)) +} + +func TestNewStepInvariantExpressionMetricsTrackerFloatPoints(t *testing.T) { + tracker := NewOperatorMetricsTracker(prometheus.NewRegistry()) + + // Passing in a step count <= 1 has no effect + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint))) + tracker.StepInvariantTracker.OnStepInvariantStepsSaved(FPoint, 0) + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint))) + tracker.StepInvariantTracker.OnStepInvariantStepsSaved(FPoint, 1) + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint))) + + tracker.StepInvariantTracker.OnStepInvariantStepsSaved(FPoint, 2) + require.Equal(t, float64(1), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint))) + tracker.StepInvariantTracker.OnStepInvariantStepsSaved(FPoint, 12) + require.Equal(t, float64(1+11), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(FPoint))) +} + +func TestNewStepInvariantExpressionMetricsTrackerHistogramPoints(t *testing.T) { + tracker := NewOperatorMetricsTracker(prometheus.NewRegistry()) + + // Passing in a step count <= 1 has no effect + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint))) + tracker.StepInvariantTracker.OnStepInvariantStepsSaved(HPoint, 0) + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint))) + tracker.StepInvariantTracker.OnStepInvariantStepsSaved(HPoint, 1) + require.Equal(t, float64(0), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint))) + + tracker.StepInvariantTracker.OnStepInvariantStepsSaved(HPoint, 2) + require.Equal(t, float64(1), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint))) + tracker.StepInvariantTracker.OnStepInvariantStepsSaved(HPoint, 12) + require.Equal(t, float64(1+11), testutil.ToFloat64(tracker.StepInvariantTracker.Counter(HPoint))) +} diff --git a/pkg/streamingpromql/operators/step_invariant_instant_vector_operator.go b/pkg/streamingpromql/operators/step_invariant_instant_vector_operator.go index b6fde89fd3b..0fa33463dd7 100644 --- a/pkg/streamingpromql/operators/step_invariant_instant_vector_operator.go +++ b/pkg/streamingpromql/operators/step_invariant_instant_vector_operator.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser/posrange" + operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics" "github.com/grafana/mimir/pkg/streamingpromql/types" "github.com/grafana/mimir/pkg/util/limiter" ) @@ -17,13 +18,15 @@ type StepInvariantInstantVectorOperator struct { inner types.InstantVectorOperator originalTimeRange types.QueryTimeRange memoryConsumptionTracker *limiter.MemoryConsumptionTracker + metricsTracker *operatormetrics.StepInvariantExpressionMetricsTracker } -func NewStepInvariantInstantVectorOperator(op types.InstantVectorOperator, originalTimeRange types.QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker) *StepInvariantInstantVectorOperator { +func NewStepInvariantInstantVectorOperator(op types.InstantVectorOperator, originalTimeRange types.QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker, metricsTracker *operatormetrics.StepInvariantExpressionMetricsTracker) *StepInvariantInstantVectorOperator { return &StepInvariantInstantVectorOperator{ inner: op, originalTimeRange: originalTimeRange, memoryConsumptionTracker: memoryConsumptionTracker, + metricsTracker: metricsTracker, } } @@ -36,6 +39,7 @@ func (s *StepInvariantInstantVectorOperator) Close() { } func (s *StepInvariantInstantVectorOperator) Prepare(ctx context.Context, params *types.PrepareParams) error { + s.metricsTracker.OnStepInvariantNodeObserved() return s.inner.Prepare(ctx, params) } @@ -70,6 +74,8 @@ func (s *StepInvariantInstantVectorOperator) NextSeries(ctx context.Context) (ty return types.InstantVectorSeriesData{}, err } + s.metricsTracker.OnStepInvariantStepsSaved(operatormetrics.FPoint, s.originalTimeRange.StepCount) + // Fill the expected steps with the same point. for ts := s.originalTimeRange.StartT; ts <= s.originalTimeRange.EndT; ts += s.originalTimeRange.IntervalMilliseconds { floats = append(floats, promql.FPoint{ @@ -89,6 +95,8 @@ func (s *StepInvariantInstantVectorOperator) NextSeries(ctx context.Context) (ty return types.InstantVectorSeriesData{}, err } + s.metricsTracker.OnStepInvariantStepsSaved(operatormetrics.HPoint, s.originalTimeRange.StepCount) + histograms = append(histograms, promql.HPoint{T: data.Histograms[0].T, H: data.Histograms[0].H}) // Note that we create a copy of the histogram for each step as we can not re-use the same *FloatHistogram in the slice from the pool. for ts := s.originalTimeRange.StartT + s.originalTimeRange.IntervalMilliseconds; ts <= s.originalTimeRange.EndT; ts += s.originalTimeRange.IntervalMilliseconds { diff --git a/pkg/streamingpromql/operators/step_invariant_scalar_operator.go b/pkg/streamingpromql/operators/step_invariant_scalar_operator.go index 877e755dd6c..b75cd746a52 100644 --- a/pkg/streamingpromql/operators/step_invariant_scalar_operator.go +++ b/pkg/streamingpromql/operators/step_invariant_scalar_operator.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser/posrange" + operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics" "github.com/grafana/mimir/pkg/streamingpromql/types" "github.com/grafana/mimir/pkg/util/limiter" ) @@ -17,13 +18,15 @@ type StepInvariantScalarOperator struct { inner types.ScalarOperator originalTimeRange types.QueryTimeRange memoryConsumptionTracker *limiter.MemoryConsumptionTracker + metricsTracker *operatormetrics.StepInvariantExpressionMetricsTracker } -func NewStepInvariantScalarOperator(op types.ScalarOperator, originalTimeRange types.QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker) *StepInvariantScalarOperator { +func NewStepInvariantScalarOperator(op types.ScalarOperator, originalTimeRange types.QueryTimeRange, memoryConsumptionTracker *limiter.MemoryConsumptionTracker, metricsTracker *operatormetrics.StepInvariantExpressionMetricsTracker) *StepInvariantScalarOperator { return &StepInvariantScalarOperator{ inner: op, originalTimeRange: originalTimeRange, memoryConsumptionTracker: memoryConsumptionTracker, + metricsTracker: metricsTracker, } } @@ -36,6 +39,7 @@ func (s *StepInvariantScalarOperator) Close() { } func (s *StepInvariantScalarOperator) Prepare(ctx context.Context, params *types.PrepareParams) error { + s.metricsTracker.OnStepInvariantNodeObserved() return s.inner.Prepare(ctx, params) } @@ -64,6 +68,8 @@ func (s *StepInvariantScalarOperator) GetValues(ctx context.Context) (types.Scal return types.ScalarData{}, err } + s.metricsTracker.OnStepInvariantStepsSaved(operatormetrics.FPoint, s.originalTimeRange.StepCount) + floats = append(floats, data.Samples[0]) for ts := s.originalTimeRange.StartT + s.originalTimeRange.IntervalMilliseconds; ts <= s.originalTimeRange.EndT; ts += s.originalTimeRange.IntervalMilliseconds { diff --git a/pkg/streamingpromql/planning.go b/pkg/streamingpromql/planning.go index cef5f8c3b02..723f44240f7 100644 --- a/pkg/streamingpromql/planning.go +++ b/pkg/streamingpromql/planning.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/streamingpromql/compat" "github.com/grafana/mimir/pkg/streamingpromql/operators/functions" + operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics" "github.com/grafana/mimir/pkg/streamingpromql/optimize" "github.com/grafana/mimir/pkg/streamingpromql/optimize/ast" "github.com/grafana/mimir/pkg/streamingpromql/optimize/plan" @@ -61,6 +62,7 @@ type QueryPlanner struct { planOptimizationPasses []optimize.QueryPlanOptimizationPass planStageLatency *prometheus.HistogramVec generatedPlans *prometheus.CounterVec + operatorMetricsTracker *operatormetrics.MetricsTracker versionProvider QueryPlanVersionProvider logger log.Logger @@ -139,14 +141,20 @@ func NewQueryPlannerWithoutOptimizationPasses(opts EngineOpts, versionProvider Q Name: "cortex_mimir_query_engine_plans_generated_total", Help: "Total number of query plans generated.", }, []string{"version"}), - - versionProvider: versionProvider, + operatorMetricsTracker: operatormetrics.NewOperatorMetricsTracker(opts.CommonOpts.Reg), + versionProvider: versionProvider, logger: opts.Logger, TimeSince: time.Since, }, nil } +// RegisterOperatorMetrics replaces the operatorMetricsTracker with the given tracker. +// This is only needed for unit tests +func (p *QueryPlanner) RegisterOperatorMetrics(tracker *operatormetrics.MetricsTracker) { + p.operatorMetricsTracker = tracker +} + // RegisterASTOptimizationPass registers an AST optimization pass used with this engine. // // This method is not thread-safe and must not be called concurrently with any other method on this type. diff --git a/pkg/streamingpromql/planning/core/step_invariant_expression.go b/pkg/streamingpromql/planning/core/step_invariant_expression.go index 88fe47bbcee..7ec767bf516 100644 --- a/pkg/streamingpromql/planning/core/step_invariant_expression.go +++ b/pkg/streamingpromql/planning/core/step_invariant_expression.go @@ -98,11 +98,16 @@ func MaterializeStepInvariantExpression(s *StepInvariantExpression, materializer return nil, err } + // There is no advantage to wrapping an instant query in a step invariant + if timeRange.StepCount <= 1 { + return planning.NewSingleUseOperatorFactory(op), nil + } + switch op := op.(type) { case types.InstantVectorOperator: - return planning.NewSingleUseOperatorFactory(operators.NewStepInvariantInstantVectorOperator(op, timeRange, params.MemoryConsumptionTracker)), nil + return planning.NewSingleUseOperatorFactory(operators.NewStepInvariantInstantVectorOperator(op, timeRange, params.MemoryConsumptionTracker, params.OperatorMetricsTracker.StepInvariantTracker)), nil case types.ScalarOperator: - return planning.NewSingleUseOperatorFactory(operators.NewStepInvariantScalarOperator(op, timeRange, params.MemoryConsumptionTracker)), nil + return planning.NewSingleUseOperatorFactory(operators.NewStepInvariantScalarOperator(op, timeRange, params.MemoryConsumptionTracker, params.OperatorMetricsTracker.StepInvariantTracker)), nil case types.RangeVectorOperator: // Notes on range vector handling: // diff --git a/pkg/streamingpromql/planning/plan.go b/pkg/streamingpromql/planning/plan.go index 1a51aeb58e4..9bd0edf028e 100644 --- a/pkg/streamingpromql/planning/plan.go +++ b/pkg/streamingpromql/planning/plan.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/prometheus/util/annotations" apierror "github.com/grafana/mimir/pkg/api/error" + operatormetrics "github.com/grafana/mimir/pkg/streamingpromql/operators/metrics" "github.com/grafana/mimir/pkg/streamingpromql/types" "github.com/grafana/mimir/pkg/util/limiter" ) @@ -219,6 +220,7 @@ func (t QueriedTimeRange) Union(other QueriedTimeRange) QueriedTimeRange { type OperatorParameters struct { Queryable storage.Queryable MemoryConsumptionTracker *limiter.MemoryConsumptionTracker + OperatorMetricsTracker *operatormetrics.MetricsTracker Annotations *annotations.Annotations QueryStats *types.QueryStats LookbackDelta time.Duration