Skip to content

Commit 80c89fd

Browse files
authored
query: distributed engine - allow querying overlapping intervals (commit signed) (thanos-io#8003)
* chore: add possibility to run individual e2e tests Signed-off-by: Abel Simon <[email protected]> * chore: add metric pointer for too old sample logs Signed-off-by: Abel Simon <[email protected]> * feat: add flag for distributed queries with overlapping intervals Signed-off-by: Abel Simon <[email protected]> * chore: add failing overlapping interval test Signed-off-by: Abel Simon <[email protected]> * chore: fix base branch diff Signed-off-by: Abel Simon <[email protected]> --------- Signed-off-by: Abel Simon <[email protected]>
1 parent 1b58ed1 commit 80c89fd

File tree

7 files changed

+245
-107
lines changed

7 files changed

+245
-107
lines changed

Makefile

+5-1
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,11 @@ test-e2e: docker-e2e $(GOTESPLIT)
340340
# NOTE(GiedriusS):
341341
# * If you want to limit CPU time available in e2e tests then pass E2E_DOCKER_CPUS environment variable. For example, E2E_DOCKER_CPUS=0.05 limits CPU time available
342342
# to spawned Docker containers to 0.05 cores.
343-
@$(GOTESPLIT) -total ${GH_PARALLEL} -index ${GH_INDEX} ./test/e2e/... -- ${GOTEST_OPTS}
343+
@if [ -n "$(SINGLE_E2E_TEST)" ]; then \
344+
$(GOTESPLIT) -total ${GH_PARALLEL} -index ${GH_INDEX} ./test/e2e -- -run $(SINGLE_E2E_TEST) ${GOTEST_OPTS}; \
345+
else \
346+
$(GOTESPLIT) -total ${GH_PARALLEL} -index ${GH_INDEX} ./test/e2e/... -- ${GOTEST_OPTS}; \
347+
fi
344348

345349
.PHONY: test-e2e-local
346350
test-e2e-local: ## Runs all thanos e2e tests locally.

cmd/thanos/query.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ func registerQuery(app *extkingpin.App) {
128128
Strings()
129129
queryPartitionLabels := cmd.Flag("query.partition-label", "Labels that partition the leaf queriers. This is used to scope down the labelsets of leaf queriers when using the distributed query mode. If set, these labels must form a partition of the leaf queriers. Partition labels must not intersect with replica labels. Every TSDB of a leaf querier must have these labels. This is useful when there are multiple external labels that are irrelevant for the partition as it allows the distributed engine to ignore them for some optimizations. If this is empty then all labels are used as partition labels.").Strings()
130130

131+
// currently, we choose the highest MinT of an engine when querying multiple engines. This flag allows to change this behavior to choose the lowest MinT.
132+
queryDistributedWithOverlappingInterval := cmd.Flag("query.distributed-with-overlapping-interval", "Allow for distributed queries using an engines lowest MinT.").Hidden().Default("false").Bool()
133+
131134
instantDefaultMaxSourceResolution := extkingpin.ModelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())
132135

133136
defaultMetadataTimeRange := cmd.Flag("query.metadata.default-time-range", "The default metadata time range duration for retrieving labels through Labels and Series API when the range parameters are not specified. The zero value means range covers the time since the beginning.").Default("0s").Duration()
@@ -372,12 +375,13 @@ func registerQuery(app *extkingpin.App) {
372375
*tenantCertField,
373376
*enforceTenancy,
374377
*tenantLabel,
378+
*queryDistributedWithOverlappingInterval,
375379
)
376380
})
377381
}
378382

