diff --git a/backend/internal/hub/handlers.go b/backend/internal/hub/handlers.go index 9737c591..2d8009eb 100644 --- a/backend/internal/hub/handlers.go +++ b/backend/internal/hub/handlers.go @@ -5,6 +5,7 @@ import ( "github.com/OrcaCD/orca-cd/internal/hub/middleware" "github.com/OrcaCD/orca-cd/internal/hub/routes" + "github.com/OrcaCD/orca-cd/internal/hub/sse" "github.com/OrcaCD/orca-cd/internal/hub/websocket" "github.com/OrcaCD/orca-cd/internal/version" "github.com/gin-gonic/gin" @@ -67,6 +68,8 @@ func RegisterRoutes(router *gin.Engine, cfg Config) error { protected.GET("/agents/:id", routes.GetAgentHandler) protected.PUT("/agents/:id", routes.UpdateAgentHandler) protected.DELETE("/agents/:id", routes.DeleteAgentHandler) + + protected.GET("/events", routes.SSEHandler) } // Admin routes (authentication + admin role required) @@ -87,6 +90,8 @@ func RegisterRoutes(router *gin.Engine, cfg Config) error { admin.DELETE("/oidc-providers/:id", routes.AdminDeleteOIDCProviderHandler) } + sse.DefaultBroker = sse.NewBroker(&Log) + h := websocket.NewHub(&Log) w := websocket.NewWorker(h, &Log) w.Start() diff --git a/backend/internal/hub/middleware/timeout.go b/backend/internal/hub/middleware/timeout.go index 570606d8..796ef92a 100644 --- a/backend/internal/hub/middleware/timeout.go +++ b/backend/internal/hub/middleware/timeout.go @@ -8,10 +8,15 @@ import ( "github.com/gin-gonic/gin" ) +const sseRoutePath = "/api/v1/events" + func TimeoutMiddleware(timeout time.Duration) gin.HandlerFunc { return func(c *gin.Context) { - // WebSocket connections are long-lived; skip the timeout. - if strings.EqualFold(c.Request.Header.Get("Upgrade"), "websocket") { + // WebSocket and SSE connections are long-lived; skip the timeout. + // The SSE exemption is tied to the registered route path, not the + // client-supplied Accept header, to prevent bypass via a crafted header. + if strings.EqualFold(c.Request.Header.Get("Upgrade"), "websocket") || + c.FullPath() == sseRoutePath { c.Next() return } diff --git a/backend/internal/hub/middleware/timeout_test.go b/backend/internal/hub/middleware/timeout_test.go index cf92eebe..e8978242 100644 --- a/backend/internal/hub/middleware/timeout_test.go +++ b/backend/internal/hub/middleware/timeout_test.go @@ -96,6 +96,51 @@ func TestTimeoutMiddleware_SkipsWebSocketRequests(t *testing.T) { } } +func TestTimeoutMiddleware_SkipsSSERoute(t *testing.T) { + t.Parallel() + + router := gin.New() + router.Use(TimeoutMiddleware(100 * time.Millisecond)) + router.GET("/api/v1/events", func(c *gin.Context) { + _, ok := c.Request.Context().Deadline() + if ok { + t.Fatal("expected no deadline for SSE route") + } + c.Status(http.StatusOK) + }) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/api/v1/events", nil) + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } +} + +func TestTimeoutMiddleware_AcceptHeaderAloneDoesNotBypassTimeout(t *testing.T) { + t.Parallel() + + router := gin.New() + router.Use(TimeoutMiddleware(100 * time.Millisecond)) + router.GET("/other", func(c *gin.Context) { + _, ok := c.Request.Context().Deadline() + if !ok { + t.Fatal("expected deadline to be set despite Accept: text/event-stream header") + } + c.Status(http.StatusOK) + }) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/other", nil) + req.Header.Set("Accept", "text/event-stream") + router.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } +} + func TestTimeoutMiddleware_CancelsContextAfterHandlerReturns(t *testing.T) { t.Parallel() diff --git a/backend/internal/hub/routes/admin_oidc.go b/backend/internal/hub/routes/admin_oidc.go index 71198470..0d5e9185 100644 --- a/backend/internal/hub/routes/admin_oidc.go +++ b/backend/internal/hub/routes/admin_oidc.go @@ -9,11 +9,14 @@ import ( "github.com/OrcaCD/orca-cd/internal/hub/crypto" "github.com/OrcaCD/orca-cd/internal/hub/db" "github.com/OrcaCD/orca-cd/internal/hub/models" + "github.com/OrcaCD/orca-cd/internal/hub/sse" gooidc "github.com/coreos/go-oidc/v3/oidc" "github.com/gin-gonic/gin" "gorm.io/gorm" ) +const AdminOIDCProvidersPath = "/api/v1/admin/oidc-providers" + type createOIDCProviderRequest struct { Name string `json:"name" binding:"required,min=1,max=100"` IssuerURL string `json:"issuerUrl" binding:"required,http_url"` @@ -140,6 +143,7 @@ func AdminCreateOIDCProviderHandler(c *gin.Context) { } c.JSON(http.StatusCreated, toOIDCProviderResponse(&provider)) + sse.PublishUpdate(AdminOIDCProvidersPath) } func AdminUpdateOIDCProviderHandler(c *gin.Context) { @@ -190,6 +194,7 @@ func AdminUpdateOIDCProviderHandler(c *gin.Context) { } c.JSON(http.StatusOK, toOIDCProviderResponse(&existing)) + sse.PublishUpdate(AdminOIDCProvidersPath) } func AdminDeleteOIDCProviderHandler(c *gin.Context) { @@ -206,4 +211,5 @@ func AdminDeleteOIDCProviderHandler(c *gin.Context) { } c.JSON(http.StatusOK, gin.H{"message": "provider deleted"}) + sse.PublishUpdate(AdminOIDCProvidersPath) } diff --git a/backend/internal/hub/routes/admin_users.go b/backend/internal/hub/routes/admin_users.go index 457dd7a7..c2c304e0 100644 --- a/backend/internal/hub/routes/admin_users.go +++ b/backend/internal/hub/routes/admin_users.go @@ -10,10 +10,13 @@ import ( "github.com/OrcaCD/orca-cd/internal/hub/auth" "github.com/OrcaCD/orca-cd/internal/hub/db" "github.com/OrcaCD/orca-cd/internal/hub/models" + "github.com/OrcaCD/orca-cd/internal/hub/sse" "github.com/gin-gonic/gin" "gorm.io/gorm" ) +const AdminUsersPath = "/api/v1/admin/users" + type adminUserResponse struct { Id string `json:"id"` Name string `json:"name"` @@ -155,6 +158,7 @@ func AdminCreateUserHandler(c *gin.Context) { } c.JSON(http.StatusCreated, toAdminUserWithGeneratedPasswordResponse(&user, nil, generatedPassword)) + sse.PublishUpdate(AdminUsersPath) } func AdminUpdateUserHandler(c *gin.Context) { @@ -233,6 +237,7 @@ func AdminUpdateUserHandler(c *gin.Context) { } c.JSON(http.StatusOK, toAdminUserWithGeneratedPasswordResponse(&user, oidcProviderNamesByUserId[user.Id], generatedPassword)) + sse.PublishUpdate(AdminUsersPath) } func AdminDeleteUserHandler(c *gin.Context) { @@ -266,6 +271,7 @@ func AdminDeleteUserHandler(c *gin.Context) { } c.JSON(http.StatusOK, gin.H{"message": "user deleted"}) + sse.PublishUpdate(AdminUsersPath) } func countAdminUsers(c *gin.Context) (int64, error) { diff --git a/backend/internal/hub/routes/agents.go b/backend/internal/hub/routes/agents.go index ca3566b7..1869cd64 100644 --- a/backend/internal/hub/routes/agents.go +++ b/backend/internal/hub/routes/agents.go @@ -9,10 +9,13 @@ import ( "github.com/OrcaCD/orca-cd/internal/hub/crypto" "github.com/OrcaCD/orca-cd/internal/hub/db" "github.com/OrcaCD/orca-cd/internal/hub/models" + "github.com/OrcaCD/orca-cd/internal/hub/sse" "github.com/gin-gonic/gin" "gorm.io/gorm" ) +const AgentsPath = "/api/v1/agents" + type agentResponse struct { Id string `json:"id"` Name string `json:"name"` @@ -148,6 +151,7 @@ func CreateAgentHandler(c *gin.Context) { agentResponse: toAgentResponse(&agent), AuthToken: authToken, }) + sse.PublishUpdate(AgentsPath) } func UpdateAgentHandler(c *gin.Context) { @@ -182,6 +186,7 @@ func UpdateAgentHandler(c *gin.Context) { } c.JSON(http.StatusOK, toAgentResponse(&agent)) + sse.PublishUpdate(AgentsPath) } func DeleteAgentHandler(c *gin.Context) { @@ -198,4 +203,5 @@ func DeleteAgentHandler(c *gin.Context) { } c.JSON(http.StatusOK, gin.H{"message": "agent deleted"}) + sse.PublishUpdate(AgentsPath) } diff --git a/backend/internal/hub/routes/applications.go b/backend/internal/hub/routes/applications.go index d36463d8..75010b9a 100644 --- a/backend/internal/hub/routes/applications.go +++ b/backend/internal/hub/routes/applications.go @@ -8,10 +8,13 @@ import ( "github.com/OrcaCD/orca-cd/internal/hub/crypto" "github.com/OrcaCD/orca-cd/internal/hub/db" "github.com/OrcaCD/orca-cd/internal/hub/models" + "github.com/OrcaCD/orca-cd/internal/hub/sse" "github.com/gin-gonic/gin" "gorm.io/gorm" ) +const ApplicationsPath = "/api/v1/applications" + type createApplicationRequest struct { Name string `json:"name" binding:"required"` RepositoryId string `json:"repositoryId" binding:"required"` @@ -158,6 +161,7 @@ func CreateApplicationHandler(c *gin.Context) { } c.JSON(http.StatusCreated, toApplicationResponse(&createdApplication)) + sse.PublishUpdate(ApplicationsPath) } func UpdateApplicationHandler(c *gin.Context) { @@ -225,6 +229,7 @@ func UpdateApplicationHandler(c *gin.Context) { } c.JSON(http.StatusOK, toApplicationResponse(&updatedApplication)) + sse.PublishUpdate(ApplicationsPath) } func DeleteApplicationHandler(c *gin.Context) { @@ -241,6 +246,7 @@ func DeleteApplicationHandler(c *gin.Context) { } c.JSON(http.StatusOK, gin.H{"message": "application deleted"}) + sse.PublishUpdate(ApplicationsPath) } func toApplicationListResponse(app *models.Application) applicationListResponse { diff --git a/backend/internal/hub/routes/repositories.go b/backend/internal/hub/routes/repositories.go index cb4350d2..91ca1edc 100644 --- a/backend/internal/hub/routes/repositories.go +++ b/backend/internal/hub/routes/repositories.go @@ -12,10 +12,13 @@ import ( "github.com/OrcaCD/orca-cd/internal/hub/db" "github.com/OrcaCD/orca-cd/internal/hub/models" "github.com/OrcaCD/orca-cd/internal/hub/repositories" + "github.com/OrcaCD/orca-cd/internal/hub/sse" "github.com/gin-gonic/gin" "gorm.io/gorm" ) +const RepositoriesPath = "/api/v1/repositories" + var appUrl string func SetRepositoriesConfig(url string) { @@ -188,6 +191,7 @@ func CreateRepositoryHandler(c *gin.Context) { } c.JSON(http.StatusCreated, toRepositoryResponse(&repo, true)) + sse.PublishUpdate(RepositoriesPath) } type testConnectionRequest struct { @@ -255,6 +259,7 @@ func DeleteRepositoryHandler(c *gin.Context) { } c.JSON(http.StatusOK, gin.H{"message": "repository deleted"}) + sse.PublishUpdate(RepositoriesPath) } type updateRepositoryRequest struct { @@ -363,6 +368,7 @@ func UpdateRepositoryHandler(c *gin.Context) { newWebhookSecret := req.SyncType == models.SyncTypeWebhook && prevSyncType != models.SyncTypeWebhook c.JSON(http.StatusOK, toRepositoryResponse(&repo, newWebhookSecret)) + sse.PublishUpdate(RepositoriesPath) } // resolveProvider validates the provider enum, URL, and authMethod, returning the diff --git a/backend/internal/hub/routes/repositories_test.go b/backend/internal/hub/routes/repositories_test.go index a98882fc..4b04d473 100644 --- a/backend/internal/hub/routes/repositories_test.go +++ b/backend/internal/hub/routes/repositories_test.go @@ -8,11 +8,13 @@ import ( "net/http/httptest" "strings" "testing" + "time" "github.com/OrcaCD/orca-cd/internal/hub/auth" "github.com/OrcaCD/orca-cd/internal/hub/crypto" "github.com/OrcaCD/orca-cd/internal/hub/db" "github.com/OrcaCD/orca-cd/internal/hub/models" + "github.com/OrcaCD/orca-cd/internal/hub/sse" "github.com/OrcaCD/orca-cd/internal/shared/httpclient" "github.com/gin-gonic/gin" "github.com/golang-jwt/jwt/v5" @@ -907,6 +909,133 @@ func TestUpdateRepositoryHandler_SwitchFromWebhook(t *testing.T) { } } +func TestCreateRepositoryHandler_PublishesSSEEvent(t *testing.T) { + setupTestDBWithRepos(t) + broker := newTestSSEBroker(t) + connID, ch := broker.Subscribe() + defer broker.Unsubscribe(connID) + + reqBody, _ := json.Marshal(map[string]any{ + "url": "https://github.com/owner/sse-repo", + "provider": "github", + "authMethod": "none", + "syncType": "manual", + }) + + c, w := makeAuthContext(t, "user-1") + c.Request = httptest.NewRequest(http.MethodPost, "/api/v1/repositories", bytes.NewReader(reqBody)) + c.Request.Header.Set("Content-Type", "application/json") + + CreateRepositoryHandler(c) + + if w.Code != http.StatusCreated { + t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String()) + } + + select { + case event := <-ch: + if event.Type != sse.EventTypeUpdate { + t.Errorf("expected event type %q, got %q", sse.EventTypeUpdate, event.Type) + } + if event.URL != RepositoriesPath { + t.Errorf("expected URL %q, got %q", RepositoriesPath, event.URL) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for SSE event") + } +} + +func TestUpdateRepositoryHandler_PublishesSSEEvent(t *testing.T) { + setupTestDBWithRepos(t) + broker := newTestSSEBroker(t) + connID, ch := broker.Subscribe() + defer broker.Unsubscribe(connID) + + repo := models.Repository{ + Name: "owner/repo", + Url: "https://github.com/owner/repo", + Provider: models.GitHub, + AuthMethod: models.AuthMethodNone, + SyncType: models.SyncTypeManual, + SyncStatus: models.SyncStatusUnknown, + CreatedBy: "user-1", + } + if err := db.DB.Select("*").Create(&repo).Error; err != nil { + t.Fatalf("failed to seed repo: %v", err) + } + + reqBody, _ := json.Marshal(map[string]any{ + "url": "https://github.com/owner/repo", + "authMethod": "none", + "syncType": "manual", + }) + + c, w := makeAuthContext(t, "user-1") + c.Request = httptest.NewRequest(http.MethodPut, "/api/v1/repositories/"+repo.Id, bytes.NewReader(reqBody)) + c.Request.Header.Set("Content-Type", "application/json") + c.Params = gin.Params{{Key: "id", Value: repo.Id}} + + UpdateRepositoryHandler(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + select { + case event := <-ch: + if event.Type != sse.EventTypeUpdate { + t.Errorf("expected event type %q, got %q", sse.EventTypeUpdate, event.Type) + } + if event.URL != RepositoriesPath { + t.Errorf("expected URL %q, got %q", RepositoriesPath, event.URL) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for SSE event") + } +} + +func TestDeleteRepositoryHandler_PublishesSSEEvent(t *testing.T) { + setupTestDBWithRepos(t) + broker := newTestSSEBroker(t) + connID, ch := broker.Subscribe() + defer broker.Unsubscribe(connID) + + repo := models.Repository{ + Name: "To Delete SSE", + Url: "https://github.com/owner/sse-delete", + Provider: models.GitHub, + AuthMethod: models.AuthMethodNone, + SyncType: models.SyncTypeManual, + SyncStatus: models.SyncStatusUnknown, + CreatedBy: "user-1", + } + if err := db.DB.Select("*").Create(&repo).Error; err != nil { + t.Fatalf("failed to seed repo: %v", err) + } + + c, w := makeAuthContext(t, "user-1") + c.Request = httptest.NewRequest(http.MethodDelete, "/api/v1/repositories/"+repo.Id, nil) + c.Params = gin.Params{{Key: "id", Value: repo.Id}} + + DeleteRepositoryHandler(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + select { + case event := <-ch: + if event.Type != sse.EventTypeUpdate { + t.Errorf("expected event type %q, got %q", sse.EventTypeUpdate, event.Type) + } + if event.URL != RepositoriesPath { + t.Errorf("expected URL %q, got %q", RepositoriesPath, event.URL) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for SSE event") + } +} + func TestTestConnectionHandler_Success(t *testing.T) { setupTestDBWithRepos(t) restore := mockHTTPClient(http.StatusOK) diff --git a/backend/internal/hub/routes/sse.go b/backend/internal/hub/routes/sse.go new file mode 100644 index 00000000..10870107 --- /dev/null +++ b/backend/internal/hub/routes/sse.go @@ -0,0 +1,74 @@ +package routes + +import ( + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/OrcaCD/orca-cd/internal/hub/auth" + "github.com/OrcaCD/orca-cd/internal/hub/sse" + "github.com/gin-gonic/gin" +) + +func SSEHandler(c *gin.Context) { + claims, ok := auth.GetClaims(c) + if !ok { + c.JSON(http.StatusUnauthorized, gin.H{"error": "missing authentication"}) + return + } + + // Clear the write deadline so the server's WriteTimeout doesn't kill long-lived SSE connections. + // Ignore errors: some ResponseWriter implementations (e.g. httptest.ResponseRecorder) don't support deadlines. + _ = http.NewResponseController(c.Writer).SetWriteDeadline(time.Time{}) + + c.Header("Content-Type", "text/event-stream") + c.Header("Cache-Control", "no-cache, no-transform") + c.Header("Connection", "keep-alive") + c.Header("X-Accel-Buffering", "no") + + connID, ch := sse.DefaultBroker.Subscribe() + defer sse.DefaultBroker.Unsubscribe(connID) + + // Flush initial comment to establish the stream + if _, err := fmt.Fprint(c.Writer, ": connected\n\n"); err != nil { + return + } + c.Writer.Flush() + + timer := time.NewTimer(time.Until(claims.ExpiresAt.Time)) + defer timer.Stop() + + keepAlive := time.NewTicker(30 * time.Second) + defer keepAlive.Stop() + + for { + select { + case <-timer.C: + if _, err := fmt.Fprint(c.Writer, "event: unauthorized\ndata: {}\n\n"); err != nil { + return + } + c.Writer.Flush() + return + case event, open := <-ch: + if !open { + return + } + data, err := json.Marshal(event) + if err != nil { + continue + } + if _, err := fmt.Fprintf(c.Writer, "event: %s\ndata: %s\n\n", event.Type, data); err != nil { + return + } + c.Writer.Flush() + case <-keepAlive.C: + if _, err := fmt.Fprint(c.Writer, ": ping\n\n"); err != nil { + return + } + c.Writer.Flush() + case <-c.Request.Context().Done(): + return + } + } +} diff --git a/backend/internal/hub/routes/sse_test.go b/backend/internal/hub/routes/sse_test.go new file mode 100644 index 00000000..15d150fb --- /dev/null +++ b/backend/internal/hub/routes/sse_test.go @@ -0,0 +1,173 @@ +package routes + +import ( + "bufio" + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + "time" + + "github.com/OrcaCD/orca-cd/internal/hub/auth" + "github.com/OrcaCD/orca-cd/internal/hub/sse" + "github.com/gin-gonic/gin" + "github.com/golang-jwt/jwt/v5" + "github.com/rs/zerolog" +) + +func newTestSSEBroker(t *testing.T) *sse.Broker { + t.Helper() + log := zerolog.New(os.Stderr).Level(zerolog.Disabled) + broker := sse.NewBroker(&log) + sse.DefaultBroker = broker + t.Cleanup(func() { sse.DefaultBroker = nil }) + return broker +} + +// injectClaimsMiddleware sets valid auth claims so SSEHandler can proceed. +func injectClaimsMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + claims := &auth.UserClaims{ + RegisteredClaims: jwt.RegisteredClaims{ + Subject: "test-user", + ExpiresAt: jwt.NewNumericDate(time.Now().Add(time.Hour)), + }, + Name: "Test User", + Email: "test@example.com", + Role: "admin", + } + auth.SetClaims(c, claims) + c.Next() + } +} + +func newSSETestServer(t *testing.T) *httptest.Server { + t.Helper() + router := gin.New() + router.GET("/api/v1/events", injectClaimsMiddleware(), SSEHandler) + server := httptest.NewServer(router) + t.Cleanup(server.Close) + return server +} + +// connectSSE opens an SSE connection and returns a line scanner and cancel func. +// Body cleanup is registered via t.Cleanup. +func connectSSE(t *testing.T, server *httptest.Server) (*bufio.Scanner, context.CancelFunc) { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, server.URL+"/api/v1/events", nil) + if err != nil { + cancel() + t.Fatalf("failed to create SSE request: %v", err) + } + resp, err := http.DefaultClient.Do(req) //nolint:bodyclose + if err != nil { + cancel() + t.Fatalf("failed to connect to SSE endpoint: %v", err) + } + t.Cleanup(func() { _ = resp.Body.Close() }) + return bufio.NewScanner(resp.Body), cancel +} + +// waitForInitialComment reads lines until the ": connected" SSE comment is seen. +func waitForInitialComment(t *testing.T, scanner *bufio.Scanner) { + t.Helper() + for scanner.Scan() { + if scanner.Text() == ": connected" { + return + } + } + t.Fatal("did not receive initial SSE comment") +} + +// TestSSEHandler_Headers uses httptest.ResponseRecorder so hop-by-hop headers +// like Connection are not stripped by the HTTP transport layer. +func TestSSEHandler_Headers(t *testing.T) { + newTestSSEBroker(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + router := gin.New() + router.GET("/api/v1/events", injectClaimsMiddleware(), SSEHandler) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/events", nil).WithContext(ctx) + w := httptest.NewRecorder() + + done := make(chan struct{}) + go func() { + defer close(done) + router.ServeHTTP(w, req) + }() + + time.Sleep(50 * time.Millisecond) + cancel() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("SSEHandler did not exit after context cancel") + } + + if ct := w.Header().Get("Content-Type"); ct != "text/event-stream" { + t.Errorf("expected Content-Type %q, got %q", "text/event-stream", ct) + } + if cc := w.Header().Get("Cache-Control"); cc != "no-cache, no-transform" { + t.Errorf("expected Cache-Control %q, got %q", "no-cache, no-transform", cc) + } + if conn := w.Header().Get("Connection"); conn != "keep-alive" { + t.Errorf("expected Connection %q, got %q", "keep-alive", conn) + } + if buf := w.Header().Get("X-Accel-Buffering"); buf != "no" { + t.Errorf("expected X-Accel-Buffering %q, got %q", "no", buf) + } +} + +func TestSSEHandler_InitialComment(t *testing.T) { + newTestSSEBroker(t) + server := newSSETestServer(t) + + scanner, cancel := connectSSE(t, server) + defer cancel() + + waitForInitialComment(t, scanner) +} + +func TestSSEHandler_ReceivesEvent(t *testing.T) { + broker := newTestSSEBroker(t) + server := newSSETestServer(t) + + scanner, cancel := connectSSE(t, server) + defer cancel() + + waitForInitialComment(t, scanner) + + broker.Publish(sse.Event{Type: sse.EventTypeUpdate, URL: "http://example.com"}) + + var eventType, eventData string + for scanner.Scan() { + line := scanner.Text() + if v, ok := strings.CutPrefix(line, "event: "); ok { + eventType = v + } + if v, ok := strings.CutPrefix(line, "data: "); ok { + eventData = v + break + } + } + + if eventType != string(sse.EventTypeUpdate) { + t.Errorf("expected event type %q, got %q", sse.EventTypeUpdate, eventType) + } + + var event sse.Event + if err := json.Unmarshal([]byte(eventData), &event); err != nil { + t.Fatalf("failed to unmarshal SSE event data: %v", err) + } + if event.URL != "http://example.com" { + t.Errorf("expected URL %q, got %q", "http://example.com", event.URL) + } +} diff --git a/backend/internal/hub/sse/broker.go b/backend/internal/hub/sse/broker.go new file mode 100644 index 00000000..e872bcfe --- /dev/null +++ b/backend/internal/hub/sse/broker.go @@ -0,0 +1,88 @@ +package sse + +// SSE are used to push real-time updates to the frontend + +import ( + "sync" + + "github.com/google/uuid" + "github.com/rs/zerolog" +) + +type EventType string + +const ( + EventTypeUpdate EventType = "update" +) + +type Event struct { + Type EventType `json:"type"` + URL string `json:"url,omitempty"` +} + +type Broker struct { + mu sync.RWMutex + clients map[string]chan Event + log *zerolog.Logger +} + +var DefaultBroker *Broker + +func NewBroker(log *zerolog.Logger) *Broker { + return &Broker{ + clients: make(map[string]chan Event), + log: log, + } +} + +// Subscribe registers a new SSE client. +// Returns the connection ID and a receive-only event channel. +func (b *Broker) Subscribe() (string, <-chan Event) { + id, err := uuid.NewV7() + if err != nil { + id = uuid.Must(uuid.NewRandom()) + } + connID := id.String() + ch := make(chan Event, 16) + b.mu.Lock() + b.clients[connID] = ch + b.mu.Unlock() + b.log.Debug().Str("connID", connID).Msg("SSE client subscribed") + return connID, ch +} + +// Unsubscribe removes a client and closes its channel. +func (b *Broker) Unsubscribe(connID string) { + b.mu.Lock() + ch, ok := b.clients[connID] + if ok { + delete(b.clients, connID) + close(ch) + } + b.mu.Unlock() + if ok { + b.log.Debug().Str("connID", connID).Msg("SSE client unsubscribed") + } +} + +// PublishUpdate sends an update event to all connected SSE clients. +// It is a no-op if the DefaultBroker has not been initialized. +func PublishUpdate(url string) { + if DefaultBroker == nil { + return + } + DefaultBroker.Publish(Event{Type: EventTypeUpdate, URL: url}) +} + +// Publish sends an event to all connected SSE clients. +func (b *Broker) Publish(event Event) { + b.mu.RLock() + defer b.mu.RUnlock() + for connID, ch := range b.clients { + select { + case ch <- event: + default: + b.log.Warn().Str("connID", connID).Msg("SSE client buffer full, dropping event") + } + } +} diff --git a/backend/internal/hub/sse/broker_test.go b/backend/internal/hub/sse/broker_test.go new file mode 100644 index 00000000..e0933412 --- /dev/null +++ b/backend/internal/hub/sse/broker_test.go @@ -0,0 +1,153 @@ +package sse + +import ( + "os" + "testing" + "time" + + "github.com/rs/zerolog" +) + +func newTestLogger() *zerolog.Logger { + log := zerolog.New(os.Stderr).Level(zerolog.Disabled) + return &log +} + +func TestBrokerSubscribe(t *testing.T) { + b := NewBroker(newTestLogger()) + connID, ch := b.Subscribe() + defer b.Unsubscribe(connID) + + if connID == "" { + t.Fatal("expected non-empty connID") + } + if ch == nil { + t.Fatal("expected non-nil channel") + } +} + +func TestBrokerUnsubscribe_ClosesChannel(t *testing.T) { + b := NewBroker(newTestLogger()) + connID, ch := b.Subscribe() + b.Unsubscribe(connID) + + select { + case _, open := <-ch: + if open { + t.Fatal("expected channel to be closed after unsubscribe") + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for channel to close") + } +} + +func TestBrokerUnsubscribe_NonExistent(t *testing.T) { + b := NewBroker(newTestLogger()) + b.Unsubscribe("nonexistent-id") // must not panic +} + +func TestBrokerPublish_DeliveredToSubscriber(t *testing.T) { + b := NewBroker(newTestLogger()) + connID, ch := b.Subscribe() + defer b.Unsubscribe(connID) + + b.Publish(Event{Type: EventTypeUpdate, URL: "http://example.com"}) + + select { + case received := <-ch: + if received.Type != EventTypeUpdate { + t.Errorf("expected type %q, got %q", EventTypeUpdate, received.Type) + } + if received.URL != "http://example.com" { + t.Errorf("expected URL %q, got %q", "http://example.com", received.URL) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for event") + } +} + +func TestBrokerPublish_DeliveredToAllSubscribers(t *testing.T) { + b := NewBroker(newTestLogger()) + + ids := make([]string, 3) + chs := make([]<-chan Event, 3) + for i := range 3 { + ids[i], chs[i] = b.Subscribe() + } + defer func() { + for _, id := range ids { + b.Unsubscribe(id) + } + }() + + b.Publish(Event{Type: EventTypeUpdate}) + + for i, ch := range chs { + select { + case received := <-ch: + if received.Type != EventTypeUpdate { + t.Errorf("subscriber %d: expected type %q, got %q", i, EventTypeUpdate, received.Type) + } + case <-time.After(time.Second): + t.Fatalf("subscriber %d: timed out waiting for event", i) + } + } +} + +func TestPublishUpdate_NilBroker(t *testing.T) { + DefaultBroker = nil + PublishUpdate("/api/v1/repositories") // must not panic +} + +func TestPublishUpdate_SendsUpdateEvent(t *testing.T) { + b := NewBroker(newTestLogger()) + DefaultBroker = b + t.Cleanup(func() { DefaultBroker = nil }) + + connID, ch := b.Subscribe() + defer b.Unsubscribe(connID) + + PublishUpdate("/api/v1/repositories") + + select { + case received := <-ch: + if received.Type != EventTypeUpdate { + t.Errorf("expected type %q, got %q", EventTypeUpdate, received.Type) + } + if received.URL != "/api/v1/repositories" { + t.Errorf("expected URL %q, got %q", "/api/v1/repositories", received.URL) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for event") + } +} + +func TestBrokerPublish_DropsWhenBufferFull(t *testing.T) { + b := NewBroker(newTestLogger()) + connID, ch := b.Subscribe() + defer b.Unsubscribe(connID) + + // Publish more than the channel buffer capacity (16) + for range 20 { + b.Publish(Event{Type: EventTypeUpdate}) + } + + count := 0 + deadline := time.After(100 * time.Millisecond) +drain: + for { + select { + case <-ch: + count++ + case <-deadline: + break drain + } + } + + if count > 16 { + t.Errorf("expected at most 16 events (buffer capacity), received %d", count) + } + if count == 0 { + t.Error("expected at least some events to be received") + } +} diff --git a/frontend/src/components/dialogs/upsert-oidc-provider-dialog.tsx b/frontend/src/components/dialogs/upsert-oidc-provider-dialog.tsx index da26a551..c11b6a23 100644 --- a/frontend/src/components/dialogs/upsert-oidc-provider-dialog.tsx +++ b/frontend/src/components/dialogs/upsert-oidc-provider-dialog.tsx @@ -37,11 +37,9 @@ const providerSchema = z.object({ export default function UpsertOIDCProviderDialog({ provider, - onSave, asDropdownItem = false, }: { provider: OIDCProviderDetail | null; - onSave: () => void; asDropdownItem?: boolean; }) { const isEditing = !!provider; @@ -95,7 +93,6 @@ export default function UpsertOIDCProviderDialog({ }); toast.success("Provider created"); } - onSave(); setOpen(false); } catch (err) { toast.error(err instanceof Error ? err.message : "Failed to save provider"); diff --git a/frontend/src/components/dialogs/upsert-user-dialog.tsx b/frontend/src/components/dialogs/upsert-user-dialog.tsx index 7c0dfd6a..a78aa4a9 100644 --- a/frontend/src/components/dialogs/upsert-user-dialog.tsx +++ b/frontend/src/components/dialogs/upsert-user-dialog.tsx @@ -20,8 +20,6 @@ import { Label } from "../ui/label"; import { Input } from "../ui/input"; import { RadioGroup, RadioGroupItem } from "../ui/radio-group"; import { Checkbox } from "../ui/checkbox"; -import { mutate } from "swr"; -import { API_BASE } from "@/lib/api"; import CopyValueDialog from "./copy-value-dialog"; const baseSchema = z.object({ @@ -85,7 +83,6 @@ export default function UpsertUserDialog({ setIsGeneratedPasswordOpen(true); } - await mutate(`${API_BASE}/admin/users`); setOpen(false); } catch (err) { toast.error(err instanceof Error ? err.message : "Failed to save user"); diff --git a/frontend/src/components/tables/users/columns.tsx b/frontend/src/components/tables/users/columns.tsx index 141d9431..46393620 100644 --- a/frontend/src/components/tables/users/columns.tsx +++ b/frontend/src/components/tables/users/columns.tsx @@ -14,8 +14,6 @@ import { deleteUser, type UserDetail } from "@/lib/users"; import { useAuth } from "@/lib/auth"; import { Badge } from "@/components/ui/badge"; import UpsertUserDialog from "@/components/dialogs/upsert-user-dialog"; -import { API_BASE } from "@/lib/api"; -import { mutate } from "swr"; export const columns: ColumnDef[] = [ { @@ -87,7 +85,6 @@ export const columns: ColumnDef[] = [ } catch (err) { toast.error(err instanceof Error ? err.message : "Failed to delete user"); } - await mutate(`${API_BASE}/admin/users`); } return ( diff --git a/frontend/src/lib/agents.ts b/frontend/src/lib/agents.ts index 785d3b16..caba795b 100644 --- a/frontend/src/lib/agents.ts +++ b/frontend/src/lib/agents.ts @@ -1,5 +1,4 @@ -import { mutate } from "swr"; -import { API_BASE, fetcher } from "./api"; +import { fetcher } from "./api"; export enum AgentStatus { Offline = "offline", @@ -29,19 +28,14 @@ export interface UpdateAgentRequest { name: string; } -export async function createAgent(data: CreateAgentRequest): Promise { - const res = await fetcher("/agents", "POST", data); - await mutate(`${API_BASE}/agents`); - return res; +export function createAgent(data: CreateAgentRequest): Promise { + return fetcher("/agents", "POST", data); } -export async function updateAgent(id: string, data: UpdateAgentRequest): Promise { - const res = await fetcher(`/agents/${id}`, "PUT", data); - await mutate(`${API_BASE}/agents`); - return res; +export function updateAgent(id: string, data: UpdateAgentRequest): Promise { + return fetcher(`/agents/${id}`, "PUT", data); } -export async function deleteAgent(id: string): Promise { - await fetcher(`/agents/${id}`, "DELETE"); - await mutate(`${API_BASE}/agents`); +export function deleteAgent(id: string): Promise { + return fetcher(`/agents/${id}`, "DELETE"); } diff --git a/frontend/src/lib/applications.ts b/frontend/src/lib/applications.ts index c791f646..b50f4c84 100644 --- a/frontend/src/lib/applications.ts +++ b/frontend/src/lib/applications.ts @@ -1,5 +1,4 @@ -import { mutate } from "swr"; -import { API_BASE, fetcher } from "./api"; +import { fetcher } from "./api"; export interface Application { id: string; @@ -68,22 +67,17 @@ interface UpdateApplicationRequest { path: string; } -export async function createApplication(data: CreateApplicationRequest): Promise { - const res = await fetcher("/applications", "POST", data); - await mutate(`${API_BASE}/applications`); - return res; +export function createApplication(data: CreateApplicationRequest): Promise { + return fetcher("/applications", "POST", data); } -export async function updateApplication( +export function updateApplication( id: string, data: UpdateApplicationRequest, ): Promise { - const res = await fetcher(`/applications/${id}`, "PUT", data); - await mutate(`${API_BASE}/applications`); - return res; + return fetcher(`/applications/${id}`, "PUT", data); } -export async function deleteApplication(id: string): Promise { - await fetcher(`/applications/${id}`, "DELETE"); - await mutate(`${API_BASE}/applications`); +export function deleteApplication(id: string): Promise { + return fetcher(`/applications/${id}`, "DELETE"); } diff --git a/frontend/src/lib/auth.tsx b/frontend/src/lib/auth.tsx index f493e2dc..d4496ecb 100644 --- a/frontend/src/lib/auth.tsx +++ b/frontend/src/lib/auth.tsx @@ -1,5 +1,6 @@ -import { createContext, useCallback, useContext } from "react"; +import { createContext, useCallback, useContext, useEffect } from "react"; import { API_BASE, fetcher, useFetch } from "./api"; +import { connect, disconnect } from "./sse"; export interface AuthState { isAuthenticated: boolean; @@ -44,6 +45,18 @@ export function AuthProvider({ children }: { children: React.ReactNode }) { await mutate(); }, [mutate]); + useEffect(() => { + if (isLoading) { + return; + } + if (auth.isAuthenticated) { + connect(refreshAuth); + } else { + disconnect(); + } + return disconnect; + }, [isLoading, auth.isAuthenticated, refreshAuth]); + const logout = useCallback(async () => { await fetch(`${API_BASE}/auth/logout`, { method: "POST" }); await mutate(undefined, false); diff --git a/frontend/src/lib/repsitories.ts b/frontend/src/lib/repsitories.ts index 06a96014..3c950099 100644 --- a/frontend/src/lib/repsitories.ts +++ b/frontend/src/lib/repsitories.ts @@ -1,5 +1,4 @@ -import { mutate } from "swr"; -import { API_BASE, fetcher } from "./api"; +import { fetcher } from "./api"; export type RepositoryProvider = | "github" @@ -53,28 +52,20 @@ export interface TestConnectionRequest { authToken?: string; } -export async function createRepository(data: CreateRepositoryRequest): Promise { - const res = await fetcher("/repositories", "POST", data); - await mutate(`${API_BASE}/repositories`); - return res; +export function createRepository(data: CreateRepositoryRequest): Promise { + return fetcher("/repositories", "POST", data); } export function testRepositoryConnection(data: TestConnectionRequest): Promise { return fetcher("/repositories/test-connection", "POST", data); } -export async function deleteRepository(id: string): Promise { - await fetcher(`/repositories/${id}`, "DELETE"); - await mutate(`${API_BASE}/repositories`); +export function deleteRepository(id: string): Promise { + return fetcher(`/repositories/${id}`, "DELETE"); } -export async function updateRepository( - id: string, - data: UpdateRepositoryRequest, -): Promise { - const res = await fetcher(`/repositories/${id}`, "PUT", data); - await mutate(`${API_BASE}/repositories`); - return res; +export function updateRepository(id: string, data: UpdateRepositoryRequest): Promise { + return fetcher(`/repositories/${id}`, "PUT", data); } export function getGitProviderIconPath(provider: RepositoryProvider): string { diff --git a/frontend/src/lib/sse.ts b/frontend/src/lib/sse.ts new file mode 100644 index 00000000..fd3d97fe --- /dev/null +++ b/frontend/src/lib/sse.ts @@ -0,0 +1,27 @@ +import { mutate } from "swr"; +import { API_BASE } from "./api"; + +let es: EventSource | undefined = undefined; + +export function connect(onUnauthorized?: () => void): void { + if (es) { + return; + } + + es = new EventSource(`${API_BASE}/events`); + + es.addEventListener("update", async (e) => { + const event = JSON.parse(e.data) as { url: string }; + await mutate((key) => typeof key === "string" && key.startsWith(event.url)); + }); + + es.addEventListener("unauthorized", () => { + disconnect(); + onUnauthorized?.(); + }); +} + +export function disconnect(): void { + es?.close(); + es = undefined; +} diff --git a/frontend/src/routes/_authenticated/admin/oidc-providers.tsx b/frontend/src/routes/_authenticated/admin/oidc-providers.tsx index 86935655..926d8278 100644 --- a/frontend/src/routes/_authenticated/admin/oidc-providers.tsx +++ b/frontend/src/routes/_authenticated/admin/oidc-providers.tsx @@ -38,26 +38,17 @@ export const Route = createFileRoute("/_authenticated/admin/oidc-providers")({ }); function OIDCProvidersPage() { - const { - data: providers, - mutate, - isLoading, - } = useFetch("/admin/oidc-providers"); + const { data: providers, isLoading } = useFetch("/admin/oidc-providers"); async function handleDelete(provider: OIDCProviderDetail) { try { await deleteOIDCProvider(provider.id); toast.success("Provider deleted"); - await mutate(); } catch (err) { toast.error(err instanceof Error ? err.message : "Failed to delete provider"); } } - async function handleSave() { - await mutate(); - } - return (
@@ -68,7 +59,7 @@ function OIDCProvidersPage() {

- +
{isLoading &&

Loading providers...

} @@ -98,11 +89,7 @@ function OIDCProvidersPage() { - + handleDelete(provider)}