Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ packages:
dir: actions/service/mocks
interfaces:
ActionsClientInterface:
github.com/flyteorg/flyte/v2/app/internal/repository/interfaces:
config:
all: true
dir: app/internal/repository/mocks
github.com/eko/gocache/lib/v4/cache:
config:
all: false
Expand Down
9 changes: 7 additions & 2 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/flyteorg/flyte/v2/flytestdlib/config"
)

// AppConfig holds configuration for the control plane AppService.
// AppConfig holds configuration for the AppService.
type AppConfig struct {
// InternalAppServiceURL is the base URL of the InternalAppService (data plane).
// In unified mode this is overridden by the shared mux BaseURL.
Expand Down Expand Up @@ -42,7 +42,7 @@ type InternalAppConfig struct {
Enabled bool `json:"enabled" pflag:",Enable app deployment controller"`

// BaseDomain is the base domain used to generate public URLs for apps.
// Apps are exposed at "{name}-{project}-{domain}.{base_domain}".
// Apps are exposed at "{name}-{project}-{domain}-{namespace}.{base_domain}".
BaseDomain string `json:"baseDomain" pflag:",Base domain for app public URLs"`

// Scheme is the URL scheme used for public app URLs ("http" or "https").
Expand All @@ -65,6 +65,10 @@ type InternalAppConfig struct {
// processes need to connect back to the Flyte manager.
DefaultEnvVars map[string]string `json:"defaultEnvVars" pflag:"-,Default env vars injected into every app pod"`

// MaxConditions is the maximum number of conditions to retain per app.
// Oldest entries are trimmed when this limit is exceeded. Defaults to 40.
MaxConditions int `json:"maxConditions" pflag:",Maximum number of conditions to retain per app"`

// WatchBufferSize is the buffer size for each subscriber's event channel.
// A larger value reduces the chance of dropped events under burst load.
WatchBufferSize int `json:"watchBufferSize" pflag:",Buffer size for watch subscriber channels"`
Expand All @@ -73,6 +77,7 @@ type InternalAppConfig struct {
var defaultInternalAppConfig = &InternalAppConfig{
DefaultRequestTimeout: 300 * time.Second,
MaxRequestTimeout: 3600 * time.Second,
MaxConditions: 40,
WatchBufferSize: 100,
}

Expand Down
107 changes: 86 additions & 21 deletions app/internal/k8s/app_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/flyteorg/flyte/v2/app/internal/config"
"github.com/flyteorg/flyte/v2/app/internal/repository/interfaces"
"github.com/flyteorg/flyte/v2/flytestdlib/k8s"
"github.com/flyteorg/flyte/v2/flytestdlib/logger"
flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app"
Expand Down Expand Up @@ -99,13 +100,19 @@ type AppK8sClientInterface interface {

// Unsubscribe removes a subscription channel previously returned by Subscribe.
Unsubscribe(appName string, ch chan *flyteapp.WatchResponse)

// PublicIngress returns the deterministic public Ingress URL for the given app,
// matching Knative's domain-template so Kourier routes traffic correctly.
// Returns nil if BaseDomain is not configured.
PublicIngress(id *flyteapp.Identifier) *flyteapp.Ingress
}

// AppK8sClient implements AppK8sClientInterface using controller-runtime.
type AppK8sClient struct {
k8sClient client.WithWatch
cache ctrlcache.Cache
cfg *config.InternalAppConfig
k8sClient client.WithWatch
cache ctrlcache.Cache
cfg *config.InternalAppConfig
conditionRepo interfaces.AppConditionsRepo

// Watch management
mu sync.RWMutex
Expand All @@ -115,12 +122,15 @@ type AppK8sClient struct {
}

// NewAppK8sClient creates a new AppK8sClient.
func NewAppK8sClient(k8sClient client.WithWatch, cache ctrlcache.Cache, cfg *config.InternalAppConfig) *AppK8sClient {
// conditionRepo is used to persist condition history on every KService event;
// pass nil to disable condition persistence (e.g. in tests that do not need it).
func NewAppK8sClient(k8sClient client.WithWatch, cache ctrlcache.Cache, cfg *config.InternalAppConfig, conditionRepo interfaces.AppConditionsRepo) *AppK8sClient {
return &AppK8sClient{
k8sClient: k8sClient,
cache: cache,
cfg: cfg,
subscribers: make(map[string]map[chan *flyteapp.WatchResponse]struct{}),
k8sClient: k8sClient,
cache: cache,
cfg: cfg,
conditionRepo: conditionRepo,
subscribers: make(map[string]map[chan *flyteapp.WatchResponse]struct{}),
}
}

Expand Down Expand Up @@ -288,14 +298,37 @@ func isManagedKService(ksvc *servingv1.Service) bool {
return ksvc.Labels[labelAppManaged] == "true"
}

// handleKServiceEvent converts a KService event into a WatchResponse and
// notifies all matching subscribers.
// handleKServiceEvent converts a KService event into a WatchResponse,
// persists the current deployment status as a condition, and notifies subscribers.
func (c *AppK8sClient) handleKServiceEvent(ctx context.Context, ksvc *servingv1.Service, eventType k8swatch.EventType) {
app, err := c.kserviceToApp(ctx, ksvc)
if err != nil {
return
}

appID := app.GetMetadata().GetId()

// Persist the current status as a condition on every K8s state change so that
// the full PENDING -> DEPLOYING -> ACTIVE/FAILED transition history is recorded,
// regardless of whether any Watch client is connected.
// Dedup: read the latest stored condition first; only append if status or message changed.
if c.conditionRepo != nil {
if eventType == k8swatch.Deleted {
// Ensure DB is clean even if a race between Delete() RPC and a late
// Modified event caused conditions to be re-written after the RPC cleared them.
if err := c.conditionRepo.DeleteConditions(ctx, appID); err != nil {
logger.Errorf(ctx, "Failed to delete conditions for app %s on watch deleted event: %v", appID.GetName(), err)
}
} else if status := app.GetStatus(); status != nil && len(status.GetConditions()) > 0 {
cond := status.GetConditions()[0]
if c.shouldAppendCondition(ctx, appID, cond) {
if err := c.conditionRepo.AppendCondition(ctx, appID, cond, c.cfg.MaxConditions); err != nil {
logger.Errorf(ctx, "Failed to persist condition for app %s on watch event: %v", appID.GetName(), err)
}
}
}
}

var resp *flyteapp.WatchResponse
switch eventType {
case k8swatch.Added:
Expand All @@ -320,8 +353,29 @@ func (c *AppK8sClient) handleKServiceEvent(ctx context.Context, ksvc *servingv1.
return
}

appName := app.GetMetadata().GetId().GetName()
c.notifySubscribers(ctx, appName, resp)
c.notifySubscribers(ctx, appID.GetName(), resp)
}

// shouldAppendCondition returns true if cond should be persisted.
// It skips two classes of noise:
// 1. PENDING with no message — the transient first Knative event before the
// Ready condition is populated; it carries no actionable information.
// 2. Exact duplicates (same DeploymentStatus + Message as the last stored entry).
func (c *AppK8sClient) shouldAppendCondition(ctx context.Context, appID *flyteapp.Identifier, cond *flyteapp.Condition) bool {
if cond.GetDeploymentStatus() == flyteapp.Status_DEPLOYMENT_STATUS_PENDING && cond.GetMessage() == "" {
return false
}
existing, err := c.conditionRepo.GetConditions(ctx, appID)
if err != nil {
// On read error, append to avoid missing transitions.
logger.Warnf(ctx, "Failed to read conditions for app %s, will append: %v", appID.GetName(), err)
return true
}
if len(existing) == 0 {
return true
}
last := existing[len(existing)-1]
return last.GetDeploymentStatus() != cond.GetDeploymentStatus() || last.GetMessage() != cond.GetMessage()
}

// notifySubscribers sends a WatchResponse to all subscribers for the given app name.
Expand Down Expand Up @@ -448,18 +502,19 @@ func (c *AppK8sClient) List(ctx context.Context, project, domain string, limit u
return apps, list.Continue, nil
}

// publicIngress returns the deterministic public URL for an app using the same
// logic as the service layer so GetApp/List/Watch are consistent with Create.
func (c *AppK8sClient) publicIngress(id *flyteapp.Identifier) *flyteapp.Ingress {
// PublicIngress returns the deterministic public URL for an app.
// The host follows Knative's domain-template "{kservice-name}-{namespace}.{domain}"
// so Kourier routes traffic correctly without extra ingress rules.
func (c *AppK8sClient) PublicIngress(id *flyteapp.Identifier) *flyteapp.Ingress {
if c.cfg.BaseDomain == "" {
return nil
}
scheme := c.cfg.Scheme
if scheme == "" {
scheme = "https"
}
host := strings.ToLower(fmt.Sprintf("%s.%s",
KServiceName(id), c.cfg.BaseDomain))
host := strings.ToLower(fmt.Sprintf("%s-%s.%s",
KServiceName(id), AppNamespace, c.cfg.BaseDomain))
url := scheme + "://" + host
if c.cfg.IngressAppsPort != 0 {
url += fmt.Sprintf(":%d", c.cfg.IngressAppsPort)
Expand Down Expand Up @@ -717,18 +772,28 @@ func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Ser
}

if phase == flyteapp.Status_DEPLOYMENT_STATUS_UNSPECIFIED {
readyCond := ksvc.Status.GetCondition(servingv1.ServiceConditionReady)
switch {
case ksvc.IsReady():
phase = flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE
message = "Service is ready"
case ksvc.IsFailed():
phase = flyteapp.Status_DEPLOYMENT_STATUS_FAILED
if c := ksvc.Status.GetCondition(servingv1.ServiceConditionReady); c != nil {
message = c.Message
if readyCond != nil {
message = readyCond.Message
}
case ksvc.Status.LatestCreatedRevisionName != ksvc.Status.LatestReadyRevisionName:
case ksvc.Status.LatestCreatedRevisionName != ksvc.Status.LatestReadyRevisionName &&
ksvc.Status.LatestCreatedRevisionName != "":
phase = flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING
message = fmt.Sprintf("Deploying revision: [%s]", ksvc.Status.LatestCreatedRevisionName)
if readyCond != nil && readyCond.Reason != "" {
message += fmt.Sprintf("\n%s: %s", readyCond.Reason, readyCond.Message)
}
default:
phase = flyteapp.Status_DEPLOYMENT_STATUS_PENDING
if readyCond != nil && readyCond.Reason != "" {
message = fmt.Sprintf("%s: %s", readyCond.Reason, readyCond.Message)
}
}
}

Expand All @@ -740,7 +805,7 @@ func (c *AppK8sClient) kserviceToStatus(ctx context.Context, ksvc *servingv1.Ser
parts := strings.SplitN(appIDStr, "/", 3)
if len(parts) == 3 {
appID := &flyteapp.Identifier{Project: parts[0], Domain: parts[1], Name: parts[2]}
status.Ingress = c.publicIngress(appID)
status.Ingress = c.PublicIngress(appID)
}
}

Expand Down
123 changes: 122 additions & 1 deletion app/internal/k8s/app_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -19,6 +20,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/flyteorg/flyte/v2/app/internal/config"
"github.com/flyteorg/flyte/v2/app/internal/repository/mocks"
flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app"
flytecoreapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core"
)
Expand Down Expand Up @@ -58,7 +60,7 @@ func testClient(t *testing.T, objs ...client.Object) *AppK8sClient {
MaxRequestTimeout: time.Hour,
WatchBufferSize: 100,
}
return NewAppK8sClient(fc, nil, cfg)
return NewAppK8sClient(fc, nil, cfg, nil)
}

// testApp builds a minimal flyteapp.App for use in tests.
Expand Down Expand Up @@ -626,6 +628,20 @@ func TestSubscribe_MultipleSubscribers(t *testing.T) {
}
}

// testClientWithRepo builds an AppK8sClient with a real conditionRepo mock.
func testClientWithRepo(t *testing.T, repo *mocks.AppConditionsRepo, objs ...client.Object) *AppK8sClient {
t.Helper()
s := testScheme(t)
fc := fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).Build()
cfg := &config.InternalAppConfig{
DefaultRequestTimeout: 5 * time.Minute,
MaxRequestTimeout: time.Hour,
WatchBufferSize: 100,
MaxConditions: 40,
}
return NewAppK8sClient(fc, nil, cfg, repo)
}

// testKsvc builds a minimal KService that kserviceToApp can parse.
func testKsvc(name, ns, rv string) *servingv1.Service {
return &servingv1.Service{
Expand All @@ -638,3 +654,108 @@ func testKsvc(name, ns, rv string) *servingv1.Service {
},
}
}

// --- Condition dedup and message format tests ---

func TestHandleKServiceEvent_DeduplicatesConditions(t *testing.T) {
repo := mocks.NewAppConditionsRepo(t)
c := testClientWithRepo(t, repo)

ksvc := testKsvc("myapp", appNamespace, "1")
appID := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"}

existingCond := &flyteapp.Condition{
DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_PENDING,
Message: "",
}

// First call: no existing conditions → should append.
repo.On("GetConditions", mock.Anything, appID).Return([]*flyteapp.Condition{}, nil).Once()
repo.On("AppendCondition", mock.Anything, appID, mock.Anything, mock.Anything).Return(nil).Once()

c.handleKServiceEvent(context.Background(), ksvc, k8swatch.Modified)

// Second call: same status+message already stored → should NOT append.
repo.On("GetConditions", mock.Anything, appID).Return([]*flyteapp.Condition{existingCond}, nil).Once()

c.handleKServiceEvent(context.Background(), ksvc, k8swatch.Modified)

repo.AssertExpectations(t)
}

func TestHandleKServiceEvent_AppendsOnStatusChange(t *testing.T) {
repo := mocks.NewAppConditionsRepo(t)
c := testClientWithRepo(t, repo)

ksvc := testKsvc("myapp", appNamespace, "1")
appID := &flyteapp.Identifier{Project: "proj", Domain: "dev", Name: "myapp"}

// Existing condition is DEPLOYING, new event is PENDING → different → should append.
existingCond := &flyteapp.Condition{
DeploymentStatus: flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING,
Message: "Deploying revision: [myapp-00001]",
}
repo.On("GetConditions", mock.Anything, appID).Return([]*flyteapp.Condition{existingCond}, nil).Once()
repo.On("AppendCondition", mock.Anything, appID, mock.Anything, mock.Anything).Return(nil).Once()

c.handleKServiceEvent(context.Background(), ksvc, k8swatch.Modified)

repo.AssertExpectations(t)
}

func TestKserviceToStatus_Messages(t *testing.T) {
tests := []struct {
name string
ksvc func() *servingv1.Service
wantPhase flyteapp.Status_DeploymentStatus
wantMessage string
}{
{
name: "active",
ksvc: func() *servingv1.Service {
ksvc := testKsvc("myapp", appNamespace, "1")
ksvc.Status.MarkRouteReady()
ksvc.Status.MarkConfigurationReady()
return ksvc
},
wantPhase: flyteapp.Status_DEPLOYMENT_STATUS_ACTIVE,
wantMessage: "Service is ready",
},
{
name: "deploying with knative reason",
ksvc: func() *servingv1.Service {
ksvc := testKsvc("myapp", appNamespace, "1")
ksvc.Status.LatestCreatedRevisionName = "myapp-00002"
ksvc.Status.LatestReadyRevisionName = "myapp-00001"
ksvc.Status.MarkRouteNotYetReady()
return ksvc
},
wantPhase: flyteapp.Status_DEPLOYMENT_STATUS_DEPLOYING,
wantMessage: "Deploying revision: [myapp-00002]",
},
{
name: "stopped",
ksvc: func() *servingv1.Service {
ksvc := testKsvc("myapp", appNamespace, "1")
if ksvc.Spec.Template.Annotations == nil {
ksvc.Spec.Template.Annotations = map[string]string{}
}
ksvc.Spec.Template.Annotations["autoscaling.knative.dev/max-scale"] = "0"
return ksvc
},
wantPhase: flyteapp.Status_DEPLOYMENT_STATUS_STOPPED,
wantMessage: "App scaled to zero",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := testClient(t)
status := c.kserviceToStatus(context.Background(), tt.ksvc())
require.NotNil(t, status)
require.NotEmpty(t, status.Conditions)
assert.Equal(t, tt.wantPhase, status.Conditions[0].DeploymentStatus)
assert.Equal(t, tt.wantMessage, status.Conditions[0].Message)
})
}
}
Loading
Loading