Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions backend/internal/hub/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/OrcaCD/orca-cd/internal/hub/middleware"
"github.com/OrcaCD/orca-cd/internal/hub/routes"
"github.com/OrcaCD/orca-cd/internal/hub/sse"
"github.com/OrcaCD/orca-cd/internal/hub/websocket"
"github.com/OrcaCD/orca-cd/internal/version"
"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -67,6 +68,8 @@ func RegisterRoutes(router *gin.Engine, cfg Config) error {
protected.GET("/agents/:id", routes.GetAgentHandler)
protected.PUT("/agents/:id", routes.UpdateAgentHandler)
protected.DELETE("/agents/:id", routes.DeleteAgentHandler)

protected.GET("/events", routes.SSEHandler)
}

// Admin routes (authentication + admin role required)
Expand All @@ -87,6 +90,8 @@ func RegisterRoutes(router *gin.Engine, cfg Config) error {
admin.DELETE("/oidc-providers/:id", routes.AdminDeleteOIDCProviderHandler)
}

sse.DefaultBroker = sse.NewBroker(&Log)

h := websocket.NewHub(&Log)
w := websocket.NewWorker(h, &Log)
w.Start()
Expand Down
5 changes: 3 additions & 2 deletions backend/internal/hub/middleware/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (

func TimeoutMiddleware(timeout time.Duration) gin.HandlerFunc {
return func(c *gin.Context) {
// WebSocket connections are long-lived; skip the timeout.
if strings.EqualFold(c.Request.Header.Get("Upgrade"), "websocket") {
// WebSocket and SSE connections are long-lived; skip the timeout.
if strings.EqualFold(c.Request.Header.Get("Upgrade"), "websocket") ||
strings.Contains(c.Request.Header.Get("Accept"), "text/event-stream") {
c.Next()
Comment thread
timokoessler marked this conversation as resolved.
return
}
Expand Down
29 changes: 29 additions & 0 deletions backend/internal/hub/middleware/timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,35 @@ func TestTimeoutMiddleware_SkipsWebSocketRequests(t *testing.T) {
}
}

func TestTimeoutMiddleware_SkipsSSERequests(t *testing.T) {
t.Parallel()

for _, acceptVal := range []string{"text/event-stream", "text/event-stream, text/html"} {
t.Run(acceptVal, func(t *testing.T) {
t.Parallel()

router := gin.New()
router.Use(TimeoutMiddleware(100 * time.Millisecond))
router.GET("/events", func(c *gin.Context) {
_, ok := c.Request.Context().Deadline()
if ok {
t.Fatal("expected no deadline for SSE request")
}
c.Status(http.StatusOK)
})

w := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/events", nil)
req.Header.Set("Accept", acceptVal)
router.ServeHTTP(w, req)

if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
})
}
}

func TestTimeoutMiddleware_CancelsContextAfterHandlerReturns(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 4 additions & 0 deletions backend/internal/hub/routes/repositories.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/OrcaCD/orca-cd/internal/hub/db"
"github.com/OrcaCD/orca-cd/internal/hub/models"
"github.com/OrcaCD/orca-cd/internal/hub/repositories"
"github.com/OrcaCD/orca-cd/internal/hub/sse"
"github.com/gin-gonic/gin"
"gorm.io/gorm"
)
Expand Down Expand Up @@ -188,6 +189,7 @@ func CreateRepositoryHandler(c *gin.Context) {
}

c.JSON(http.StatusCreated, toRepositoryResponse(&repo, true))
sse.PublishUpdate("/api/v1/repositories")
}
Comment thread
timokoessler marked this conversation as resolved.

type testConnectionRequest struct {
Expand Down Expand Up @@ -255,6 +257,7 @@ func DeleteRepositoryHandler(c *gin.Context) {
}

c.JSON(http.StatusOK, gin.H{"message": "repository deleted"})
sse.PublishUpdate("/api/v1/repositories")
}

type updateRepositoryRequest struct {
Expand Down Expand Up @@ -363,6 +366,7 @@ func UpdateRepositoryHandler(c *gin.Context) {

newWebhookSecret := req.SyncType == models.SyncTypeWebhook && prevSyncType != models.SyncTypeWebhook
c.JSON(http.StatusOK, toRepositoryResponse(&repo, newWebhookSecret))
sse.PublishUpdate("/api/v1/repositories")
}

// resolveProvider validates the provider enum, URL, and authMethod, returning the
Expand Down
131 changes: 131 additions & 0 deletions backend/internal/hub/routes/repositories_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/OrcaCD/orca-cd/internal/hub/auth"
"github.com/OrcaCD/orca-cd/internal/hub/crypto"
"github.com/OrcaCD/orca-cd/internal/hub/db"
"github.com/OrcaCD/orca-cd/internal/hub/models"
"github.com/OrcaCD/orca-cd/internal/hub/sse"
"github.com/OrcaCD/orca-cd/internal/shared/httpclient"
"github.com/gin-gonic/gin"
"github.com/golang-jwt/jwt/v5"
"gorm.io/gorm"
)

const repositoriesAPIPath = "/api/v1/repositories"

func setupTestDBWithRepos(t *testing.T) {
t.Helper()
setupTestDB(t)
Expand Down Expand Up @@ -907,6 +911,133 @@ func TestUpdateRepositoryHandler_SwitchFromWebhook(t *testing.T) {
}
}

func TestCreateRepositoryHandler_PublishesSSEEvent(t *testing.T) {
setupTestDBWithRepos(t)
broker := newTestSSEBroker(t)
connID, ch := broker.Subscribe()
defer broker.Unsubscribe(connID)

reqBody, _ := json.Marshal(map[string]any{
"url": "https://github.com/owner/sse-repo",
"provider": "github",
"authMethod": "none",
"syncType": "manual",
})

c, w := makeAuthContext(t, "user-1")
c.Request = httptest.NewRequest(http.MethodPost, "/api/v1/repositories", bytes.NewReader(reqBody))
c.Request.Header.Set("Content-Type", "application/json")

CreateRepositoryHandler(c)

if w.Code != http.StatusCreated {
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
}

select {
case event := <-ch:
if event.Type != sse.EventTypeUpdate {
t.Errorf("expected event type %q, got %q", sse.EventTypeUpdate, event.Type)
}
if event.URL != repositoriesAPIPath {
t.Errorf("expected URL %q, got %q", repositoriesAPIPath, event.URL)
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for SSE event")
}
}

func TestUpdateRepositoryHandler_PublishesSSEEvent(t *testing.T) {
setupTestDBWithRepos(t)
broker := newTestSSEBroker(t)
connID, ch := broker.Subscribe()
defer broker.Unsubscribe(connID)

repo := models.Repository{
Name: "owner/repo",
Url: "https://github.com/owner/repo",
Provider: models.GitHub,
AuthMethod: models.AuthMethodNone,
SyncType: models.SyncTypeManual,
SyncStatus: models.SyncStatusUnknown,
CreatedBy: "user-1",
}
if err := db.DB.Select("*").Create(&repo).Error; err != nil {
t.Fatalf("failed to seed repo: %v", err)
}

reqBody, _ := json.Marshal(map[string]any{
"url": "https://github.com/owner/repo",
"authMethod": "none",
"syncType": "manual",
})

c, w := makeAuthContext(t, "user-1")
c.Request = httptest.NewRequest(http.MethodPut, "/api/v1/repositories/"+repo.Id, bytes.NewReader(reqBody))
c.Request.Header.Set("Content-Type", "application/json")
c.Params = gin.Params{{Key: "id", Value: repo.Id}}

UpdateRepositoryHandler(c)

if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}

select {
case event := <-ch:
if event.Type != sse.EventTypeUpdate {
t.Errorf("expected event type %q, got %q", sse.EventTypeUpdate, event.Type)
}
if event.URL != repositoriesAPIPath {
t.Errorf("expected URL %q, got %q", repositoriesAPIPath, event.URL)
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for SSE event")
}
}

func TestDeleteRepositoryHandler_PublishesSSEEvent(t *testing.T) {
setupTestDBWithRepos(t)
broker := newTestSSEBroker(t)
connID, ch := broker.Subscribe()
defer broker.Unsubscribe(connID)

repo := models.Repository{
Name: "To Delete SSE",
Url: "https://github.com/owner/sse-delete",
Provider: models.GitHub,
AuthMethod: models.AuthMethodNone,
SyncType: models.SyncTypeManual,
SyncStatus: models.SyncStatusUnknown,
CreatedBy: "user-1",
}
if err := db.DB.Select("*").Create(&repo).Error; err != nil {
t.Fatalf("failed to seed repo: %v", err)
}

c, w := makeAuthContext(t, "user-1")
c.Request = httptest.NewRequest(http.MethodDelete, "/api/v1/repositories/"+repo.Id, nil)
c.Params = gin.Params{{Key: "id", Value: repo.Id}}

DeleteRepositoryHandler(c)

if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}

select {
case event := <-ch:
if event.Type != sse.EventTypeUpdate {
t.Errorf("expected event type %q, got %q", sse.EventTypeUpdate, event.Type)
}
if event.URL != repositoriesAPIPath {
t.Errorf("expected URL %q, got %q", repositoriesAPIPath, event.URL)
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for SSE event")
}
}

func TestTestConnectionHandler_Success(t *testing.T) {
setupTestDBWithRepos(t)
restore := mockHTTPClient(http.StatusOK)
Expand Down
69 changes: 69 additions & 0 deletions backend/internal/hub/routes/sse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package routes

import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/OrcaCD/orca-cd/internal/hub/auth"
"github.com/OrcaCD/orca-cd/internal/hub/sse"
"github.com/gin-gonic/gin"
)

func SSEHandler(c *gin.Context) {
claims, ok := auth.GetClaims(c)
if !ok {
c.JSON(http.StatusUnauthorized, gin.H{"error": "missing authentication"})
return
}

// Clear the write deadline so the server's WriteTimeout doesn't kill long-lived SSE connections.
rc := http.NewResponseController(c.Writer)
if err := rc.SetWriteDeadline(time.Time{}); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal server error"})
return
}

c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache, no-transform")
c.Header("Connection", "keep-alive")
c.Header("X-Accel-Buffering", "no")

connID, ch := sse.DefaultBroker.Subscribe()
defer sse.DefaultBroker.Unsubscribe(connID)

Comment thread
timokoessler marked this conversation as resolved.
// Flush initial comment to establish the stream
if _, err := fmt.Fprint(c.Writer, ": connected\n\n"); err != nil {
return
}
c.Writer.Flush()

timer := time.NewTimer(time.Until(claims.ExpiresAt.Time))
defer timer.Stop()

for {
select {
case <-timer.C:
if _, err := fmt.Fprint(c.Writer, "event: unauthorized\ndata: {}\n\n"); err != nil {
return
}
c.Writer.Flush()
return
case event, open := <-ch:
if !open {
return
}
data, err := json.Marshal(event)
if err != nil {
continue
}
if _, err := fmt.Fprintf(c.Writer, "event: %s\ndata: %s\n\n", event.Type, data); err != nil {
return
}
c.Writer.Flush()
case <-c.Request.Context().Done():
return
}
Comment thread
timokoessler marked this conversation as resolved.
}
}
Loading
Loading