Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* [FEATURE] Ingester: Add experimental `blocks-storage.tsdb.index-lookup-planning-enabled` flag to configure use of a cost-based index lookup planner. #12530
* [FEATURE] MQE: Add support for applying extra selectors to one side of a binary operation to reduce data fetched. #12577
* [FEATURE] Query-frontend: Add a native histogram presenting the length of query expressions handled by the query-frontend #12571
* [FEATURE] Query-frontend and querier: Add experimental support for performing query planning in query-frontends and distributing portions of the plan to queriers for execution. #12302 #12551 #12665 #12687 #12745 #12757 #12798 #12809
* [FEATURE] Query-frontend and querier: Add experimental support for performing query planning in query-frontends and distributing portions of the plan to queriers for execution. #12302 #12551 #12665 #12687 #12745 #12757 #12798 #12808 #12809
* [FEATURE] Alertmanager: add Microsoft Teams V2 as a supported integration. #12680
* [FEATURE] Distributor: Add experimental flag `-validation.label-value-length-over-limit-strategy` to configure how to handle label values over the length limit. #12627
* [FEATURE] Ingester: Introduce metric `cortex_ingester_owned_target_info_series` for counting the number of owned `target_info` series by tenant. #12681
Expand Down
2 changes: 2 additions & 0 deletions pkg/streamingpromql/planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,8 @@ type AnalysisResult struct {

ASTStages []ASTStage `json:"astStages"`
PlanningStages []PlanningStage `json:"planningStages"`

PlanVersion int64 `json:"planVersion"`
}

type ASTStage struct {
Expand Down
17 changes: 16 additions & 1 deletion pkg/streamingpromql/planning/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,25 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"

apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/streamingpromql/types"
"github.com/grafana/mimir/pkg/util/limiter"
)

var MaximumSupportedQueryPlanVersion = int64(0)

type QueryPlan struct {
TimeRange types.QueryTimeRange
Root Node

OriginalExpression string
EnableDelayedNameRemoval bool

// The version of this query plan.
//
// Queriers use this to ensure they do not attempt to execute a query plan that contains features they
// cannot safely or correctly execute (eg. new nodes or new meaning for existing node details).
Version int64
}

// Node represents a node in the query plan graph.
Expand Down Expand Up @@ -166,6 +175,7 @@ func (p *QueryPlan) ToEncodedPlan(includeDescriptions bool, includeDetails bool)
RootNode: rootNode,
OriginalExpression: p.OriginalExpression,
EnableDelayedNameRemoval: p.EnableDelayedNameRemoval,
Version: p.Version,
}

return encoded, nil
Expand Down Expand Up @@ -261,8 +271,12 @@ func NodeTypeName(n Node) string {
// ToDecodedPlan converts this encoded plan to its decoded form.
// It returns references to the specified nodeIndices.
func (p *EncodedQueryPlan) ToDecodedPlan(nodeIndices ...int64) (*QueryPlan, []Node, error) {
if p.Version > MaximumSupportedQueryPlanVersion {
return nil, nil, apierror.Newf(apierror.TypeBadData, "query plan has version %v, but the maximum supported query plan version is %v", p.Version, MaximumSupportedQueryPlanVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the use of apierror here? It seems odd to use an error specific to the Prometheus HTTP API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I've been using apierror wrong this whole time - I view it as a way to hint what kind of error it is, so that the API endpoint logic doesn't need to know about every kind of error and how to translate that to an error code / HTTP status code.

Is there something else you had in mind instead?

}

if p.RootNode < 0 || p.RootNode >= int64(len(p.Nodes)) {
return nil, nil, fmt.Errorf("root node index %v out of range with %v nodes in plan", p.RootNode, len(p.Nodes))
return nil, nil, apierror.Newf(apierror.TypeBadData, "root node index %v out of range with %v nodes in plan", p.RootNode, len(p.Nodes))
}

decoder := newQueryPlanDecoder(p.Nodes)
Expand All @@ -286,6 +300,7 @@ func (p *EncodedQueryPlan) ToDecodedPlan(nodeIndices ...int64) (*QueryPlan, []No
Root: root,
OriginalExpression: p.OriginalExpression,
EnableDelayedNameRemoval: p.EnableDelayedNameRemoval,
Version: p.Version,
}, nodes, nil
}

Expand Down
119 changes: 80 additions & 39 deletions pkg/streamingpromql/planning/plan.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/streamingpromql/planning/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ message EncodedQueryPlan {
string originalExpression = 4;

bool enableDelayedNameRemoval = 5;

int64 version = 6;
}

message EncodedQueryTimeRange {
Expand Down
47 changes: 45 additions & 2 deletions pkg/streamingpromql/planning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package streamingpromql

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -1163,6 +1164,31 @@ func TestPlanCreationEncodingAndDecoding(t *testing.T) {
}
}

func TestPlanVersioning(t *testing.T) {
originalMaximumPlanVersion := planning.MaximumSupportedQueryPlanVersion
planning.MaximumSupportedQueryPlanVersion = 9001
t.Cleanup(func() { planning.MaximumSupportedQueryPlanVersion = originalMaximumPlanVersion })

plan := &planning.QueryPlan{
TimeRange: types.NewInstantQueryTimeRange(time.Now()),
Root: &core.NumberLiteral{
NumberLiteralDetails: &core.NumberLiteralDetails{
Value: 123,
},
},
OriginalExpression: "123",
Version: 9000,
}

encoded, err := plan.ToEncodedPlan(false, true)
require.NoError(t, err)
require.Equal(t, int64(9000), encoded.Version)

decoded, _, err := encoded.ToDecodedPlan()
require.NoError(t, err)
require.Equal(t, plan, decoded)
}

func TestDeduplicateAndMergePlanning(t *testing.T) {
testCases := map[string]struct {
expr string
Expand Down Expand Up @@ -1432,7 +1458,8 @@ func TestAnalysisHandler(t *testing.T) {
"originalExpression": "up"
}
}
]
],
"planVersion": 0
}`,
expectedStatusCode: http.StatusOK,
},
Expand Down Expand Up @@ -1475,7 +1502,8 @@ func TestAnalysisHandler(t *testing.T) {
"originalExpression": "up"
}
}
]
],
"planVersion": 0
}`,
expectedStatusCode: http.StatusOK,
},
Expand Down Expand Up @@ -1802,6 +1830,21 @@ func TestDecodingInvalidPlan(t *testing.T) {
},
expectedError: "node of type BinaryExpression expects 2 children, but got 1",
},
"query plan version is too high": {
input: &planning.EncodedQueryPlan{
OriginalExpression: "123",
Nodes: []*planning.EncodedNode{
{
NodeType: planning.NODE_TYPE_NUMBER_LITERAL,
Details: marshalDetails(&core.NumberLiteralDetails{
Value: 123,
}),
},
},
Version: planning.MaximumSupportedQueryPlanVersion + 1,
},
expectedError: fmt.Sprintf("query plan has version %v, but the maximum supported query plan version is %v", planning.MaximumSupportedQueryPlanVersion+1, planning.MaximumSupportedQueryPlanVersion),
},
}

for name, testCase := range testCases {
Expand Down
Loading