Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions flyteidl2/core/execution.proto
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ message TaskLog {
message LogContext {
repeated PodLogContext pods = 1;
string primary_pod_name = 2;

// Optional. Set by the connector plugin when an action is served by a connector. The dataplane
// dataproxy uses this endpoint to proxy log requests to the connector's GetTaskLogs RPC instead
// of streaming pod logs.
ConnectorLogContext connector = 3;
Comment thread
pingsutw marked this conversation as resolved.
}

// ConnectorLogContext describes how to reach a connector that owns an action's logs.
message ConnectorLogContext {
// gRPC endpoint of the connector deployment (e.g.
// "batch-job-connector.flytesnacks-development.svc.cluster.local:80").
string endpoint = 1;
Comment thread
pingsutw marked this conversation as resolved.
}

// Contains metadata required to identify logs produces by a single pod
Expand Down
33 changes: 30 additions & 3 deletions flyteplugins/go/tasks/plugins/webapi/connector/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ type ResourceWrapper struct {
CustomInfo *structpb.Struct
ConnectorID string
IsConnectorApp bool
// ConnectorEndpoint is the gRPC endpoint of the connector deployment serving this task. It is
// recorded on the LogContext emitted by Status() so the dataplane dataproxy can stream logs
// from the connector's pods.
Comment thread
pingsutw marked this conversation as resolved.
Outdated
ConnectorEndpoint string
}

// IsTerminal is used to avoid making network calls to the connector service if the resource is already in a terminal state.
Expand Down Expand Up @@ -210,13 +214,20 @@ func (p *Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContext
return nil, nil, fmt.Errorf("failed to create task from connector with %v", err)
}

resource := ResourceWrapper{
ConnectorID: connector.ConnectorID,
IsConnectorApp: connector.IsConnectorApp,
}
if connector.ConnectorDeployment != nil {
resource.ConnectorEndpoint = connector.ConnectorDeployment.Endpoint
}
return ResourceMetaWrapper{
OutputPrefix: outputPrefix,
ConnectorResourceMeta: res.GetResourceMeta(),
TaskCategory: &taskCategory,
Connection: &connection,
Domain: taskTemplate.GetId().GetDomain(),
}, nil, nil
}, resource, nil
}
Comment thread
pingsutw marked this conversation as resolved.
Outdated

func (p *Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) {
Expand Down Expand Up @@ -244,15 +255,19 @@ func (p *Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest web
return nil, fmt.Errorf("failed to get task from connector with %v", err)
}

return ResourceWrapper{
wrapper := ResourceWrapper{
Phase: res.GetResource().GetPhase(),
Outputs: res.GetResource().GetOutputs(),
Message: res.GetResource().GetMessage(),
LogLinks: res.GetResource().GetLogLinks(),
CustomInfo: res.GetResource().GetCustomInfo(),
ConnectorID: connector.ConnectorID,
IsConnectorApp: connector.IsConnectorApp,
}, nil
}
if connector.ConnectorDeployment != nil {
wrapper.ConnectorEndpoint = connector.ConnectorDeployment.Endpoint
}
Comment thread
pingsutw marked this conversation as resolved.
Outdated
return wrapper, nil
}

func (p *Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error {
Expand Down Expand Up @@ -319,6 +334,18 @@ func (p *Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phas
}

taskInfo := &core.TaskInfo{Logs: logLinks, CustomInfo: resource.CustomInfo}

// Record the connector endpoint on the LogContext so the dataplane dataproxy can stream logs
// from the connector deployment's pods. The endpoint was resolved at Get() time and stashed
// on the ResourceWrapper, so we don't have to re-resolve it here.
Comment thread
pingsutw marked this conversation as resolved.
Outdated
if resource.ConnectorEndpoint != "" {
taskInfo.LogContext = &flyteIdl.LogContext{
Connector: &flyteIdl.ConnectorLogContext{
Endpoint: resource.ConnectorEndpoint,
},
}
}

errorCode := pluginErrors.TaskFailedWithError

switch resource.Phase {
Expand Down
324 changes: 203 additions & 121 deletions gen/go/flyteidl2/core/execution.pb.go

Large diffs are not rendered by default.

133 changes: 133 additions & 0 deletions gen/go/flyteidl2/core/execution.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 17 additions & 15 deletions gen/python/flyteidl2/core/execution_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading