Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions flyteplugins/go/tasks/plugins/webapi/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, but this isn't needed for the grpc-go version we use (v1.81.1). The core google.golang.org/grpc package already registers round_robin for us: clientconn.go (which is package grpc) has

// google.golang.org/grpc/clientconn.go:55
_ "google.golang.org/grpc/balancer/roundrobin"           // To register roundrobin.

and roundrobin's init() calls balancer.Register(...). Since this file already imports google.golang.org/grpc, the balancer is registered as soon as the package loads — no separate blank import required. The manual blank import was only necessary in much older grpc-go releases (pre ~v1.36) before that import was folded into clientconn.go.

Also, {"loadBalancingConfig":[{"round_robin":{}}]} was already the prior default service config and is in use in production, so there's no silent fallback here.

"google.golang.org/grpc/status"

"github.com/flyteorg/flyte/v2/flytestdlib/config"
Expand All @@ -20,6 +21,27 @@ import (

const defaultTaskTypeVersion = 0

// DefaultGRPCServiceConfig is the gRPC service config applied to a connector
// connection when its Deployment does not set DefaultServiceConfig. It enables
// round-robin load balancing across connector replicas and retries transient
// UNAVAILABLE failures so a single dropped connection does not fail the task.
// It is exported so callers that register connectors programmatically (e.g. the
// operator app controller) can set it explicitly.
const DefaultGRPCServiceConfig = `{
"loadBalancingConfig": [{"round_robin":{}}],
"methodConfig": [{
"name": [{"service": "flyteidl2.connector.AsyncConnectorService"}],
"retryPolicy": {
"maxAttempts": 4,
"initialBackoff": "0.2s",
"maxBackoff": "3s",
"backoffMultiplier": 2.0,
"retryableStatusCodes": ["UNAVAILABLE"]
}
}],
"retryThrottling": {"maxTokens": 10, "tokenRatio": 0.1}
}`

type Connector struct {
// ConnectorDeployment is the connector deployment where this connector is running.
ConnectorDeployment *Deployment
Expand Down Expand Up @@ -50,8 +72,22 @@ func getGrpcConnection(ctx context.Context, connector *Deployment) (*grpc.Client
opts = append(opts, grpc.WithTransportCredentials(creds))
}

if len(connector.DefaultServiceConfig) != 0 {
opts = append(opts, grpc.WithDefaultServiceConfig(connector.DefaultServiceConfig))
serviceConfig := connector.DefaultServiceConfig
if len(serviceConfig) == 0 {
serviceConfig = DefaultGRPCServiceConfig
}
opts = append(opts, grpc.WithDefaultServiceConfig(serviceConfig))
Comment on lines +75 to +79

// Keepalive is opt-in per deployment. It is only safe for connectors fronted
// by an L7 gateway (e.g. Knative/Kourier Envoy) that reaps idle connections;
// a directly-reached grpc-go or grpcio (C-core) server rejects frequent idle
// pings with GOAWAY "too_many_pings".
if connector.Keepalive != nil {
opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: connector.Keepalive.Time.Duration,
Timeout: connector.Keepalive.Timeout.Duration,
PermitWithoutStream: connector.Keepalive.PermitWithoutStream,
}))
}

var err error
Expand Down
75 changes: 75 additions & 0 deletions flyteplugins/go/tasks/plugins/webapi/connector/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ package connector

import (
"context"
"encoding/json"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/flyteorg/flyte/v2/flytestdlib/config"
)

