Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions flyteidl2/core/execution.proto
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ message TaskLog {
message LogContext {
repeated PodLogContext pods = 1;
string primary_pod_name = 2;

// Optional. Set by the connector plugin when an action is served by a connector. The dataplane
// dataproxy uses this endpoint to proxy log requests to the connector's GetTaskLogs RPC instead
// of streaming pod logs.
ConnectorLogContext connector = 3;
Comment thread
pingsutw marked this conversation as resolved.
}

// ConnectorLogContext describes how to reach a connector that owns an action's logs.
message ConnectorLogContext {
// gRPC endpoint of the connector deployment (e.g.
// "batch-job-connector.flytesnacks-development.svc.cluster.local:80").
string endpoint = 1;
Comment thread
pingsutw marked this conversation as resolved.
}

// Contains metadata required to identify logs produces by a single pod
Expand Down
14 changes: 14 additions & 0 deletions flyteplugins/go/tasks/plugins/webapi/connector/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,20 @@ func (p *Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phas
}

taskInfo := &core.TaskInfo{Logs: logLinks, CustomInfo: resource.CustomInfo}

// Record the connector endpoint on the LogContext so the dataplane dataproxy can stream logs
// straight from the connector via AsyncConnectorService.GetTaskLogs (resource_meta is fetched
// separately from PluginStateService at log-stream time).
if metadata, ok := taskCtx.ResourceMeta().(ResourceMetaWrapper); ok {
if connector, err := p.getFinalConnector(metadata.TaskCategory, p.cfg, metadata.Domain); err == nil && connector != nil && connector.ConnectorDeployment != nil {
taskInfo.LogContext = &flyteIdl.LogContext{
Connector: &flyteIdl.ConnectorLogContext{
Endpoint: connector.ConnectorDeployment.Endpoint,
},
Comment thread
pingsutw marked this conversation as resolved.
Outdated
}
}
}
Comment thread
pingsutw marked this conversation as resolved.
Outdated

errorCode := pluginErrors.TaskFailedWithError

switch resource.Phase {
Expand Down
Loading
Loading