Skip to content

Support query analyze for thanos engine #6790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/spf13/afero v1.11.0
github.com/stretchr/testify v1.10.0
github.com/thanos-io/objstore v0.0.0-20250317105316-a0136a6f898d
github.com/thanos-io/promql-engine v0.0.0-20250522103302-dd83bd8fdb50
github.com/thanos-io/promql-engine v0.0.0-20250605204400-0c7d03a38861
github.com/thanos-io/thanos v0.37.3-0.20250603135757-4ad45948cd10
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5
Expand Down Expand Up @@ -136,7 +136,6 @@ require (
github.com/docker/go-units v0.5.0 // indirect
github.com/edsrzf/mmap-go v1.2.0 // indirect
github.com/efficientgo/tools/extkingpin v0.0.0-20230505153745-6b7392939a60 // indirect
github.com/envoyproxy/go-control-plane v0.12.0 // indirect
github.com/fatih/color v1.18.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1695,8 +1695,8 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1DkncwDHFvrpd12/2TLfgYNRmEQA48ikp+0=
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw=
github.com/thanos-io/promql-engine v0.0.0-20250522103302-dd83bd8fdb50 h1:RGdaDAyFOjrFJSjaPT2z8robLvQ3KxNiNEN3DojpLOs=
github.com/thanos-io/promql-engine v0.0.0-20250522103302-dd83bd8fdb50/go.mod h1:agUazAk1yHLYSL87MdEcRbjN12DJ9OZfSUcfFLqy+F8=
github.com/thanos-io/promql-engine v0.0.0-20250605204400-0c7d03a38861 h1:vatMMwRfIBjVK8Jkq2hdufIbmXGzCv0mUfPzF6SvO4Q=
github.com/thanos-io/promql-engine v0.0.0-20250605204400-0c7d03a38861/go.mod h1:agUazAk1yHLYSL87MdEcRbjN12DJ9OZfSUcfFLqy+F8=
github.com/thanos-io/thanos v0.37.3-0.20250603135757-4ad45948cd10 h1:mtmcivEm0EoXeHTJAgjXTthyQSTLNFWrPTzpiovau3Y=
github.com/thanos-io/thanos v0.37.3-0.20250603135757-4ad45948cd10/go.mod h1:2NvA8ZJtoGcOTriumDnJQzDmbxJz1ISGPovVAGGYDbg=
github.com/tjhop/slog-gokit v0.1.4 h1:uj/vbDt3HaF0Py8bHPV4ti/s0utnO0miRbO277FLBKM=
Expand Down
5 changes: 0 additions & 5 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,6 @@ func NewQuerierHandler(
codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: true}, cm),
}

// Install codecs
for _, c := range codecs {
api.InstallCodec(c)
}

router := mux.NewRouter()

// Use a separate metric for the querier in order to differentiate requests from the query-frontend when
Expand Down
66 changes: 64 additions & 2 deletions pkg/api/queryapi/query_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,30 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/regexp"
"github.com/munnerz/goautoneg"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/httputil"
"github.com/prometheus/prometheus/util/stats"
v1 "github.com/prometheus/prometheus/web/api/v1"
thanosengine "github.com/thanos-io/promql-engine/engine"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/engine"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/analysis"
"github.com/cortexproject/cortex/pkg/util/api"
)

type QueryData struct {
ResultType parser.ValueType `json:"resultType"`
Result parser.Value `json:"result"`
Stats stats.QueryStats `json:"stats,omitempty"`
Analysis *analysis.QueryTelemetry `json:"analysis,omitempty"`
}