func TestInitializeClients(t *testing.T) {
Expand All @@ -26,3 +32,72 @@ func TestInitializeClients(t *testing.T) {
_, ok = cs.asyncConnectorClients["x"]
assert.True(t, ok)
}

func TestDefaultGRPCServiceConfig(t *testing.T) {
// Must be valid JSON — grpc.WithDefaultServiceConfig silently ignores a
// malformed config, so a typo here would disable LB/retry without any error.
assert.True(t, json.Valid([]byte(DefaultGRPCServiceConfig)), "DefaultGRPCServiceConfig must be valid JSON")

var parsed struct {
LoadBalancingConfig []map[string]any `json:"loadBalancingConfig"`
MethodConfig []struct {
Name []struct {
Service string `json:"service"`
} `json:"name"`
RetryPolicy struct {
MaxAttempts int `json:"maxAttempts"`
RetryableStatusCodes []string `json:"retryableStatusCodes"`
} `json:"retryPolicy"`
} `json:"methodConfig"`
RetryThrottling map[string]any `json:"retryThrottling"`
}
require.NoError(t, json.Unmarshal([]byte(DefaultGRPCServiceConfig), &parsed))

require.Len(t, parsed.LoadBalancingConfig, 1)
_, hasRoundRobin := parsed.LoadBalancingConfig[0]["round_robin"]
assert.True(t, hasRoundRobin, "expected round_robin load balancing")

require.Len(t, parsed.MethodConfig, 1)
mc := parsed.MethodConfig[0]
require.Len(t, mc.Name, 1)
assert.Equal(t, "flyteidl2.connector.AsyncConnectorService", mc.Name[0].Service)
assert.Greater(t, mc.RetryPolicy.MaxAttempts, 1)
assert.Equal(t, []string{"UNAVAILABLE"}, mc.RetryPolicy.RetryableStatusCodes)
assert.NotEmpty(t, parsed.RetryThrottling, "expected retryThrottling to bound retry storms")
}

func TestGetGrpcConnection(t *testing.T) {
ctx := context.Background()

Comment on lines +69 to +71
// Empty DefaultServiceConfig must fall back to DefaultGRPCServiceConfig
// (round-robin + retry) rather than no service config at all.
conn, err := getGrpcConnection(ctx, &Deployment{Endpoint: "x", Insecure: true})
require.NoError(t, err)
require.NotNil(t, conn)
assert.NoError(t, conn.Close())

// A deployment-specific DefaultServiceConfig still takes precedence.
custom := `{"loadBalancingConfig": [{"round_robin":{}}]}`
conn, err = getGrpcConnection(ctx, &Deployment{Endpoint: "y", Insecure: true, DefaultServiceConfig: custom})
require.NoError(t, err)
require.NotNil(t, conn)
assert.NoError(t, conn.Close())

// Keepalive is opt-in: a deployment may enable it (gateway-fronted connectors).
conn, err = getGrpcConnection(ctx, &Deployment{
Endpoint: "z", Insecure: true,
Keepalive: &KeepaliveConfig{
Time: config.Duration{Duration: 30 * time.Second},
Timeout: config.Duration{Duration: 10 * time.Second},
PermitWithoutStream: true,
},
})
require.NoError(t, err)
require.NotNil(t, conn)
assert.NoError(t, conn.Close())
}

// ensure the constant referenced from config.go and the LB choice stay in sync
func TestDefaultGRPCServiceConfigMentionsRoundRobin(t *testing.T) {
assert.True(t, strings.Contains(DefaultGRPCServiceConfig, "round_robin"))
}
36 changes: 32 additions & 4 deletions flyteplugins/go/tasks/plugins/webapi/connector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ var (
},
},
DefaultConnector: Deployment{
Endpoint: "",
Insecure: true,
DefaultTimeout: config.Duration{Duration: 10 * time.Second},
DefaultServiceConfig: `{"loadBalancingConfig": [{"round_robin":{}}]}`,
Endpoint: "",
Insecure: true,
DefaultTimeout: config.Duration{Duration: 10 * time.Second},
// DefaultServiceConfig is left empty so getGrpcConnection falls back to
// DefaultGRPCServiceConfig (round-robin LB + UNAVAILABLE retry). Set this
// per-deployment only to override that default.
DefaultServiceConfig: "",
},
ConnectorDeployments: map[string]*Deployment{},
ConnectorForTaskTypes: map[string]string{},
Expand Down Expand Up @@ -95,6 +98,14 @@ type Deployment struct {
// DefaultServiceConfig sets default gRPC service config; check https://github.com/grpc/grpc/blob/master/doc/service_config.md for more details
DefaultServiceConfig string `json:"defaultServiceConfig,omitempty" yaml:"defaultServiceConfig,omitempty"`

// Keepalive configures gRPC client keepalive pings for this connector. Leave
// unset for connectors reached directly: a grpc-go or grpcio (C-core) server
// rejects frequent idle pings with GOAWAY "too_many_pings". Set it for
// connectors fronted by an L7 gateway (e.g. Knative/Kourier Envoy) that reaps
// idle connections, so the connection is kept warm and dead connections are
// detected proactively instead of on the next RPC.
Keepalive *KeepaliveConfig `json:"keepalive,omitempty" yaml:"keepalive,omitempty"`

// Timeouts defines various RPC timeout values for different plugin operations: CreateTask, GetTask, DeleteTask; if not configured, defaults to DefaultTimeout
Timeouts map[string]config.Duration `json:"timeouts,omitempty" yaml:"timeouts,omitempty"`

Expand All @@ -108,6 +119,23 @@ type Deployment struct {
Domain string `json:"domain,omitempty" yaml:"domain,omitempty"`
}

// KeepaliveConfig configures gRPC client keepalive pings for a connector
// connection. See https://github.com/grpc/grpc/blob/master/doc/keepalive.md.
type KeepaliveConfig struct {
// Time is the interval between keepalive pings on an otherwise idle
// connection. Keep it above the server/gateway enforcement threshold to
// avoid GOAWAY "too_many_pings".
Time config.Duration `json:"time,omitempty" yaml:"time,omitempty"`

// Timeout is how long the client waits for a keepalive ping ack before
// considering the connection dead and closing it.
Timeout config.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`

// PermitWithoutStream sends keepalive pings even when there are no active
// RPCs. Required to keep an idle connection warm through a gateway.
PermitWithoutStream bool `json:"permitWithoutStream,omitempty" yaml:"permitWithoutStream,omitempty"`
}

func GetConfig() *Config {
return configSection.GetConfig().(*Config)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ func TestDefaultAgentConfig(t *testing.T) {
assert.Equal(t, "", cfg.DefaultConnector.Endpoint)
assert.True(t, cfg.DefaultConnector.Insecure)
assert.Equal(t, 10*time.Second, cfg.DefaultConnector.DefaultTimeout.Duration)
assert.Equal(t, `{"loadBalancingConfig": [{"round_robin":{}}]}`, cfg.DefaultConnector.DefaultServiceConfig)
// DefaultServiceConfig defaults to empty; getGrpcConnection falls back to
// DefaultGRPCServiceConfig when a deployment does not override it.
assert.Empty(t, cfg.DefaultConnector.DefaultServiceConfig)

assert.Empty(t, cfg.DefaultConnector.Timeouts)

Expand Down
Loading