Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions flyteidl2/connector/connector.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "flyteidl2/core/identifier.proto";
import "flyteidl2/core/metrics.proto";
import "flyteidl2/core/security.proto";
import "flyteidl2/core/tasks.proto";
import "flyteidl2/logs/dataplane/payload.proto";
import "flyteidl2/task/common.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";
Expand Down Expand Up @@ -208,8 +209,15 @@ message GetTaskLogsResponseHeader {
}

message GetTaskLogsResponseBody {
// The execution log results.
repeated string results = 1;
// The execution log results. Deprecated — use `lines` for structured log
// lines that carry timestamps and originator metadata. Retained for
// backward compatibility with older connectors that emit only strings.
repeated string results = 1 [deprecated = true];

// Structured log lines (timestamp + message + originator). Preferred over
// `results`. Connectors should populate this field; the operator dataproxy
// falls back to `results` only when this is empty.
repeated flyteidl2.logs.dataplane.LogLine lines = 2;
Comment thread
pingsutw marked this conversation as resolved.
}

// A response containing the logs for a task execution.
Expand Down
8 changes: 8 additions & 0 deletions flyteidl2/core/execution.proto
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ message TaskLog {
message LogContext {
repeated PodLogContext pods = 1;
string primary_pod_name = 2;
ConnectorLogContext connector = 3;
Comment thread
pingsutw marked this conversation as resolved.
}

// ConnectorLogContext describes how to reach a connector that owns an action's logs.
message ConnectorLogContext {
// gRPC endpoint of the connector deployment (e.g.
// "batch-job-connector.flytesnacks-development.svc.cluster.local:80").
string endpoint = 1;
Comment thread
pingsutw marked this conversation as resolved.
}

// Contains metadata required to identify logs produces by a single pod
Expand Down
4 changes: 4 additions & 0 deletions flyteidl2/dataproxy/dataproxy_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ message TailLogsRequest {

// The pod name to tail logs from. If not provided, attempt to find the primary pod, else assume the first pod.
string pod_name = 3;

// +optional. The dataproxy uses this to stream logs from the connector via
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think with our version of proto3 we can still use the optional keyword and then it will generate a HasConnectorEndpoint() function.

// AsyncConnectorService.GetTaskLogs instead of pod logs.
string connector_endpoint = 4;
Comment thread
pingsutw marked this conversation as resolved.
}

// Reponse message for tailing logs.
Expand Down
45 changes: 27 additions & 18 deletions flyteplugins/go/tasks/plugins/webapi/connector/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@ import (
"google.golang.org/protobuf/types/known/structpb"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/secret"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/utils"

pluginErrors "github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/errors"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/logs"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/core/template"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/ioutils"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/secret"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/tasklog"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/utils"
"github.com/flyteorg/flyte/v2/flyteplugins/go/tasks/pluginmachinery/webapi"
"github.com/flyteorg/flyte/v2/flytestdlib/logger"
"github.com/flyteorg/flyte/v2/flytestdlib/promutils"
Expand Down Expand Up @@ -80,13 +79,14 @@ type Plugin struct {
}

type ResourceWrapper struct {
Phase flyteIdl.TaskExecution_Phase
Outputs *task.Outputs
Message string
LogLinks []*flyteIdl.TaskLog
CustomInfo *structpb.Struct
ConnectorID string
IsConnectorApp bool
Phase flyteIdl.TaskExecution_Phase
Outputs *task.Outputs
Message string
LogLinks []*flyteIdl.TaskLog
CustomInfo *structpb.Struct
ConnectorID string
IsConnectorApp bool
ConnectorEndpoint string
}

// IsTerminal is used to avoid making network calls to the connector service if the resource is already in a terminal state.
Expand Down Expand Up @@ -243,15 +243,15 @@ func (p *Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest web
if err != nil {
return nil, fmt.Errorf("failed to get task from connector with %v", err)
}

return ResourceWrapper{
Phase: res.GetResource().GetPhase(),
Outputs: res.GetResource().GetOutputs(),
Message: res.GetResource().GetMessage(),
LogLinks: res.GetResource().GetLogLinks(),
CustomInfo: res.GetResource().GetCustomInfo(),
ConnectorID: connector.ConnectorID,
IsConnectorApp: connector.IsConnectorApp,
Phase: res.GetResource().GetPhase(),
Outputs: res.GetResource().GetOutputs(),
Message: res.GetResource().GetMessage(),
LogLinks: res.GetResource().GetLogLinks(),
CustomInfo: res.GetResource().GetCustomInfo(),
ConnectorID: connector.ConnectorID,
IsConnectorApp: connector.IsConnectorApp,
ConnectorEndpoint: connector.ConnectorDeployment.Endpoint,
Comment thread
pingsutw marked this conversation as resolved.
}, nil
}

Expand Down Expand Up @@ -319,6 +319,15 @@ func (p *Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phas
}

taskInfo := &core.TaskInfo{Logs: logLinks, CustomInfo: resource.CustomInfo}

if resource.ConnectorEndpoint != "" {
taskInfo.LogContext = &flyteIdl.LogContext{
Connector: &flyteIdl.ConnectorLogContext{
Endpoint: resource.ConnectorEndpoint,
},
}
}

errorCode := pluginErrors.TaskFailedWithError

switch resource.Phase {
Expand Down
51 changes: 51 additions & 0 deletions flyteplugins/go/tasks/plugins/webapi/connector/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,57 @@ func TestPlugin(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhasePermanentFailure, phase.Phase())
})

// Verifies that when the connector plugin recorded an endpoint on the
// ResourceWrapper, Status surfaces it through TaskInfo.LogContext.Connector
// so downstream consumers (action events → dataproxy log streaming) can
// reach the connector directly. See PR flyteorg/flyte#7317.
t.Run("Status records connector endpoint on LogContext when present", func(t *testing.T) {
taskContext := new(webapiPlugin.StatusContext)
taskContext.On("Resource").Return(ResourceWrapper{
Phase: flyteIdlCore.TaskExecution_RUNNING,
LogLinks: []*flyteIdlCore.TaskLog{{Uri: "http://localhost:3000/log", Name: "Log Link"}},
ConnectorEndpoint: "batch-job-connector.flytesnacks-development.svc.cluster.local:80",
})

mockTaskMetadata := &pluginCoreMocks.TaskExecutionMetadata{}
mockTaskMetadata.On("GetTaskExecutionID").Return(&pluginCoreMocks.TaskExecutionID{})
taskContext.On("TaskExecutionMetadata").Return(mockTaskMetadata)

phase, err := plugin.Status(context.Background(), taskContext)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRunning, phase.Phase())

info := phase.Info()
assert.NotNil(t, info)
assert.NotNil(t, info.LogContext)
assert.NotNil(t, info.LogContext.GetConnector())
assert.Equal(t,
"batch-job-connector.flytesnacks-development.svc.cluster.local:80",
info.LogContext.GetConnector().GetEndpoint())
})

// Pod-backed tasks (no connector endpoint) must not get a synthetic
// connector LogContext — the executor's pod-event flow owns LogContext for
// those.
t.Run("Status leaves LogContext unset when no connector endpoint", func(t *testing.T) {
taskContext := new(webapiPlugin.StatusContext)
taskContext.On("Resource").Return(ResourceWrapper{
Phase: flyteIdlCore.TaskExecution_RUNNING,
LogLinks: []*flyteIdlCore.TaskLog{{Uri: "http://localhost:3000/log", Name: "Log Link"}},
// ConnectorEndpoint intentionally empty.
})

mockTaskMetadata := &pluginCoreMocks.TaskExecutionMetadata{}
mockTaskMetadata.On("GetTaskExecutionID").Return(&pluginCoreMocks.TaskExecutionID{})
taskContext.On("TaskExecutionMetadata").Return(mockTaskMetadata)

phase, err := plugin.Status(context.Background(), taskContext)
assert.NoError(t, err)
info := phase.Info()
assert.NotNil(t, info)
assert.Nil(t, info.LogContext)
})
}

func getMockMetadataServiceClient() *connectorMocks.ConnectorMetadataServiceClient {
Expand Down
Loading
Loading