Skip to content
2 changes: 1 addition & 1 deletion .outpost.yaml.dev
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ idgen:
event_prefix: "evt"
destination_prefix: "des"
delivery_prefix: "dlv"
delivery_event_prefix: "dev"
"

# Concurrency
publish_max_concurrency: 1
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
TEST?=./...
TEST?=./internal/...
RUN?=

# Build targets
Expand Down
163 changes: 163 additions & 0 deletions cmd/e2e/suites_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,3 +544,166 @@ func TestE2E_Regression_AutoDisableWithoutCallbackURL(t *testing.T) {
// Cleanup mock server
_ = mockServerInfra
}

// TestE2E_Regression_RetryRaceCondition verifies that retries are not lost when
// the retry scheduler queries logstore before the event has been persisted.
//
// Test configuration creates a timing window where retry fires before log persistence:
// - LogBatchThresholdSeconds = 5 (slow persistence)
// - RetryIntervalSeconds = 1 (fast retry)
// - RetryVisibilityTimeoutSeconds = 2 (quick reprocessing when event not found)
//
// Expected behavior: retry remains in queue until event is available, then succeeds.
func TestE2E_Regression_RetryRaceCondition(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping e2e test")
}

// Setup infrastructure
testinfraCleanup := testinfra.Start(t)
defer testinfraCleanup()
gin.SetMode(gin.TestMode)
mockServerBaseURL := testinfra.GetMockServer(t)

// Configure with slow log persistence and fast retry
cfg := configs.Basic(t, configs.BasicOpts{
LogStorage: configs.LogStorageTypeClickHouse,
})

// SLOW log persistence: batch won't flush for 5 seconds
cfg.LogBatchThresholdSeconds = 5
cfg.LogBatchSize = 10000 // High batch size to prevent early flush

// FAST retry: retry fires after ~1 second
cfg.RetryIntervalSeconds = 1
cfg.RetryPollBackoffMs = 50
cfg.RetryMaxLimit = 5
cfg.RetryVisibilityTimeoutSeconds = 2 // Short VT so retry happens quickly after event not found

require.NoError(t, cfg.Validate(config.Flags{}))

// Start application
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

appDone := make(chan struct{})
go func() {
defer close(appDone)
application := app.New(&cfg)
if err := application.Run(ctx); err != nil {
log.Println("Application stopped:", err)
}
}()
defer func() {
cancel()
<-appDone
}()

// Wait for services to start
waitForHealthy(t, cfg.APIPort, 5*time.Second)

// Setup test client
client := httpclient.New(fmt.Sprintf("http://localhost:%d/api/v1", cfg.APIPort), cfg.APIKey)
mockServerInfra := testinfra.NewMockServerInfra(mockServerBaseURL)

// Test data
tenantID := fmt.Sprintf("tenant_race_%d", time.Now().UnixNano())
destinationID := fmt.Sprintf("dest_race_%d", time.Now().UnixNano())
secret := "testsecret1234567890abcdefghijklmnop"

// Create tenant
resp, err := client.Do(httpclient.Request{
Method: httpclient.MethodPUT,
Path: "/tenants/" + tenantID,
Headers: map[string]string{"Authorization": "Bearer " + cfg.APIKey},
})
require.NoError(t, err)
require.Equal(t, 201, resp.StatusCode, "failed to create tenant")

// Configure mock server destination
resp, err = client.Do(httpclient.Request{
Method: httpclient.MethodPUT,
BaseURL: mockServerBaseURL,
Path: "/destinations",
Body: map[string]interface{}{
"id": destinationID,
"type": "webhook",
"config": map[string]interface{}{
"url": fmt.Sprintf("%s/webhook/%s", mockServerBaseURL, destinationID),
},
"credentials": map[string]interface{}{
"secret": secret,
},
},
})
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode, "failed to configure mock server")

// Create destination
resp, err = client.Do(httpclient.Request{
Method: httpclient.MethodPOST,
Path: "/tenants/" + tenantID + "/destinations",
Headers: map[string]string{"Authorization": "Bearer " + cfg.APIKey},
Body: map[string]interface{}{
"id": destinationID,
"type": "webhook",
"topics": "*",
"config": map[string]interface{}{
"url": fmt.Sprintf("%s/webhook/%s", mockServerBaseURL, destinationID),
},
"credentials": map[string]interface{}{
"secret": secret,
},
},
})
require.NoError(t, err)
require.Equal(t, 201, resp.StatusCode, "failed to create destination")

// Publish event that will always fail (should_err: true)
// We want to verify that retries happen (mock server is hit multiple times)
resp, err = client.Do(httpclient.Request{
Method: httpclient.MethodPOST,
Path: "/publish",
Headers: map[string]string{"Authorization": "Bearer " + cfg.APIKey},
Body: map[string]interface{}{
"tenant_id": tenantID,
"topic": "user.created",
"eligible_for_retry": true,
"metadata": map[string]interface{}{
"should_err": "true", // All deliveries fail
},
"data": map[string]interface{}{
"test": "race-condition-test",
},
},
})
require.NoError(t, err)
require.Equal(t, 202, resp.StatusCode, "failed to publish event")

