Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/querier/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,11 +450,11 @@ func TestDispatcher_HandleProtobuf(t *testing.T) {
req: createQueryRequest(`sum by (idx) (rate(my_series{idx="0"}[11s])) + quantile by (idx) (2, my_series{idx="0"})`, types.NewInstantQueryTimeRange(startT.Add(30*time.Second))),
expectedResponseMessages: []*frontendv2pb.QueryResultStreamRequest{
newSeriesMetadataMessage(
7,
6,
querierpb.SeriesMetadata{Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("idx", "0"))},
),
newInstantVectorSeriesDataMessage(
7,
6,
querierpb.InstantVectorSeriesData{
Floats: []mimirpb.Sample{
{TimestampMs: 30_000, Value: math.Inf(1)},
Expand Down Expand Up @@ -649,8 +649,8 @@ func TestDispatcher_HandleProtobuf(t *testing.T) {
`max_over_time(my_series[11s:10s]) + min_over_time(my_other_series[11s:10s])`,
types.NewRangeQueryTimeRange(startT, startT.Add(20*time.Second), 10*time.Second),
1,
[]string{"BinaryExpression: LHS + RHS", "LHS: DeduplicateAndMerge", "FunctionCall: max_over_time(...)", "Subquery: [11s:10s]"},
[]string{"BinaryExpression: LHS + RHS", "RHS: DeduplicateAndMerge", "FunctionCall: min_over_time(...)", "Subquery: [11s:10s]"},
[]string{"BinaryExpression: LHS + RHS", "LHS: FunctionCall: max_over_time(...)", "Subquery: [11s:10s]"},
[]string{"BinaryExpression: LHS + RHS", "RHS: FunctionCall: min_over_time(...)", "Subquery: [11s:10s]"},
),
expectedResponseMessages: []*frontendv2pb.QueryResultStreamRequest{
newSeriesMetadataMessage(
Expand Down Expand Up @@ -810,7 +810,7 @@ func TestDispatcher_HandleProtobuf(t *testing.T) {
1,
[]string{"BinaryExpression: LHS + RHS", "LHS: DeduplicateAndMerge", "BinaryExpression: LHS + RHS", "LHS: NumberLiteral: 12"},
[]string{"BinaryExpression: LHS + RHS", "LHS: DeduplicateAndMerge", "BinaryExpression: LHS + RHS", `RHS: VectorSelector: {__name__="my_series"}`},
[]string{"BinaryExpression: LHS + RHS", "RHS: DeduplicateAndMerge", "FunctionCall: min_over_time(...)", "Subquery: [11s:10s]"},
[]string{"BinaryExpression: LHS + RHS", "RHS: FunctionCall: min_over_time(...)", "Subquery: [11s:10s]"},
),
expectedResponseMessages: []*frontendv2pb.QueryResultStreamRequest{
newScalarMessage(0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ package plan

import (
"context"
"slices"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

Expand All @@ -13,6 +16,21 @@ import (
"github.com/grafana/mimir/pkg/streamingpromql/planning/core"
)

type selectorType int

const (
notSelector selectorType = iota
selectorWithoutExactName
selectorWithExactName
)

type dedupNodeInfo struct {
node *core.DeduplicateAndMerge
parent planning.Node
childIndex int // childIndex is the index of a node in its parent's children array.
keep bool
}

// EliminateDeduplicateAndMergeOptimizationPass removes redundant DeduplicateAndMerge nodes from the plan.
// DeduplicateAndMerge by default are wrapped around operations which manipulate labels (name-dropping functions, some binary operations, etc) and therefore could produce duplicate series.
// These nodes are unnecessary if it can be proven that each input series produces a unique output series.
Expand All @@ -22,37 +40,41 @@ import (
// Primary goal of this optimization is to unlock "labels projection" - ability to load only needed labels into memory.
type EliminateDeduplicateAndMergeOptimizationPass struct {
enableDelayedNameRemoval bool
}

type SelectorType int

const (
NotSelector SelectorType = iota
SelectorWithoutExactName
SelectorWithExactName
)

type dedupNodeInfo struct {
node *core.DeduplicateAndMerge
parent planning.Node
childIndex int // childIndex is the index of a node in its parent's children array.
keep bool
attempts prometheus.Counter
modified prometheus.Counter
}

func NewEliminateDeduplicateAndMergeOptimizationPass(enableDelayedNameRemoval bool) *EliminateDeduplicateAndMergeOptimizationPass {
func NewEliminateDeduplicateAndMergeOptimizationPass(enableDelayedNameRemoval bool, reg prometheus.Registerer) *EliminateDeduplicateAndMergeOptimizationPass {
return &EliminateDeduplicateAndMergeOptimizationPass{
enableDelayedNameRemoval: enableDelayedNameRemoval,
attempts: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_mimir_query_engine_eliminate_dedupe_attempted_total",
Help: "Total number of queries that the optimization pass has attempted to eliminate DeduplicateAndMerge nodes for.",
}),
modified: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_mimir_query_engine_eliminate_dedupe_modified_total",
Help: "Total number of queries where the optimization pass has been able to eliminate DeduplicateAndMerge nodes for.",
}),
}
}

func (e *EliminateDeduplicateAndMergeOptimizationPass) Name() string {
return "Eliminate DeduplicateAndMerge"
}

func (e *EliminateDeduplicateAndMergeOptimizationPass) Apply(ctx context.Context, plan *planning.QueryPlan, _ planning.QueryPlanVersion) (*planning.QueryPlan, error) {
func (e *EliminateDeduplicateAndMergeOptimizationPass) Apply(_ context.Context, plan *planning.QueryPlan, _ planning.QueryPlanVersion) (*planning.QueryPlan, error) {
e.attempts.Inc()

// nodes is a list of DeduplicateAndMerge nodes in the order of their appearance in the plan.
var nodes []dedupNodeInfo
e.collect(plan.Root, nil, -1, &nodes)

// If there are any DeduplicateAndMerge nodes we are not keeping, increment the modified counter.
if slices.ContainsFunc(nodes, func(n dedupNodeInfo) bool { return !n.keep }) {
e.modified.Inc()
}

newRoot, err := e.eliminate(nodes)
if err != nil {
return nil, err
Expand All @@ -65,14 +87,19 @@ func (e *EliminateDeduplicateAndMergeOptimizationPass) Apply(ctx context.Context

// collect collects DeduplicateAndMerge nodes from the plan and marks them for removal or keeping.
func (e *EliminateDeduplicateAndMergeOptimizationPass) collect(node planning.Node, parent planning.Node, childIndex int, nodes *[]dedupNodeInfo) {
// Binary operations are not supported yet. When we encounter a binary operation, we stop the elimination
// and keep all DeduplicateAndMerge nodes. It's done just to keep initial implementation simple.
// TODO:
// 1. Remove DeduplicateAndMerge nodes provided they don't contain binary operations - rate(foo[5m]) / rate(bar[5m])
// 2. Handle all binary operations by inspecting whether each side produces series with a __name__ label that could cause duplicates.
// If this is a binary expression, we need to retain the top-level (delayed name removal)
// or closest (non-delayed name removal) deduplicate and merge node. However, we can potentially
// remove deduplicate and merge nodes on either side of the binary expression.
if _, isBinaryOp := node.(*core.BinaryExpression); isBinaryOp {
*nodes = nil
return
if e.enableDelayedNameRemoval {
if len(*nodes) > 0 {
(*nodes)[0].keep = true
}
} else {
if len(*nodes) >= 1 {
(*nodes)[len(*nodes)-1].keep = true
}
}
}

if dedupNode, isDedup := node.(*core.DeduplicateAndMerge); isDedup {
Expand All @@ -83,12 +110,12 @@ func (e *EliminateDeduplicateAndMergeOptimizationPass) collect(node planning.Nod
})
}

selectorType := getSelectorType(node)
switch selectorType {
case SelectorWithExactName:
selector := getSelectorType(node)
switch selector {
case selectorWithExactName:
// Series with the same name are guaranteed to have the same labels, so we can eliminate all DeduplicateAndMerge nodes.
return
case SelectorWithoutExactName:
case selectorWithoutExactName:
if e.enableDelayedNameRemoval {
// With Delayed Name Removal name is dropped at the very end of query execution.
// Keep the DeduplicateAndMerge closest to root to handle final deduplication.
Expand Down Expand Up @@ -149,24 +176,24 @@ func (e *EliminateDeduplicateAndMergeOptimizationPass) eliminate(dedupNodes []de
}

// getSelectorType determines if node is a selector and whether it has an exact name matcher.
func getSelectorType(node planning.Node) SelectorType {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Label replace handling incorrectly marks sibling branch nodes

Medium Severity

The label_replace handling at lines 148-150 assumes nodes[len-2] is always an ancestor of the label_replace function. However, with the new binary expression traversal (lines 93-103), the nodes list can now contain DeduplicateAndMerge nodes from sibling branches. For a query like rate(foo[5m]) or label_replace(rate(bar[5m]), ...), when processing label_replace, nodes[len-2] incorrectly points to foo_rate's dedup node (from the LHS branch) rather than an ancestor of label_replace. This causes the LHS dedup node to be incorrectly retained when it should be eliminated (since foo has an exact name matcher). Before this PR, binary expressions caused early return so this code path was never exercised.

Fix in Cursor Fix in Web

Copy link
Contributor Author

@56quarters 56quarters Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This results in keeping an extra DeduplicateAndMerge node which is less efficient than dropping it but doesn't cause in incorrect results. I'll handle this in a follow up PR.

func getSelectorType(node planning.Node) selectorType {
var matchers []*core.LabelMatcher

if vs, isVectorSelector := node.(*core.VectorSelector); isVectorSelector {
matchers = vs.Matchers
} else if ms, isMatrixSelector := node.(*core.MatrixSelector); isMatrixSelector {
matchers = ms.Matchers
} else {
return NotSelector
return notSelector
}

for _, matcher := range matchers {
if matcher.Name == model.MetricNameLabel && matcher.Type == labels.MatchEqual {
return SelectorWithExactName
return selectorWithExactName
}
}

return SelectorWithoutExactName
return selectorWithoutExactName
}

func isLabelReplaceOrJoinFunction(node planning.Node) bool {
Expand Down
Loading
Loading