Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
* [BUGFIX] Query-frontend: Fix silent panic when executing a remote read API request if the request has no matchers. #13745
* [BUGFIX] Ruler: Fixed `-ruler.max-rule-groups-per-tenant-by-namespace` to only count rule groups in the specified namespace instead of all namespaces. #13743
* [BUGFIX] Update to Go v1.25.5 to address [CVE-2025-61729](https://pkg.go.dev/vuln/GO-2025-4155). #13755
* [BUGFIX] MQE: Map remote execution storage errors correctly. #13947

### Mixin

Expand Down
6 changes: 4 additions & 2 deletions pkg/querier/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var errNoNodesInRequest = errors.New("request contains no nodes to evaluate")

type Dispatcher struct {
engine *streamingpromql.Engine
queryable storage.Queryable
queryable storage.SampleAndChunkQueryable
extractor propagation.Extractor
logger log.Logger

Expand All @@ -63,7 +63,9 @@ type Dispatcher struct {
timeNow func() time.Time
}

func NewDispatcher(engine *streamingpromql.Engine, queryable storage.Queryable, querierMetrics *RequestMetrics, serverMetrics *server.Metrics, extractor propagation.Extractor, logger log.Logger) *Dispatcher {
func NewDispatcher(engine *streamingpromql.Engine, queryable storage.SampleAndChunkQueryable, querierMetrics *RequestMetrics, serverMetrics *server.Metrics, extractor propagation.Extractor, logger log.Logger) *Dispatcher {
queryable = NewErrorTranslateSampleAndChunkQueryable(queryable)

return &Dispatcher{
engine: engine,
queryable: queryable,
Expand Down
110 changes: 110 additions & 0 deletions pkg/querier/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
prototypes "github.com/gogo/protobuf/types"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/server"
"github.com/grafana/dskit/user"
"github.com/grafana/regexp"
Expand Down Expand Up @@ -1501,3 +1502,112 @@ func newRangeVectorStepDataMessage(data querierpb.EvaluateQueryResponseRangeVect
},
}
}

func TestDispatcher_RingErrorTranslation(t *testing.T) {
opts := streamingpromql.NewTestEngineOpts()
opts.Pedantic = true
planner, err := streamingpromql.NewQueryPlanner(opts, streamingpromql.NewMaximumSupportedVersionQueryPlanVersionProvider())
require.NoError(t, err)
engine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), planner)
require.NoError(t, err)

startT := timestamp.Time(0)

testCases := map[string]struct {
storageError error
expectedErrorMessage string
expectedErrorType mimirpb.QueryErrorType
}{
"ring error - too many unhealthy instances": {
storageError: ring.ErrTooManyUnhealthyInstances,
expectedErrorMessage: "too many unhealthy instances in the ring",
expectedErrorType: mimirpb.QUERY_ERROR_TYPE_INTERNAL,
},
"ring error - too many unhealthy instances (wrapped)": {
storageError: fmt.Errorf("partition 1: %w", ring.ErrTooManyUnhealthyInstances),
expectedErrorMessage: "partition 1: too many unhealthy instances in the ring",
expectedErrorType: mimirpb.QUERY_ERROR_TYPE_INTERNAL,
},
"ring error - empty ring": {
storageError: ring.ErrEmptyRing,
expectedErrorMessage: "empty ring",
expectedErrorType: mimirpb.QUERY_ERROR_TYPE_INTERNAL,
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
tenantID := "tenant-1"
ctx := user.InjectOrgID(context.Background(), tenantID)
_, ctx = stats.ContextWithEmptyStats(ctx)

errorStorage := &errorReturningStorage{err: testCase.storageError}

plan, err := planner.NewQueryPlan(context.Background(), `my_series`, types.NewInstantQueryTimeRange(startT), streamingpromql.NoopPlanningObserver{})
require.NoError(t, err)

encodedPlan, nodeIndices, err := plan.ToEncodedPlan(false, true, plan.Root)
require.NoError(t, err)

body := &querierpb.EvaluateQueryRequest{
Plan: *encodedPlan,
Nodes: []querierpb.EvaluationNode{
{
TimeRange: encodedPlan.TimeRange,
NodeIndex: nodeIndices[0],
},
},
BatchSize: 1,
}

req, err := prototypes.MarshalAny(body)
require.NoError(t, err)

route, err := prototypes.AnyMessageName(req)
require.NoError(t, err)

reg, requestMetrics, serverMetrics := newMetrics()
stream := &mockQueryResultStream{t: t, route: route, reg: reg}
dispatcher := NewDispatcher(engine, errorStorage, requestMetrics, serverMetrics, &propagation.NoopExtractor{}, opts.Logger)

dispatcher.HandleProtobuf(ctx, req, propagation.MapCarrier{}, stream)

require.Len(t, stream.messages, 1)
require.NotNil(t, stream.messages[0].GetError())
require.Equal(t, testCase.expectedErrorType, stream.messages[0].GetError().Type)
require.Contains(t, stream.messages[0].GetError().Message, testCase.expectedErrorMessage)
})
}
}

type errorReturningStorage struct {
err error
}

func (e *errorReturningStorage) Querier(mint, maxt int64) (storage.Querier, error) {
return &errorReturningQuerier{err: e.err}, nil
}

func (e *errorReturningStorage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
return nil, errors.New("chunk querier not supported")
}

type errorReturningQuerier struct {
err error
}

func (e *errorReturningQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
return storage.ErrSeriesSet(e.err)
}

func (e *errorReturningQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, e.err
}

func (e *errorReturningQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, e.err
}

func (e *errorReturningQuerier) Close() error {
return nil
}
Loading