diff --git a/.mockery.yaml b/.mockery.yaml index 96d14d8de22..c84ae1de929 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -103,6 +103,10 @@ packages: dir: actions/service/mocks interfaces: ActionsClientInterface: + github.com/flyteorg/flyte/v2/app/internal/repository/interfaces: + config: + all: true + dir: app/internal/repository/mocks github.com/eko/gocache/lib/v4/cache: config: all: false diff --git a/app/config/config.go b/app/config/config.go index a330e5723e3..9bef6a444f1 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -6,7 +6,7 @@ import ( "github.com/flyteorg/flyte/v2/flytestdlib/config" ) -// AppConfig holds configuration for the control plane AppService. +// AppConfig holds configuration for the AppService. type AppConfig struct { // InternalAppServiceURL is the base URL of the InternalAppService (data plane). // In unified mode this is overridden by the shared mux BaseURL. @@ -42,7 +42,7 @@ type InternalAppConfig struct { Enabled bool `json:"enabled" pflag:",Enable app deployment controller"` // BaseDomain is the base domain used to generate public URLs for apps. - // Apps are exposed at "{name}-{project}-{domain}.{base_domain}". + // Apps are exposed at "{name}-{project}-{domain}-{namespace}.{base_domain}". BaseDomain string `json:"baseDomain" pflag:",Base domain for app public URLs"` // Scheme is the URL scheme used for public app URLs ("http" or "https"). @@ -65,6 +65,10 @@ type InternalAppConfig struct { // processes need to connect back to the Flyte manager. DefaultEnvVars map[string]string `json:"defaultEnvVars" pflag:"-,Default env vars injected into every app pod"` + // MaxConditions is the maximum number of conditions to retain per app. + // Oldest entries are trimmed when this limit is exceeded. Defaults to 40. + MaxConditions int `json:"maxConditions" pflag:",Maximum number of conditions to retain per app"` + // WatchBufferSize is the buffer size for each subscriber's event channel. // A larger value reduces the chance of dropped events under burst load. WatchBufferSize int `json:"watchBufferSize" pflag:",Buffer size for watch subscriber channels"` @@ -73,6 +77,7 @@ type InternalAppConfig struct { var defaultInternalAppConfig = &InternalAppConfig{ DefaultRequestTimeout: 300 * time.Second, MaxRequestTimeout: 3600 * time.Second, + MaxConditions: 40, WatchBufferSize: 100, } diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index 6dc8f7ffe51..6a0338079c4 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -23,6 +23,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/flyteorg/flyte/v2/app/internal/config" + "github.com/flyteorg/flyte/v2/app/internal/repository/interfaces" "github.com/flyteorg/flyte/v2/flytestdlib/k8s" "github.com/flyteorg/flyte/v2/flytestdlib/logger" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" @@ -99,13 +100,19 @@ type AppK8sClientInterface interface { // Unsubscribe removes a subscription channel previously returned by Subscribe. Unsubscribe(appName string, ch chan *flyteapp.WatchResponse) + + // PublicIngress returns the deterministic public Ingress URL for the given app, + // matching Knative's domain-template so Kourier routes traffic correctly. + // Returns nil if BaseDomain is not configured. + PublicIngress(id *flyteapp.Identifier) *flyteapp.Ingress } // AppK8sClient implements AppK8sClientInterface using controller-runtime. type AppK8sClient struct { - k8sClient client.WithWatch - cache ctrlcache.Cache - cfg *config.InternalAppConfig + k8sClient client.WithWatch + cache ctrlcache.Cache + cfg *config.InternalAppConfig + conditionRepo interfaces.AppConditionsRepo // Watch management mu sync.RWMutex @@ -115,12 +122,15 @@ type AppK8sClient struct { } // NewAppK8sClient creates a new AppK8sClient. -func NewAppK8sClient(k8sClient client.WithWatch, cache ctrlcache.Cache, cfg *config.InternalAppConfig) *AppK8sClient { +// conditionRepo is used to persist condition history on every KService event; +// pass nil to disable condition persistence (e.g. in tests that do not need it). +func NewAppK8sClient(k8sClient client.WithWatch, cache ctrlcache.Cache, cfg *config.InternalAppConfig, conditionRepo interfaces.AppConditionsRepo) *AppK8sClient { return &AppK8sClient{ - k8sClient: k8sClient, - cache: cache, - cfg: cfg, - subscribers: make(map[string]map[chan *flyteapp.WatchResponse]struct{}), + k8sClient: k8sClient, + cache: cache, + cfg: cfg, + conditionRepo: conditionRepo, + subscribers: make(map[string]map[chan *flyteapp.WatchResponse]struct{}), } } @@ -288,14 +298,37 @@ func isManagedKService(ksvc *servingv1.Service) bool { return ksvc.Labels[labelAppManaged] == "true" } -// handleKServiceEvent converts a KService event into a WatchResponse and -// notifies all matching subscribers. +// handleKServiceEvent converts a KService event into a WatchResponse, +// persists the current deployment status as a condition, and notifies subscribers. func (c *AppK8sClient) handleKServiceEvent(ctx context.Context, ksvc *servingv1.Service, eventType k8swatch.EventType) { app, err := c.kserviceToApp(ctx, ksvc) if err != nil { return } + appID := app.GetMetadata().GetId() + + // Persist the current status as a condition on every K8s state change so that + // the full PENDING -> DEPLOYING -> ACTIVE/FAILED transition history is recorded, + // regardless of whether any Watch client is connected. + // Dedup: read the latest stored condition first; only append if status or message changed. + if c.conditionRepo != nil { + if eventType == k8swatch.Deleted { + // Ensure DB is clean even if a race between Delete() RPC and a late + // Modified event caused conditions to be re-written after the RPC cleared them. + if err := c.conditionRepo.DeleteConditions(ctx, appID); err != nil { + logger.Errorf(ctx, "Failed to delete conditions for app %s on watch deleted event: %v", appID.GetName(), err) + } + } else if status := app.GetStatus(); status != nil && len(status.GetConditions()) > 0 { + cond := status.GetConditions()[0] + if c.shouldAppendCondition(ctx, appID, cond) { + if err := c.conditionRepo.AppendCondition(ctx, appID, cond, c.cfg.MaxConditions); err != nil { + logger.Errorf(ctx, "Failed to persist condition for app %s on watch event: %v", appID.GetName(), err) + } + } + } + } + var resp *flyteapp.WatchResponse switch eventType { case k8swatch.Added: @@ -320,8 +353,29 @@ func (c *AppK8sClient) handleKServiceEvent(ctx context.Context, ksvc *servingv1. return } - appName := app.GetMetadata().GetId().GetName() - c.notifySubscribers(ctx, appName, resp) + c.notifySubscribers(ctx, appID.GetName(), resp) +} + +// shouldAppendCondition returns true if cond should be persisted. +// It skips two classes of noise: +// 1. PENDING with no message — the transient first Knative event before the +// Ready condition is populated; it carries no actionable information. +// 2. Exact duplicates (same DeploymentStatus + Message as the last stored entry). +func (c *AppK8sClient) shouldAppendCondition(ctx context.Context, appID *flyteapp.Identifier, cond *flyteapp.Condition) bool { + if cond.GetDeploymentStatus() == flyteapp.Status_DEPLOYMENT_STATUS_PENDING && cond.GetMessage() == "" { + return false + } + existing, err := c.conditionRepo.GetConditions(ctx, appID) + if err != nil { + // On read error, append to avoid missing transitions. + logger.Warnf(ctx, "Failed to read conditions for app %s, will append: %v", appID.GetName(), err) + return true + } + if len(existing) == 0 { + return true + } + last := existing[len(existing)-1] + return last.GetDeploymentStatus() != cond.GetDeploymentStatus() || last.GetMessage() != cond.GetMessage() } // notifySubscribers sends a WatchResponse to all subscribers for the given app name. @@ -448,9 +502,10 @@ func (c *AppK8sClient) List(ctx context.Context, project, domain string, limit u return apps, list.Continue, nil } -// publicIngress returns the deterministic public URL for an app using the same -// logic as the service layer so GetApp/List/Watch are consistent with Create. -func (c *AppK8sClient) publicIngress(id *flyteapp.Identifier) *flyteapp.Ingress { +// PublicIngress returns the deterministic public URL for an app. +// The host follows Knative's domain-template "{kservice-name}-{namespace}.{domain}" +// so Kourier routes traffic correctly without extra ingress rules. +func (c *AppK8sClient) PublicIngress(id *flyteapp.Identifier) *flyteapp.Ingress { if c.cfg.BaseDomain == "" { return nil } @@ -458,8 +513,8 @@ func (c *AppK8sClient) publicIngress(id *flyteapp.Identifier) *flyteapp.Ingress if scheme == "" { scheme = "https" } - host := strings.ToLower(fmt.Sprintf("%s.%s", - KServiceName(id), c.cfg.BaseDomain)) + host := strings.ToLower(fmt.Sprintf("%s-%s.%s", + KServiceName(id), AppNamespace, c.cfg.BaseDomain)) url := scheme + "://" + host if c.cfg.IngressAppsPort != 0 { url += fmt.Sprintf(":%d", c.cfg.IngressAppsPort) @@ -717,18 +772,28 @@ func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Ser } if phase == flyteapp.Status_DEPLOYMENT_STATUS_UNSPECIFIED { + readyCond := ksvc.Status.GetCondition(servingv1.ServiceConditionReady) switch { case ksvc.IsReady(): phase = flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE + message = "Service is ready" case ksvc.IsFailed(): phase = flyteapp.Status_DEPLOYMENT_STATUS_FAILED - if c := ksvc.Status.GetCondition(servingv1.ServiceConditionReady); c != nil { - message = c.Message + if readyCond != nil { + message = readyCond.Message } - case ksvc.Status.LatestCreatedRevisionName != ksvc.Status.LatestReadyRevisionName: + case ksvc.Status.LatestCreatedRevisionName != ksvc.Status.LatestReadyRevisionName && + ksvc.Status.LatestCreatedRevisionName != "": phase = flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING + message = fmt.Sprintf("Deploying revision: [%s]", ksvc.Status.LatestCreatedRevisionName) + if readyCond != nil && readyCond.Reason != "" { + message += fmt.Sprintf("\n%s: %s", readyCond.Reason, readyCond.Message) + } default: phase = flyteapp.Status_DEPLOYMENT_STATUS_PENDING + if readyCond != nil && readyCond.Reason != "" { + message = fmt.Sprintf("%s: %s", readyCond.Reason, readyCond.Message) + } } } @@ -740,7 +805,7 @@ func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Ser parts := strings.SplitN(appIDStr, "/", 3) if len(parts) == 3 { appID := &flyteapp.Identifier{Project: parts[0], Domain: parts[1], Name: parts[2]} - status.Ingress = c.publicIngress(appID) + status.Ingress = c.PublicIngress(appID) } } diff --git a/app/internal/k8s/app_client_test.go b/app/internal/k8s/app_client_test.go index 040afca5141..1281f437090 100644 --- a/app/internal/k8s/app_client_test.go +++ b/app/internal/k8s/app_client_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -19,6 +20,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" "github.com/flyteorg/flyte/v2/app/internal/config" + "github.com/flyteorg/flyte/v2/app/internal/repository/mocks" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" ) @@ -58,7 +60,7 @@ func testClient(t *testing.T, objs ...client.Object) *AppK8sClient { MaxRequestTimeout: time.Hour, WatchBufferSize: 100, } - return NewAppK8sClient(fc, nil, cfg) + return NewAppK8sClient(fc, nil, cfg, nil) } // testApp builds a minimal flyteapp.App for use in tests. @@ -626,6 +628,20 @@ func TestSubscribe_MultipleSubscribers(t *testing.T) { } } +// testClientWithRepo builds an AppK8sClient with a real conditionRepo mock. +func testClientWithRepo(t *testing.T, repo *mocks.AppConditionsRepo, objs ...client.Object) *AppK8sClient { + t.Helper() + s := testScheme(t) + fc := fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).Build() + cfg := &config.InternalAppConfig{ + DefaultRequestTimeout: 5 * time.Minute, + MaxRequestTimeout: time.Hour, + WatchBufferSize: 100, + MaxConditions: 40, + } + return NewAppK8sClient(fc, nil, cfg, repo) +} + // testKsvc builds a minimal KService that kserviceToApp can parse. func testKsvc(name, ns, rv string) *servingv1.Service { return &servingv1.Service{ @@ -638,3 +654,108 @@ func testKsvc(name, ns, rv string) *servingv1.Service { }, } } + +// --- Condition dedup and message format tests --- + +func TestHandleKServiceEvent_DeduplicatesConditions(t *testing.T) { + repo := mocks.NewAppConditionsRepo(t) + c := testClientWithRepo(t, repo) + + ksvc := testKsvc("myapp", appNamespace, "1") + appID := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} + + existingCond := &flyteapp.Condition{ + DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_PENDING, + Message: "", + } + + // First call: no existing conditions → should append. + repo.On("GetConditions", mock.Anything, appID).Return([]*flyteapp.Condition{}, nil).Once() + repo.On("AppendCondition", mock.Anything, appID, mock.Anything, mock.Anything).Return(nil).Once() + + c.handleKServiceEvent(context.Background(), ksvc, k8swatch.Modified) + + // Second call: same status+message already stored → should NOT append. + repo.On("GetConditions", mock.Anything, appID).Return([]*flyteapp.Condition{existingCond}, nil).Once() + + c.handleKServiceEvent(context.Background(), ksvc, k8swatch.Modified) + + repo.AssertExpectations(t) +} + +func TestHandleKServiceEvent_AppendsOnStatusChange(t *testing.T) { + repo := mocks.NewAppConditionsRepo(t) + c := testClientWithRepo(t, repo) + + ksvc := testKsvc("myapp", appNamespace, "1") + appID := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"} + + // Existing condition is DEPLOYING, new event is PENDING → different → should append. + existingCond := &flyteapp.Condition{ + DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING, + Message: "Deploying revision: [myapp-00001]", + } + repo.On("GetConditions", mock.Anything, appID).Return([]*flyteapp.Condition{existingCond}, nil).Once() + repo.On("AppendCondition", mock.Anything, appID, mock.Anything, mock.Anything).Return(nil).Once() + + c.handleKServiceEvent(context.Background(), ksvc, k8swatch.Modified) + + repo.AssertExpectations(t) +} + +func TestKserviceToStatus_Messages(t *testing.T) { + tests := []struct { + name string + ksvc func() *servingv1.Service + wantPhase flyteapp.Status_DeploymentStatus + wantMessage string + }{ + { + name: "active", + ksvc: func() *servingv1.Service { + ksvc := testKsvc("myapp", appNamespace, "1") + ksvc.Status.MarkRouteReady() + ksvc.Status.MarkConfigurationReady() + return ksvc + }, + wantPhase: flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE, + wantMessage: "Service is ready", + }, + { + name: "deploying with knative reason", + ksvc: func() *servingv1.Service { + ksvc := testKsvc("myapp", appNamespace, "1") + ksvc.Status.LatestCreatedRevisionName = "myapp-00002" + ksvc.Status.LatestReadyRevisionName = "myapp-00001" + ksvc.Status.MarkRouteNotYetReady() + return ksvc + }, + wantPhase: flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING, + wantMessage: "Deploying revision: [myapp-00002]", + }, + { + name: "stopped", + ksvc: func() *servingv1.Service { + ksvc := testKsvc("myapp", appNamespace, "1") + if ksvc.Spec.Template.Annotations == nil { + ksvc.Spec.Template.Annotations = map[string]string{} + } + ksvc.Spec.Template.Annotations["autoscaling.knative.dev/max-scale"] = "0" + return ksvc + }, + wantPhase: flyteapp.Status_DEPLOYMENT_STATUS_STOPPED, + wantMessage: "App scaled to zero", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := testClient(t) + status := c.kserviceToStatus(context.Background(), tt.ksvc()) + require.NotNil(t, status) + require.NotEmpty(t, status.Conditions) + assert.Equal(t, tt.wantPhase, status.Conditions[0].DeploymentStatus) + assert.Equal(t, tt.wantMessage, status.Conditions[0].Message) + }) + } +} diff --git a/app/internal/migrations/migrations.go b/app/internal/migrations/migrations.go new file mode 100644 index 00000000000..698d39aab8a --- /dev/null +++ b/app/internal/migrations/migrations.go @@ -0,0 +1,18 @@ +package migrations + +import ( + "context" + "embed" + + "github.com/jmoiron/sqlx" + + "github.com/flyteorg/flyte/v2/flytestdlib/database" +) + +//go:embed sql/*.sql +var migrationFS embed.FS + +// RunMigrations applies all pending app migrations. +func RunMigrations(ctx context.Context, db *sqlx.DB) error { + return database.Migrate(ctx, db, "app", migrationFS) +} diff --git a/app/internal/migrations/sql/20260423000000_app_conditions.sql b/app/internal/migrations/sql/20260423000000_app_conditions.sql new file mode 100644 index 00000000000..ddaeac237ab --- /dev/null +++ b/app/internal/migrations/sql/20260423000000_app_conditions.sql @@ -0,0 +1,13 @@ +-- App conditions history table. +-- Stores the append-only condition log for each app. +-- KService CRD remains the source of truth for live status (spec, replicas, ingress). +-- This table only persists the conditions array for historical display. + +CREATE TABLE IF NOT EXISTS app_conditions ( + project TEXT NOT NULL, + domain TEXT NOT NULL, + name TEXT NOT NULL, + conditions BYTEA NOT NULL DEFAULT '', + updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (project, domain, name) +); diff --git a/app/internal/migrations/sql/20260423000000_app_conditions_down.sql b/app/internal/migrations/sql/20260423000000_app_conditions_down.sql new file mode 100644 index 00000000000..816bf6fc8f9 --- /dev/null +++ b/app/internal/migrations/sql/20260423000000_app_conditions_down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS app_conditions; diff --git a/app/internal/repository/impl/app_conditions.go b/app/internal/repository/impl/app_conditions.go new file mode 100644 index 00000000000..d2f780ccaab --- /dev/null +++ b/app/internal/repository/impl/app_conditions.go @@ -0,0 +1,136 @@ +package impl + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + "github.com/jmoiron/sqlx" + "google.golang.org/protobuf/proto" + + "github.com/flyteorg/flyte/v2/app/internal/repository/interfaces" + "github.com/flyteorg/flyte/v2/flytestdlib/logger" + flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" +) + +var _ interfaces.AppConditionsRepo = (*AppConditionsRepo)(nil) + +type AppConditionsRepo struct { + db *sqlx.DB +} + +func NewAppConditionsRepo(db *sqlx.DB) *AppConditionsRepo { + return &AppConditionsRepo{db: db} +} + +// AppendCondition appends cond to the stored conditions for the given app within a +// single transaction. If the total exceeds maxConditions, the oldest entries are +// trimmed. Creates the row if it does not exist. +func (r *AppConditionsRepo) AppendCondition(ctx context.Context, appID *flyteapp.Identifier, cond *flyteapp.Condition, maxConditions int) error { + tx, err := r.db.BeginTxx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction for app conditions: %w", err) + } + defer func() { + if err != nil { + _ = tx.Rollback() + } + }() + + // Read existing conditions inside the transaction. + var raw []byte + err = tx.QueryRowContext(ctx, + "SELECT conditions FROM app_conditions WHERE project = $1 AND domain = $2 AND name = $3 FOR UPDATE", + appID.GetProject(), appID.GetDomain(), appID.GetName(), + ).Scan(&raw) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("failed to read app conditions for %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) + } + + conditions, err := unmarshalConditions(raw) + if err != nil { + return fmt.Errorf("failed to unmarshal app conditions for %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) + } + + // Append and trim. + conditions = append(conditions, cond) + if maxConditions > 0 && len(conditions) > maxConditions { + logger.Debugf(ctx, "Trimming conditions for app %s/%s/%s from %d to %d", + appID.GetProject(), appID.GetDomain(), appID.GetName(), len(conditions), maxConditions) + conditions = conditions[len(conditions)-maxConditions:] + } + + raw, err = marshalConditions(conditions) + if err != nil { + return fmt.Errorf("failed to marshal app conditions for %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) + } + + _, err = tx.ExecContext(ctx, + `INSERT INTO app_conditions (project, domain, name, conditions, updated_at) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (project, domain, name) DO UPDATE SET + conditions = EXCLUDED.conditions, + updated_at = EXCLUDED.updated_at`, + appID.GetProject(), appID.GetDomain(), appID.GetName(), raw, time.Now().UTC(), + ) + if err != nil { + return fmt.Errorf("failed to upsert app conditions for %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) + } + + return tx.Commit() +} + +// GetConditions returns the stored conditions for the given app. +// Returns nil if no conditions have been recorded yet. +func (r *AppConditionsRepo) GetConditions(ctx context.Context, appID *flyteapp.Identifier) ([]*flyteapp.Condition, error) { + var raw []byte + err := r.db.QueryRowContext(ctx, + "SELECT conditions FROM app_conditions WHERE project = $1 AND domain = $2 AND name = $3", + appID.GetProject(), appID.GetDomain(), appID.GetName(), + ).Scan(&raw) + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("failed to get app conditions for %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) + } + + conditions, err := unmarshalConditions(raw) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal app conditions for %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) + } + return conditions, nil +} + +// DeleteConditions removes the conditions row for the given app. +// No-ops if the row does not exist. +func (r *AppConditionsRepo) DeleteConditions(ctx context.Context, appID *flyteapp.Identifier) error { + _, err := r.db.ExecContext(ctx, + "DELETE FROM app_conditions WHERE project = $1 AND domain = $2 AND name = $3", + appID.GetProject(), appID.GetDomain(), appID.GetName(), + ) + if err != nil { + return fmt.Errorf("failed to delete app conditions for %s/%s/%s: %w", appID.GetProject(), appID.GetDomain(), appID.GetName(), err) + } + return nil +} + +// marshalConditions serializes a slice of Condition using Status as a proto wrapper. +func marshalConditions(conditions []*flyteapp.Condition) ([]byte, error) { + return proto.Marshal(&flyteapp.Status{Conditions: conditions}) +} + +// unmarshalConditions deserializes conditions from bytes produced by marshalConditions. +// Returns nil for empty/nil input. +func unmarshalConditions(raw []byte) ([]*flyteapp.Condition, error) { + if len(raw) == 0 { + return nil, nil + } + status := &flyteapp.Status{} + if err := proto.Unmarshal(raw, status); err != nil { + return nil, err + } + return status.GetConditions(), nil +} diff --git a/app/internal/repository/interfaces/app_conditions.go b/app/internal/repository/interfaces/app_conditions.go new file mode 100644 index 00000000000..3be525d9cdd --- /dev/null +++ b/app/internal/repository/interfaces/app_conditions.go @@ -0,0 +1,24 @@ +package interfaces + +import ( + "context" + + flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" +) + +// AppConditionsRepo persists and retrieves the condition history for apps. +// Each app has at most one row; conditions are stored as a serialized proto array. +type AppConditionsRepo interface { + // AppendCondition appends cond to the stored conditions for the given app, + // trimming the oldest entries if the total exceeds maxConditions. + // Creates the row if it does not exist. + AppendCondition(ctx context.Context, appID *flyteapp.Identifier, cond *flyteapp.Condition, maxConditions int) error + + // GetConditions returns the stored conditions for the given app. + // Returns nil (not an error) if no conditions have been recorded yet. + GetConditions(ctx context.Context, appID *flyteapp.Identifier) ([]*flyteapp.Condition, error) + + // DeleteConditions removes the conditions row for the given app. + // No-ops if the row does not exist. + DeleteConditions(ctx context.Context, appID *flyteapp.Identifier) error +} diff --git a/app/internal/repository/mocks/mocks.go b/app/internal/repository/mocks/mocks.go new file mode 100644 index 00000000000..94a71849644 --- /dev/null +++ b/app/internal/repository/mocks/mocks.go @@ -0,0 +1,233 @@ +// Code generated by mockery; DO NOT EDIT. +// github.com/vektra/mockery +// template: testify + +package mocks + +import ( + "context" + + "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" + mock "github.com/stretchr/testify/mock" +) + +// NewAppConditionsRepo creates a new instance of AppConditionsRepo. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewAppConditionsRepo(t interface { + mock.TestingT + Cleanup(func()) +}) *AppConditionsRepo { + mock := &AppConditionsRepo{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} + +// AppConditionsRepo is an autogenerated mock type for the AppConditionsRepo type +type AppConditionsRepo struct { + mock.Mock +} + +type AppConditionsRepo_Expecter struct { + mock *mock.Mock +} + +func (_m *AppConditionsRepo) EXPECT() *AppConditionsRepo_Expecter { + return &AppConditionsRepo_Expecter{mock: &_m.Mock} +} + +// AppendCondition provides a mock function for the type AppConditionsRepo +func (_mock *AppConditionsRepo) AppendCondition(ctx context.Context, appID *app.Identifier, cond *app.Condition, maxConditions int) error { + ret := _mock.Called(ctx, appID, cond, maxConditions) + + if len(ret) == 0 { + panic("no return value specified for AppendCondition") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, *app.Identifier, *app.Condition, int) error); ok { + r0 = returnFunc(ctx, appID, cond, maxConditions) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// AppConditionsRepo_AppendCondition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AppendCondition' +type AppConditionsRepo_AppendCondition_Call struct { + *mock.Call +} + +// AppendCondition is a helper method to define mock.On call +// - ctx context.Context +// - appID *app.Identifier +// - cond *app.Condition +// - maxConditions int +func (_e *AppConditionsRepo_Expecter) AppendCondition(ctx interface{}, appID interface{}, cond interface{}, maxConditions interface{}) *AppConditionsRepo_AppendCondition_Call { + return &AppConditionsRepo_AppendCondition_Call{Call: _e.mock.On("AppendCondition", ctx, appID, cond, maxConditions)} +} + +func (_c *AppConditionsRepo_AppendCondition_Call) Run(run func(ctx context.Context, appID *app.Identifier, cond *app.Condition, maxConditions int)) *AppConditionsRepo_AppendCondition_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 *app.Identifier + if args[1] != nil { + arg1 = args[1].(*app.Identifier) + } + var arg2 *app.Condition + if args[2] != nil { + arg2 = args[2].(*app.Condition) + } + var arg3 int + if args[3] != nil { + arg3 = args[3].(int) + } + run( + arg0, + arg1, + arg2, + arg3, + ) + }) + return _c +} + +func (_c *AppConditionsRepo_AppendCondition_Call) Return(err error) *AppConditionsRepo_AppendCondition_Call { + _c.Call.Return(err) + return _c +} + +func (_c *AppConditionsRepo_AppendCondition_Call) RunAndReturn(run func(ctx context.Context, appID *app.Identifier, cond *app.Condition, maxConditions int) error) *AppConditionsRepo_AppendCondition_Call { + _c.Call.Return(run) + return _c +} + +// DeleteConditions provides a mock function for the type AppConditionsRepo +func (_mock *AppConditionsRepo) DeleteConditions(ctx context.Context, appID *app.Identifier) error { + ret := _mock.Called(ctx, appID) + + if len(ret) == 0 { + panic("no return value specified for DeleteConditions") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, *app.Identifier) error); ok { + r0 = returnFunc(ctx, appID) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// AppConditionsRepo_DeleteConditions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteConditions' +type AppConditionsRepo_DeleteConditions_Call struct { + *mock.Call +} + +// DeleteConditions is a helper method to define mock.On call +// - ctx context.Context +// - appID *app.Identifier +func (_e *AppConditionsRepo_Expecter) DeleteConditions(ctx interface{}, appID interface{}) *AppConditionsRepo_DeleteConditions_Call { + return &AppConditionsRepo_DeleteConditions_Call{Call: _e.mock.On("DeleteConditions", ctx, appID)} +} + +func (_c *AppConditionsRepo_DeleteConditions_Call) Run(run func(ctx context.Context, appID *app.Identifier)) *AppConditionsRepo_DeleteConditions_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 *app.Identifier + if args[1] != nil { + arg1 = args[1].(*app.Identifier) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *AppConditionsRepo_DeleteConditions_Call) Return(err error) *AppConditionsRepo_DeleteConditions_Call { + _c.Call.Return(err) + return _c +} + +func (_c *AppConditionsRepo_DeleteConditions_Call) RunAndReturn(run func(ctx context.Context, appID *app.Identifier) error) *AppConditionsRepo_DeleteConditions_Call { + _c.Call.Return(run) + return _c +} + +// GetConditions provides a mock function for the type AppConditionsRepo +func (_mock *AppConditionsRepo) GetConditions(ctx context.Context, appID *app.Identifier) ([]*app.Condition, error) { + ret := _mock.Called(ctx, appID) + + if len(ret) == 0 { + panic("no return value specified for GetConditions") + } + + var r0 []*app.Condition + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, *app.Identifier) ([]*app.Condition, error)); ok { + return returnFunc(ctx, appID) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, *app.Identifier) []*app.Condition); ok { + r0 = returnFunc(ctx, appID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*app.Condition) + } + } + if returnFunc, ok := ret.Get(1).(func(context.Context, *app.Identifier) error); ok { + r1 = returnFunc(ctx, appID) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// AppConditionsRepo_GetConditions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetConditions' +type AppConditionsRepo_GetConditions_Call struct { + *mock.Call +} + +// GetConditions is a helper method to define mock.On call +// - ctx context.Context +// - appID *app.Identifier +func (_e *AppConditionsRepo_Expecter) GetConditions(ctx interface{}, appID interface{}) *AppConditionsRepo_GetConditions_Call { + return &AppConditionsRepo_GetConditions_Call{Call: _e.mock.On("GetConditions", ctx, appID)} +} + +func (_c *AppConditionsRepo_GetConditions_Call) Run(run func(ctx context.Context, appID *app.Identifier)) *AppConditionsRepo_GetConditions_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 *app.Identifier + if args[1] != nil { + arg1 = args[1].(*app.Identifier) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *AppConditionsRepo_GetConditions_Call) Return(conditions []*app.Condition, err error) *AppConditionsRepo_GetConditions_Call { + _c.Call.Return(conditions, err) + return _c +} + +func (_c *AppConditionsRepo_GetConditions_Call) RunAndReturn(run func(ctx context.Context, appID *app.Identifier) ([]*app.Condition, error)) *AppConditionsRepo_GetConditions_Call { + _c.Call.Return(run) + return _c +} diff --git a/app/internal/repository/models/app_conditions.go b/app/internal/repository/models/app_conditions.go new file mode 100644 index 00000000000..1234fb8938c --- /dev/null +++ b/app/internal/repository/models/app_conditions.go @@ -0,0 +1,12 @@ +package models + +import "time" + +// AppConditions stores the serialized condition history for a single app. +type AppConditions struct { + Project string `db:"project"` + Domain string `db:"domain"` + Name string `db:"name"` + Conditions []byte `db:"conditions"` // proto-serialized flyteidl2.app.ConditionList + UpdatedAt time.Time `db:"updated_at"` +} diff --git a/app/internal/service/internal_app_service.go b/app/internal/service/internal_app_service.go index feb1b3695eb..85d9b6aa8e2 100644 --- a/app/internal/service/internal_app_service.go +++ b/app/internal/service/internal_app_service.go @@ -3,31 +3,30 @@ package service import ( "context" "fmt" - "strings" "connectrpc.com/connect" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" k8serrors "k8s.io/apimachinery/pkg/api/errors" - appconfig "github.com/flyteorg/flyte/v2/app/internal/config" appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s" + "github.com/flyteorg/flyte/v2/app/internal/repository/interfaces" "github.com/flyteorg/flyte/v2/flytestdlib/logger" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" ) // InternalAppService is the data plane implementation of the AppService. -// It has direct K8s access via AppK8sClientInterface and no database dependency — -// all app state lives in KService CRDs. +// It has direct K8s access via AppK8sClientInterface. +// conditionRepo is used to read condition history; conditions are written by +// AppK8sClient.handleKServiceEvent on every KService informer event. type InternalAppService struct { appconnect.UnimplementedAppServiceHandler - k8s appk8s.AppK8sClientInterface - cfg *appconfig.InternalAppConfig + k8s appk8s.AppK8sClientInterface + conditionRepo interfaces.AppConditionsRepo } // NewInternalAppService creates a new InternalAppService. -func NewInternalAppService(k8s appk8s.AppK8sClientInterface, cfg *appconfig.InternalAppConfig) *InternalAppService { - return &InternalAppService{k8s: k8s, cfg: cfg} +func NewInternalAppService(k8s appk8s.AppK8sClientInterface, conditionRepo interfaces.AppConditionsRepo) *InternalAppService { + return &InternalAppService{k8s: k8s, conditionRepo: conditionRepo} } // Ensure InternalAppService satisfies the generated handler interface. @@ -55,38 +54,12 @@ func (s *InternalAppService) Create( } app.Status = &flyteapp.Status{ - Conditions: []*flyteapp.Condition{ - { - DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_PENDING, - LastTransitionTime: timestamppb.Now(), - }, - }, - Ingress: publicIngress(app.GetMetadata().GetId(), s.cfg), + Ingress: s.k8s.PublicIngress(app.GetMetadata().GetId()), } return connect.NewResponse(&flyteapp.CreateResponse{App: app}), nil } -// publicIngress builds the deterministic public URL for an app using -// BaseDomain — which must match Knative's domain-template so Kourier -// serves the URL directly. Returns nil if BaseDomain is unset. -func publicIngress(id *flyteapp.Identifier, cfg *appconfig.InternalAppConfig) *flyteapp.Ingress { - if cfg.BaseDomain == "" { - return nil - } - scheme := cfg.Scheme - if scheme == "" { - scheme = "https" - } - host := strings.ToLower(fmt.Sprintf("%s.%s", - appk8s.KServiceName(id), cfg.BaseDomain)) - url := scheme + "://" + host - if cfg.IngressAppsPort != 0 { - url += fmt.Sprintf(":%d", cfg.IngressAppsPort) - } - return &flyteapp.Ingress{PublicUrl: url} -} - // Get retrieves an app and its live status from the KService CRD. // Note: App.Spec is not populated — status and ingress URL are the authoritative fields. func (s *InternalAppService) Get( @@ -98,7 +71,18 @@ func (s *InternalAppService) Get( return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("app_id is required")) } - app, err := s.k8s.GetApp(ctx, appID.AppId) + app, err := s.getWithConditions(ctx, appID.AppId) + if err != nil { + return nil, err + } + + return connect.NewResponse(&flyteapp.GetResponse{App: app}), nil +} + +// getWithConditions fetches an app from K8s and merges its stored DB condition +// history into the status. Both Get and Update use this so their responses are consistent. +func (s *InternalAppService) getWithConditions(ctx context.Context, appID *flyteapp.Identifier) (*flyteapp.App, error) { + app, err := s.k8s.GetApp(ctx, appID) if err != nil { if k8serrors.IsNotFound(err) { return nil, connect.NewError(connect.CodeNotFound, err) @@ -106,7 +90,14 @@ func (s *InternalAppService) Get( return nil, connect.NewError(connect.CodeInternal, err) } - return connect.NewResponse(&flyteapp.GetResponse{App: app}), nil + conditions, err := s.conditionRepo.GetConditions(ctx, appID) + if err != nil { + logger.Errorf(ctx, "Failed to get conditions for app %s: %v", appID.GetName(), err) + } else { + app.Status.Conditions = conditions + } + + return app, nil } // Update modifies an app's spec or desired state. @@ -138,9 +129,11 @@ func (s *InternalAppService) Update( } } - freshApp, err := s.k8s.GetApp(ctx, appID) + // Return K8s live status merged with the full DB condition history, + // consistent with Get(). Condition recording is driven by handleKServiceEvent. + freshApp, err := s.getWithConditions(ctx, appID) if err != nil { - return nil, connect.NewError(connect.CodeInternal, err) + return nil, err } app.Status = freshApp.Status @@ -162,6 +155,10 @@ func (s *InternalAppService) Delete( return nil, connect.NewError(connect.CodeInternal, err) } + if err := s.conditionRepo.DeleteConditions(ctx, appID); err != nil { + logger.Errorf(ctx, "Failed to delete conditions for app %s: %v", appID.GetName(), err) + } + return connect.NewResponse(&flyteapp.DeleteResponse{}), nil } diff --git a/app/internal/service/internal_app_service_test.go b/app/internal/service/internal_app_service_test.go index 401aa87fe94..d068a87d272 100644 --- a/app/internal/service/internal_app_service_test.go +++ b/app/internal/service/internal_app_service_test.go @@ -5,14 +5,13 @@ import ( "net/http" "net/http/httptest" "testing" - "time" "connectrpc.com/connect" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - appconfig "github.com/flyteorg/flyte/v2/app/internal/config" + "github.com/flyteorg/flyte/v2/app/internal/repository/mocks" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" @@ -81,16 +80,23 @@ func (m *mockAppK8sClient) Unsubscribe(appName string, ch chan *flyteapp.WatchRe m.Called(appName, ch) } +func (m *mockAppK8sClient) PublicIngress(id *flyteapp.Identifier) *flyteapp.Ingress { + args := m.Called(id) + if args.Get(0) == nil { + return nil + } + return args.Get(0).(*flyteapp.Ingress) +} + // --- helpers --- -func testCfg() *appconfig.InternalAppConfig { - return &appconfig.InternalAppConfig{ - Enabled: true, - BaseDomain: "example.com", - Scheme: "https", - DefaultRequestTimeout: 5 * time.Minute, - MaxRequestTimeout: time.Hour, - } +// newTestRepo returns a mock AppConditionsRepo that silently accepts any call. +func newTestRepo(t *testing.T) *mocks.AppConditionsRepo { + repo := mocks.NewAppConditionsRepo(t) + repo.On("AppendCondition", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + repo.On("GetConditions", mock.Anything, mock.Anything).Return(([]*flyteapp.Condition)(nil), nil).Maybe() + repo.On("DeleteConditions", mock.Anything, mock.Anything).Return(nil).Maybe() + return repo } func testAppID() *flyteapp.Identifier { @@ -120,7 +126,7 @@ func testAppWithStatus(phase flyteapp.Status_DeploymentStatus) *flyteapp.App { } func newTestClient(t *testing.T, k8s *mockAppK8sClient) appconnect.AppServiceClient { - svc := NewInternalAppService(k8s, testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t)) path, handler := appconnect.NewAppServiceHandler(svc) mux := http.NewServeMux() mux.Handle("/internal"+path, http.StripPrefix("/internal", handler)) @@ -133,20 +139,23 @@ func newTestClient(t *testing.T, k8s *mockAppK8sClient) appconnect.AppServiceCli func TestCreate_Success(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t)) app := testApp() + ingress := &flyteapp.Ingress{PublicUrl: "https://myapp-3dcbfc92-flyte.example.com"} k8s.On("Deploy", mock.Anything, app).Return(nil) + k8s.On("PublicIngress", app.Metadata.Id).Return(ingress) resp, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{App: app})) require.NoError(t, err) - assert.Equal(t, flyteapp.Status_DEPLOYMENT_STATUS_PENDING, resp.Msg.App.Status.Conditions[0].DeploymentStatus) - assert.Equal(t, "https://myapp-proj-dev.example.com", resp.Msg.App.Status.Ingress.PublicUrl) + // Conditions are written by handleKServiceEvent, not by Create directly. + assert.Empty(t, resp.Msg.App.Status.Conditions) + assert.Equal(t, ingress.PublicUrl, resp.Msg.App.Status.Ingress.PublicUrl) k8s.AssertExpectations(t) } func TestCreate_MissingID(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t)) _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{ App: &flyteapp.App{Spec: testApp().Spec}, @@ -156,7 +165,7 @@ func TestCreate_MissingID(t *testing.T) { } func TestCreate_MissingSpec(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t)) _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{ App: &flyteapp.App{Metadata: &flyteapp.Meta{Id: testAppID()}}, @@ -166,7 +175,7 @@ func TestCreate_MissingSpec(t *testing.T) { } func TestCreate_MissingPayload(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t)) _, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{ App: &flyteapp.App{ @@ -180,27 +189,26 @@ func TestCreate_MissingPayload(t *testing.T) { func TestCreate_IngressWithPort(t *testing.T) { k8s := &mockAppK8sClient{} - cfg := testCfg() - cfg.IngressAppsPort = 30081 - svc := NewInternalAppService(k8s, cfg) + svc := NewInternalAppService(k8s, newTestRepo(t)) app := testApp() + ingress := &flyteapp.Ingress{PublicUrl: "https://myapp-3dcbfc92-flyte.example.com:30081"} k8s.On("Deploy", mock.Anything, app).Return(nil) + k8s.On("PublicIngress", app.Metadata.Id).Return(ingress) resp, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{App: app})) require.NoError(t, err) - assert.Equal(t, "https://myapp-proj-dev.example.com:30081", resp.Msg.App.Status.Ingress.PublicUrl) + assert.Equal(t, ingress.PublicUrl, resp.Msg.App.Status.Ingress.PublicUrl) k8s.AssertExpectations(t) } func TestCreate_NoBaseDomain_NoIngress(t *testing.T) { k8s := &mockAppK8sClient{} - cfg := testCfg() - cfg.BaseDomain = "" - svc := NewInternalAppService(k8s, cfg) + svc := NewInternalAppService(k8s, newTestRepo(t)) app := testApp() k8s.On("Deploy", mock.Anything, app).Return(nil) + k8s.On("PublicIngress", app.Metadata.Id).Return((*flyteapp.Ingress)(nil)) resp, err := svc.Create(context.Background(), connect.NewRequest(&flyteapp.CreateRequest{App: app})) require.NoError(t, err) @@ -212,10 +220,13 @@ func TestCreate_NoBaseDomain_NoIngress(t *testing.T) { func TestGet_Success(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, testCfg()) + repo := mocks.NewAppConditionsRepo(t) + svc := NewInternalAppService(k8s, repo) appID := testAppID() k8s.On("GetApp", mock.Anything, appID).Return(testAppWithStatus(flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE), nil) + dbConditions := []*flyteapp.Condition{{DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE}} + repo.On("GetConditions", mock.Anything, appID).Return(dbConditions, nil) resp, err := svc.Get(context.Background(), connect.NewRequest(&flyteapp.GetRequest{ Identifier: &flyteapp.GetRequest_AppId{AppId: appID}, @@ -226,7 +237,7 @@ func TestGet_Success(t *testing.T) { } func TestGet_MissingAppID(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t)) _, err := svc.Get(context.Background(), connect.NewRequest(&flyteapp.GetRequest{})) require.Error(t, err) @@ -237,11 +248,14 @@ func TestGet_MissingAppID(t *testing.T) { func TestUpdate_Deploy(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, testCfg()) + repo := mocks.NewAppConditionsRepo(t) + svc := NewInternalAppService(k8s, repo) app := testApp() + dbConditions := []*flyteapp.Condition{{DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING}} k8s.On("Deploy", mock.Anything, app).Return(nil) k8s.On("GetApp", mock.Anything, app.Metadata.Id).Return(testAppWithStatus(flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING), nil) + repo.On("GetConditions", mock.Anything, app.Metadata.Id).Return(dbConditions, nil) resp, err := svc.Update(context.Background(), connect.NewRequest(&flyteapp.UpdateRequest{App: app})) require.NoError(t, err) @@ -251,12 +265,15 @@ func TestUpdate_Deploy(t *testing.T) { func TestUpdate_Stop(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, testCfg()) + repo := mocks.NewAppConditionsRepo(t) + svc := NewInternalAppService(k8s, repo) app := testApp() app.Spec.DesiredState = flyteapp.Spec_DESIRED_STATE_STOPPED + dbConditions := []*flyteapp.Condition{{DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_STOPPED}} k8s.On("Stop", mock.Anything, app.Metadata.Id).Return(nil) k8s.On("GetApp", mock.Anything, app.Metadata.Id).Return(testAppWithStatus(flyteapp.Status_DEPLOYMENT_STATUS_STOPPED), nil) + repo.On("GetConditions", mock.Anything, app.Metadata.Id).Return(dbConditions, nil) resp, err := svc.Update(context.Background(), connect.NewRequest(&flyteapp.UpdateRequest{App: app})) require.NoError(t, err) @@ -265,7 +282,7 @@ func TestUpdate_Stop(t *testing.T) { } func TestUpdate_MissingID(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t)) _, err := svc.Update(context.Background(), connect.NewRequest(&flyteapp.UpdateRequest{ App: &flyteapp.App{}, @@ -278,7 +295,7 @@ func TestUpdate_MissingID(t *testing.T) { func TestDelete_Success(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t)) appID := testAppID() k8s.On("Delete", mock.Anything, appID).Return(nil) @@ -289,7 +306,7 @@ func TestDelete_Success(t *testing.T) { } func TestDelete_MissingID(t *testing.T) { - svc := NewInternalAppService(&mockAppK8sClient{}, testCfg()) + svc := NewInternalAppService(&mockAppK8sClient{}, newTestRepo(t)) _, err := svc.Delete(context.Background(), connect.NewRequest(&flyteapp.DeleteRequest{})) require.Error(t, err) @@ -300,7 +317,7 @@ func TestDelete_MissingID(t *testing.T) { func TestList_ByProject(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t)) apps := []*flyteapp.App{testApp()} k8s.On("List", mock.Anything, "proj", "dev", uint32(10), "tok").Return(apps, "nexttok", nil) @@ -319,7 +336,7 @@ func TestList_ByProject(t *testing.T) { func TestList_NoFilter(t *testing.T) { k8s := &mockAppK8sClient{} - svc := NewInternalAppService(k8s, testCfg()) + svc := NewInternalAppService(k8s, newTestRepo(t)) k8s.On("List", mock.Anything, "", "", uint32(0), "").Return([]*flyteapp.App{}, "", nil) diff --git a/app/internal/setup.go b/app/internal/setup.go index 6a15ac9ad05..86e3bc2ccd8 100644 --- a/app/internal/setup.go +++ b/app/internal/setup.go @@ -10,9 +10,10 @@ import ( appconfig "github.com/flyteorg/flyte/v2/app/internal/config" appk8s "github.com/flyteorg/flyte/v2/app/internal/k8s" + "github.com/flyteorg/flyte/v2/app/internal/migrations" + repoimpl "github.com/flyteorg/flyte/v2/app/internal/repository/impl" "github.com/flyteorg/flyte/v2/app/internal/service" "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app/appconnect" - knativeapp "github.com/flyteorg/flyte/v2/flytestdlib/app" ) // Setup registers the InternalAppService handler on the SetupContext mux. @@ -24,11 +25,17 @@ func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.Inter return nil } - if err := knativeapp.InitAppScheme(); err != nil { + if err := stdlibapp.InitAppScheme(); err != nil { return fmt.Errorf("internalapp: failed to register Knative scheme: %w", err) } - appK8sClient := appk8s.NewAppK8sClient(sc.K8sClient, sc.K8sCache, cfg) + if err := migrations.RunMigrations(ctx, sc.DB); err != nil { + return fmt.Errorf("internalapp: failed to run migrations: %w", err) + } + + conditionRepo := repoimpl.NewAppConditionsRepo(sc.DB) + appK8sClient := appk8s.NewAppK8sClient(sc.K8sClient, sc.K8sCache, cfg, conditionRepo) + internalAppSvc := service.NewInternalAppService(appK8sClient, conditionRepo) if err := appK8sClient.StartWatching(ctx); err != nil { return fmt.Errorf("internalapp: failed to start KService watcher: %w", err) @@ -39,8 +46,6 @@ func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.Inter return nil }) - internalAppSvc := service.NewInternalAppService(appK8sClient, cfg) - path, handler := appconnect.NewAppServiceHandler(internalAppSvc) sc.Mux.Handle("/internal"+path, http.StripPrefix("/internal", handler)) logger.Infof(ctx, "Mounted InternalAppService at /internal%s", path)