Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
* [BUGFIX] Query-frontend: Fix incorrect query results when running sharding inside MQE is enabled and the query contains a subquery eligible for subquery spin-off wrapped in a shardable aggregation. #13619
* [BUGFIX] Memberlist: Fix occasional nil pointer dereference panics. #13635
* [BUGFIX] Query-frontend: Fix silent panic when executing a remote read API request if the request has no matchers. #13745
* [BUGFIX] MQE: Map remote execution storage errors correctly. #13946

### Mixin

Expand Down
6 changes: 4 additions & 2 deletions pkg/querier/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var errZeroBatchSize = errors.New("requested batch size cannot be 0 for an insta

type Dispatcher struct {
engine *streamingpromql.Engine
queryable storage.Queryable
queryable storage.SampleAndChunkQueryable
extractor propagation.Extractor
logger log.Logger

Expand All @@ -60,7 +60,9 @@ type Dispatcher struct {
timeNow func() time.Time
}

func NewDispatcher(engine *streamingpromql.Engine, queryable storage.Queryable, querierMetrics *RequestMetrics, serverMetrics *server.Metrics, extractor propagation.Extractor, logger log.Logger) *Dispatcher {
func NewDispatcher(engine *streamingpromql.Engine, queryable storage.SampleAndChunkQueryable, querierMetrics *RequestMetrics, serverMetrics *server.Metrics, extractor propagation.Extractor, logger log.Logger) *Dispatcher {
queryable = NewErrorTranslateSampleAndChunkQueryable(queryable)

return &Dispatcher{
engine: engine,
queryable: queryable,
Expand Down
109 changes: 109 additions & 0 deletions pkg/querier/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
prototypes "github.com/gogo/protobuf/types"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/server"
"github.com/grafana/dskit/user"
"github.com/grafana/regexp"
Expand Down Expand Up @@ -1309,3 +1310,111 @@ func TestQueryResponseWriter_WriteError(t *testing.T) {
})
}
}

func TestDispatcher_RingErrorTranslation(t *testing.T) {
opts := streamingpromql.NewTestEngineOpts()
opts.Pedantic = true
planner, err := streamingpromql.NewQueryPlanner(opts, streamingpromql.NewMaximumSupportedVersionQueryPlanVersionProvider())
require.NoError(t, err)
engine, err := streamingpromql.NewEngine(opts, streamingpromql.NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), planner)
require.NoError(t, err)

startT := timestamp.Time(0)

testCases := map[string]struct {
storageError error
expectedErrorMessage string
expectedErrorType mimirpb.QueryErrorType
}{
"ring error - too many unhealthy instances": {
storageError: ring.ErrTooManyUnhealthyInstances,
expectedErrorMessage: "too many unhealthy instances in the ring",
expectedErrorType: mimirpb.QUERY_ERROR_TYPE_INTERNAL,
},
"ring error - too many unhealthy instances (wrapped)": {
storageError: fmt.Errorf("partition 1: %w", ring.ErrTooManyUnhealthyInstances),
expectedErrorMessage: "partition 1: too many unhealthy instances in the ring",
expectedErrorType: mimirpb.QUERY_ERROR_TYPE_INTERNAL,
},
"ring error - empty ring": {
storageError: ring.ErrEmptyRing,
expectedErrorMessage: "empty ring",
expectedErrorType: mimirpb.QUERY_ERROR_TYPE_INTERNAL,
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
tenantID := "tenant-1"
ctx := user.InjectOrgID(context.Background(), tenantID)
_, ctx = stats.ContextWithEmptyStats(ctx)

errorStorage := &errorReturningStorage{err: testCase.storageError}

plan, err := planner.NewQueryPlan(context.Background(), `my_series`, types.NewInstantQueryTimeRange(startT), streamingpromql.NoopPlanningObserver{})
require.NoError(t, err)

encodedPlan, err := plan.ToEncodedPlan(false, true)
require.NoError(t, err)

body := &querierpb.EvaluateQueryRequest{
Plan: *encodedPlan,
Nodes: []querierpb.EvaluationNode{
{
TimeRange: encodedPlan.TimeRange,
},
},
BatchSize: 1,
}

req, err := prototypes.MarshalAny(body)
require.NoError(t, err)

route, err := prototypes.AnyMessageName(req)
require.NoError(t, err)

reg, requestMetrics, serverMetrics := newMetrics()
stream := &mockQueryResultStream{t: t, route: route, reg: reg}
dispatcher := NewDispatcher(engine, errorStorage, requestMetrics, serverMetrics, &propagation.NoopExtractor{}, opts.Logger)

dispatcher.HandleProtobuf(ctx, req, propagation.MapCarrier{}, stream)

require.Len(t, stream.messages, 1)
require.NotNil(t, stream.messages[0].GetError())
require.Equal(t, testCase.expectedErrorType, stream.messages[0].GetError().Type)
require.Contains(t, stream.messages[0].GetError().Message, testCase.expectedErrorMessage)
})
}
}

type errorReturningStorage struct {
err error
}

func (e *errorReturningStorage) Querier(mint, maxt int64) (storage.Querier, error) {
return &errorReturningQuerier{err: e.err}, nil
}

func (e *errorReturningStorage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
return nil, errors.New("chunk querier not supported")
}

type errorReturningQuerier struct {
err error
}

func (e *errorReturningQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
return storage.ErrSeriesSet(e.err)
}

func (e *errorReturningQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, e.err
}

func (e *errorReturningQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, e.err
}

func (e *errorReturningQuerier) Close() error {
return nil
}