type QueryAPI struct {
queryable storage.SampleAndChunkQueryable
queryEngine promql.QueryEngine
Expand Down Expand Up @@ -117,14 +129,20 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
if res.Err != nil {
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
}
var queryAnalysis analysis.QueryTelemetry
if q.parseQueryAnalyzeParam(r) {
engineType := engine.GetEngineType(ctx)
queryAnalysis, err = analyzeQueryOutput(qry, engineType)
}

warnings := res.Warnings
qs := q.statsRenderer(ctx, qry.Stats(), r.FormValue("stats"))

return apiFuncResult{&v1.QueryData{
return apiFuncResult{&QueryData{
ResultType: res.Value.Type(),
Result: res.Value,
Stats: qs,
Analysis: &queryAnalysis,
}, nil, warnings, qry.Close}
}

Expand Down Expand Up @@ -173,14 +191,20 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
if res.Err != nil {
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
}
var queryAnalysis analysis.QueryTelemetry
if q.parseQueryAnalyzeParam(r) {
engineType := engine.GetEngineType(ctx)
queryAnalysis, err = analyzeQueryOutput(qry, engineType)
}

warnings := res.Warnings
qs := q.statsRenderer(ctx, qry.Stats(), r.FormValue("stats"))

return apiFuncResult{&v1.QueryData{
return apiFuncResult{&QueryData{
ResultType: res.Value.Type(),
Result: res.Value,
Stats: qs,
Analysis: &queryAnalysis,
}, nil, warnings, qry.Close}
}

Expand Down Expand Up @@ -252,3 +276,41 @@ func (q *QueryAPI) negotiateCodec(req *http.Request, resp *v1.Response) (v1.Code

return defaultCodec, nil
}

func (q *QueryAPI) parseQueryAnalyzeParam(r *http.Request) bool {
return r.FormValue("analyze") == "true"
}

func analyzeQueryOutput(query promql.Query, engineType engine.Type) (analysis.QueryTelemetry, error) {
if eq, ok := query.(thanosengine.ExplainableQuery); ok {
if analyze := eq.Analyze(); analyze != nil {
return processAnalysis(analyze), nil
} else {
return analysis.QueryTelemetry{}, errors.Errorf("Query: %v not analyzable", query)
}
}

var warning error
if engineType == engine.Thanos {
warning = errors.New("Query fallback to prometheus engine; not analyzable.")
} else {
warning = errors.New("Query not analyzable; change engine to 'thanos'.")
}

return analysis.QueryTelemetry{}, warning
}

func processAnalysis(a *thanosengine.AnalyzeOutputNode) analysis.QueryTelemetry {
var analysis analysis.QueryTelemetry
analysis.OperatorName = a.OperatorTelemetry.String()
analysis.Execution = a.OperatorTelemetry.ExecutionTimeTaken().String()
analysis.SeriesExecution = a.OperatorTelemetry.SeriesExecutionTime().String()
analysis.SamplesExecution = a.OperatorTelemetry.NextExecutionTime().String()
analysis.Series = a.OperatorTelemetry.MaxSeriesCount()
analysis.PeakSamples = a.PeakSamples()
analysis.TotalSamples = a.TotalSamples()
for _, c := range a.Children {
analysis.Children = append(analysis.Children, processAnalysis(c))
}
return analysis
}
44 changes: 41 additions & 3 deletions pkg/querier/codec/protobuf_codec.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package codec

import (
"time"

"github.com/gogo/protobuf/proto"
jsoniter "github.com/json-iterator/go"
"github.com/prometheus/common/model"
Expand All @@ -9,8 +11,10 @@ import (
"github.com/prometheus/prometheus/util/stats"
v1 "github.com/prometheus/prometheus/web/api/v1"

"github.com/cortexproject/cortex/pkg/api/queryapi"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/util/analysis"
)

type ProtobufCodec struct {
Expand Down Expand Up @@ -48,7 +52,7 @@ func (p ProtobufCodec) Encode(resp *v1.Response) ([]byte, error) {
}

func createPrometheusQueryResponse(resp *v1.Response, cortexInternal bool) (*tripperware.PrometheusResponse, error) {
var data = resp.Data.(*v1.QueryData)
var data = resp.Data.(*queryapi.QueryData)

var queryResult tripperware.PrometheusQueryResult
switch string(data.ResultType) {
Expand Down Expand Up @@ -82,20 +86,54 @@ func createPrometheusQueryResponse(resp *v1.Response, cortexInternal bool) (*tri
stats = &tripperware.PrometheusResponseStats{Samples: getStats(&builtin)}
}

var analyze *tripperware.Analysis
if data.Analysis != nil {
analyze = queryTelemetryToAnalysis(data.Analysis)
}

return &tripperware.PrometheusResponse{
Status: string(resp.Status),
Data: tripperware.PrometheusData{
ResultType: string(data.ResultType),
Result: queryResult,
Stats: stats,
Analysis: analyze,
},
ErrorType: string(resp.ErrorType),
Error: resp.Error,
Warnings: resp.Warnings,
}, nil
}

func getMatrixSampleStreams(data *v1.QueryData) *[]tripperware.SampleStream {
func queryTelemetryToAnalysis(telemetry *analysis.QueryTelemetry) *tripperware.Analysis {
if telemetry == nil {
return nil
}

duration, _ := time.ParseDuration(telemetry.Execution)
seriesDuration, _ := time.ParseDuration(telemetry.SeriesExecution)
samplesDuration, _ := time.ParseDuration(telemetry.SamplesExecution)
result := &tripperware.Analysis{
Name: telemetry.OperatorName,
ExecutionTime: tripperware.Duration(duration),
SeriesExecutionTime: tripperware.Duration(seriesDuration),
SamplesExecutionTime: tripperware.Duration(samplesDuration),
Series: int64(telemetry.Series),
PeakSamples: telemetry.PeakSamples,
TotalSamples: telemetry.TotalSamples,
}

if len(telemetry.Children) > 0 {
result.Children = make([]*tripperware.Analysis, len(telemetry.Children))
for i, child := range telemetry.Children {
result.Children[i] = queryTelemetryToAnalysis(&child)
}
}

return result
}

func getMatrixSampleStreams(data *queryapi.QueryData) *[]tripperware.SampleStream {
sampleStreamsLen := len(data.Result.(promql.Matrix))
sampleStreams := make([]tripperware.SampleStream, sampleStreamsLen)

Expand Down Expand Up @@ -150,7 +188,7 @@ func getMatrixSampleStreams(data *v1.QueryData) *[]tripperware.SampleStream {
return &sampleStreams
}

func getVectorSamples(data *v1.QueryData, cortexInternal bool) *[]tripperware.Sample {
func getVectorSamples(data *queryapi.QueryData, cortexInternal bool) *[]tripperware.Sample {
vectorSamplesLen := len(data.Result.(promql.Vector))
vectorSamples := make([]tripperware.Sample, vectorSamplesLen)

Expand Down
5 changes: 5 additions & 0 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for

result.Query = r.FormValue("query")
result.Stats = r.FormValue("stats")
result.Analyze = r.FormValue("analyze")
result.Path = r.URL.Path

isSourceRuler := strings.Contains(r.Header.Get("User-Agent"), tripperware.RulerUserAgent)
Expand Down Expand Up @@ -155,6 +156,10 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ
params.Add("stats", promReq.Stats)
}

if promReq.Analyze != "" {
params.Add("analyze", promReq.Analyze)
}

u := &url.URL{
Path: promReq.Path,
RawQuery: params.Encode(),
Expand Down
51 changes: 48 additions & 3 deletions pkg/querier/tripperware/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,18 @@ func MergeResponse(ctx context.Context, sumStats bool, req Request, responses ..
promResponses := make([]*PrometheusResponse, 0, len(responses))
warnings := make([][]string, 0, len(responses))
infos := make([][]string, 0, len(responses))
for _, resp := range responses {
analyzes := make([]*Analysis, 0, len(responses))
for i, resp := range responses {
promResponses = append(promResponses, resp.(*PrometheusResponse))
if w := resp.(*PrometheusResponse).Warnings; w != nil {
warnings = append(warnings, w)
}
if i := resp.(*PrometheusResponse).Infos; i != nil {
infos = append(infos, i)
}
if promResponses[i].GetData().Analysis != nil {
analyzes = append(analyzes, promResponses[i].GetData().Analysis)
}
}

// Check if it is a range query. Range query passed req as nil since
Expand All @@ -73,7 +77,8 @@ func MergeResponse(ctx context.Context, sumStats bool, req Request, responses ..
Vector: v,
},
},
Stats: statsMerge(sumStats, promResponses),
Stats: statsMerge(sumStats, promResponses),
Analysis: AnalyzesMerge(analyzes...),
}
case model.ValMatrix.String():
sampleStreams, err := matrixMerge(ctx, promResponses)
Expand All @@ -90,7 +95,8 @@ func MergeResponse(ctx context.Context, sumStats bool, req Request, responses ..
},
},
},
Stats: statsMerge(sumStats, promResponses),
Stats: statsMerge(sumStats, promResponses),
Analysis: AnalyzesMerge(analyzes...),
}
default:
return nil, fmt.Errorf("unexpected result type: %s", promResponses[0].Data.ResultType)
Expand Down Expand Up @@ -258,6 +264,45 @@ func statsMerge(shouldSumStats bool, resps []*PrometheusResponse) *PrometheusRes
return result
}

func traverseAnalysis(a *Analysis, results *[]*Analysis) {
if a == nil {
return
}

*results = append(*results, a)

for _, ch := range a.Children {
traverseAnalysis(ch, results)
}
}

func AnalyzesMerge(analysis ...*Analysis) *Analysis {
if len(analysis) == 0 {
return &Analysis{}
}

root := analysis[0]

var rootElements []*Analysis
traverseAnalysis(root, &rootElements)

for _, a := range analysis[1:] {
var elements []*Analysis
traverseAnalysis(a, &elements)

for i := 0; i < len(elements) && i < len(rootElements); i++ {
rootElements[i].ExecutionTime += analysis[i].ExecutionTime
rootElements[i].SeriesExecutionTime += analysis[i].SeriesExecutionTime
rootElements[i].SamplesExecutionTime += analysis[i].SamplesExecutionTime
rootElements[i].Series += analysis[i].Series
rootElements[i].PeakSamples += analysis[i].PeakSamples
rootElements[i].TotalSamples += analysis[i].TotalSamples
}
}

return root
}

type sortPlan int

const (
Expand Down
Loading
Loading