Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions flyteidl2/core/execution.proto
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ message TaskLog {
message LogContext {
repeated PodLogContext pods = 1;
string primary_pod_name = 2;

// Optional. Set by the connector plugin when an action is served by a connector. The dataplane
// dataproxy uses this endpoint to proxy log requests to the connector's GetTaskLogs RPC instead
// of streaming pod logs.
ConnectorLogContext connector = 3;
Comment thread
pingsutw marked this conversation as resolved.
}

// ConnectorLogContext describes how to reach a connector that owns an action's logs.
message ConnectorLogContext {
// gRPC endpoint of the connector deployment (e.g.
// "batch-job-connector.flytesnacks-development.svc.cluster.local:80").
string endpoint = 1;
Comment thread
pingsutw marked this conversation as resolved.
}

// Contains metadata required to identify logs produces by a single pod
Expand Down
5 changes: 5 additions & 0 deletions flyteidl2/dataproxy/dataproxy_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ message TailLogsRequest {

// The pod name to tail logs from. If not provided, attempt to find the primary pod, else assume the first pod.
string pod_name = 3;

// +optional. Connector endpoint copied from ActionAttempt.log_context.connector.endpoint when the
// action is served by a connector. The dataproxy uses this to stream logs from the connector via
// AsyncConnectorService.GetTaskLogs instead of pod logs.
string connector_endpoint = 4;
Comment thread
pingsutw marked this conversation as resolved.
}

// Reponse message for tailing logs.
Expand Down
40 changes: 28 additions & 12 deletions flyteplugins/go/tasks/plugins/webapi/connector/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@ import (
"google.golang.org/protobuf/types/known/structpb"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/secret"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/utils"

pluginErrors "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/errors"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/logs"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/core/template"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/ioutils"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/secret"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/tasklog"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/utils"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/webapi"
"github.com/flyteorg/flyte/v2/flytestdlib/logger"
"github.com/flyteorg/flyte/v2/flytestdlib/promutils"
Expand Down Expand Up @@ -80,13 +79,14 @@ type Plugin struct {
}

type ResourceWrapper struct {
Phase flyteIdl.TaskExecution_Phase
Outputs *task.Outputs
Message string
LogLinks []*flyteIdl.TaskLog
CustomInfo *structpb.Struct
ConnectorID string
IsConnectorApp bool
Phase flyteIdl.TaskExecution_Phase
Outputs *task.Outputs
Message string
LogLinks []*flyteIdl.TaskLog
CustomInfo *structpb.Struct
ConnectorID string
IsConnectorApp bool
ConnectorEndpoint string
}

// IsTerminal is used to avoid making network calls to the connector service if the resource is already in a terminal state.
Expand Down Expand Up @@ -244,15 +244,19 @@ func (p *Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest web
return nil, fmt.Errorf("failed to get task from connector with %v", err)
}

return ResourceWrapper{
wrapper := ResourceWrapper{
Phase: res.GetResource().GetPhase(),
Outputs: res.GetResource().GetOutputs(),
Message: res.GetResource().GetMessage(),
LogLinks: res.GetResource().GetLogLinks(),
CustomInfo: res.GetResource().GetCustomInfo(),
ConnectorID: connector.ConnectorID,
IsConnectorApp: connector.IsConnectorApp,
}, nil
}
if connector.ConnectorDeployment != nil {
wrapper.ConnectorEndpoint = connector.ConnectorDeployment.Endpoint
}
Comment thread
pingsutw marked this conversation as resolved.
Outdated
return wrapper, nil
}

func (p *Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error {
Expand Down Expand Up @@ -319,6 +323,18 @@ func (p *Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phas
}

taskInfo := &core.TaskInfo{Logs: logLinks, CustomInfo: resource.CustomInfo}

if resource.ConnectorEndpoint != "" {
taskInfo.LogContext = &flyteIdl.LogContext{
Connector: &flyteIdl.ConnectorLogContext{
Endpoint: resource.ConnectorEndpoint,
},
}
logger.Infof(ctx, "Recorded connector endpoint %q on LogContext for connector %q", resource.ConnectorEndpoint, resource.ConnectorID)
Comment thread
pingsutw marked this conversation as resolved.
Outdated
} else {
logger.Debugf(ctx, "Connector endpoint missing on resource for connector %q; LogContext.connector not set", resource.ConnectorID)
Comment thread
pingsutw marked this conversation as resolved.
Outdated
}

errorCode := pluginErrors.TaskFailedWithError

switch resource.Phase {
Expand Down
Loading
Loading