diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d19b0dd6dd..e7292f0713a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -127,6 +127,7 @@ * [BUGFIX] Query-frontend: Fix silent panic when executing a remote read API request if the request has no matchers. #13745 * [BUGFIX] Ruler: Fixed `-ruler.max-rule-groups-per-tenant-by-namespace` to only count rule groups in the specified namespace instead of all namespaces. #13743 * [BUGFIX] Query-frontend: Fix race condition that could sometimes cause unnecessary resharding of queriers if querier shuffle sharding and remote execution is enabled. #13794 #13838 +* [BUGFIX] MQE: Map remote execution storage errors correctly. #13949 ### Mixin diff --git a/pkg/querier/dispatcher.go b/pkg/querier/dispatcher.go index b70bf16266d..2ab7e194e9e 100644 --- a/pkg/querier/dispatcher.go +++ b/pkg/querier/dispatcher.go @@ -49,7 +49,7 @@ var errNoNodesInRequest = errors.New("request contains no nodes to evaluate") type Dispatcher struct { engine *streamingpromql.Engine - queryable storage.Queryable + queryable storage.SampleAndChunkQueryable extractor propagation.Extractor logger log.Logger @@ -63,7 +63,9 @@ type Dispatcher struct { timeNow func() time.Time } -func NewDispatcher(engine *streamingpromql.Engine, queryable storage.Queryable, querierMetrics *RequestMetrics, serverMetrics *server.Metrics, extractor propagation.Extractor, logger log.Logger) *Dispatcher { +func NewDispatcher(engine *streamingpromql.Engine, queryable storage.SampleAndChunkQueryable, querierMetrics *RequestMetrics, serverMetrics *server.Metrics, extractor propagation.Extractor, logger log.Logger) *Dispatcher { + queryable = NewErrorTranslateSampleAndChunkQueryable(queryable) + return &Dispatcher{ engine: engine, queryable: queryable, diff --git a/pkg/querier/dispatcher_test.go b/pkg/querier/dispatcher_test.go index 543745a126c..19f466fb284 100644 --- a/pkg/querier/dispatcher_test.go +++ b/pkg/querier/dispatcher_test.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/gogo/protobuf/proto" prototypes "github.com/gogo/protobuf/types" + "github.com/grafana/dskit/ring" "github.com/grafana/dskit/server" "github.com/grafana/dskit/user" "github.com/grafana/regexp" @@ -1501,3 +1502,112 @@ func newRangeVectorStepDataMessage(data querierpb.EvaluateQueryResponseRangeVect }, } } + +func TestDispatcher_RingErrorTranslation(t *testing.T) { + opts := streamingpromql.NewTestEngineOpts() + opts.Pedantic = true + planner, err := streamingpromql.NewQueryPlanner(opts, streamingpromql.NewMaximumSupportedVersionQueryPlanVersionProvider()) + require.NoError(t, err) + engine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), planner) + require.NoError(t, err) + + startT := timestamp.Time(0) + + testCases := map[string]struct { + storageError error + expectedErrorMessage string + expectedErrorType mimirpb.QueryErrorType + }{ + "ring error - too many unhealthy instances": { + storageError: ring.ErrTooManyUnhealthyInstances, + expectedErrorMessage: "too many unhealthy instances in the ring", + expectedErrorType: mimirpb.QUERY_ERROR_TYPE_INTERNAL, + }, + "ring error - too many unhealthy instances (wrapped)": { + storageError: fmt.Errorf("partition 1: %w", ring.ErrTooManyUnhealthyInstances), + expectedErrorMessage: "partition 1: too many unhealthy instances in the ring", + expectedErrorType: mimirpb.QUERY_ERROR_TYPE_INTERNAL, + }, + "ring error - empty ring": { + storageError: ring.ErrEmptyRing, + expectedErrorMessage: "empty ring", + expectedErrorType: mimirpb.QUERY_ERROR_TYPE_INTERNAL, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + tenantID := "tenant-1" + ctx := user.InjectOrgID(context.Background(), tenantID) + _, ctx = stats.ContextWithEmptyStats(ctx) + + errorStorage := &errorReturningStorage{err: testCase.storageError} + + plan, err := planner.NewQueryPlan(context.Background(), `my_series`, types.NewInstantQueryTimeRange(startT), streamingpromql.NoopPlanningObserver{}) + require.NoError(t, err) + + encodedPlan, nodeIndices, err := plan.ToEncodedPlan(false, true, plan.Root) + require.NoError(t, err) + + body := &querierpb.EvaluateQueryRequest{ + Plan: *encodedPlan, + Nodes: []querierpb.EvaluationNode{ + { + TimeRange: encodedPlan.TimeRange, + NodeIndex: nodeIndices[0], + }, + }, + BatchSize: 1, + } + + req, err := prototypes.MarshalAny(body) + require.NoError(t, err) + + route, err := prototypes.AnyMessageName(req) + require.NoError(t, err) + + reg, requestMetrics, serverMetrics := newMetrics() + stream := &mockQueryResultStream{t: t, route: route, reg: reg} + dispatcher := NewDispatcher(engine, errorStorage, requestMetrics, serverMetrics, &propagation.NoopExtractor{}, opts.Logger) + + dispatcher.HandleProtobuf(ctx, req, propagation.MapCarrier{}, stream) + + require.Len(t, stream.messages, 1) + require.NotNil(t, stream.messages[0].GetError()) + require.Equal(t, testCase.expectedErrorType, stream.messages[0].GetError().Type) + require.Contains(t, stream.messages[0].GetError().Message, testCase.expectedErrorMessage) + }) + } +} + +type errorReturningStorage struct { + err error +} + +func (e *errorReturningStorage) Querier(mint, maxt int64) (storage.Querier, error) { + return &errorReturningQuerier{err: e.err}, nil +} + +func (e *errorReturningStorage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { + return nil, errors.New("chunk querier not supported") +} + +type errorReturningQuerier struct { + err error +} + +func (e *errorReturningQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + return storage.ErrSeriesSet(e.err) +} + +func (e *errorReturningQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, e.err +} + +func (e *errorReturningQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, e.err +} + +func (e *errorReturningQuerier) Close() error { + return nil +}