// Wait for retries to complete
// - t=0: Event published, first delivery fails
// - t=1s: Retry fires, event not in logstore yet, message returns to queue
// - t=3s: Message visible again after 2s VT, retry fires again
// - t=5s: Log batch flushes, event now in logstore
// - t=5s+: Retry finds event, delivery succeeds
time.Sleep(10 * time.Second)

// Verify mock server received multiple delivery attempts
resp, err = client.Do(httpclient.Request{
Method: httpclient.MethodGET,
BaseURL: mockServerBaseURL,
Path: "/destinations/" + destinationID + "/events",
})
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode)

events, ok := resp.Body.([]interface{})
require.True(t, ok, "expected events array")

// Should have at least 2 attempts: initial failure + successful retry
require.GreaterOrEqual(t, len(events), 2,
"expected multiple delivery attempts (initial + retry after event persisted)")

_ = mockServerInfra
}
28 changes: 27 additions & 1 deletion internal/apirouter/retry_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,15 @@ func TestRetryDelivery(t *testing.T) {

require.NoError(t, result.logStore.InsertMany(context.Background(), []*models.LogEntry{{Event: event, Delivery: delivery}}))

t.Run("should retry delivery successfully", func(t *testing.T) {
t.Run("should retry delivery successfully with full event data", func(t *testing.T) {
// Subscribe to deliveryMQ to capture published task
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

subscription, err := result.deliveryMQ.Subscribe(ctx)
require.NoError(t, err)

// Trigger manual retry
w := httptest.NewRecorder()
req, _ := http.NewRequest("POST", baseAPIPath+"/tenants/"+tenantID+"/deliveries/"+deliveryID+"/retry", nil)
result.router.ServeHTTP(w, req)
Expand All @@ -69,6 +77,24 @@ func TestRetryDelivery(t *testing.T) {
var response map[string]interface{}
require.NoError(t, json.Unmarshal(w.Body.Bytes(), &response))
assert.Equal(t, true, response["success"])

// Verify published task has full event data
msg, err := subscription.Receive(ctx)
require.NoError(t, err)

var task models.DeliveryTask
require.NoError(t, json.Unmarshal(msg.Body, &task))

assert.Equal(t, eventID, task.Event.ID)
assert.Equal(t, tenantID, task.Event.TenantID)
assert.Equal(t, destinationID, task.Event.DestinationID)
assert.Equal(t, "order.created", task.Event.Topic)
assert.False(t, task.Event.Time.IsZero(), "event time should be set")
assert.Equal(t, eventTime.UTC(), task.Event.Time.UTC())
assert.Equal(t, event.Data, task.Event.Data, "event data should match original")
assert.True(t, task.Manual, "should be marked as manual retry")

msg.Ack()
})

t.Run("should return 404 for non-existent delivery", func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions internal/apirouter/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type testRouterResult struct {
redisClient redis.Client
entityStore models.EntityStore
logStore logstore.LogStore
deliveryMQ *deliverymq.DeliveryMQ
}

func setupTestRouter(t *testing.T, apiKey, jwtSecret string, funcs ...func(t *testing.T) clickhouse.DB) (http.Handler, *logging.Logger, redis.Client) {
Expand Down Expand Up @@ -73,6 +74,7 @@ func setupTestRouterFull(t *testing.T, apiKey, jwtSecret string, funcs ...func(t
redisClient: redisClient,
entityStore: entityStore,
logStore: logStore,
deliveryMQ: deliveryMQ,
}
}

Expand Down
4 changes: 3 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ type Config struct {
RetrySchedule []int `yaml:"retry_schedule" env:"RETRY_SCHEDULE" envSeparator:"," desc:"Comma-separated list of retry delays in seconds. If provided, overrides retry_interval_seconds and retry_max_limit. Schedule length defines the max number of retries. Example: '5,60,600,3600,7200' for 5 retries at 5s, 1m, 10m, 1h, 2h." required:"N"`
RetryIntervalSeconds int `yaml:"retry_interval_seconds" env:"RETRY_INTERVAL_SECONDS" desc:"Interval in seconds for exponential backoff retry strategy (base 2). Ignored if retry_schedule is provided." required:"N"`
RetryMaxLimit int `yaml:"retry_max_limit" env:"MAX_RETRY_LIMIT" desc:"Maximum number of retry attempts for a single event delivery before giving up. Ignored if retry_schedule is provided." required:"N"`
RetryPollBackoffMs int `yaml:"retry_poll_backoff_ms" env:"RETRY_POLL_BACKOFF_MS" desc:"Backoff time in milliseconds when the retry monitor finds no messages to process. When a retry message is found, the monitor immediately polls for the next message without delay. Lower values provide faster retry processing but increase Redis load. For serverless Redis providers (Upstash, ElastiCache Serverless), consider increasing to 5000-10000ms to reduce costs. Default: 100" required:"N"`
RetryPollBackoffMs int `yaml:"retry_poll_backoff_ms" env:"RETRY_POLL_BACKOFF_MS" desc:"Backoff time in milliseconds when the retry monitor finds no messages to process. When a retry message is found, the monitor immediately polls for the next message without delay. Lower values provide faster retry processing but increase Redis load. For serverless Redis providers (Upstash, ElastiCache Serverless), consider increasing to 5000-10000ms to reduce costs. Default: 100" required:"N"`
RetryVisibilityTimeoutSeconds int `yaml:"retry_visibility_timeout_seconds" env:"RETRY_VISIBILITY_TIMEOUT_SECONDS" desc:"Time in seconds a retry message is hidden after being received before becoming visible again for reprocessing. This applies when event data is temporarily unavailable (e.g., race condition with log persistence). Default: 30" required:"N"`

// Event Delivery
MaxDestinationsPerTenant int `yaml:"max_destinations_per_tenant" env:"MAX_DESTINATIONS_PER_TENANT" desc:"Maximum number of destinations allowed per tenant/organization." required:"N"`
Expand Down Expand Up @@ -165,6 +166,7 @@ func (c *Config) InitDefaults() {
c.RetryIntervalSeconds = 30
c.RetryMaxLimit = 10
c.RetryPollBackoffMs = 100
c.RetryVisibilityTimeoutSeconds = 30
c.MaxDestinationsPerTenant = 20
c.DeliveryTimeoutSeconds = 5
c.PublishIdempotencyKeyTTL = 3600 // 1 hour
Expand Down
36 changes: 0 additions & 36 deletions internal/deliverymq/messagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/hookdeck/outpost/internal/destregistry"
"github.com/hookdeck/outpost/internal/idempotence"
"github.com/hookdeck/outpost/internal/logging"
"github.com/hookdeck/outpost/internal/logstore"
"github.com/hookdeck/outpost/internal/models"
"github.com/hookdeck/outpost/internal/mqs"
"github.com/hookdeck/outpost/internal/scheduler"
Expand Down Expand Up @@ -70,7 +69,6 @@ type messageHandler struct {
logger *logging.Logger
logMQ LogPublisher
entityStore DestinationGetter
logStore EventGetter
retryScheduler RetryScheduler
retryBackoff backoff.Backoff
retryMaxLimit int
Expand All @@ -96,10 +94,6 @@ type DestinationGetter interface {
RetrieveDestination(ctx context.Context, tenantID, destID string) (*models.Destination, error)
}

type EventGetter interface {
RetrieveEvent(ctx context.Context, request logstore.RetrieveEventRequest) (*models.Event, error)
}

type DeliveryTracer interface {
Deliver(ctx context.Context, task *models.DeliveryTask, destination *models.Destination) (context.Context, trace.Span)
}
Expand All @@ -112,7 +106,6 @@ func NewMessageHandler(
logger *logging.Logger,
logMQ LogPublisher,
entityStore DestinationGetter,
logStore EventGetter,
publisher Publisher,
eventTracer DeliveryTracer,
retryScheduler RetryScheduler,
Expand All @@ -126,7 +119,6 @@ func NewMessageHandler(
logger: logger,
logMQ: logMQ,
entityStore: entityStore,
logStore: logStore,
publisher: publisher,
retryScheduler: retryScheduler,
retryBackoff: retryBackoff,
Expand All @@ -150,11 +142,6 @@ func (h *messageHandler) Handle(ctx context.Context, msg *mqs.Message) error {
zap.String("destination_id", task.DestinationID),
zap.Int("attempt", task.Attempt))

// Ensure event data
if err := h.ensureDeliveryTask(ctx, &task); err != nil {
return h.handleError(msg, &PreDeliveryError{err: err})
}

// Get destination
destination, err := h.ensurePublishableDestination(ctx, task)
if err != nil {
Expand Down Expand Up @@ -443,29 +430,6 @@ func (h *messageHandler) scheduleRetry(ctx context.Context, task models.Delivery
return nil
}

// ensureDeliveryTask ensures that the delivery task has full event data.
// In retry scenarios, the task only has event ID and we'll need to query the full data.
func (h *messageHandler) ensureDeliveryTask(ctx context.Context, task *models.DeliveryTask) error {
// TODO: consider a more deliberate way to check for retry scenario?
if !task.Event.Time.IsZero() {
return nil
}

event, err := h.logStore.RetrieveEvent(ctx, logstore.RetrieveEventRequest{
TenantID: task.Event.TenantID,
EventID: task.Event.ID,
})
if err != nil {
return err
}
if event == nil {
return errors.New("event not found")
}
task.Event = *event

return nil
}

// ensurePublishableDestination ensures that the destination exists and is in a publishable state.
// Returns an error if the destination is not found, deleted, disabled, or any other state that
// would prevent publishing.
Expand Down
Loading