diff --git a/CHANGELOG.md b/CHANGELOG.md index 969e5fe837c..65faeed6732 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -85,6 +85,7 @@ * [BUGFIX] Query-frontend: Fix excessive CPU and memory consumption when running sharding inside MQE. #13580 * [BUGFIX] Query-frontend: Fix incorrect query results when running sharding inside MQE is enabled and the query contains a subquery eligible for subquery spin-off wrapped in a shardable aggregation. #13619 * [BUGFIX] Query-frontend: Fix silent panic when executing a remote read API request if the request has no matchers. #13745 +* [BUGFIX] MQE: Map remote execution storage errors correctly. #13945 ### Mixin diff --git a/pkg/querier/dispatcher.go b/pkg/querier/dispatcher.go index d93c08e2a13..99a5101a35d 100644 --- a/pkg/querier/dispatcher.go +++ b/pkg/querier/dispatcher.go @@ -46,7 +46,7 @@ var errZeroBatchSize = errors.New("requested batch size cannot be 0 for an insta type Dispatcher struct { engine *streamingpromql.Engine - queryable storage.Queryable + queryable storage.SampleAndChunkQueryable extractor propagation.Extractor logger log.Logger @@ -60,7 +60,9 @@ type Dispatcher struct { timeNow func() time.Time } -func NewDispatcher(engine *streamingpromql.Engine, queryable storage.Queryable, querierMetrics *RequestMetrics, serverMetrics *server.Metrics, extractor propagation.Extractor, logger log.Logger) *Dispatcher { +func NewDispatcher(engine *streamingpromql.Engine, queryable storage.SampleAndChunkQueryable, querierMetrics *RequestMetrics, serverMetrics *server.Metrics, extractor propagation.Extractor, logger log.Logger) *Dispatcher { + queryable = NewErrorTranslateSampleAndChunkQueryable(queryable) + return &Dispatcher{ engine: engine, queryable: queryable, diff --git a/pkg/querier/dispatcher_test.go b/pkg/querier/dispatcher_test.go index c2ca486c30b..c6b4e3f2885 100644 --- a/pkg/querier/dispatcher_test.go +++ b/pkg/querier/dispatcher_test.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/gogo/protobuf/proto" prototypes "github.com/gogo/protobuf/types" + "github.com/grafana/dskit/ring" "github.com/grafana/dskit/server" "github.com/grafana/dskit/user" "github.com/grafana/regexp" @@ -1402,3 +1403,111 @@ func TestQueryResponseWriter_WriteError(t *testing.T) { }) } } + +func TestDispatcher_RingErrorTranslation(t *testing.T) { + opts := streamingpromql.NewTestEngineOpts() + opts.Pedantic = true + planner, err := streamingpromql.NewQueryPlanner(opts, streamingpromql.NewMaximumSupportedVersionQueryPlanVersionProvider()) + require.NoError(t, err) + engine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), planner) + require.NoError(t, err) + + startT := timestamp.Time(0) + + testCases := map[string]struct { + storageError error + expectedErrorMessage string + expectedErrorType mimirpb.QueryErrorType + }{ + "ring error - too many unhealthy instances": { + storageError: ring.ErrTooManyUnhealthyInstances, + expectedErrorMessage: "too many unhealthy instances in the ring", + expectedErrorType: mimirpb.QUERY_ERROR_TYPE_INTERNAL, + }, + "ring error - too many unhealthy instances (wrapped)": { + storageError: fmt.Errorf("partition 1: %w", ring.ErrTooManyUnhealthyInstances), + expectedErrorMessage: "partition 1: too many unhealthy instances in the ring", + expectedErrorType: mimirpb.QUERY_ERROR_TYPE_INTERNAL, + }, + "ring error - empty ring": { + storageError: ring.ErrEmptyRing, + expectedErrorMessage: "empty ring", + expectedErrorType: mimirpb.QUERY_ERROR_TYPE_INTERNAL, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + tenantID := "tenant-1" + ctx := user.InjectOrgID(context.Background(), tenantID) + _, ctx = stats.ContextWithEmptyStats(ctx) + + errorStorage := &errorReturningStorage{err: testCase.storageError} + + plan, err := planner.NewQueryPlan(context.Background(), `my_series`, types.NewInstantQueryTimeRange(startT), streamingpromql.NoopPlanningObserver{}) + require.NoError(t, err) + + encodedPlan, err := plan.ToEncodedPlan(false, true) + require.NoError(t, err) + + body := &querierpb.EvaluateQueryRequest{ + Plan: *encodedPlan, + Nodes: []querierpb.EvaluationNode{ + { + TimeRange: encodedPlan.TimeRange, + }, + }, + BatchSize: 1, + } + + req, err := prototypes.MarshalAny(body) + require.NoError(t, err) + + route, err := prototypes.AnyMessageName(req) + require.NoError(t, err) + + reg, requestMetrics, serverMetrics := newMetrics() + stream := &mockQueryResultStream{t: t, route: route, reg: reg} + dispatcher := NewDispatcher(engine, errorStorage, requestMetrics, serverMetrics, &propagation.NoopExtractor{}, opts.Logger) + + dispatcher.HandleProtobuf(ctx, req, propagation.MapCarrier{}, stream) + + require.Len(t, stream.messages, 1) + require.NotNil(t, stream.messages[0].GetError()) + require.Equal(t, testCase.expectedErrorType, stream.messages[0].GetError().Type) + require.Contains(t, stream.messages[0].GetError().Message, testCase.expectedErrorMessage) + }) + } +} + +type errorReturningStorage struct { + err error +} + +func (e *errorReturningStorage) Querier(mint, maxt int64) (storage.Querier, error) { + return &errorReturningQuerier{err: e.err}, nil +} + +func (e *errorReturningStorage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { + return nil, errors.New("chunk querier not supported") +} + +type errorReturningQuerier struct { + err error +} + +func (e *errorReturningQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + return storage.ErrSeriesSet(e.err) +} + +func (e *errorReturningQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, e.err +} + +func (e *errorReturningQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, e.err +} + +func (e *errorReturningQuerier) Close() error { + return nil +}