379383
// runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured
380-
// store nodes, merging and duplicating the data to satisfy user query.
384+
// store nodes, merging and deduplicating the data to satisfy user query.
381385
func runQuery(
382386
g *run.Group,
383387
logger log.Logger,
@@ -454,6 +458,7 @@ func runQuery(
454458
tenantCertField string,
455459
enforceTenancy bool,
456460
tenantLabel string,
461+
queryDistributedWithOverlappingInterval bool,
457462
) error {
458463
comp := component.Query
459464
if alertQueryURL == "" {
@@ -694,11 +699,12 @@ func runQuery(
694699
level.Info(logger).Log("msg", "Distributed query mode enabled, using Thanos as the default query engine.")
695700
defaultEngine = string(apiv1.PromqlEngineThanos)
696701
remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{
697-
AutoDownsample: enableAutodownsampling,
698-
ReplicaLabels: queryReplicaLabels,
699-
PartitionLabels: queryPartitionLabels,
700-
Timeout: queryTimeout,
701-
EnablePartialResponse: enableQueryPartialResponse,
702+
AutoDownsample: enableAutodownsampling,
703+
ReplicaLabels: queryReplicaLabels,
704+
PartitionLabels: queryPartitionLabels,
705+
Timeout: queryTimeout,
706+
EnablePartialResponse: enableQueryPartialResponse,
707+
QueryDistributedWithOverlappingInterval: queryDistributedWithOverlappingInterval,
702708
})
703709
}
704710

pkg/query/remote_engine.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@ import (
3333

3434
// Opts are the options for a PromQL query.
3535
type Opts struct {
36-
AutoDownsample bool
37-
ReplicaLabels []string
38-
PartitionLabels []string
39-
Timeout time.Duration
40-
EnablePartialResponse bool
36+
AutoDownsample bool
37+
ReplicaLabels []string
38+
PartitionLabels []string
39+
Timeout time.Duration
40+
EnablePartialResponse bool
41+
QueryDistributedWithOverlappingInterval bool
4142
}
4243

4344
// Client is a query client that executes PromQL queries.
@@ -114,6 +115,7 @@ func NewRemoteEngine(logger log.Logger, queryClient Client, opts Opts) *remoteEn
114115
// a block due to retention before other replicas did the same.
115116
// See https://github.com/thanos-io/promql-engine/issues/187.
116117
func (r *remoteEngine) MinT() int64 {
118+
117119
r.mintOnce.Do(func() {
118120
var (
119121
hashBuf = make([]byte, 0, 128)
@@ -126,7 +128,11 @@ func (r *remoteEngine) MinT() int64 {
126128
highestMintByLabelSet[key] = lset.MinTime
127129
continue
128130
}
129-
if lset.MinTime > lsetMinT {
131+
// If we are querying with overlapping intervals, we want to find the first available timestamp
132+
// otherwise we want to find the last available timestamp.
133+
if r.opts.QueryDistributedWithOverlappingInterval && lset.MinTime < lsetMinT {
134+
highestMintByLabelSet[key] = lset.MinTime
135+
} else if !r.opts.QueryDistributedWithOverlappingInterval && lset.MinTime > lsetMinT {
130136
highestMintByLabelSet[key] = lset.MinTime
131137
}
132138
}

pkg/receive/writer_errors.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ func (a *writeErrorTracker) addSampleError(err error, tLogger log.Logger, lset l
6565
level.Debug(tLogger).Log("msg", "Out of bounds metric", "lset", lset, "value", v, "timestamp", t)
6666
case errors.Is(err, storage.ErrTooOldSample):
6767
a.numSamplesTooOld++
68-
level.Debug(tLogger).Log("msg", "Sample is too old", "lset", lset, "value", v, "timestamp", t)
68+
// we could pass in current head max time, but in case that is not updated, maxTime would be < current time
69+
// so we can just point to the metric that shows the current head max time
70+
level.Debug(tLogger).Log("msg", "Sample is too old", "lset", lset, "value", v, "timestamp", t, "for current latest, check prometheus_tsdb_head_max_time metric")
6971
default:
7072
level.Debug(tLogger).Log("msg", "Error ingesting sample", "err", err)
7173
}

test/e2e/distributed_query_test.go

+202
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,28 @@ package e2e_test
55

66
import (
77
"context"
8+
"os"
9+
"path"
10+
"path/filepath"
811
"testing"
912
"time"
1013

1114
"github.com/efficientgo/core/testutil"
1215
"github.com/efficientgo/e2e"
16+
e2edb "github.com/efficientgo/e2e/db"
17+
"github.com/go-kit/log"
18+
"github.com/pkg/errors"
1319
"github.com/prometheus/common/model"
20+
"github.com/prometheus/prometheus/model/labels"
21+
"github.com/prometheus/prometheus/model/timestamp"
1422

23+
"github.com/thanos-io/objstore"
24+
"github.com/thanos-io/objstore/client"
25+
"github.com/thanos-io/objstore/providers/s3"
26+
v1 "github.com/thanos-io/thanos/pkg/api/query"
27+
"github.com/thanos-io/thanos/pkg/block/metadata"
1528
"github.com/thanos-io/thanos/pkg/promclient"
29+
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
1630
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
1731
)
1832

@@ -82,3 +96,191 @@ func TestDistributedQueryExecution(t *testing.T) {
8296
},
8397
})
8498
}
99+
100+
func TestDistributedEngineWithOverlappingIntervalsEnabled(t *testing.T) {
101+
t.Parallel()
102+
103+
e, err := e2e.New(e2e.WithName("dist-disj-tsdbs"))
104+
testutil.Ok(t, err)
105+
t.Cleanup(e2ethanos.CleanScenario(t, e))
106+
107+
ctx := context.Background()
108+
l := log.NewLogfmtLogger(os.Stdout)
109+
now := time.Now()
110+
111+
bucket1 := "dist-disj-tsdbs-test1"
112+
minio1 := e2edb.NewMinio(e, "1", bucket1, e2edb.WithMinioTLS())
113+
testutil.Ok(t, e2e.StartAndWaitReady(minio1))
114+
115+
bkt1, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket1, minio1.Endpoint("http"), minio1.Dir()), "test", nil)
116+
testutil.Ok(t, err)
117+
118+
// Setup a storage GW with 2 blocks that have a gap to trigger distributed query MinT bug
119+
dir1 := filepath.Join(e.SharedDir(), "tmp1")
120+
testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir1), os.ModePerm))
121+
blockID1, err := e2eutil.CreateBlockWithBlockDelay(ctx,
122+
dir1,
123+
[]labels.Labels{labels.FromStrings("__name__", "foo", "instance", "foo_1")},
124+
1000,
125+
timestamp.FromTime(now.Add(-10*time.Hour)),
126+
timestamp.FromTime(now.Add(-8*time.Hour)),
127+
30*time.Minute,
128+
labels.FromStrings("prometheus", "p1", "replica", "0"),
129+
0,
130+
metadata.NoneFunc,
131+
)
132+
testutil.Ok(t, err)
133+
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt1, path.Join(dir1, blockID1.String()), blockID1.String()))
134+
135+
blockID2, err := e2eutil.CreateBlockWithBlockDelay(ctx,
136+
dir1,
137+
[]labels.Labels{labels.FromStrings("__name__", "foo", "instance", "foo_1")},
138+
1000,
139+
timestamp.FromTime(now.Add(-4*time.Hour)),
140+
timestamp.FromTime(now.Add(-2*time.Hour)),
141+
30*time.Minute,
142+
labels.FromStrings("prometheus", "p1", "replica", "0"),
143+
0,
144+
metadata.NoneFunc,
145+
)
146+
testutil.Ok(t, err)
147+
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt1, path.Join(dir1, blockID2.String()), blockID2.String()))
148+
store1 := e2ethanos.NewStoreGW(
149+
e,
150+
"s1",
151+
client.BucketConfig{
152+
Type: client.S3,
153+
Config: e2ethanos.NewS3Config(bucket1, minio1.InternalEndpoint("http"), minio1.InternalDir()),
154+
},
155+
"",
156+
"",
157+
nil,
158+
)
159+
testutil.Ok(t, e2e.StartAndWaitReady(store1))
160+
161+
querierLeaf1 := e2ethanos.NewQuerierBuilder(e, "1", store1.InternalEndpoint("grpc")).Init()
162+
testutil.Ok(t, e2e.StartAndWaitReady(querierLeaf1))
163+
// We need another querier to circumvent the passthrough optimizer
164+
promConfig2 := e2ethanos.DefaultPromConfig("p2", 0, "", "", e2ethanos.LocalPrometheusTarget)
165+
prom2, sidecar2 := e2ethanos.NewPrometheusWithSidecar(e, "p2", promConfig2, "", e2ethanos.DefaultPrometheusImage(), "")
166+
testutil.Ok(t, e2e.StartAndWaitReady(prom2, sidecar2))
167+
querierLeaf2 := e2ethanos.NewQuerierBuilder(e, "2", sidecar2.InternalEndpoint("grpc")).Init()
168+
testutil.Ok(t, e2e.StartAndWaitReady(querierLeaf2))
169+
querierDistributed := e2ethanos.NewQuerierBuilder(e, "3",
170+
querierLeaf1.InternalEndpoint("grpc"),
171+
querierLeaf2.InternalEndpoint("grpc"),
172+
).
173+
WithEngine(v1.PromqlEngineThanos).
174+
WithQueryMode("distributed").
175+
WithDistributedOverlap(true).
176+
Init()
177+
178+
testutil.Ok(t, e2e.StartAndWaitReady(querierDistributed))
179+
180+
// We would expect 2x2h ranges for the 2 blocks containing foo samples. That would be around 240 expected sample pairs in the result matrix.
181+
// We assert on more then 200 to reduce flakiness
182+
rangeQuery(t, ctx, querierDistributed.Endpoint("http"), func() string { return "foo" }, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now), 60, promclient.QueryOptions{}, func(res model.Matrix) error {
183+
if res.Len() < 1 {
184+
return errors.New("No result series returned")
185+
}
186+
if nvals := len(res[0].Values); nvals < 200 {
187+
return errors.Errorf("Too few values in result matrix, got %d, expected > 200", nvals)
188+
}
189+
return nil
190+
})
191+
}
192+
193+
func TestDistributedEngineWithoutOverlappingIntervals(t *testing.T) {
194+
t.Skip("skipping test as this replicates a bug")
195+
t.Parallel()
196+
e, err := e2e.New(e2e.WithName("dist-disj-tsdbs2"))
197+
testutil.Ok(t, err)
198+
t.Cleanup(e2ethanos.CleanScenario(t, e))
199+
200+
ctx := context.Background()
201+
l := log.NewLogfmtLogger(os.Stdout)
202+
now := time.Now()
203+
204+
bucket1 := "dist-disj-tsdbs2-test2"
205+
minio1 := e2edb.NewMinio(e, "1", bucket1, e2edb.WithMinioTLS())
206+
testutil.Ok(t, e2e.StartAndWaitReady(minio1))
207+
208+
bkt1, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket1, minio1.Endpoint("http"), minio1.Dir()), "test", nil)
209+
testutil.Ok(t, err)
210+
211+
// Setup a storage GW with 2 blocks that have a gap to trigger distributed query MinT bug
212+
dir1 := filepath.Join(e.SharedDir(), "tmp1")
213+
testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir1), os.ModePerm))
214+
blockID1, err := e2eutil.CreateBlockWithBlockDelay(ctx,
215+
dir1,
216+
[]labels.Labels{labels.FromStrings("__name__", "foo", "instance", "foo_1")},
217+
1000,
218+
timestamp.FromTime(now.Add(-14*time.Hour)),
219+
timestamp.FromTime(now.Add(-12*time.Hour)),
220+
30*time.Minute,
221+
labels.FromStrings("prometheus", "p1", "replica", "0"),
222+
0,
223+
metadata.NoneFunc,
224+
)
225+
testutil.Ok(t, err)
226+
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt1, path.Join(dir1, blockID1.String()), blockID1.String()))
227+
228+
blockID2, err := e2eutil.CreateBlockWithBlockDelay(ctx,
229+
dir1,
230+
[]labels.Labels{labels.FromStrings("__name__", "foo", "instance", "foo_1")},
231+
1000,
232+
timestamp.FromTime(now.Add(-4*time.Hour)),
233+
timestamp.FromTime(now.Add(-2*time.Hour)),
234+
30*time.Minute,
235+
labels.FromStrings("prometheus", "p1", "replica", "0"),
236+
0,
237+
metadata.NoneFunc,
238+
)
239+
testutil.Ok(t, err)
240+
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt1, path.Join(dir1, blockID2.String()), blockID2.String()))
241+
store1 := e2ethanos.NewStoreGW(
242+
e,
243+
"s1",
244+
client.BucketConfig{
245+
Type: client.S3,
246+
Config: e2ethanos.NewS3Config(bucket1, minio1.InternalEndpoint("http"), minio1.InternalDir()),
247+
},
248+
"",
249+
"",
250+
nil,
251+
)
252+
testutil.Ok(t, e2e.StartAndWaitReady(store1))
253+
254+
querierLeaf1 := e2ethanos.NewQuerierBuilder(e, "1", store1.InternalEndpoint("grpc")).Init()
255+
256+
testutil.Ok(t, e2e.StartAndWaitReady(querierLeaf1))
257+
// We need another querier to circumvent the passthrough optimizer
258+
promConfig2 := e2ethanos.DefaultPromConfig("p2", 0, "", "", e2ethanos.LocalPrometheusTarget)
259+
prom2, sidecar2 := e2ethanos.NewPrometheusWithSidecar(e, "p2", promConfig2, "", e2ethanos.DefaultPrometheusImage(), "")
260+
testutil.Ok(t, e2e.StartAndWaitReady(prom2, sidecar2))
261+
querierLeaf2 := e2ethanos.NewQuerierBuilder(e, "2", sidecar2.InternalEndpoint("grpc")).Init()
262+
testutil.Ok(t, e2e.StartAndWaitReady(querierLeaf2))
263+
264+
querierDistributed := e2ethanos.NewQuerierBuilder(e, "3",
265+
querierLeaf1.InternalEndpoint("grpc"),
266+
querierLeaf2.InternalEndpoint("grpc"),
267+
).
268+
WithEngine(v1.PromqlEngineThanos).
269+
WithQueryMode("distributed").
270+
Init()
271+
272+
testutil.Ok(t, e2e.StartAndWaitReady(querierDistributed))
273+
274+
// We would expect 2x2h ranges for the 2 blocks containing foo samples. That would be around 240 expected sample pairs in the result matrix.
275+
// We assert on more then 200 to reduce flakiness
276+
rangeQuery(t, ctx, querierDistributed.Endpoint("http"), func() string { return "foo" }, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now), 60, promclient.QueryOptions{}, func(res model.Matrix) error {
277+
if res.Len() < 1 {
278+
return errors.New("No result series returned")
279+
}
280+
if nvals := len(res[0].Values); nvals < 200 {
281+
return errors.Errorf("Too few values in result matrix, got %d, expected > 200", nvals)
282+
}
283+
284+
return nil
285+
})
286+
}

