Skip to content

Commit eab2099

Browse files
authored
allow logical plan fragment type for scheduler queue (#6968)
Signed-off-by: rubywtl <[email protected]>
1 parent bba3b9a commit eab2099

File tree

17 files changed

+1124
-89
lines changed

17 files changed

+1124
-89
lines changed

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3485,6 +3485,10 @@ grpc_client_config:
34853485
# using default gRPC client connect timeout 20s.
34863486
# CLI flag: -querier.frontend-client.connect-timeout
34873487
[connect_timeout: <duration> | default = 5s]
3488+
3489+
# Name of network interface to read address from.
3490+
# CLI flag: -querier.instance-interface-names
3491+
[instance_interface_names: <list of string> | default = [eth0 en0]]
34883492
```
34893493

34903494
### `ingester_config`

pkg/cortex/modules.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"flag"
66
"fmt"
7-
87
"log/slog"
98
"net/http"
109
"runtime"
@@ -414,6 +413,9 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
414413

415414
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
416415
t.Cfg.Worker.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog
416+
417+
t.Cfg.Worker.ListenPort = t.Cfg.Server.GRPCListenPort
418+
417419
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer)
418420
}
419421

@@ -815,7 +817,7 @@ func (t *Cortex) initQueryScheduler() (services.Service, error) {
815817
tenant.WithDefaultResolver(tenantfederation.NewRegexValidator())
816818
}
817819

818-
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer)
820+
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer, t.Cfg.Querier.DistributedExecEnabled)
819821
if err != nil {
820822
return nil, errors.Wrap(err, "query-scheduler init")
821823
}
Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,32 @@
11
package distributed_execution
22

3+
// FragmentKey uniquely identifies a fragment of a distributed logical query plan.
4+
// It combines a queryID (to identify the overall query) and a fragmentID
5+
// (to identify the specific fragment within that query).
36
type FragmentKey struct {
4-
queryID uint64
7+
// queryID identifies the distributed query this fragment belongs to
8+
queryID uint64
9+
// fragmentID identifies this specific fragment within the query
510
fragmentID uint64
611
}
712

8-
func MakeFragmentKey(queryID uint64, fragmentID uint64) *FragmentKey {
9-
return &FragmentKey{
13+
// MakeFragmentKey creates a new FragmentKey with the given queryID and fragmentID.
14+
// It's used to track and identify fragments during distributed query execution.
15+
func MakeFragmentKey(queryID uint64, fragmentID uint64) FragmentKey {
16+
return FragmentKey{
1017
queryID: queryID,
1118
fragmentID: fragmentID,
1219
}
1320
}
1421

22+
// GetQueryID returns the queryID for the current key
23+
// This ID is shared across all fragments of the same distributed query.
1524
func (f FragmentKey) GetQueryID() uint64 {
1625
return f.queryID
1726
}
1827

28+
// GetFragmentID returns the ID for this specific fragment
29+
// within its parent query.
1930
func (f FragmentKey) GetFragmentID() uint64 {
2031
return f.fragmentID
2132
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package plan_fragments
2+
3+
import "github.com/thanos-io/promql-engine/logicalplan"
4+
5+
// Fragmenter interface
6+
type Fragmenter interface {
7+
// Fragment function fragments the logical query plan and will always return the fragment in the order of child-to-root
8+
// in other words, the order of the fragment in the array will be the order they are being scheduled
9+
Fragment(node logicalplan.Node) ([]Fragment, error)
10+
}
11+
12+
type DummyFragmenter struct {
13+
}
14+
15+
func (f *DummyFragmenter) Fragment(node logicalplan.Node) ([]Fragment, error) {
16+
// simple logic without distributed optimizer
17+
return []Fragment{
18+
{
19+
Node: node,
20+
FragmentID: uint64(1),
21+
ChildIDs: []uint64{},
22+
IsRoot: true,
23+
},
24+
}, nil
25+
}
26+
27+
type Fragment struct {
28+
Node logicalplan.Node
29+
FragmentID uint64
30+
ChildIDs []uint64
31+
IsRoot bool
32+
}
33+
34+
func (s *Fragment) IsEmpty() bool {
35+
if s.Node != nil {
36+
return false
37+
}
38+
if s.FragmentID != 0 {
39+
return false
40+
}
41+
if s.IsRoot {
42+
return false
43+
}
44+
if len(s.ChildIDs) != 0 {
45+
return false
46+
}
47+
return true
48+
}
49+
50+
func NewDummyFragmenter() Fragmenter {
51+
return &DummyFragmenter{}
52+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package plan_fragments
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/cortexproject/cortex/pkg/util/logical_plan"
10+
)
11+
12+
func TestFragmenter(t *testing.T) {
13+
type testCase struct {
14+
name string
15+
query string
16+
start time.Time
17+
end time.Time
18+
expectedFragments int
19+
}
20+
21+
now := time.Now()
22+
23+
// more tests will be added when distributed optimizer and fragmenter are implemented
24+
tests := []testCase{
25+
{
26+
name: "simple logical query plan - no fragmentation",
27+
query: "up",
28+
start: now,
29+
end: now,
30+
expectedFragments: 1,
31+
},
32+
}
33+
34+
for _, tc := range tests {
35+
t.Run(tc.name, func(t *testing.T) {
36+
lp, err := logical_plan.CreateTestLogicalPlan(tc.query, tc.start, tc.end, 0)
37+
require.NoError(t, err)
38+
39+
fragmenter := NewDummyFragmenter()
40+
res, err := fragmenter.Fragment((*lp).Root())
41+
42+
require.NoError(t, err)
43+
require.Equal(t, tc.expectedFragments, len(res))
44+
})
45+
}
46+
}

pkg/distributed_execution/remote_node.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func (r *Remote) UnmarshalJSON(data []byte) error {
6565
return err
6666
}
6767

68-
r.FragmentKey = *MakeFragmentKey(re.QueryID, re.FragmentID)
68+
r.FragmentKey = MakeFragmentKey(re.QueryID, re.FragmentID)
6969
r.FragmentAddr = re.FragmentAddr
7070
return nil
7171
}

pkg/querier/querier.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ type Config struct {
9696
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"`
9797
ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"`
9898
ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"`
99-
DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`
99+
100+
DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`
100101
}
101102

