Skip to content

Commit 6924d0f

Browse files
committed
Merge branch 'v2' of github.com:flyteorg/flyte into app-events
Signed-off-by: machichima <nary12321@gmail.com>
2 parents 1848163 + 2d5e331 commit 6924d0f

40 files changed

Lines changed: 1206 additions & 645 deletions

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
> [!IMPORTANT]
2+
> ## Flyte 2 Devbox is now available!
3+
>
4+
> Check out the guide [here](https://www.union.ai/docs/v2/flyte/user-guide/run-modes/running-devbox/) to get started.
5+
>
6+
> Looking for Flyte 1? Go to the [master](https://github.com/flyteorg/flyte/tree/master) branch, where Flyte 1 is now maintained.
7+
8+
---
9+
110
# Flyte 2
211

312
**Reliably orchestrate ML pipelines, models, and agents at scale — in pure Python.**

actions/k8s/client.go

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,16 @@ type ActionUpdate struct {
4343
ErrorState *executorv1.ErrorState
4444
}
4545

46-
const labelTerminalStatusRecorded = "flyte.org/terminal-status-recorded"
46+
const (
47+
labelTerminalStatusRecorded = "flyte.org/terminal-status-recorded"
48+
// flyteNamespace is the single Kubernetes namespace used for all Flyte resources.
49+
flyteNamespace = "flyte"
50+
)
4751

4852
// ActionsClient handles all etcd/K8s TaskAction CR operations for the Actions service.
4953
type ActionsClient struct {
5054
k8sClient client.WithWatch
5155
sharedCache ctrlcache.Cache
52-
namespace string
5356
bufferSize int
5457
runClient workflowconnect.InternalRunServiceClient
5558
// recordedFilter deduplicates RecordAction calls across watch reconnects.
@@ -71,14 +74,13 @@ type ActionsClient struct {
7174
}
7275

7376
// NewActionsClient creates a new Kubernetes-based actions client.
74-
func NewActionsClient(k8sClient client.WithWatch, sharedCache ctrlcache.Cache, namespace string, bufferSize int, numWorkers int, runClient workflowconnect.InternalRunServiceClient, recordFilterSize int, scope promutils.Scope) *ActionsClient {
77+
func NewActionsClient(k8sClient client.WithWatch, sharedCache ctrlcache.Cache, bufferSize int, numWorkers int, runClient workflowconnect.InternalRunServiceClient, recordFilterSize int, scope promutils.Scope) *ActionsClient {
7578
if numWorkers <= 0 {
7679
numWorkers = 1
7780
}
7881
c := &ActionsClient{
7982
k8sClient: k8sClient,
8083
sharedCache: sharedCache,
81-
namespace: namespace,
8284
bufferSize: bufferSize,
8385
numWorkers: numWorkers,
8486
runClient: runClient,
@@ -109,14 +111,13 @@ func (c *ActionsClient) Enqueue(ctx context.Context, action *actions.Action, run
109111
switch action.GetSpec().(type) {
110112
case *actions.Action_Task:
111113
taskActionName := buildTaskActionName(actionID)
112-
namespace := buildNamespace(actionID.Run)
113-
if err := k8sutil.EnsureNamespaceExists(ctx, c.k8sClient, namespace); err != nil {
114-
return fmt.Errorf("failed to ensure namespace %s: %w", namespace, err)
114+
if err := k8sutil.EnsureNamespaceExists(ctx, c.k8sClient, flyteNamespace); err != nil {
115+
return fmt.Errorf("failed to ensure namespace %s: %w", flyteNamespace, err)
115116
}
116117
taskAction := &executorv1.TaskAction{
117118
ObjectMeta: metav1.ObjectMeta{
118119
Name: taskActionName,
119-
Namespace: namespace,
120+
Namespace: flyteNamespace,
120121
Labels: map[string]string{
121122
"flyte.org/project": actionID.Run.Project,
122123
"flyte.org/domain": actionID.Run.Domain,
@@ -138,7 +139,7 @@ func (c *ActionsClient) Enqueue(ctx context.Context, action *actions.Action, run
138139
parentName := buildTaskActionName(parentID)
139140

140141
parent := &executorv1.TaskAction{}
141-
if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: parentName, Namespace: namespace}, parent); err != nil {
142+
if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: parentName, Namespace: flyteNamespace}, parent); err != nil {
142143
return fmt.Errorf("failed to get parent TaskAction %s: %w", parentName, err)
143144
}
144145
parentTaskAction = parent
@@ -214,7 +215,7 @@ func (c *ActionsClient) AbortAction(ctx context.Context, actionID *common.Action
214215
logger.Infof(ctx, "Aborting action %s (reason: %v)", taskActionName, reason)
215216

216217
taskAction := &executorv1.TaskAction{}
217-
if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: taskActionName, Namespace: buildNamespace(actionID.Run)}, taskAction); err != nil {
218+
if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: taskActionName, Namespace: flyteNamespace}, taskAction); err != nil {
218219
return fmt.Errorf("failed to get TaskAction %s: %w", taskActionName, err)
219220
}
220221

@@ -233,7 +234,7 @@ func (c *ActionsClient) GetState(ctx context.Context, actionID *common.ActionIde
233234
taskAction := &executorv1.TaskAction{}
234235
if err := c.k8sClient.Get(ctx, client.ObjectKey{
235236
Name: taskActionName,
236-
Namespace: buildNamespace(actionID.Run),
237+
Namespace: flyteNamespace,
237238
}, taskAction); err != nil {
238239
return "", fmt.Errorf("failed to get TaskAction %s: %w", taskActionName, err)
239240
}
@@ -249,7 +250,7 @@ func (c *ActionsClient) PutState(ctx context.Context, actionID *common.ActionIde
249250
taskAction := &executorv1.TaskAction{}
250251
if err := c.k8sClient.Get(ctx, client.ObjectKey{
251252
Name: taskActionName,
252-
Namespace: buildNamespace(actionID.Run),
253+
Namespace: flyteNamespace,
253254
}, taskAction); err != nil {
254255
return fmt.Errorf("failed to get TaskAction %s: %w", taskActionName, err)
255256
}
@@ -279,7 +280,7 @@ func (c *ActionsClient) PutState(ctx context.Context, actionID *common.ActionIde
279280
func (c *ActionsClient) ListRunActions(ctx context.Context, runID *common.RunIdentifier) ([]*executorv1.TaskAction, error) {
280281
taskActionList := &executorv1.TaskActionList{}
281282
listOpts := []client.ListOption{
282-
client.InNamespace(buildNamespace(runID)),
283+
client.InNamespace(flyteNamespace),
283284
client.MatchingLabels{
284285
"flyte.org/project": runID.Project,
285286
"flyte.org/domain": runID.Domain,
@@ -303,7 +304,7 @@ func (c *ActionsClient) ListChildActions(ctx context.Context, parentActionID *co
303304
// List all TaskActions in the same run
304305
taskActionList := &executorv1.TaskActionList{}
305306
listOpts := []client.ListOption{
306-
client.InNamespace(buildNamespace(parentActionID.Run)),
307+
client.InNamespace(flyteNamespace),
307308
client.MatchingLabels{
308309
"flyte.org/project": parentActionID.Run.Project,
309310
"flyte.org/domain": parentActionID.Run.Domain,
@@ -340,7 +341,7 @@ func (c *ActionsClient) GetTaskAction(ctx context.Context, actionID *common.Acti
340341
taskAction := &executorv1.TaskAction{}
341342
if err := c.k8sClient.Get(ctx, client.ObjectKey{
342343
Name: taskActionName,
343-
Namespace: buildNamespace(actionID.Run),
344+
Namespace: flyteNamespace,
344345
}, taskAction); err != nil {
345346
return nil, fmt.Errorf("failed to get TaskAction %s: %w", taskActionName, err)
346347
}
@@ -393,7 +394,7 @@ func (c *ActionsClient) StartWatching(ctx context.Context) error {
393394
}
394395
c.mu.Unlock()
395396

396-
logger.Infof(ctx, "Starting TaskAction watcher for namespace: %s (workers: %d)", c.namespace, c.numWorkers)
397+
logger.Infof(ctx, "Starting TaskAction watcher for namespace: %s (workers: %d)", flyteNamespace, c.numWorkers)
397398

398399
if c.sharedCache == nil {
399400
return fmt.Errorf("shared cache is required for TaskAction informer")
@@ -430,7 +431,7 @@ func (c *ActionsClient) setupInformer(ctx context.Context) error {
430431
obj = tombstone.Obj
431432
}
432433
taskAction, ok := obj.(*executorv1.TaskAction)
433-
if !ok {
434+
if !ok || c.shouldSkipTaskAction(taskAction) {
434435
return
435436
}
436437
c.dispatchEvent(taskAction, watch.Deleted)
@@ -508,7 +509,10 @@ func buildActionUpdate(ctx context.Context, taskAction *executorv1.TaskAction, e
508509
}
509510

510511
phase := GetPhaseFromConditions(taskAction)
511-
if eventType == watch.Deleted {
512+
if eventType == watch.Deleted && !isTerminalPhase(phase) {
513+
// Only force ABORTED if the action wasn't already in a terminal phase.
514+
// Otherwise a missed-delete tombstone or post-terminal CR cleanup would
515+
// overwrite a recorded Succeeded/Failed status with Aborted.
512516
phase = common.ActionPhase_ACTION_PHASE_ABORTED
513517
}
514518

@@ -724,11 +728,6 @@ func buildTaskActionName(actionID *common.ActionIdentifier) string {
724728
return fmt.Sprintf("%s-%s", actionID.Run.Name, actionID.Name)
725729
}
726730

727-
// buildNamespace returns the Kubernetes namespace for a run: "<project>-<domain>".
728-
func buildNamespace(runID *common.RunIdentifier) string {
729-
return fmt.Sprintf("%s-%s", runID.Project, runID.Domain)
730-
}
731-
732731
// buildOutputUri computes the action-specific output URI from the TaskAction spec.
733732
// It uses the same path structure as the executor's ComputeActionOutputPath so that
734733
// the SDK can find outputs written by the executor.

actions/k8s/client_test.go

Lines changed: 61 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -186,22 +186,8 @@ func TestBuildTaskActionName(t *testing.T) {
186186
})
187187
}
188188

189-
func TestBuildNamespace(t *testing.T) {
190-
t.Run("combines project and domain", func(t *testing.T) {
191-
runID := &common.RunIdentifier{
192-
Project: "flytesnacks",
193-
Domain: "development",
194-
}
195-
assert.Equal(t, "flytesnacks-development", buildNamespace(runID))
196-
})
197-
198-
t.Run("different project and domain", func(t *testing.T) {
199-
runID := &common.RunIdentifier{
200-
Project: "myproject",
201-
Domain: "production",
202-
}
203-
assert.Equal(t, "myproject-production", buildNamespace(runID))
204-
})
189+
func TestFlyteNamespace(t *testing.T) {
190+
assert.Equal(t, "flyte", flyteNamespace)
205191
}
206192

207193
func TestExtractTaskCacheKey(t *testing.T) {
@@ -755,3 +741,62 @@ func TestBuildActionUpdate_NilErrorStateWhenAbsent(t *testing.T) {
755741

756742
assert.Nil(t, upd.ErrorState)
757743
}
744+
745+
// A delete event (real or DeletedFinalStateUnknown tombstone delivered after
746+
// informer resync) on a TaskAction that already reached a terminal phase must
747+
// NOT overwrite the recorded phase with ABORTED.
748+
func TestBuildActionUpdate_DeleteAfterTerminalPreservesPhase(t *testing.T) {
749+
cases := []struct {
750+
name string
751+
condition string
752+
want common.ActionPhase
753+
}{
754+
{"succeeded", string(executorv1.ConditionTypeSucceeded), common.ActionPhase_ACTION_PHASE_SUCCEEDED},
755+
{"failed", string(executorv1.ConditionTypeFailed), common.ActionPhase_ACTION_PHASE_FAILED},
756+
}
757+
for _, tc := range cases {
758+
t.Run(tc.name, func(t *testing.T) {
759+
ta := &executorv1.TaskAction{
760+
Spec: executorv1.TaskActionSpec{
761+
Project: "p", Domain: "d", RunName: "r", ActionName: "a",
762+
},
763+
Status: executorv1.TaskActionStatus{
764+
Conditions: []metav1.Condition{
765+
{Type: tc.condition, Status: metav1.ConditionTrue},
766+
},
767+
},
768+
}
769+
770+
upd := buildActionUpdate(context.Background(), ta, watch.Deleted)
771+
772+
require.NotNil(t, upd)
773+
assert.Equal(t, tc.want, upd.Phase, "delete event must not overwrite a terminal phase with ABORTED")
774+
assert.True(t, upd.IsDeleted)
775+
})
776+
}
777+
}
778+
779+
// A delete event on a TaskAction that is still running (no terminal condition)
780+
// should still mark the action as ABORTED.
781+
func TestBuildActionUpdate_DeleteOnNonTerminalForcesAborted(t *testing.T) {
782+
ta := &executorv1.TaskAction{
783+
Spec: executorv1.TaskActionSpec{
784+
Project: "p", Domain: "d", RunName: "r", ActionName: "a",
785+
},
786+
Status: executorv1.TaskActionStatus{
787+
Conditions: []metav1.Condition{
788+
{
789+
Type: string(executorv1.ConditionTypeProgressing),
790+
Status: metav1.ConditionTrue,
791+
Reason: string(executorv1.ConditionReasonExecuting),
792+
},
793+
},
794+
},
795+
}
796+
797+
upd := buildActionUpdate(context.Background(), ta, watch.Deleted)
798+
799+
require.NotNil(t, upd)
800+
assert.Equal(t, common.ActionPhase_ACTION_PHASE_ABORTED, upd.Phase)
801+
assert.True(t, upd.IsDeleted)
802+
}

actions/setup.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,13 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
3232
actionsClient := actionsk8s.NewActionsClient(
3333
sc.K8sClient,
3434
sc.K8sCache,
35-
sc.Namespace,
3635
cfg.WatchBufferSize,
3736
cfg.WatchWorkers,
3837
runClient,
3938
cfg.RecordFilterSize,
4039
sc.Scope,
4140
)
42-
logger.Infof(ctx, "Actions K8s client initialized for namespace: %s", sc.Namespace)
41+
logger.Infof(ctx, "Actions K8s client initialized")
4342

4443
if err := actionsClient.StartWatching(ctx); err != nil {
4544
return fmt.Errorf("actions: failed to start TaskAction watcher: %w", err)

0 commit comments

Comments
 (0)