Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 17 additions & 21 deletions actions/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@ type ActionUpdate struct {
ErrorState *executorv1.ErrorState
}

const labelTerminalStatusRecorded = "flyte.org/terminal-status-recorded"
const (
labelTerminalStatusRecorded = "flyte.org/terminal-status-recorded"
// flyteNamespace is the single Kubernetes namespace used for all Flyte resources.
flyteNamespace = "flyte"
)

// ActionsClient handles all etcd/K8s TaskAction CR operations for the Actions service.
type ActionsClient struct {
k8sClient client.WithWatch
sharedCache ctrlcache.Cache
namespace string
bufferSize int
runClient workflowconnect.InternalRunServiceClient
// recordedFilter deduplicates RecordAction calls across watch reconnects.
Expand All @@ -71,14 +74,13 @@ type ActionsClient struct {
}

// NewActionsClient creates a new Kubernetes-based actions client.
func NewActionsClient(k8sClient client.WithWatch, sharedCache ctrlcache.Cache, namespace string, bufferSize int, numWorkers int, runClient workflowconnect.InternalRunServiceClient, recordFilterSize int, scope promutils.Scope) *ActionsClient {
func NewActionsClient(k8sClient client.WithWatch, sharedCache ctrlcache.Cache, bufferSize int, numWorkers int, runClient workflowconnect.InternalRunServiceClient, recordFilterSize int, scope promutils.Scope) *ActionsClient {
if numWorkers <= 0 {
numWorkers = 1
}
c := &ActionsClient{
k8sClient: k8sClient,
sharedCache: sharedCache,
namespace: namespace,
bufferSize: bufferSize,
numWorkers: numWorkers,
runClient: runClient,
Expand Down Expand Up @@ -109,14 +111,13 @@ func (c *ActionsClient) Enqueue(ctx context.Context, action *actions.Action, run
switch action.GetSpec().(type) {
case *actions.Action_Task:
taskActionName := buildTaskActionName(actionID)
namespace := buildNamespace(actionID.Run)
if err := k8sutil.EnsureNamespaceExists(ctx, c.k8sClient, namespace); err != nil {
return fmt.Errorf("failed to ensure namespace %s: %w", namespace, err)
if err := k8sutil.EnsureNamespaceExists(ctx, c.k8sClient, flyteNamespace); err != nil {
return fmt.Errorf("failed to ensure namespace %s: %w", flyteNamespace, err)
}
taskAction := &executorv1.TaskAction{
ObjectMeta: metav1.ObjectMeta{
Name: taskActionName,
Namespace: namespace,
Namespace: flyteNamespace,
Labels: map[string]string{
"flyte.org/project": actionID.Run.Project,
"flyte.org/domain": actionID.Run.Domain,
Expand All @@ -138,7 +139,7 @@ func (c *ActionsClient) Enqueue(ctx context.Context, action *actions.Action, run
parentName := buildTaskActionName(parentID)

parent := &executorv1.TaskAction{}
if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: parentName, Namespace: namespace}, parent); err != nil {
if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: parentName, Namespace: flyteNamespace}, parent); err != nil {
return fmt.Errorf("failed to get parent TaskAction %s: %w", parentName, err)
}
parentTaskAction = parent
Expand Down Expand Up @@ -214,7 +215,7 @@ func (c *ActionsClient) AbortAction(ctx context.Context, actionID *common.Action
logger.Infof(ctx, "Aborting action %s (reason: %v)", taskActionName, reason)

taskAction := &executorv1.TaskAction{}
if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: taskActionName, Namespace: buildNamespace(actionID.Run)}, taskAction); err != nil {
if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: taskActionName, Namespace: flyteNamespace}, taskAction); err != nil {
return fmt.Errorf("failed to get TaskAction %s: %w", taskActionName, err)
}

Expand All @@ -233,7 +234,7 @@ func (c *ActionsClient) GetState(ctx context.Context, actionID *common.ActionIde
taskAction := &executorv1.TaskAction{}
if err := c.k8sClient.Get(ctx, client.ObjectKey{
Name: taskActionName,
Namespace: buildNamespace(actionID.Run),
Namespace: flyteNamespace,
}, taskAction); err != nil {
return "", fmt.Errorf("failed to get TaskAction %s: %w", taskActionName, err)
}
Expand All @@ -249,7 +250,7 @@ func (c *ActionsClient) PutState(ctx context.Context, actionID *common.ActionIde
taskAction := &executorv1.TaskAction{}
if err := c.k8sClient.Get(ctx, client.ObjectKey{
Name: taskActionName,
Namespace: buildNamespace(actionID.Run),
Namespace: flyteNamespace,
}, taskAction); err != nil {
return fmt.Errorf("failed to get TaskAction %s: %w", taskActionName, err)
}
Expand Down Expand Up @@ -279,7 +280,7 @@ func (c *ActionsClient) PutState(ctx context.Context, actionID *common.ActionIde
func (c *ActionsClient) ListRunActions(ctx context.Context, runID *common.RunIdentifier) ([]*executorv1.TaskAction, error) {
taskActionList := &executorv1.TaskActionList{}
listOpts := []client.ListOption{
client.InNamespace(buildNamespace(runID)),
client.InNamespace(flyteNamespace),
client.MatchingLabels{
"flyte.org/project": runID.Project,
"flyte.org/domain": runID.Domain,
Expand All @@ -303,7 +304,7 @@ func (c *ActionsClient) ListChildActions(ctx context.Context, parentActionID *co
// List all TaskActions in the same run
taskActionList := &executorv1.TaskActionList{}
listOpts := []client.ListOption{
client.InNamespace(buildNamespace(parentActionID.Run)),
client.InNamespace(flyteNamespace),
client.MatchingLabels{
"flyte.org/project": parentActionID.Run.Project,
"flyte.org/domain": parentActionID.Run.Domain,
Expand Down Expand Up @@ -340,7 +341,7 @@ func (c *ActionsClient) GetTaskAction(ctx context.Context, actionID *common.Acti
taskAction := &executorv1.TaskAction{}
if err := c.k8sClient.Get(ctx, client.ObjectKey{
Name: taskActionName,
Namespace: buildNamespace(actionID.Run),
Namespace: flyteNamespace,
}, taskAction); err != nil {
return nil, fmt.Errorf("failed to get TaskAction %s: %w", taskActionName, err)
}
Expand Down Expand Up @@ -393,7 +394,7 @@ func (c *ActionsClient) StartWatching(ctx context.Context) error {
}
c.mu.Unlock()

logger.Infof(ctx, "Starting TaskAction watcher for namespace: %s (workers: %d)", c.namespace, c.numWorkers)
logger.Infof(ctx, "Starting TaskAction watcher for namespace: %s (workers: %d)", flyteNamespace, c.numWorkers)

if c.sharedCache == nil {
return fmt.Errorf("shared cache is required for TaskAction informer")
Expand Down Expand Up @@ -724,11 +725,6 @@ func buildTaskActionName(actionID *common.ActionIdentifier) string {
return fmt.Sprintf("%s-%s", actionID.Run.Name, actionID.Name)
}

// buildNamespace returns the Kubernetes namespace for a run: "<project>-<domain>".
func buildNamespace(runID *common.RunIdentifier) string {
return fmt.Sprintf("%s-%s", runID.Project, runID.Domain)
}

// buildOutputUri computes the action-specific output URI from the TaskAction spec.
// It uses the same path structure as the executor's ComputeActionOutputPath so that
// the SDK can find outputs written by the executor.
Expand Down
18 changes: 2 additions & 16 deletions actions/k8s/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,22 +186,8 @@ func TestBuildTaskActionName(t *testing.T) {
})
}

func TestBuildNamespace(t *testing.T) {
t.Run("combines project and domain", func(t *testing.T) {
runID := &common.RunIdentifier{
Project: "flytesnacks",
Domain: "development",
}
assert.Equal(t, "flytesnacks-development", buildNamespace(runID))
})

t.Run("different project and domain", func(t *testing.T) {
runID := &common.RunIdentifier{
Project: "myproject",
Domain: "production",
}
assert.Equal(t, "myproject-production", buildNamespace(runID))
})
func TestFlyteNamespace(t *testing.T) {
assert.Equal(t, "flyte", flyteNamespace)
}

func TestExtractTaskCacheKey(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions actions/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
actionsClient := actionsk8s.NewActionsClient(
sc.K8sClient,
sc.K8sCache,
sc.Namespace,
cfg.WatchBufferSize,
cfg.WatchWorkers,
runClient,
cfg.RecordFilterSize,
sc.Scope,
)
logger.Infof(ctx, "Actions K8s client initialized for namespace: %s", sc.Namespace)
logger.Infof(ctx, "Actions K8s client initialized")

if err := actionsClient.StartWatching(ctx); err != nil {
return fmt.Errorf("actions: failed to start TaskAction watcher: %w", err)
Expand Down
Loading