102103
var (

pkg/querier/worker/scheduler_processor.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
"github.com/cortexproject/cortex/pkg/util/services"
3232
)
3333

34-
func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (*schedulerProcessor, []services.Service) {
34+
func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer, querierAddress string) (*schedulerProcessor, []services.Service) {
3535
p := &schedulerProcessor{
3636
log: log,
3737
handler: handler,
@@ -47,6 +47,7 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r
4747
Help: "Time spend doing requests to frontend.",
4848
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
4949
}, []string{"operation", "status_code"}),
50+
querierAddress: querierAddress,
5051
}
5152

5253
frontendClientsGauge := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
@@ -71,6 +72,7 @@ type schedulerProcessor struct {
7172
grpcConfig grpcclient.Config
7273
maxMessageSize int
7374
querierID string
75+
querierAddress string
7476

7577
frontendPool *client.Pool
7678
frontendClientRequestDuration *prometheus.HistogramVec
@@ -97,7 +99,7 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context,
9799
for backoff.Ongoing() {
98100
c, err := schedulerClient.QuerierLoop(ctx)
99101
if err == nil {
100-
err = c.Send(&schedulerpb.QuerierToScheduler{QuerierID: sp.querierID})
102+
err = c.Send(&schedulerpb.QuerierToScheduler{QuerierID: sp.querierID, QuerierAddress: sp.querierAddress})
101103
}
102104

103105
if err != nil {

pkg/querier/worker/scheduler_processor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func Test_ToShowNotPanic_RelatedIssue6599(t *testing.T) {
144144
go stat.AddFetchedChunkBytes(10)
145145
}).Return(&httpgrpc.HTTPResponse{}, nil)
146146

147-
sp, _ := newSchedulerProcessor(cfg, requestHandler, log.NewNopLogger(), nil)
147+
sp, _ := newSchedulerProcessor(cfg, requestHandler, log.NewNopLogger(), nil, "")
148148
schedulerClient := &mockSchedulerForQuerierClient{}
149149
schedulerClient.On("QuerierLoop", mock.Anything, mock.Anything).Return(querierLoopClient, nil)
150150

pkg/querier/worker/worker.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package worker
33
import (
44
"context"
55
"flag"
6+
"net"
67
"os"
8+
"strconv"
79
"sync"
810
"time"
911

@@ -14,7 +16,9 @@ import (
1416
"github.com/weaveworks/common/httpgrpc"
1517
"google.golang.org/grpc"
1618

19+
"github.com/cortexproject/cortex/pkg/ring"
1720
"github.com/cortexproject/cortex/pkg/util"
21+
"github.com/cortexproject/cortex/pkg/util/flagext"
1822
"github.com/cortexproject/cortex/pkg/util/grpcclient"
1923
"github.com/cortexproject/cortex/pkg/util/services"
2024
)
@@ -33,6 +37,10 @@ type Config struct {
3337
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
3438

3539
TargetHeaders []string `yaml:"-"` // Propagated by config.
40+
41+
InstanceInterfaceNames []string `yaml:"instance_interface_names"`
42+
ListenPort int `yaml:"-"`
43+
InstanceAddr string `yaml:"instance_addr" doc:"hidden"`
3644
}
3745

3846
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
@@ -46,6 +54,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
4654
f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to frontend service to identify requests from the same querier. Defaults to hostname.")
4755

4856
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", "", f)
57+
58+
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}
59+
f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), "querier.instance-interface-names", "Name of network interface to read address from.")
60+
f.StringVar(&cfg.InstanceAddr, "querier.instance-addr", "", "IP address of the querier")
4961
}
5062

5163
func (cfg *Config) Validate(log log.Logger) error {
@@ -109,7 +121,14 @@ func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg pr
109121
level.Info(log).Log("msg", "Starting querier worker connected to query-scheduler", "scheduler", cfg.SchedulerAddress)
110122

111123
address = cfg.SchedulerAddress
112-
processor, servs = newSchedulerProcessor(cfg, handler, log, reg)
124+
125+
ipAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames, log)
126+
if err != nil {
127+
return nil, err
128+
}
129+
querierAddr := net.JoinHostPort(ipAddr, strconv.Itoa(cfg.ListenPort))
130+
131+
processor, servs = newSchedulerProcessor(cfg, handler, log, reg, querierAddr)
113132

114133
case cfg.FrontendAddress != "":
115134
level.Info(log).Log("msg", "Starting querier worker connected to query-frontend", "frontend", cfg.FrontendAddress)

0 commit comments

Comments
 (0)