diff --git a/backend/internal/agent/agent.go b/backend/internal/agent/agent.go index 35e6a5f..b03322a 100644 --- a/backend/internal/agent/agent.go +++ b/backend/internal/agent/agent.go @@ -31,13 +31,14 @@ type FatalConfigError struct { func (e *FatalConfigError) Error() string { return e.Msg } type Config struct { - LogLevel zerolog.Level - LogJSON bool - HubUrl string - AuthToken string - AgentID string - HubPublicKey ed25519.PublicKey - HealthPort string + LogLevel zerolog.Level + LogJSON bool + HubUrl string + AuthToken string + AgentID string + HubPublicKey ed25519.PublicKey + HealthPort string + DeploymentsDir string } func DefaultConfig() (Config, error) { @@ -76,13 +77,14 @@ func DefaultConfig() (Config, error) { } return Config{ - LogLevel: logLevel, - LogJSON: logJSON, - HubUrl: hubUrl, - AuthToken: authToken, - AgentID: agentID, - HubPublicKey: hubPublicKey, - HealthPort: healthPort, + LogLevel: logLevel, + LogJSON: logJSON, + HubUrl: hubUrl, + AuthToken: authToken, + AgentID: agentID, + HubPublicKey: hubPublicKey, + HealthPort: healthPort, + DeploymentsDir: "/deployments", }, nil } @@ -149,7 +151,7 @@ func Run(cfg Config) error { Log.Info().Str("version", version.Version).Msg("agent started") - dockerClient, err := docker.New(Log) + dockerClient, err := docker.New(Log, cfg.DeploymentsDir) if err != nil { return fmt.Errorf("docker init: %w", err) } @@ -174,6 +176,7 @@ func Run(cfg Config) error { return err } wsConnected.Store(true) + sender := newMessageSender(conn, session) for { _, data, readErr := conn.ReadMessage() @@ -193,6 +196,7 @@ func Run(cfg Config) error { return err } wsConnected.Store(true) + sender = newMessageSender(conn, session) continue } msg := &messages.ServerMessage{} @@ -200,6 +204,6 @@ func Run(cfg Config) error { Log.Error().Err(err).Msg("unmarshal error") continue } - handleServerMessage(msg, conn, session) + handleServerMessage(ctx, msg, session, sender, dockerClient) } } diff --git a/backend/internal/agent/agent_config_test.go b/backend/internal/agent/agent_config_test.go index 15c1d0a..6089873 100644 --- a/backend/internal/agent/agent_config_test.go +++ b/backend/internal/agent/agent_config_test.go @@ -68,6 +68,9 @@ func TestDefaultConfig_Valid(t *testing.T) { if len(cfg.HubPublicKey) != ed25519.PublicKeySize { t.Errorf("HubPublicKey length = %d, want %d", len(cfg.HubPublicKey), ed25519.PublicKeySize) } + if cfg.DeploymentsDir != "/deployments" { + t.Errorf("DeploymentsDir = %q, want %q", cfg.DeploymentsDir, "/deployments") + } } func TestDefaultConfig_Defaults(t *testing.T) { @@ -88,6 +91,9 @@ func TestDefaultConfig_Defaults(t *testing.T) { if cfg.LogJSON { t.Error("LogJSON = true, want false by default") } + if cfg.DeploymentsDir != "/deployments" { + t.Errorf("DeploymentsDir = %q, want %q", cfg.DeploymentsDir, "/deployments") + } } func TestDefaultConfig_LogLevels(t *testing.T) { diff --git a/backend/internal/agent/docker/deploy.go b/backend/internal/agent/docker/deploy.go new file mode 100644 index 0000000..fe79b19 --- /dev/null +++ b/backend/internal/agent/docker/deploy.go @@ -0,0 +1,109 @@ +package docker + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/OrcaCD/orca-cd/internal/shared/utils" + composetypes "github.com/compose-spec/compose-go/v2/types" + "github.com/docker/compose/v5/pkg/api" +) + +const ( + composeFileName = "compose.yaml" + deployWaitTimeout = 2 * time.Minute +) + +type DeployRequest struct { + ApplicationID string + ApplicationName string + ComposeFile string +} + +var loadProject = func(ctx context.Context, composeService api.Compose, options api.ProjectLoadOptions) (*composetypes.Project, error) { + return composeService.LoadProject(ctx, options) +} + +var upProject = func(ctx context.Context, composeService api.Compose, project *composetypes.Project, options api.UpOptions) error { + return composeService.Up(ctx, project, options) +} + +func (c *Client) Deploy(ctx context.Context, req DeployRequest) error { + if c.compose == nil { + return errors.New("docker compose service is not initialized") + } + if c.deploymentsDir == "" { + return errors.New("deployments directory is not configured") + } + if !c.Ready() { + return errors.New("docker daemon is not ready") + } + + if req.ComposeFile == "" { + return errors.New("compose file is empty") + } + + // Validate application name to prevent path traversal + if err := utils.DoesNotLookLikeFilePath(req.ApplicationName); err != nil { + return fmt.Errorf("invalid application name: %w", err) + } + + applicationDir := filepath.Join(c.deploymentsDir, req.ApplicationName) + composePath := filepath.Join(applicationDir, composeFileName) + + // Verify the final path stays within the deployments directory + if err := utils.IsPathWithinBase(c.deploymentsDir, composePath); err != nil { + return fmt.Errorf("invalid compose file path: %w", err) + } + + if err := os.MkdirAll(applicationDir, 0o750); err != nil { + return fmt.Errorf("create deployment directory: %w", err) + } + if err := os.WriteFile(composePath, []byte(req.ComposeFile), 0o600); err != nil { + return fmt.Errorf("write compose file: %w", err) + } + + project, err := loadProject(ctx, c.compose, api.ProjectLoadOptions{ + ProjectName: strings.ToLower(req.ApplicationName), + ConfigPaths: []string{composePath}, + WorkingDir: applicationDir, + }) + if err != nil { + return fmt.Errorf("load compose project: %w", err) + } + + // Add OrcaCD managed label to all services + for _, service := range project.Services { + if service.Labels == nil { + service.Labels = make(map[string]string) + } + service.Labels["managed_by"] = "orca-cd" + } + + if err := upProject(ctx, c.compose, project, api.UpOptions{ + Create: api.CreateOptions{ + RemoveOrphans: true, + Recreate: api.RecreateDiverged, + RecreateDependencies: api.RecreateDiverged, + }, + Start: api.StartOptions{ + Wait: true, + WaitTimeout: deployWaitTimeout, + }, + }); err != nil { + return fmt.Errorf("compose up: %w", err) + } + + c.log.Info(). + Str("application_id", req.ApplicationID). + Str("application_name", req.ApplicationName). + Str("compose_path", composePath). + Msg("deployment completed") + + return nil +} diff --git a/backend/internal/agent/docker/deploy_test.go b/backend/internal/agent/docker/deploy_test.go new file mode 100644 index 0000000..cece9ea --- /dev/null +++ b/backend/internal/agent/docker/deploy_test.go @@ -0,0 +1,111 @@ +package docker + +import ( + "context" + "os" + "path/filepath" + "testing" + + composetypes "github.com/compose-spec/compose-go/v2/types" + "github.com/docker/compose/v5/pkg/api" +) + +func TestDeploy_WritesComposeFileAndRunsComposeUp(t *testing.T) { + c := newTestClient(t) + c.deploymentsDir = t.TempDir() + + originalLoadProject := loadProject + originalUpProject := upProject + t.Cleanup(func() { + loadProject = originalLoadProject + upProject = originalUpProject + }) + + var gotLoadOptions api.ProjectLoadOptions + loadProject = func(_ context.Context, _ api.Compose, options api.ProjectLoadOptions) (*composetypes.Project, error) { + gotLoadOptions = options + return &composetypes.Project{Name: options.ProjectName}, nil + } + + var gotUpOptions api.UpOptions + upProject = func(_ context.Context, _ api.Compose, project *composetypes.Project, options api.UpOptions) error { + if project.Name != "billing" { + t.Fatalf("expected project name %q, got %q", "billing", project.Name) + } + gotUpOptions = options + return nil + } + + req := DeployRequest{ + ApplicationID: "app-123", + ApplicationName: "billing", + ComposeFile: "services:\n app:\n image: ghcr.io/orcacd/app:1.0.0\n", + } + + if err := c.Deploy(t.Context(), req); err != nil { + t.Fatalf("Deploy: %v", err) + } + + composePath := filepath.Join(c.deploymentsDir, req.ApplicationName, composeFileName) + //nolint:gosec // composePath is built from t.TempDir() and a fixed test application id + content, err := os.ReadFile(composePath) + if err != nil { + t.Fatalf("ReadFile: %v", err) + } + if string(content) != req.ComposeFile { + t.Fatalf("expected compose file to be written to deployment volume") + } + + if gotLoadOptions.ProjectName != "billing" { + t.Fatalf("expected load project name %q, got %q", "billing", gotLoadOptions.ProjectName) + } + if len(gotLoadOptions.ConfigPaths) != 1 || gotLoadOptions.ConfigPaths[0] != composePath { + t.Fatalf("unexpected config paths: %#v", gotLoadOptions.ConfigPaths) + } + if gotLoadOptions.WorkingDir != filepath.Join(c.deploymentsDir, req.ApplicationName) { + t.Fatalf("expected working dir %q, got %q", filepath.Join(c.deploymentsDir, req.ApplicationName), gotLoadOptions.WorkingDir) + } + + if !gotUpOptions.Start.Wait { + t.Fatal("expected compose up to wait for services to become ready") + } + if gotUpOptions.Start.WaitTimeout != deployWaitTimeout { + t.Fatalf("expected wait timeout %s, got %s", deployWaitTimeout, gotUpOptions.Start.WaitTimeout) + } + if !gotUpOptions.Create.RemoveOrphans { + t.Fatal("expected compose up to remove orphaned containers") + } + if gotUpOptions.Create.Recreate != api.RecreateDiverged { + t.Fatalf("expected recreate strategy %q, got %q", api.RecreateDiverged, gotUpOptions.Create.Recreate) + } +} + +func TestDeploy_RejectsUnsafeApplicationName(t *testing.T) { + c := newTestClient(t) + c.deploymentsDir = t.TempDir() + + err := c.Deploy(t.Context(), DeployRequest{ + ApplicationID: "019e1ce8-7938-71b8-be55-4b184f307a2d", + ApplicationName: "../bad", + ComposeFile: "services: {}\n", + }) + if err == nil { + t.Fatal("expected deploy to reject unsafe application names") + } + err2 := c.Deploy(t.Context(), DeployRequest{ + ApplicationID: "019e1ce8-7938-71b8-be55-4b184f307a2d", + ApplicationName: "test/orcacd-docs", + ComposeFile: "services: {}\n", + }) + if err2 == nil { + t.Fatal("expected deploy to reject unsafe application names") + } + err3 := c.Deploy(t.Context(), DeployRequest{ + ApplicationID: "019e1ce8-7938-71b8-be55-4b184f307a2d", + ApplicationName: "bad/../name", + ComposeFile: "services: {}\n", + }) + if err3 == nil { + t.Fatal("expected deploy to reject unsafe application names") + } +} diff --git a/backend/internal/agent/docker/docker.go b/backend/internal/agent/docker/docker.go index a89ee59..42fe42d 100644 --- a/backend/internal/agent/docker/docker.go +++ b/backend/internal/agent/docker/docker.go @@ -17,16 +17,17 @@ import ( const daemonCheckInterval = 5 * time.Second type Client struct { - mu sync.RWMutex // protects ready - log zerolog.Logger - cli command.Cli - compose api.Compose - ready bool - ctx context.Context - cancel context.CancelFunc + mu sync.RWMutex // protects ready + log zerolog.Logger + cli command.Cli + compose api.Compose + deploymentsDir string + ready bool + ctx context.Context + cancel context.CancelFunc } -func New(log zerolog.Logger) (*Client, error) { +func New(log zerolog.Logger, deploymentsDir string) (*Client, error) { dockerCLI, err := command.NewDockerCli() if err != nil { return nil, err @@ -43,11 +44,12 @@ func New(log zerolog.Logger) (*Client, error) { ctx, cancel := context.WithCancel(context.Background()) c := &Client{ - log: log, - cli: dockerCLI, - compose: composeSvc, - ctx: ctx, - cancel: cancel, + log: log, + cli: dockerCLI, + compose: composeSvc, + deploymentsDir: deploymentsDir, + ctx: ctx, + cancel: cancel, } if c.pingDaemon() { diff --git a/backend/internal/agent/docker/docker_test.go b/backend/internal/agent/docker/docker_test.go index f63fd9a..b8b73d2 100644 --- a/backend/internal/agent/docker/docker_test.go +++ b/backend/internal/agent/docker/docker_test.go @@ -8,7 +8,7 @@ import ( func newTestClient(t *testing.T) *Client { t.Helper() - c, err := New(zerolog.Nop()) + c, err := New(zerolog.Nop(), t.TempDir()) if err != nil { t.Fatalf("New: %v", err) } @@ -50,7 +50,7 @@ func TestPingDaemon(t *testing.T) { func TestNew_DaemonUnreachable(t *testing.T) { t.Setenv("DOCKER_HOST", "tcp://localhost:1") - c, err := New(zerolog.Nop()) + c, err := New(zerolog.Nop(), t.TempDir()) if err != nil { t.Fatalf("New: %v", err) } @@ -62,7 +62,7 @@ func TestNew_DaemonUnreachable(t *testing.T) { func TestPingDaemon_Unreachable(t *testing.T) { t.Setenv("DOCKER_HOST", "tcp://localhost:1") - c, err := New(zerolog.Nop()) + c, err := New(zerolog.Nop(), t.TempDir()) if err != nil { t.Fatalf("New: %v", err) } diff --git a/backend/internal/agent/websocket.go b/backend/internal/agent/websocket.go index a2c9507..d6d8cb3 100644 --- a/backend/internal/agent/websocket.go +++ b/backend/internal/agent/websocket.go @@ -7,8 +7,10 @@ import ( "fmt" "net/http" "strings" + "sync" "time" + "github.com/OrcaCD/orca-cd/internal/agent/docker" messages "github.com/OrcaCD/orca-cd/internal/proto" "github.com/OrcaCD/orca-cd/internal/shared/wscrypto" "github.com/gorilla/websocket" @@ -16,6 +18,36 @@ import ( ) const handshakeTimeout = 15 * time.Second +const deploymentTimeout = 5 * time.Minute + +type outboundSender interface { + Send(msg *messages.ClientMessage) error +} + +type deployExecutor interface { + Deploy(ctx context.Context, req docker.DeployRequest) error +} + +type messageSender struct { + conn *websocket.Conn + mu sync.Mutex + session *wscrypto.Session +} + +func newMessageSender(conn *websocket.Conn, session *wscrypto.Session) *messageSender { + return &messageSender{ + conn: conn, + session: session, + } +} + +// TODO: We should make this async by adding it to a goroutine +func (s *messageSender) Send(msg *messages.ClientMessage) error { + s.mu.Lock() + defer s.mu.Unlock() + + return sendMessage(s.conn, s.session, msg) +} func performHandshake(conn *websocket.Conn, agentID string, hubPubKey ed25519.PublicKey) (*wscrypto.Session, error) { if err := conn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil { @@ -95,7 +127,7 @@ func sendMessage(conn *websocket.Conn, session *wscrypto.Session, msg *messages. return conn.WriteMessage(websocket.BinaryMessage, data) } -func handleServerMessage(msg *messages.ServerMessage, conn *websocket.Conn, session *wscrypto.Session) { +func handleServerMessage(ctx context.Context, msg *messages.ServerMessage, session *wscrypto.Session, sender outboundSender, deployer deployExecutor) { _, isEncrypted := msg.Payload.(*messages.ServerMessage_EncryptedPayload) if !isEncrypted && !wscrypto.AllowedUnencrypted(msg) { Log.Warn().Msgf("dropping unencrypted message of type %T", msg.Payload) @@ -129,14 +161,59 @@ func handleServerMessage(msg *messages.ServerMessage, conn *websocket.Conn, sess }, }, } - if err := sendMessage(conn, session, pong); err != nil { + if err := sender.Send(pong); err != nil { Log.Error().Err(err).Msg("failed to send Pong response") } + case *messages.ServerMessage_DeployRequest: + go executeDeployment(ctx, sender, deployer, p.DeployRequest) default: Log.Warn().Msg("unknown message type received") } } +func executeDeployment(ctx context.Context, sender outboundSender, deployer deployExecutor, req *messages.DeployRequest) { + result := &messages.DeployResult{ + RequestId: req.RequestId, + ApplicationId: req.ApplicationId, + } + + if deployer == nil { + result.ErrorMessage = "deployment executor not initialized" + sendDeployResult(sender, result) + return + } + + deployCtx, cancel := context.WithTimeout(ctx, deploymentTimeout) + defer cancel() + + if err := deployer.Deploy(deployCtx, docker.DeployRequest{ + ApplicationID: req.ApplicationId, + ApplicationName: req.ApplicationName, + ComposeFile: req.ComposeFile, + }); err != nil { + result.ErrorMessage = err.Error() + sendDeployResult(sender, result) + return + } + + result.Success = true + sendDeployResult(sender, result) +} + +func sendDeployResult(sender outboundSender, result *messages.DeployResult) { + if err := sender.Send(&messages.ClientMessage{ + Payload: &messages.ClientMessage_DeployResult{ + DeployResult: result, + }, + }); err != nil { + Log.Error(). + Err(err). + Str("application_id", result.ApplicationId). + Str("request_id", result.RequestId). + Msg("failed to send deploy result") + } +} + func connectAndHandshake(ctx context.Context, cfg Config, tracker *connTracker) (*websocket.Conn, *wscrypto.Session, error) { conn, err := connectWithRetry(ctx, cfg.HubUrl, cfg.AuthToken) if err != nil || tracker.setAndCancelled(ctx, conn) { diff --git a/backend/internal/agent/websocket_test.go b/backend/internal/agent/websocket_test.go index 533f5ed..9769210 100644 --- a/backend/internal/agent/websocket_test.go +++ b/backend/internal/agent/websocket_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + agentdocker "github.com/OrcaCD/orca-cd/internal/agent/docker" messages "github.com/OrcaCD/orca-cd/internal/proto" "github.com/OrcaCD/orca-cd/internal/shared/wscrypto" "github.com/gorilla/websocket" @@ -21,6 +22,40 @@ var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } +type stubSender struct { + err error + sent chan *messages.ClientMessage +} + +func (s *stubSender) Send(msg *messages.ClientMessage) error { + if s.sent != nil { + s.sent <- msg + } + + return s.err +} + +type stubDeployer struct { + block chan struct{} + err error + reqCh chan agentdocker.DeployRequest +} + +func (d *stubDeployer) Deploy(ctx context.Context, req agentdocker.DeployRequest) error { + if d.reqCh != nil { + d.reqCh <- req + } + if d.block != nil { + select { + case <-d.block: + case <-ctx.Done(): + return ctx.Err() + } + } + + return d.err +} + // newTestServer starts a test WebSocket server. The handler is called for each // accepted connection. The server is closed when the test ends. func newTestServer(t *testing.T, handler func(*websocket.Conn)) *httptest.Server { @@ -115,7 +150,7 @@ func TestHandleServerMessage_Ping(t *testing.T) { if err := proto.Unmarshal(data, msg); err != nil { t.Fatalf("unmarshal ping: %v", err) } - handleServerMessage(msg, clientConn, nil) + handleServerMessage(context.Background(), msg, nil, newMessageSender(clientConn, nil), nil) select { case <-done: @@ -147,7 +182,7 @@ func TestHandleServerMessage_UnknownPayload(t *testing.T) { clientConn := dialServer(t, srv) defer clientConn.Close() //nolint:errcheck - handleServerMessage(&messages.ServerMessage{}, clientConn, nil) + handleServerMessage(context.Background(), &messages.ServerMessage{}, nil, newMessageSender(clientConn, nil), nil) select { case <-writeReceived: @@ -560,7 +595,7 @@ func TestHandleServerMessage_EncryptedPing(t *testing.T) { Payload: &messages.ServerMessage_EncryptedPayload{EncryptedPayload: env}, } - handleServerMessage(encryptedMsg, conn, session) + handleServerMessage(context.Background(), encryptedMsg, session, newMessageSender(conn, session), nil) select { case <-done: @@ -573,6 +608,66 @@ func TestHandleServerMessage_EncryptedPing(t *testing.T) { } } +func TestHandleServerMessage_DeployRequest(t *testing.T) { + sessionKey := make([]byte, 32) + session, err := wscrypto.NewSession(sessionKey) + if err != nil { + t.Fatalf("NewSession: %v", err) + } + + sender := &stubSender{sent: make(chan *messages.ClientMessage, 1)} + deployer := &stubDeployer{reqCh: make(chan agentdocker.DeployRequest, 1)} + + deployMsg := &messages.ServerMessage{ + Payload: &messages.ServerMessage_DeployRequest{ + DeployRequest: &messages.DeployRequest{ + RequestId: "req-1", + ApplicationId: "app-1", + ApplicationName: "billing", + ComposeFile: "services:\n app:\n image: ghcr.io/orcacd/billing:1.0.0\n", + }, + }, + } + env, err := session.Encrypt(deployMsg) + if err != nil { + t.Fatalf("Encrypt: %v", err) + } + + handleServerMessage(context.Background(), &messages.ServerMessage{ + Payload: &messages.ServerMessage_EncryptedPayload{ + EncryptedPayload: env, + }, + }, session, sender, deployer) + + select { + case req := <-deployer.reqCh: + if req.ApplicationID != "app-1" { + t.Fatalf("expected application id %q, got %q", "app-1", req.ApplicationID) + } + if req.ApplicationName != "billing" { + t.Fatalf("expected application name %q, got %q", "billing", req.ApplicationName) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for deploy request") + } + + select { + case msg := <-sender.sent: + result := msg.GetDeployResult() + if result == nil { + t.Fatal("expected deploy result payload") + } + if !result.Success { + t.Fatal("expected successful deploy result") + } + if result.RequestId != "req-1" { + t.Fatalf("expected request id %q, got %q", "req-1", result.RequestId) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for deploy result") + } +} + func TestHandleServerMessage_EncryptedDecryptError(t *testing.T) { sessionKey := make([]byte, 32) session, err := wscrypto.NewSession(sessionKey) @@ -588,7 +683,7 @@ func TestHandleServerMessage_EncryptedDecryptError(t *testing.T) { }, }, } - handleServerMessage(msg, nil, session) // must return without panic + handleServerMessage(context.Background(), msg, session, &stubSender{}, nil) // must return without panic } func TestHandleServerMessage_DoublyEncrypted(t *testing.T) { @@ -610,7 +705,7 @@ func TestHandleServerMessage_DoublyEncrypted(t *testing.T) { msg := &messages.ServerMessage{ Payload: &messages.ServerMessage_EncryptedPayload{EncryptedPayload: env}, } - handleServerMessage(msg, nil, session) // must drop without panic + handleServerMessage(context.Background(), msg, session, &stubSender{}, nil) // must drop without panic } func TestHandleServerMessage_DropsUnencryptedNonPing(t *testing.T) { @@ -618,7 +713,7 @@ func TestHandleServerMessage_DropsUnencryptedNonPing(t *testing.T) { msg := &messages.ServerMessage{ Payload: &messages.ServerMessage_KeyExchangeInit{KeyExchangeInit: &messages.KeyExchangeInit{}}, } - handleServerMessage(msg, nil, nil) // must return without panic + handleServerMessage(context.Background(), msg, nil, &stubSender{}, nil) // must return without panic } func TestConnTracker_CloseNilConn(t *testing.T) { diff --git a/backend/internal/hub/applications/applications_test.go b/backend/internal/hub/applications/applications_test.go index 13c5935..43ea1d6 100644 --- a/backend/internal/hub/applications/applications_test.go +++ b/backend/internal/hub/applications/applications_test.go @@ -13,6 +13,7 @@ import ( "github.com/OrcaCD/orca-cd/internal/hub/db" "github.com/OrcaCD/orca-cd/internal/hub/models" "github.com/OrcaCD/orca-cd/internal/hub/repositories" + messages "github.com/OrcaCD/orca-cd/internal/proto" "github.com/rs/zerolog" "gorm.io/driver/sqlite" "gorm.io/gorm" @@ -29,6 +30,51 @@ type mockProvider struct { onGetLatestCommit func() } +type stubDeploymentHandle struct { + err error + result *messages.DeployResult +} + +func (h stubDeploymentHandle) Await(_ context.Context) (*messages.DeployResult, error) { + return h.result, h.err +} + +func (h stubDeploymentHandle) Cancel() {} + +type stubDeploymentManager struct { + deployErr error + deployResult *messages.DeployResult + lastAppID string + lastCompose string + trackedAppID string +} + +func (m *stubDeploymentManager) StartDeploy(app *models.Application, composeFile string) (DeploymentHandle, error) { + m.lastAppID = app.Id + m.lastCompose = composeFile + + return stubDeploymentHandle{ + err: m.deployErr, + result: m.deployResult, + }, nil +} + +func (m *stubDeploymentManager) DeployAndWait(_ context.Context, app *models.Application, composeFile string) (*messages.DeployResult, error) { + m.lastAppID = app.Id + m.lastCompose = composeFile + return m.deployResult, m.deployErr +} + +func (m *stubDeploymentManager) TrackManualDeploy(app models.Application, _ DeploymentHandle) { + m.trackedAppID = app.Id +} + +func useTestDeployer(t *testing.T, deployer DeploymentManager) { + t.Helper() + DefaultDeployer = deployer + t.Cleanup(func() { DefaultDeployer = nil }) +} + func (m *mockProvider) ParseURL(url string) (string, string, error) { return "", "", nil } func (m *mockProvider) SupportedAuthMethods() []models.RepositoryAuthMethod { return nil } func (m *mockProvider) TestConnection(_ context.Context, _ *models.Repository) error { return nil } @@ -262,6 +308,9 @@ func TestProcessSyncJob_NoComposeChange_SetsStatusSynced(t *testing.T) { func TestProcessSyncJob_ComposeChanged_UpdatesComposeAndSetsStatusSynced(t *testing.T) { setupTestDB(t) + useTestDeployer(t, &stubDeploymentManager{ + deployResult: &messages.DeployResult{Success: true}, + }) repo := seedRepo(t) agent := seedAgent(t) const oldCompose = "services:\n app:\n image: myimage:1.0\n" @@ -481,6 +530,9 @@ func TestQueue_Enqueue_FullQueue_DropsJob(t *testing.T) { func TestQueue_Start_ProcessesJobs(t *testing.T) { setupTestDB(t) + useTestDeployer(t, &stubDeploymentManager{ + deployResult: &messages.DeployResult{Success: true}, + }) repo := seedRepo(t) agent := seedAgent(t) const compose = "services:\n app:\n image: myimage:1.0\n" diff --git a/backend/internal/hub/applications/deploy.go b/backend/internal/hub/applications/deploy.go new file mode 100644 index 0000000..66a6cc7 --- /dev/null +++ b/backend/internal/hub/applications/deploy.go @@ -0,0 +1,205 @@ +package applications + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/OrcaCD/orca-cd/internal/hub/db" + "github.com/OrcaCD/orca-cd/internal/hub/models" + "github.com/OrcaCD/orca-cd/internal/hub/sse" + hubws "github.com/OrcaCD/orca-cd/internal/hub/websocket" + messages "github.com/OrcaCD/orca-cd/internal/proto" + "github.com/google/uuid" + "github.com/rs/zerolog" + "gorm.io/gorm" +) + +const ( + applicationsSSEPath = "/api/v1/applications" + manualDeployTimeout = 5 * time.Minute +) + +var ErrAgentUnavailable = errors.New("agent is not connected") + +type DeploymentHandle interface { + Await(ctx context.Context) (*messages.DeployResult, error) + Cancel() +} + +type DeploymentManager interface { + StartDeploy(app *models.Application, composeFile string) (DeploymentHandle, error) + DeployAndWait(ctx context.Context, app *models.Application, composeFile string) (*messages.DeployResult, error) + TrackManualDeploy(app models.Application, handle DeploymentHandle) +} + +type deployTransport interface { + StartDeploy(agentID string, req *messages.DeployRequest) (*hubws.DeployHandle, error) +} + +type websocketTransport struct { + hub *hubws.Hub +} + +func (t websocketTransport) StartDeploy(agentID string, req *messages.DeployRequest) (*hubws.DeployHandle, error) { + return t.hub.StartDeploy(agentID, req) +} + +type Deployer struct { + log *zerolog.Logger + transport deployTransport +} + +var DefaultDeployer DeploymentManager + +func NewDeployer(hub *hubws.Hub, log *zerolog.Logger) *Deployer { + return &Deployer{ + log: log, + transport: websocketTransport{hub: hub}, + } +} + +func (d *Deployer) StartDeploy(app *models.Application, composeFile string) (DeploymentHandle, error) { + if app == nil { + return nil, errors.New("application is required") + } + if app.AgentId == "" { + return nil, errors.New("application is missing agent id") + } + if composeFile == "" { + return nil, errors.New("application compose file is empty") + } + + handle, err := d.transport.StartDeploy(app.AgentId, &messages.DeployRequest{ + RequestId: uuid.NewString(), + ApplicationId: app.Id, + ApplicationName: app.Name.String(), + ComposeFile: composeFile, + }) + if err != nil { + if errors.Is(err, hubws.ErrDeployUnavailable) { + return nil, ErrAgentUnavailable + } + return nil, fmt.Errorf("start deploy: %w", err) + } + + // Mark deployment as in progress + if err := markDeploymentInProgress(context.Background(), app.Id, d.log); err != nil { + return nil, fmt.Errorf("mark deployment in progress: %w", err) + } + + return handle, nil +} + +func (d *Deployer) DeployAndWait(ctx context.Context, app *models.Application, composeFile string) (*messages.DeployResult, error) { + handle, err := d.StartDeploy(app, composeFile) + if err != nil { + return nil, err + } + + result, err := handle.Await(ctx) + if err != nil { + return nil, err + } + + return result, nil +} + +func (d *Deployer) TrackManualDeploy(app models.Application, handle DeploymentHandle) { + go d.trackManualDeploy(app, handle) +} + +func (d *Deployer) trackManualDeploy(app models.Application, handle DeploymentHandle) { + ctx, cancel := context.WithTimeout(context.Background(), manualDeployTimeout) + defer cancel() + + result, err := handle.Await(ctx) + if err != nil { + d.log.Error().Err(err).Str("applicationId", app.Id).Msg("manual deployment failed") + markDeploymentTransportError(context.Background(), app.Id, d.log) + return + } + + if result == nil { + d.log.Error().Str("applicationId", app.Id).Msg("manual deployment finished without a result") + markDeploymentTransportError(context.Background(), app.Id, d.log) + return + } + + if !result.Success { + d.log.Error(). + Str("applicationId", app.Id). + Str("request_id", result.RequestId). + Str("error", result.ErrorMessage). + Msg("manual deployment failed on agent") + markDeploymentExecutionFailure(context.Background(), app.Id, d.log) + return + } + + markDeploymentSuccess(context.Background(), app.Id, nil, d.log) +} + +func markDeploymentInProgress(ctx context.Context, applicationID string, log *zerolog.Logger) error { + if _, err := gorm.G[models.Application](db.DB). + Where("id = ?", applicationID). + Updates(ctx, models.Application{ + SyncStatus: models.Syncing, + }); err != nil { + log.Error().Err(err).Str("applicationId", applicationID).Msg("failed to mark application deployment in progress") + sse.PublishUpdate(applicationsSSEPath) + return err + } + + sse.PublishUpdate(applicationsSSEPath) + return nil +} + +func markDeploymentSuccess(ctx context.Context, applicationID string, updateFn func(*models.Application), log *zerolog.Logger) { + now := time.Now() + updates := models.Application{ + SyncStatus: models.Synced, + HealthStatus: models.Healthy, + LastSyncedAt: &now, + } + + if updateFn != nil { + updateFn(&updates) + } + + if _, err := gorm.G[models.Application](db.DB). + Where("id = ?", applicationID). + Updates(ctx, updates); err != nil { + log.Error().Err(err).Str("applicationId", applicationID).Msg("failed to update application after deployment") + return + } + + sse.PublishUpdate(applicationsSSEPath) +} + +func markDeploymentExecutionFailure(ctx context.Context, applicationID string, log *zerolog.Logger) { + if _, err := gorm.G[models.Application](db.DB). + Where("id = ?", applicationID). + Updates(ctx, models.Application{ + SyncStatus: models.OutOfSync, + HealthStatus: models.Unhealthy, + }); err != nil { + log.Error().Err(err).Str("applicationId", applicationID).Msg("failed to mark application deployment failure") + return + } + + sse.PublishUpdate(applicationsSSEPath) +} + +func markDeploymentTransportError(ctx context.Context, applicationID string, log *zerolog.Logger) { + if _, err := gorm.G[models.Application](db.DB). + Where("id = ?", applicationID). + Updates(ctx, models.Application{ + SyncStatus: models.OutOfSync, + }); err != nil { + log.Error().Err(err).Str("applicationId", applicationID).Msg("failed to mark application deployment transport error") + return + } + + sse.PublishUpdate(applicationsSSEPath) +} diff --git a/backend/internal/hub/applications/deploy_test.go b/backend/internal/hub/applications/deploy_test.go new file mode 100644 index 0000000..02dcc3c --- /dev/null +++ b/backend/internal/hub/applications/deploy_test.go @@ -0,0 +1,394 @@ +package applications + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/OrcaCD/orca-cd/internal/hub/crypto" + "github.com/OrcaCD/orca-cd/internal/hub/db" + "github.com/OrcaCD/orca-cd/internal/hub/models" + hubws "github.com/OrcaCD/orca-cd/internal/hub/websocket" + messages "github.com/OrcaCD/orca-cd/internal/proto" + "github.com/rs/zerolog" + "gorm.io/gorm" +) + +// Mock deployment handle for testing +type mockDeploymentHandle struct { + result *messages.DeployResult + err error +} + +func (m mockDeploymentHandle) Await(context.Context) (*messages.DeployResult, error) { + return m.result, m.err +} + +func (m mockDeploymentHandle) Cancel() {} + +// Mock transport for testing +type mockDeployTransport struct { + startErr error +} + +func (m *mockDeployTransport) StartDeploy(agentID string, req *messages.DeployRequest) (*hubws.DeployHandle, error) { + // For testing, we return nil since we're testing at a higher level + // The actual hubws.DeployHandle would be created by the real hub + return nil, m.startErr +} + +func seedTestApp(t *testing.T, repoID string) models.Application { + t.Helper() + return seedApp(t, repoID, "agent-1", "version: '3.9'\n") +} + +func TestNewDeployer(t *testing.T) { + setupTestDB(t) + log := zerolog.Nop() + hub := hubws.NewHub(&log) + + deployer := NewDeployer(hub, &log) + + if deployer == nil { + t.Fatal("expected deployer to be created") + } + if deployer.log != &log { + t.Error("expected logger to be set") + } +} + +func TestStartDeploy_NilApplication(t *testing.T) { + setupTestDB(t) + log := zerolog.Nop() + + transport := &mockDeployTransport{} + deployer := &Deployer{ + log: &log, + transport: transport, + } + + handle, err := deployer.StartDeploy(nil, "compose") + + if err == nil { + t.Fatal("expected error for nil application") + } + if handle != nil { + t.Error("expected no handle to be returned") + } +} + +func TestStartDeploy_MissingAgentId(t *testing.T) { + setupTestDB(t) + repo := seedRepo(t) + log := zerolog.Nop() + + app := models.Application{ + Name: crypto.EncryptedString("test-app"), + RepositoryId: repo.Id, + AgentId: "", // Missing + SyncStatus: models.UnknownSync, + HealthStatus: models.UnknownHealth, + Branch: "main", + Path: "docker-compose.yml", + ComposeFile: crypto.EncryptedString("version: '3.9'\n"), + PreviousComposeFile: crypto.EncryptedString(""), + } + + transport := &mockDeployTransport{} + deployer := &Deployer{ + log: &log, + transport: transport, + } + + handle, err := deployer.StartDeploy(&app, "compose") + + if err == nil { + t.Fatal("expected error for missing agent id") + } + if handle != nil { + t.Error("expected no handle to be returned") + } +} + +func TestStartDeploy_EmptyComposeFile(t *testing.T) { + setupTestDB(t) + repo := seedRepo(t) + log := zerolog.Nop() + + app := models.Application{ + Name: crypto.EncryptedString("test-app"), + RepositoryId: repo.Id, + AgentId: "agent-1", + SyncStatus: models.UnknownSync, + HealthStatus: models.UnknownHealth, + Branch: "main", + Path: "docker-compose.yml", + ComposeFile: crypto.EncryptedString(""), // Empty + PreviousComposeFile: crypto.EncryptedString(""), + } + + transport := &mockDeployTransport{} + deployer := &Deployer{ + log: &log, + transport: transport, + } + + handle, err := deployer.StartDeploy(&app, "") + + if err == nil { + t.Fatal("expected error for empty compose file") + } + if handle != nil { + t.Error("expected no handle to be returned") + } +} + +func TestStartDeploy_AgentUnavailable(t *testing.T) { + setupTestDB(t) + repo := seedRepo(t) + app := seedTestApp(t, repo.Id) + log := zerolog.Nop() + + transport := &mockDeployTransport{ + startErr: hubws.ErrDeployUnavailable, + } + deployer := &Deployer{ + log: &log, + transport: transport, + } + + handle, err := deployer.StartDeploy(&app, app.ComposeFile.String()) + + if err == nil { + t.Fatal("expected error for unavailable agent") + } + if !errors.Is(err, ErrAgentUnavailable) { + t.Errorf("expected ErrAgentUnavailable, got %v", err) + } + if handle != nil { + t.Error("expected no handle to be returned") + } +} + +func TestTrackManualDeploy_Success(t *testing.T) { + setupTestDB(t) + repo := seedRepo(t) + app := seedTestApp(t, repo.Id) + log := zerolog.Nop() + + result := &messages.DeployResult{ + RequestId: "req-1", + Success: true, + ErrorMessage: "", + } + + mockHandle := &mockDeploymentHandle{result: result} + deployer := &Deployer{ + log: &log, + transport: &mockDeployTransport{}, + } + + // Start tracking in goroutine + deployer.TrackManualDeploy(app, mockHandle) + + // Give it time to complete + time.Sleep(100 * time.Millisecond) + + // Verify app status was updated to Synced + updated, err := gorm.G[models.Application](db.DB).Where("id = ?", app.Id).First(context.Background()) + if err != nil { + t.Fatalf("failed to fetch app: %v", err) + } + if updated.SyncStatus != models.Synced { + t.Errorf("expected sync status %q, got %q", models.Synced, updated.SyncStatus) + } + if updated.HealthStatus != models.Healthy { + t.Errorf("expected health status %q, got %q", models.Healthy, updated.HealthStatus) + } +} + +func TestTrackManualDeploy_ExecutionFailure(t *testing.T) { + setupTestDB(t) + repo := seedRepo(t) + app := seedTestApp(t, repo.Id) + log := zerolog.Nop() + + result := &messages.DeployResult{ + RequestId: "req-1", + Success: false, + ErrorMessage: "deployment failed on agent", + } + + mockHandle := &mockDeploymentHandle{result: result} + deployer := &Deployer{ + log: &log, + transport: &mockDeployTransport{}, + } + + deployer.TrackManualDeploy(app, mockHandle) + time.Sleep(100 * time.Millisecond) + + updated, err := gorm.G[models.Application](db.DB).Where("id = ?", app.Id).First(context.Background()) + if err != nil { + t.Fatalf("failed to fetch app: %v", err) + } + if updated.SyncStatus != models.OutOfSync { + t.Errorf("expected sync status %q, got %q", models.OutOfSync, updated.SyncStatus) + } + if updated.HealthStatus != models.Unhealthy { + t.Errorf("expected health status %q, got %q", models.Unhealthy, updated.HealthStatus) + } +} + +func TestTrackManualDeploy_TransportError(t *testing.T) { + setupTestDB(t) + repo := seedRepo(t) + app := seedTestApp(t, repo.Id) + log := zerolog.Nop() + + mockHandle := &mockDeploymentHandle{ + err: errors.New("transport error"), + } + deployer := &Deployer{ + log: &log, + transport: &mockDeployTransport{}, + } + + deployer.TrackManualDeploy(app, mockHandle) + time.Sleep(100 * time.Millisecond) + + updated, err := gorm.G[models.Application](db.DB).Where("id = ?", app.Id).First(context.Background()) + if err != nil { + t.Fatalf("failed to fetch app: %v", err) + } + if updated.SyncStatus != models.OutOfSync { + t.Errorf("expected sync status %q, got %q", models.OutOfSync, updated.SyncStatus) + } +} + +func TestTrackManualDeploy_NilResult(t *testing.T) { + setupTestDB(t) + repo := seedRepo(t) + app := seedTestApp(t, repo.Id) + log := zerolog.Nop() + + mockHandle := &mockDeploymentHandle{result: nil} + deployer := &Deployer{ + log: &log, + transport: &mockDeployTransport{}, + } + + deployer.TrackManualDeploy(app, mockHandle) + time.Sleep(100 * time.Millisecond) + + updated, err := gorm.G[models.Application](db.DB).Where("id = ?", app.Id).First(context.Background()) + if err != nil { + t.Fatalf("failed to fetch app: %v", err) + } + if updated.SyncStatus != models.OutOfSync { + t.Errorf("expected sync status %q, got %q", models.OutOfSync, updated.SyncStatus) + } +} + +func TestMarkDeploymentInProgress(t *testing.T) { + setupTestDB(t) + repo := seedRepo(t) + app := seedTestApp(t, repo.Id) + log := zerolog.Nop() + + err := markDeploymentInProgress(context.Background(), app.Id, &log) + + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + updated, err := gorm.G[models.Application](db.DB).Where("id = ?", app.Id).First(context.Background()) + if err != nil { + t.Fatalf("failed to fetch app: %v", err) + } + if updated.SyncStatus != models.Syncing { + t.Errorf("expected sync status %q, got %q", models.Syncing, updated.SyncStatus) + } +} + +func TestMarkDeploymentSuccess(t *testing.T) { + setupTestDB(t) + repo := seedRepo(t) + app := seedTestApp(t, repo.Id) + log := zerolog.Nop() + + markDeploymentSuccess(context.Background(), app.Id, nil, &log) + + updated, err := gorm.G[models.Application](db.DB).Where("id = ?", app.Id).First(context.Background()) + if err != nil { + t.Fatalf("failed to fetch app: %v", err) + } + if updated.SyncStatus != models.Synced { + t.Errorf("expected sync status %q, got %q", models.Synced, updated.SyncStatus) + } + if updated.HealthStatus != models.Healthy { + t.Errorf("expected health status %q, got %q", models.Healthy, updated.HealthStatus) + } + if updated.LastSyncedAt == nil { + t.Error("expected LastSyncedAt to be set") + } +} + +func TestMarkDeploymentSuccess_WithUpdateFn(t *testing.T) { + setupTestDB(t) + repo := seedRepo(t) + app := seedTestApp(t, repo.Id) + log := zerolog.Nop() + + updateFn := func(a *models.Application) { + a.Commit = "new-commit-hash" + } + + markDeploymentSuccess(context.Background(), app.Id, updateFn, &log) + + updated, err := gorm.G[models.Application](db.DB).Where("id = ?", app.Id).First(context.Background()) + if err != nil { + t.Fatalf("failed to fetch app: %v", err) + } + if updated.Commit != "new-commit-hash" { + t.Errorf("expected commit %q, got %q", "new-commit-hash", updated.Commit) + } +} + +func TestMarkDeploymentExecutionFailure(t *testing.T) { + setupTestDB(t) + repo := seedRepo(t) + app := seedTestApp(t, repo.Id) + log := zerolog.Nop() + + markDeploymentExecutionFailure(context.Background(), app.Id, &log) + + updated, err := gorm.G[models.Application](db.DB).Where("id = ?", app.Id).First(context.Background()) + if err != nil { + t.Fatalf("failed to fetch app: %v", err) + } + if updated.SyncStatus != models.OutOfSync { + t.Errorf("expected sync status %q, got %q", models.OutOfSync, updated.SyncStatus) + } + if updated.HealthStatus != models.Unhealthy { + t.Errorf("expected health status %q, got %q", models.Unhealthy, updated.HealthStatus) + } +} + +func TestMarkDeploymentTransportError(t *testing.T) { + setupTestDB(t) + repo := seedRepo(t) + app := seedTestApp(t, repo.Id) + log := zerolog.Nop() + + markDeploymentTransportError(context.Background(), app.Id, &log) + + updated, err := gorm.G[models.Application](db.DB).Where("id = ?", app.Id).First(context.Background()) + if err != nil { + t.Fatalf("failed to fetch app: %v", err) + } + if updated.SyncStatus != models.OutOfSync { + t.Errorf("expected sync status %q, got %q", models.OutOfSync, updated.SyncStatus) + } +} diff --git a/backend/internal/hub/applications/sync.go b/backend/internal/hub/applications/sync.go index 7ba14be..13f4ba3 100644 --- a/backend/internal/hub/applications/sync.go +++ b/backend/internal/hub/applications/sync.go @@ -26,28 +26,12 @@ func GetMatchingApplications(ctx context.Context, repository *models.Repository, } func processSyncJob(ctx context.Context, job syncJob, log *zerolog.Logger) { - if _, err := gorm.G[models.Application](db.DB). - Where("id = ?", job.Application.Id). - Updates(ctx, models.Application{ - SyncStatus: models.Syncing, - }); err != nil { - log.Error().Err(err).Str("applicationId", job.Application.Id). - Msg("failed to set application sync status to syncing") - } - sse.PublishUpdate("/api/v1/applications") + _ = markDeploymentInProgress(context.Background(), job.Application.Id, log) success := false defer func() { if !success { - if _, err := gorm.G[models.Application](db.DB). - Where("id = ?", job.Application.Id). - Updates(ctx, models.Application{ - SyncStatus: models.OutOfSync, - }); err != nil { - log.Error().Err(err).Str("applicationId", job.Application.Id). - Msg("failed to mark application as out_of_sync after failed sync") - } - sse.PublishUpdate("/api/v1/applications") + markDeploymentExecutionFailure(ctx, job.Application.Id, log) } }() @@ -78,23 +62,35 @@ func processSyncJob(ctx context.Context, job syncJob, log *zerolog.Logger) { return } - // TODO: trigger actual deployment to agent - // ... - success = true + if DefaultDeployer == nil { + log.Error().Str("applicationId", job.Application.Id).Msg("application deployer not initialized") + return + } - if _, err := gorm.G[models.Application](db.DB). - Where("id = ?", job.Application.Id). - Updates(ctx, models.Application{ - ComposeFile: crypto.EncryptedString(content), - PreviousComposeFile: job.Application.ComposeFile, - Commit: job.Commit, - CommitMessage: job.CommitMessage, - SyncStatus: models.Synced, - LastSyncedAt: &now, - }); err != nil { - log.Error().Err(err).Str("applicationId", job.Application.Id). - Msg("failed to update application after sync (compose changed)") + result, err := DefaultDeployer.DeployAndWait(ctx, &job.Application, content) + if err != nil { + log.Error().Err(err).Str("applicationId", job.Application.Id).Msg("failed to deploy compose file to agent") + return + } + if result == nil { + log.Error().Str("applicationId", job.Application.Id).Msg("agent deployment finished without a result") + return + } + if !result.Success { + log.Error(). + Str("applicationId", job.Application.Id). + Str("request_id", result.RequestId). + Str("error", result.ErrorMessage). + Msg("agent reported deployment failure") + return } - sse.PublishUpdate("/api/v1/applications") + success = true + markDeploymentSuccess(ctx, job.Application.Id, func(update *models.Application) { + update.ComposeFile = crypto.EncryptedString(content) + update.PreviousComposeFile = job.Application.ComposeFile + update.Commit = job.Commit + update.CommitMessage = job.CommitMessage + update.LastSyncedAt = &now + }, log) } diff --git a/backend/internal/hub/handlers.go b/backend/internal/hub/handlers.go index 46c1311..2138f95 100644 --- a/backend/internal/hub/handlers.go +++ b/backend/internal/hub/handlers.go @@ -3,6 +3,7 @@ package hub import ( "time" + "github.com/OrcaCD/orca-cd/internal/hub/applications" "github.com/OrcaCD/orca-cd/internal/hub/middleware" "github.com/OrcaCD/orca-cd/internal/hub/routes" "github.com/OrcaCD/orca-cd/internal/hub/sse" @@ -58,6 +59,7 @@ func RegisterRoutes(router *gin.Engine, cfg Config) error { protected.GET("/applications", routes.ListApplicationsHandler) protected.GET("/applications/:id", routes.GetApplicationHandler) protected.POST("/applications", routes.CreateApplicationHandler) + protected.POST("/applications/:id/deploy", routes.DeployApplicationHandler) protected.PUT("/applications/:id", routes.UpdateApplicationHandler) protected.DELETE("/applications/:id", routes.DeleteApplicationHandler) @@ -100,6 +102,7 @@ func RegisterRoutes(router *gin.Engine, cfg Config) error { sse.DefaultBroker = sse.NewBroker(&Log) h := websocket.NewHub(&Log) + applications.DefaultDeployer = applications.NewDeployer(h, &Log) w := websocket.NewWorker(h, &Log) w.Start() diff --git a/backend/internal/hub/routes/applications.go b/backend/internal/hub/routes/applications.go index 68529e6..0612dfa 100644 --- a/backend/internal/hub/routes/applications.go +++ b/backend/internal/hub/routes/applications.go @@ -5,6 +5,7 @@ import ( "net/http" "time" + hubApplications "github.com/OrcaCD/orca-cd/internal/hub/applications" "github.com/OrcaCD/orca-cd/internal/hub/crypto" "github.com/OrcaCD/orca-cd/internal/hub/db" "github.com/OrcaCD/orca-cd/internal/hub/models" @@ -271,6 +272,49 @@ func DeleteApplicationHandler(c *gin.Context) { sse.PublishUpdate(ApplicationsPath) } +func DeployApplicationHandler(c *gin.Context) { + if hubApplications.DefaultDeployer == nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "application deployer not initialized"}) + return + } + + id := c.Param("id") + application, err := gorm.G[models.Application](db.DB). + Preload("Repository", nil). + Preload("Agent", nil). + Where("id = ?", id). + First(c.Request.Context()) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "application not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "internal server error"}) + return + } + + handle, err := hubApplications.DefaultDeployer.StartDeploy(&application, application.ComposeFile.String()) + if err != nil { + if errors.Is(err, hubApplications.ErrAgentUnavailable) { + c.JSON(http.StatusConflict, gin.H{"error": "agent is not connected"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to start deployment"}) + return + } + + application.SyncStatus = models.Syncing + if err := db.DB.Save(&application).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update application"}) + return + } + + hubApplications.DefaultDeployer.TrackManualDeploy(application, handle) + + c.JSON(http.StatusAccepted, gin.H{"message": "deployment started"}) + sse.PublishUpdate(ApplicationsPath) +} + func toApplicationListResponse(app *models.Application) applicationListResponse { return applicationListResponse{ Id: app.Id, diff --git a/backend/internal/hub/routes/applications_test.go b/backend/internal/hub/routes/applications_test.go index f525faf..20f1269 100644 --- a/backend/internal/hub/routes/applications_test.go +++ b/backend/internal/hub/routes/applications_test.go @@ -2,6 +2,7 @@ package routes import ( "bytes" + "context" "encoding/base64" "encoding/json" "io" @@ -11,9 +12,11 @@ import ( "testing" "time" + hubapplications "github.com/OrcaCD/orca-cd/internal/hub/applications" "github.com/OrcaCD/orca-cd/internal/hub/crypto" "github.com/OrcaCD/orca-cd/internal/hub/db" "github.com/OrcaCD/orca-cd/internal/hub/models" + messages "github.com/OrcaCD/orca-cd/internal/proto" "github.com/OrcaCD/orca-cd/internal/shared/httpclient" "github.com/gin-gonic/gin" "gorm.io/gorm" @@ -29,6 +32,35 @@ const ( seedComposeFileContent = "version: \"3.8\"\nservices:\n app:\n image: ghcr.io/orcacd/app:old\n" ) +type stubRouteDeployHandle struct{} + +func (stubRouteDeployHandle) Await(context.Context) (*messages.DeployResult, error) { return nil, nil } +func (stubRouteDeployHandle) Cancel() {} + +type stubRouteDeployer struct { + startErr error + startedApp string + startedWith string + trackedApp string +} + +func (d *stubRouteDeployer) StartDeploy(app *models.Application, composeFile string) (hubapplications.DeploymentHandle, error) { + d.startedApp = app.Id + d.startedWith = composeFile + if d.startErr != nil { + return nil, d.startErr + } + return stubRouteDeployHandle{}, nil +} + +func (d *stubRouteDeployer) DeployAndWait(context.Context, *models.Application, string) (*messages.DeployResult, error) { + return nil, nil +} + +func (d *stubRouteDeployer) TrackManualDeploy(app models.Application, _ hubapplications.DeploymentHandle) { + d.trackedApp = app.Id +} + func setupTestDBWithApplications(t *testing.T) { t.Helper() setupTestDB(t) @@ -980,6 +1012,74 @@ func TestDeleteApplicationHandler_DBError(t *testing.T) { } } +func TestDeployApplicationHandler_Success(t *testing.T) { + setupTestDBWithApplications(t) + + repo := seedTestRepository(t, "https://github.com/owner/repo-deploy") + agent := seedTestAgent(t, "agent-deploy") + app := seedTestApplication(t, repo.Id, agent.Id, "Deploy Me") + + deployer := &stubRouteDeployer{} + hubapplications.DefaultDeployer = deployer + t.Cleanup(func() { hubapplications.DefaultDeployer = nil }) + + c, w := makeAuthContext(t, "user-1") + c.Request = httptest.NewRequest(http.MethodPost, "/api/v1/applications/"+app.Id+"/deploy", nil) + c.Params = gin.Params{{Key: "id", Value: app.Id}} + + DeployApplicationHandler(c) + + if w.Code != http.StatusAccepted { + t.Fatalf("expected 202, got %d: %s", w.Code, w.Body.String()) + } + if deployer.startedApp != app.Id { + t.Fatalf("expected deployer to start app %q, got %q", app.Id, deployer.startedApp) + } + if deployer.startedWith != seedComposeFileContent { + t.Fatalf("expected deployer compose file to match stored compose") + } + if deployer.trackedApp != app.Id { + t.Fatalf("expected background tracking for app %q, got %q", app.Id, deployer.trackedApp) + } + + updated, err := gorm.G[models.Application](db.DB).Where("id = ?", app.Id).First(t.Context()) + if err != nil { + t.Fatalf("failed to reload app: %v", err) + } + if updated.SyncStatus != models.Syncing { + t.Fatalf("expected app sync status %q, got %q", models.Syncing, updated.SyncStatus) + } +} + +func TestDeployApplicationHandler_AgentUnavailable(t *testing.T) { + setupTestDBWithApplications(t) + + repo := seedTestRepository(t, "https://github.com/owner/repo-deploy-offline") + agent := seedTestAgent(t, "agent-deploy-offline") + app := seedTestApplication(t, repo.Id, agent.Id, "Deploy Me") + + hubapplications.DefaultDeployer = &stubRouteDeployer{startErr: hubapplications.ErrAgentUnavailable} + t.Cleanup(func() { hubapplications.DefaultDeployer = nil }) + + c, w := makeAuthContext(t, "user-1") + c.Request = httptest.NewRequest(http.MethodPost, "/api/v1/applications/"+app.Id+"/deploy", nil) + c.Params = gin.Params{{Key: "id", Value: app.Id}} + + DeployApplicationHandler(c) + + if w.Code != http.StatusConflict { + t.Fatalf("expected 409, got %d: %s", w.Code, w.Body.String()) + } + + updated, err := gorm.G[models.Application](db.DB).Where("id = ?", app.Id).First(t.Context()) + if err != nil { + t.Fatalf("failed to reload app: %v", err) + } + if updated.SyncStatus != models.UnknownSync { + t.Fatalf("expected sync status to remain %q, got %q", models.UnknownSync, updated.SyncStatus) + } +} + func TestParseRFC3339Timestamp(t *testing.T) { t.Run("nil value", func(t *testing.T) { parsed, ok := parseRFC3339Timestamp(nil) diff --git a/backend/internal/hub/websocket/deploy.go b/backend/internal/hub/websocket/deploy.go new file mode 100644 index 0000000..9a86164 --- /dev/null +++ b/backend/internal/hub/websocket/deploy.go @@ -0,0 +1,128 @@ +package websocket + +import ( + "context" + "errors" + "sync" + + messages "github.com/OrcaCD/orca-cd/internal/proto" +) + +type DeployHandle struct { + deployManager *DeployManager + requestID string + outcome chan deployOutcome +} + +type deployOutcome struct { + err error + result *messages.DeployResult +} + +type pendingDeploy struct { + agentID string + outcome chan deployOutcome +} + +var ErrAgentDisconnected = errors.New("agent disconnected before deployment completed") +var ErrDeployUnavailable = errors.New("agent is not connected or unable to receive deploy requests") + +// Await waits for the deployment to complete or the context to be cancelled. +func (h *DeployHandle) Await(ctx context.Context) (*messages.DeployResult, error) { + select { + case outcome, ok := <-h.outcome: + if !ok { + return nil, ErrAgentDisconnected + } + return outcome.result, outcome.err + case <-ctx.Done(): + h.Cancel() + return nil, ctx.Err() + } +} + +// Cancel cancels the deployment. +func (h *DeployHandle) Cancel() { + h.deployManager.CancelDeploy(h.requestID) +} + +// DeployManager handles deployment state management. +type DeployManager struct { + mu sync.Mutex + pendingDeploys map[string]pendingDeploy +} + +// NewDeployManager creates a new deployment manager. +func NewDeployManager() *DeployManager { + return &DeployManager{ + pendingDeploys: make(map[string]pendingDeploy), + } +} + +// StartDeploy initiates a new deployment and returns a handle to await its result. +func (dm *DeployManager) StartDeploy(agentID string, req *messages.DeployRequest) pendingDeploy { + dm.mu.Lock() + defer dm.mu.Unlock() + + pending := pendingDeploy{ + agentID: agentID, + outcome: make(chan deployOutcome, 1), + } + dm.pendingDeploys[req.RequestId] = pending + return pending +} + +// ResolveDeploy completes a pending deployment with a result. +func (dm *DeployManager) ResolveDeploy(result *messages.DeployResult) bool { + dm.mu.Lock() + pending, ok := dm.pendingDeploys[result.RequestId] + if ok { + delete(dm.pendingDeploys, result.RequestId) + } + dm.mu.Unlock() + + if !ok { + return false + } + + pending.outcome <- deployOutcome{result: result} + close(pending.outcome) + + return true +} + +// CancelDeploy cancels a pending deployment. +func (dm *DeployManager) CancelDeploy(requestID string) { + dm.mu.Lock() + pending, ok := dm.pendingDeploys[requestID] + if ok { + delete(dm.pendingDeploys, requestID) + } + dm.mu.Unlock() + + if ok { + close(pending.outcome) + } +} + +// FailPendingDeploys fails all pending deployments for a given agent. +func (dm *DeployManager) FailPendingDeploys(agentID string, err error) { + dm.mu.Lock() + requestIDs := make([]string, 0) + pending := make([]pendingDeploy, 0) + for requestID, waiter := range dm.pendingDeploys { + if waiter.agentID == agentID { + requestIDs = append(requestIDs, requestID) + pending = append(pending, waiter) + } + } + for _, requestID := range requestIDs { + delete(dm.pendingDeploys, requestID) + } + dm.mu.Unlock() + + for _, waiter := range pending { + waiter.outcome <- deployOutcome{err: err} + close(waiter.outcome) + } +} diff --git a/backend/internal/hub/websocket/handler.go b/backend/internal/hub/websocket/handler.go index 7dfa2fc..6b475cd 100644 --- a/backend/internal/hub/websocket/handler.go +++ b/backend/internal/hub/websocket/handler.go @@ -18,6 +18,15 @@ import ( "gorm.io/gorm" ) +// WSConn is an interface for WebSocket connections, allowing for testing. +type WSConn interface { + ReadMessage() (messageType int, data []byte, err error) + WriteMessage(messageType int, data []byte) error + SetReadDeadline(t time.Time) error + SetWriteDeadline(t time.Time) error + Close() error +} + const pongWait = 90 * time.Second // must be greater than the worker ping interval (60s) const handshakeTimeout = 15 * time.Second @@ -130,12 +139,12 @@ func WsHandler(h *Hub, log *zerolog.Logger) gin.HandlerFunc { continue } - go handleClientMessage(client, msg, log) + go handleClientMessage(h, client, msg, log) } } } -func handleClientMessage(client *Client, msg *messages.ClientMessage, log *zerolog.Logger) { +func handleClientMessage(h *Hub, client *Client, msg *messages.ClientMessage, log *zerolog.Logger) { _, isEncrypted := msg.Payload.(*messages.ClientMessage_EncryptedPayload) if !isEncrypted && !wscrypto.AllowedUnencrypted(msg) { log.Warn().Str("client", client.Id).Msgf("dropping unencrypted message of type %T", msg.Payload) @@ -167,12 +176,19 @@ func handleClientMessage(client *Client, msg *messages.ClientMessage, log *zerol if err != nil { log.Error().Err(err).Str("client", client.Id).Msg("Failed to update last_seen") } + case *messages.ClientMessage_DeployResult: + if !h.ResolveDeploy(p.DeployResult) { + log.Warn(). + Str("client", client.Id). + Str("request_id", p.DeployResult.RequestId). + Msg("received deploy result for unknown request") + } default: log.Warn().Str("client", client.Id).Msg("Unknown message type received") } } -func performHandshake(conn *websocket.Conn, agentID string, log *zerolog.Logger) (*wscrypto.Session, error) { +func performHandshake(conn WSConn, agentID string, log *zerolog.Logger) (*wscrypto.Session, error) { hubKeys, err := wscrypto.GenerateHubKeys() if err != nil { return nil, err diff --git a/backend/internal/hub/websocket/handler_test.go b/backend/internal/hub/websocket/handler_test.go index fc4cabb..251a017 100644 --- a/backend/internal/hub/websocket/handler_test.go +++ b/backend/internal/hub/websocket/handler_test.go @@ -1,6 +1,7 @@ package websocket import ( + "errors" "log" "net/http" "net/http/httptest" @@ -418,6 +419,7 @@ func TestWsHandler_HandlesPong(t *testing.T) { func TestHandleClientMessage_DropsUnencryptedNonPong(t *testing.T) { log := testLogger() + h := NewHub(&log) client := &Client{Id: "test", Send: make(chan *messages.ServerMessage, 1)} // KeyExchangeResponse is neither encrypted nor a Pong — must be dropped silently. msg := &messages.ClientMessage{ @@ -425,11 +427,12 @@ func TestHandleClientMessage_DropsUnencryptedNonPong(t *testing.T) { KeyExchangeResponse: &messages.KeyExchangeResponse{}, }, } - handleClientMessage(client, msg, &log) // must not panic or block + handleClientMessage(h, client, msg, &log) // must not panic or block } func TestHandleClientMessage_DecryptError(t *testing.T) { log := testLogger() + h := NewHub(&log) sessionKey := make([]byte, 32) session, err := wscrypto.NewSession(sessionKey) if err != nil { @@ -444,11 +447,12 @@ func TestHandleClientMessage_DecryptError(t *testing.T) { }, }, } - handleClientMessage(client, msg, &log) // must return without panic + handleClientMessage(h, client, msg, &log) // must return without panic } func TestHandleClientMessage_DoublyEncrypted(t *testing.T) { log := testLogger() + h := NewHub(&log) sessionKey := make([]byte, 32) session, err := wscrypto.NewSession(sessionKey) if err != nil { @@ -469,12 +473,13 @@ func TestHandleClientMessage_DoublyEncrypted(t *testing.T) { msg := &messages.ClientMessage{ Payload: &messages.ClientMessage_EncryptedPayload{EncryptedPayload: env}, } - handleClientMessage(client, msg, &log) // doubly-encrypted — must be dropped + handleClientMessage(h, client, msg, &log) // doubly-encrypted — must be dropped } func TestHandleClientMessage_EncryptedUnknownPayload(t *testing.T) { setupHandlerTestEnv(t) log := testLogger() + h := NewHub(&log) sessionKey := make([]byte, 32) session, err := wscrypto.NewSession(sessionKey) if err != nil { @@ -495,7 +500,7 @@ func TestHandleClientMessage_EncryptedUnknownPayload(t *testing.T) { msg := &messages.ClientMessage{ Payload: &messages.ClientMessage_EncryptedPayload{EncryptedPayload: env}, } - handleClientMessage(client, msg, &log) // hits the "unknown message type" default case + handleClientMessage(h, client, msg, &log) // hits the "unknown message type" default case } func TestWsHandler_AgentMarkedOfflineOnDisconnect(t *testing.T) { @@ -544,3 +549,232 @@ func TestWsHandler_AgentMarkedOfflineOnDisconnect(t *testing.T) { } t.Error("expected agent status to be Offline after disconnect") } + +// MockConn implements a subset of websocket.Conn for testing handshake errors. +type MockConn struct { + readErr error + writeErr error + readDeadErr error + writeDeadErr error + messages [][]byte + readIndex int +} + +func (m *MockConn) ReadMessage() (messageType int, data []byte, err error) { + if m.readErr != nil { + return 0, nil, m.readErr + } + if m.readIndex >= len(m.messages) { + return 0, nil, websocket.ErrCloseSent + } + data = m.messages[m.readIndex] + m.readIndex++ + return websocket.BinaryMessage, data, nil +} + +func (m *MockConn) WriteMessage(messageType int, data []byte) error { + return m.writeErr +} + +func (m *MockConn) SetReadDeadline(t time.Time) error { + return m.readDeadErr +} + +func (m *MockConn) SetWriteDeadline(t time.Time) error { + return m.writeDeadErr +} + +func (m *MockConn) Close() error { + return nil +} + +func TestPerformHandshake_WriteDeadlineError(t *testing.T) { + setupHandlerTestEnv(t) + log := testLogger() + conn := &MockConn{writeDeadErr: errors.New("failed to set write deadline")} + + session, err := performHandshake(conn, "test-agent-id", &log) + if err == nil { + t.Error("expected error from SetWriteDeadline, got nil") + } + if session != nil { + t.Error("expected nil session on error") + } + if !strings.Contains(err.Error(), "write deadline") { + t.Errorf("expected deadline error message, got: %v", err) + } +} + +func TestPerformHandshake_WriteMessageError(t *testing.T) { + setupHandlerTestEnv(t) + log := testLogger() + conn := &MockConn{writeErr: errors.New("failed to write message")} + + session, err := performHandshake(conn, "test-agent-id", &log) + if err == nil { + t.Error("expected error from WriteMessage, got nil") + } + if session != nil { + t.Error("expected nil session on error") + } + if !strings.Contains(err.Error(), "write") { + t.Errorf("expected write error, got: %v", err) + } +} + +func TestPerformHandshake_ReadDeadlineError(t *testing.T) { + setupHandlerTestEnv(t) + log := testLogger() + conn := &MockConn{readDeadErr: errors.New("failed to set read deadline")} + + session, err := performHandshake(conn, "test-agent-id", &log) + if err == nil { + t.Error("expected error from SetReadDeadline, got nil") + } + if session != nil { + t.Error("expected nil session on error") + } + if !strings.Contains(err.Error(), "read deadline") { + t.Errorf("expected deadline error message, got: %v", err) + } +} + +func TestPerformHandshake_ReadMessageError(t *testing.T) { + log := testLogger() + conn := &MockConn{readErr: errors.New("connection closed")} + + session, err := performHandshake(conn, "test-agent-id", &log) + if err == nil { + t.Error("expected error from ReadMessage, got nil") + } + if session != nil { + t.Error("expected nil session on error") + } +} + +func TestPerformHandshake_UnmarshalError(t *testing.T) { + setupHandlerTestEnv(t) + log := testLogger() + // Send invalid protobuf data + conn := &MockConn{ + messages: [][]byte{{0xFF, 0xFF, 0xFF, 0xFF}}, + } + + session, err := performHandshake(conn, "test-agent-id", &log) + if err == nil { + t.Error("expected error from proto.Unmarshal, got nil") + } + if session != nil { + t.Error("expected nil session on error") + } +} + +func TestPerformHandshake_InvalidResponseType(t *testing.T) { + setupHandlerTestEnv(t) + log := testLogger() + + // Send a valid ClientMessage with wrong payload type (Pong instead of KeyExchangeResponse) + wrongMsg := &messages.ClientMessage{ + Payload: &messages.ClientMessage_Pong{ + Pong: &messages.PongResponse{Timestamp: 123}, + }, + } + data, err := proto.Marshal(wrongMsg) + if err != nil { + t.Fatalf("failed to marshal test message: %v", err) + } + + conn := &MockConn{ + messages: [][]byte{data}, + } + + session, err := performHandshake(conn, "test-agent-id", &log) + if err == nil { + t.Error("expected error for invalid response type, got nil") + } + if session != nil { + t.Error("expected nil session on error") + } + if !strings.Contains(err.Error(), "KeyExchangeResponse") { + t.Errorf("expected error mentioning KeyExchangeResponse, got: %v", err) + } +} + +func TestPerformHandshake_DerivationError(t *testing.T) { + setupHandlerTestEnv(t) + log := testLogger() + + // Create valid handshake init, but respond with invalid derivation data + // Send KeyExchangeResponse with invalid ciphertext (too short, will fail MLKEM decapsulation) + resp := &messages.ClientMessage{ + Payload: &messages.ClientMessage_KeyExchangeResponse{ + KeyExchangeResponse: &messages.KeyExchangeResponse{ + MlkemCiphertext: []byte{0x00}, // invalid MLKEM ciphertext + AgentX25519PublicKey: make([]byte, 32), + }, + }, + } + respData, err := proto.Marshal(resp) + if err != nil { + t.Fatalf("failed to marshal response: %v", err) + } + + conn := &MockConn{ + messages: [][]byte{respData}, + } + + session, err := performHandshake(conn, "test-agent-id", &log) + if err == nil { + t.Error("expected error from key derivation, got nil") + } + if session != nil { + t.Error("expected nil session on error") + } +} + +func TestPerformHandshake_Success(t *testing.T) { + setupHandlerTestEnv(t) + log := testLogger() + + agentID := "test-agent-id" + hubKeys, err := wscrypto.GenerateHubKeys() + if err != nil { + t.Fatalf("failed to generate hub keys: %v", err) + } + + // Client performs handshake + mlkemCiphertext, agentX25519Pub, _, err := wscrypto.AgentHandshake( + hubKeys.MLKEMEncapKey, + hubKeys.X25519PublicKey, + agentID, + ) + if err != nil { + t.Fatalf("AgentHandshake failed: %v", err) + } + + // Server reads this response + resp := &messages.ClientMessage{ + Payload: &messages.ClientMessage_KeyExchangeResponse{ + KeyExchangeResponse: &messages.KeyExchangeResponse{ + MlkemCiphertext: mlkemCiphertext, + AgentX25519PublicKey: agentX25519Pub, + }, + }, + } + respData, err := proto.Marshal(resp) + if err != nil { + t.Fatalf("failed to marshal response: %v", err) + } + + conn := &MockConn{ + messages: [][]byte{respData}, + } + + session, err := performHandshake(conn, agentID, &log) + if err != nil { + t.Fatalf("performHandshake failed: %v", err) + } + if session == nil { + t.Fatal("expected non-nil session") + } +} diff --git a/backend/internal/hub/websocket/hub.go b/backend/internal/hub/websocket/hub.go index 9da1a90..f1ad3b2 100644 --- a/backend/internal/hub/websocket/hub.go +++ b/backend/internal/hub/websocket/hub.go @@ -25,13 +25,18 @@ func (c *Client) Close() { } type Hub struct { - mu sync.RWMutex - clients map[string]*Client - log *zerolog.Logger + mu sync.RWMutex + clients map[string]*Client + deployManager *DeployManager + log *zerolog.Logger } func NewHub(log *zerolog.Logger) *Hub { - return &Hub{clients: make(map[string]*Client), log: log} + return &Hub{ + clients: make(map[string]*Client), + deployManager: NewDeployManager(), + log: log, + } } func (h *Hub) Register(id string, conn *websocket.Conn) (*Client, error) { @@ -54,6 +59,7 @@ func (h *Hub) Unregister(id string) { h.mu.Lock() delete(h.clients, id) h.mu.Unlock() + h.deployManager.FailPendingDeploys(id, ErrAgentDisconnected) h.log.Debug().Str("client", id).Msg("Client unregistered") } @@ -88,6 +94,33 @@ func (h *Hub) Broadcast(msg *messages.ServerMessage) { } } +func (h *Hub) StartDeploy(agentID string, req *messages.DeployRequest) (*DeployHandle, error) { + pending := h.deployManager.StartDeploy(agentID, req) + + msg := &messages.ServerMessage{ + Payload: &messages.ServerMessage_DeployRequest{ + DeployRequest: req, + }, + } + + if !h.Send(agentID, msg) { + h.deployManager.CancelDeploy(req.RequestId) + return nil, ErrDeployUnavailable + } + + handle := &DeployHandle{ + deployManager: h.deployManager, + requestID: req.RequestId, + outcome: pending.outcome, + } + + return handle, nil +} + +func (h *Hub) ResolveDeploy(result *messages.DeployResult) bool { + return h.deployManager.ResolveDeploy(result) +} + const writeWait = 10 * time.Second // WritePump writes outgoing messages to the WebSocket connection. diff --git a/backend/internal/hub/websocket/hub_test.go b/backend/internal/hub/websocket/hub_test.go index 448e704..23b7c5f 100644 --- a/backend/internal/hub/websocket/hub_test.go +++ b/backend/internal/hub/websocket/hub_test.go @@ -1,6 +1,8 @@ package websocket import ( + "context" + "errors" "net/http" "net/http/httptest" "os" @@ -199,6 +201,93 @@ func TestHub_Send_FullBuffer(t *testing.T) { } } +func TestHub_StartDeployAndResolve(t *testing.T) { + log := testLogger() + h := NewHub(&log) + + conn := newTestWSConn(t) + defer conn.Close() //nolint:errcheck + + client, err := h.Register("agent-1", conn) + if err != nil { + t.Fatalf("Register: %v", err) + } + + handle, err := h.StartDeploy("agent-1", &messages.DeployRequest{ + RequestId: "req-1", + ApplicationId: "app-1", + ApplicationName: "billing", + ComposeFile: "services: {}\n", + }) + if err != nil { + t.Fatalf("StartDeploy: %v", err) + } + + select { + case msg := <-client.Send: + req := msg.GetDeployRequest() + if req == nil || req.RequestId != "req-1" { + t.Fatalf("expected deploy request to be queued") + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for queued deploy request") + } + + go h.ResolveDeploy(&messages.DeployResult{ + RequestId: "req-1", + ApplicationId: "app-1", + Success: true, + }) + + result, err := handle.Await(context.Background()) + if err != nil { + t.Fatalf("Await: %v", err) + } + if result == nil || !result.Success { + t.Fatal("expected successful deploy result") + } +} + +func TestHub_StartDeploy_UnavailableAgent(t *testing.T) { + log := testLogger() + h := NewHub(&log) + + _, err := h.StartDeploy("missing-agent", &messages.DeployRequest{ + RequestId: "req-1", + ApplicationId: "app-1", + }) + if !errors.Is(err, ErrDeployUnavailable) { + t.Fatalf("expected ErrDeployUnavailable, got %v", err) + } +} + +func TestHub_StartDeploy_FailsPendingWhenAgentDisconnects(t *testing.T) { + log := testLogger() + h := NewHub(&log) + + conn := newTestWSConn(t) + defer conn.Close() //nolint:errcheck + + if _, err := h.Register("agent-1", conn); err != nil { + t.Fatalf("Register: %v", err) + } + + handle, err := h.StartDeploy("agent-1", &messages.DeployRequest{ + RequestId: "req-1", + ApplicationId: "app-1", + }) + if err != nil { + t.Fatalf("StartDeploy: %v", err) + } + + h.Unregister("agent-1") + + _, err = handle.Await(context.Background()) + if !errors.Is(err, ErrAgentDisconnected) { + t.Fatalf("expected ErrAgentDisconnected, got %v", err) + } +} + func TestHub_Broadcast(t *testing.T) { log := testLogger() h := NewHub(&log) diff --git a/backend/internal/proto/messages.proto b/backend/internal/proto/messages.proto index a6027f7..5d4dfa6 100644 --- a/backend/internal/proto/messages.proto +++ b/backend/internal/proto/messages.proto @@ -9,6 +9,7 @@ message ClientMessage { PongResponse pong = 1; KeyExchangeResponse key_exchange_response = 2; EncryptedPayload encrypted_payload = 3; + DeployResult deploy_result = 4; } } @@ -17,6 +18,7 @@ message ServerMessage { PingRequest ping = 1; KeyExchangeInit key_exchange_init = 2; EncryptedPayload encrypted_payload = 3; + DeployRequest deploy_request = 4; } } @@ -43,3 +45,17 @@ message EncryptedPayload { bytes nonce = 1; bytes ciphertext = 2; } + +message DeployRequest { + string request_id = 1; + string application_id = 2; + string application_name = 3; + string compose_file = 4; +} + +message DeployResult { + string request_id = 1; + string application_id = 2; + bool success = 3; + string error_message = 4; +} diff --git a/backend/internal/shared/utils/fileUtils.go b/backend/internal/shared/utils/fileUtils.go new file mode 100644 index 0000000..dc0c622 --- /dev/null +++ b/backend/internal/shared/utils/fileUtils.go @@ -0,0 +1,53 @@ +package utils + +import ( + "errors" + "fmt" + "path/filepath" + "strings" +) + +type FileUtils struct{} + +// DoesNotLookLikeFilePath validates that a path is safe and doesn't contain path traversal sequences. +func DoesNotLookLikeFilePath(name string) error { + if strings.TrimSpace(name) == "" { + return errors.New("file path cannot be empty") + } + + // Prevent path traversal attacks + if strings.Contains(name, "..") { + return errors.New("file path cannot contain '..'") + } + + // Check for invalid characters that shouldn't appear in file paths + invalidChars := []string{"<", ">", ":", "\"", "|", "?", "*"} + for _, char := range invalidChars { + if strings.Contains(name, char) { + return errors.New("file path contains invalid character: " + char) + } + } + + return nil +} + +// IsPathWithinBase ensures that a file path stays within a base directory. +// This prevents directory traversal attacks even with symlinks and relative paths. +func IsPathWithinBase(basePath, filePath string) error { + absBase, err := filepath.Abs(basePath) + if err != nil { + return fmt.Errorf("invalid base path: %w", err) + } + + absFile, err := filepath.Abs(filePath) + if err != nil { + return fmt.Errorf("invalid file path: %w", err) + } + + // Ensure the file path is within the base directory + if !strings.HasPrefix(absFile, absBase) { + return errors.New("path traversal detected: file path is outside base directory") + } + + return nil +} diff --git a/backend/internal/shared/utils/fileUtils_test.go b/backend/internal/shared/utils/fileUtils_test.go new file mode 100644 index 0000000..be5abed --- /dev/null +++ b/backend/internal/shared/utils/fileUtils_test.go @@ -0,0 +1,233 @@ +package utils + +import ( + "os" + "path/filepath" + "testing" +) + +func TestDoesNotLookLikeFilePath(t *testing.T) { + tests := []struct { + name string + input string + wantErr bool + errMsg string + }{ + { + name: "valid simple path", + input: "file.txt", + wantErr: false, + }, + { + name: "valid nested path", + input: "dir/subdir/file.txt", + wantErr: false, + }, + { + name: "valid path with hyphens", + input: "my-app-file.yaml", + wantErr: false, + }, + { + name: "valid path with underscores", + input: "my_app_file.yaml", + wantErr: false, + }, + { + name: "empty string", + input: "", + wantErr: true, + errMsg: "file path cannot be empty", + }, + { + name: "whitespace only", + input: " ", + wantErr: true, + errMsg: "file path cannot be empty", + }, + { + name: "path with parent directory traversal", + input: "../etc/passwd", + wantErr: true, + errMsg: "file path cannot contain '..'", + }, + { + name: "path with double dots in middle", + input: "dir/../file.txt", + wantErr: true, + errMsg: "file path cannot contain '..'", + }, + { + name: "path with double dots at start", + input: "../../etc/shadow", + wantErr: true, + errMsg: "file path cannot contain '..'", + }, + { + name: "path with less than character", + input: "file.txt", + wantErr: true, + errMsg: "file path contains invalid character: <", + }, + { + name: "path with greater than character", + input: "file>name.txt", + wantErr: true, + errMsg: "file path contains invalid character: >", + }, + { + name: "path with colon", + input: "file:name.txt", + wantErr: true, + errMsg: "file path contains invalid character: :", + }, + { + name: "path with double quote", + input: "file\"name\".txt", + wantErr: true, + errMsg: "file path contains invalid character: \"", + }, + { + name: "path with pipe", + input: "file|name.txt", + wantErr: true, + errMsg: "file path contains invalid character: |", + }, + { + name: "path with question mark", + input: "file?name.txt", + wantErr: true, + errMsg: "file path contains invalid character: ?", + }, + { + name: "path with asterisk", + input: "file*name.txt", + wantErr: true, + errMsg: "file path contains invalid character: *", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := DoesNotLookLikeFilePath(tt.input) + if (err != nil) != tt.wantErr { + t.Errorf("DoesNotLookLikeFilePath() error = %v, wantErr %v", err, tt.wantErr) + } + if tt.wantErr && tt.errMsg != "" && err.Error() != tt.errMsg { + t.Errorf("DoesNotLookLikeFilePath() error message = %q, want %q", err.Error(), tt.errMsg) + } + }) + } +} + +func TestIsPathWithinBase(t *testing.T) { + // Create temporary directories for testing + tmpDir := t.TempDir() + baseDir := filepath.Join(tmpDir, "base") + if err := os.MkdirAll(baseDir, 0o750); err != nil { + t.Fatalf("failed to create base directory: %v", err) + } + + tests := []struct { + name string + baseDir string + filePath string + wantErr bool + errMsg string + }{ + { + name: "file directly in base directory", + baseDir: baseDir, + filePath: filepath.Join(baseDir, "file.txt"), + wantErr: false, + }, + { + name: "file in subdirectory of base", + baseDir: baseDir, + filePath: filepath.Join(baseDir, "subdir", "file.txt"), + wantErr: false, + }, + { + name: "file deeply nested in base", + baseDir: baseDir, + filePath: filepath.Join(baseDir, "a", "b", "c", "file.txt"), + wantErr: false, + }, + { + name: "relative path within base (with ./)", + baseDir: baseDir, + filePath: filepath.Join(baseDir, ".", "file.txt"), + wantErr: false, + }, + { + name: "file outside base directory", + baseDir: baseDir, + filePath: filepath.Join(tmpDir, "other", "file.txt"), + wantErr: true, + errMsg: "path traversal detected: file path is outside base directory", + }, + { + name: "file in parent directory of base", + baseDir: baseDir, + filePath: filepath.Join(tmpDir, "file.txt"), + wantErr: true, + errMsg: "path traversal detected: file path is outside base directory", + }, + { + name: "relative path traversing out of base", + baseDir: baseDir, + filePath: filepath.Join(baseDir, "subdir", "..", "..", "file.txt"), + wantErr: true, + errMsg: "path traversal detected: file path is outside base directory", + }, + { + name: "base directory itself", + baseDir: baseDir, + filePath: baseDir, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := IsPathWithinBase(tt.baseDir, tt.filePath) + if (err != nil) != tt.wantErr { + t.Errorf("IsPathWithinBase() error = %v, wantErr %v", err, tt.wantErr) + } + if tt.wantErr && tt.errMsg != "" && err.Error() != tt.errMsg { + t.Errorf("IsPathWithinBase() error message = %q, want %q", err.Error(), tt.errMsg) + } + }) + } +} + +func TestIsPathWithinBaseInvalidInput(t *testing.T) { + tests := []struct { + name string + baseDir string + filePath string + wantErr bool + }{ + { + name: "invalid base directory path", + baseDir: "/nonexistent/base/path/that/is/invalid\x00", + filePath: "/some/file.txt", + wantErr: true, + }, + { + name: "invalid file path", + baseDir: "/tmp", + filePath: "/some/file/path\x00", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := IsPathWithinBase(tt.baseDir, tt.filePath) + if (err != nil) != tt.wantErr { + t.Errorf("IsPathWithinBase() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 80dece7..1c66618 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -7,7 +7,7 @@ services: dockerfile: hub.Dockerfile container_name: orca-hub-dev ports: - - '127.0.0.1:8080:8080' + - "127.0.0.1:8080:8080" volumes: - ./data/hub:/app/data environment: @@ -20,6 +20,8 @@ services: dockerfile: agent.Dockerfile container_name: orca-agent-dev use_api_socket: true # Allows container to access Docker API on host + volumes: + - ./data/agent/deployments:/deployments depends_on: - hub environment: diff --git a/docker-compose.yml b/docker-compose.yml index 29950ac..21b22b9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,7 @@ services: cap_drop: - ALL ports: - - '127.0.0.1:8080:8080' + - "127.0.0.1:8080:8080" volumes: - ./data/hub:/app/data @@ -23,5 +23,7 @@ services: cap_drop: - ALL use_api_socket: true # Allows container to access Docker API on host + volumes: + - ./data/agent/deployments:/deployments depends_on: # Remove this if you are only deploying the agent on a host without the hub - hub diff --git a/frontend/messages/de.json b/frontend/messages/de.json index 10efefa..eb618f3 100644 --- a/frontend/messages/de.json +++ b/frontend/messages/de.json @@ -23,6 +23,10 @@ "updatingDots": "Aktualisieren...", "refresh": "Aktualisieren", "details": "Details", + "deploy": "Bereitstellen", + "deploying": "Wird bereitgestellt...", + "deploymentStarted": "Deployment gestartet", + "failedDeployApplication": "Deployment der Anwendung fehlgeschlagen", "sync": "Synchronisieren", "syncing": "Synchronisiere...", "syncTriggered": "Synchronisierung gestartet", diff --git a/frontend/messages/en.json b/frontend/messages/en.json index 39ac7cb..303c8d6 100644 --- a/frontend/messages/en.json +++ b/frontend/messages/en.json @@ -23,6 +23,10 @@ "updatingDots": "Updating...", "refresh": "Refresh", "details": "Details", + "deploy": "Deploy", + "deploying": "Deploying...", + "deploymentStarted": "Deployment started", + "failedDeployApplication": "Failed to deploy application", "sync": "Sync", "syncing": "Syncing...", "syncTriggered": "Sync triggered", diff --git a/frontend/src/lib/applications.ts b/frontend/src/lib/applications.ts index 42218f2..1c4ab93 100644 --- a/frontend/src/lib/applications.ts +++ b/frontend/src/lib/applications.ts @@ -70,6 +70,10 @@ interface UpdateApplicationRequest { path: string; } +interface DeployApplicationResponse { + message: string; +} + export function createApplication(data: CreateApplicationRequest): Promise { return fetcher("/applications", "POST", data); } @@ -84,3 +88,7 @@ export function updateApplication( export function deleteApplication(id: string): Promise { return fetcher(`/applications/${id}`, "DELETE"); } + +export function deployApplication(id: string): Promise { + return fetcher(`/applications/${id}/deploy`, "POST"); +} diff --git a/frontend/src/routes/_authenticated/applications/$id.index.tsx b/frontend/src/routes/_authenticated/applications/$id.index.tsx index 71fdaea..0150354 100644 --- a/frontend/src/routes/_authenticated/applications/$id.index.tsx +++ b/frontend/src/routes/_authenticated/applications/$id.index.tsx @@ -1,4 +1,10 @@ -import { deleteApplication, HealthStatus, SyncStatus, type Application } from "@/lib/applications"; +import { + deleteApplication, + deployApplication, + HealthStatus, + SyncStatus, + type Application, +} from "@/lib/applications"; import { createFileRoute, Link, useNavigate } from "@tanstack/react-router"; import { Breadcrumb, @@ -150,14 +156,22 @@ function ApplicationDetailsPage() { const { data } = useFetch("/applications/" + id); - const [syncing, setSyncing] = useState(false); + const [deploying, setDeploying] = useState(false); - const handleSync = async () => { - setSyncing(true); - await new Promise((resolve) => setTimeout(resolve, 2000)); - setSyncing(false); + const handleDeploy = async () => { + setDeploying(true); + try { + await deployApplication(id); + toast.success(m.deploymentStarted()); + } catch (err) { + toast.error(err instanceof Error ? err.message : m.failedDeployApplication()); + } finally { + setDeploying(false); + } }; + const deploymentInProgress = deploying || data?.syncStatus === SyncStatus.Syncing; + const manifestHtml = useMemo(() => { return highlighter.codeToHtml(data?.composeFile ?? "", { lang: "yaml", @@ -217,9 +231,9 @@ function ApplicationDetailsPage() {
-