test/e2e/e2ethanos/services.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,10 @@ type QuerierBuilder struct {
254254
endpoints []string
255255
strictEndpoints []string
256256

257-
engine apiv1.PromqlEngineType
258-
queryMode string
259-
enableXFunctions bool
257+
engine apiv1.PromqlEngineType
258+
queryMode string
259+
queryDistributedWithOverlappingInterval bool
260+
enableXFunctions bool
260261

261262
replicaLabels []string
262263
tracingConfig string
@@ -376,6 +377,10 @@ func (q *QuerierBuilder) WithQueryMode(mode string) *QuerierBuilder {
376377
q.queryMode = mode
377378
return q
378379
}
380+
func (q *QuerierBuilder) WithDistributedOverlap(overlap bool) *QuerierBuilder {
381+
q.queryDistributedWithOverlappingInterval = overlap
382+
return q
383+
}
379384

380385
func (q *QuerierBuilder) WithEnableXFunctions() *QuerierBuilder {
381386
q.enableXFunctions = true
@@ -513,6 +518,9 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) {
513518
if q.queryMode != "" {
514519
args = append(args, "--query.mode="+q.queryMode)
515520
}
521+
if q.queryDistributedWithOverlappingInterval {
522+
args = append(args, "--query.distributed-with-overlapping-interval")
523+
}
516524
if q.engine != "" {
517525
args = append(args, "--query.promql-engine="+string(q.engine))
518526
}

0 commit comments

Comments
 (0)