Skip to content

Commit d0fa4ce

Browse files
authored
fix: manual retry blocked by idempotency after first success (#670)
* chore: deliverymq log skipped task * test: duplicate manual retry (failing tests/tdd) * fix: include DeliveryTask.Nonce for manual retry * chore: remove unnecessary backward compat handling * chore: comment explain idempotency key context * test: redis testcontainer & improve test stability
1 parent c1d2b27 commit d0fa4ce

File tree

11 files changed

+214
-270
lines changed

11 files changed

+214
-270
lines changed

.env.test

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
# DBs
22
TEST_POSTGRES_URL="localhost:35432"
33
TEST_CLICKHOUSE_URL="localhost:39000"
4-
TEST_REDIS_URL="localhost:36379"
5-
TEST_DRAGONFLY_URL="localhost:37379"
64
# MQs
75
TEST_RABBITMQ_URL="localhost:35672"
86
TEST_LOCALSTACK_URL="localhost:34566"

build/test/compose.yml

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,6 @@ services:
88
dockerfile: ./build/test/Dockerfile.mock
99
ports:
1010
- 35555:5555
11-
redis:
12-
image: redis/redis-stack-server:latest
13-
ports:
14-
- 36379:6379
15-
dragonfly:
16-
image: docker.dragonflydb.io/dragonflydb/dragonfly
17-
command: ["--proactor_threads=1", "--maxmemory=256mb"]
18-
ports:
19-
- 37379:6379
2011
clickhouse:
2112
image: clickhouse/clickhouse-server:24-alpine
2213
ports:

cmd/e2e/retry_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,45 @@ func (s *basicSuite) TestRetry_ManualRetryCreatesNewAttempt() {
9393
s.Equal(1, manualCount, "should have exactly one manual retry attempt")
9494
}
9595

96+
func (s *basicSuite) TestRetry_DuplicateManualRetryExecutesBoth() {
97+
tenant := s.createTenant()
98+
dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret))
99+
100+
eventID := idgen.Event()
101+
s.publish(tenant.ID, "user.created", map[string]any{
102+
"test": "duplicate_manual_retry",
103+
}, withEventID(eventID))
104+
105+
// Wait for initial delivery
106+
s.waitForNewAttempts(tenant.ID, 1)
107+
108+
// First manual retry
109+
status := s.retryEvent(eventID, dest.ID)
110+
s.Equal(http.StatusAccepted, status)
111+
s.waitForNewAttempts(tenant.ID, 2)
112+
113+
// Second manual retry for same event+destination
114+
status = s.retryEvent(eventID, dest.ID)
115+
s.Equal(http.StatusAccepted, status)
116+
s.waitForNewAttempts(tenant.ID, 3)
117+
118+
// Verify: 3 attempts total, 2 manual
119+
var resp struct {
120+
Models []map[string]any `json:"models"`
121+
}
122+
status = s.doJSON(http.MethodGet, s.apiURL("/attempts?tenant_id="+tenant.ID+"&event_id="+eventID+"&dir=asc"), nil, &resp)
123+
s.Require().Equal(http.StatusOK, status)
124+
s.Require().Len(resp.Models, 3, "should have 3 attempts: initial + 2 manual retries")
125+
126+
manualCount := 0
127+
for _, atm := range resp.Models {
128+
if manual, ok := atm["manual"].(bool); ok && manual {
129+
manualCount++
130+
}
131+
}
132+
s.Equal(2, manualCount, "should have exactly 2 manual retry attempts")
133+
}
134+
96135
func (s *basicSuite) TestRetry_ManualRetryOnDisabledDestinationRejected() {
97136
tenant := s.createTenant()
98137
dest := s.createWebhookDestination(tenant.ID, "*")

contributing/test.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ Integration and e2e tests require external services like ClickHouse, LocalStack,
134134

135135
### Why persistent infrastructure?
136136

137-
Lightweight services like Redis start quickly, but heavier dependencies like LocalStack (AWS) or GCP emulators can take 15-30 seconds to initialize. With persistent infrastructure, you pay this cost once and get fast iteration from then on.
137+
Redis and Dragonfly always use testcontainers (one container per test) since they start quickly. Heavier dependencies like LocalStack (AWS) or GCP emulators can take 15-30 seconds to initialize. With persistent infrastructure, you pay this cost once and get fast iteration from then on.
138138

139139
To run the test infrastructure:
140140

internal/deliverymq/messagehandler.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,21 @@ func (h *messageHandler) Handle(ctx context.Context, msg *mqs.Message) error {
147147
return h.handleError(msg, &PreDeliveryError{err: err})
148148
}
149149

150-
err = h.idempotence.Exec(ctx, idempotencyKeyFromDeliveryTask(task), func(ctx context.Context) error {
150+
executed := false
151+
idempotencyKey := idempotencyKeyFromDeliveryTask(task)
152+
err = h.idempotence.Exec(ctx, idempotencyKey, func(ctx context.Context) error {
153+
executed = true
151154
return h.doHandle(ctx, task, destination)
152155
})
156+
if err == nil && !executed {
157+
h.logger.Ctx(ctx).Info("delivery task skipped (idempotent)",
158+
zap.String("event_id", task.Event.ID),
159+
zap.String("tenant_id", task.Event.TenantID),
160+
zap.String("destination_id", task.DestinationID),
161+
zap.Int("attempt", task.Attempt),
162+
zap.Bool("manual", task.Manual),
163+
zap.String("idempotency_key", idempotencyKey))
164+
}
153165
return h.handleError(msg, err)
154166
}
155167

internal/deliverymq/messagehandler_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,6 +1118,66 @@ func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destina
11181118
}
11191119
}
11201120

1121+
func TestManualDelivery_DuplicateRetry(t *testing.T) {
1122+
// Test scenario:
1123+
// - First manual retry for event+destination succeeds
1124+
// - Second manual retry for same event+destination is requested
1125+
// - Second retry should also execute (not be blocked by idempotency)
1126+
//
1127+
// Manual retries are explicit user actions and should always execute,
1128+
// even if a previous manual retry for the same event+destination already succeeded.
1129+
1130+
// Setup test data
1131+
tenant := models.Tenant{ID: idgen.String()}
1132+
destination := testutil.DestinationFactory.Any(
1133+
testutil.DestinationFactory.WithTenantID(tenant.ID),
1134+
)
1135+
event := testutil.EventFactory.Any(
1136+
testutil.EventFactory.WithTenantID(tenant.ID),
1137+
testutil.EventFactory.WithDestinationID(destination.ID),
1138+
)
1139+
1140+
// Setup mocks
1141+
destGetter := &mockDestinationGetter{dest: &destination}
1142+
retryScheduler := newMockRetryScheduler()
1143+
publisher := newMockPublisher([]error{nil, nil}) // Both succeed
1144+
logPublisher := newMockLogPublisher(nil)
1145+
alertMonitor := newMockAlertMonitor()
1146+
1147+
// Setup message handler with Redis for idempotency
1148+
redis := testutil.CreateTestRedisClient(t)
1149+
handler := deliverymq.NewMessageHandler(
1150+
testutil.CreateTestLogger(t),
1151+
logPublisher,
1152+
destGetter,
1153+
publisher,
1154+
testutil.NewMockEventTracer(nil),
1155+
retryScheduler,
1156+
&backoff.ConstantBackoff{Interval: 1 * time.Second},
1157+
10,
1158+
alertMonitor,
1159+
idempotence.New(redis, idempotence.WithSuccessfulTTL(24*time.Hour)),
1160+
)
1161+
1162+
// Step 1: First manual retry succeeds
1163+
task1 := models.NewManualDeliveryTask(event, destination.ID)
1164+
mockMsg1, msg1 := newDeliveryMockMessage(task1)
1165+
err := handler.Handle(context.Background(), msg1)
1166+
require.NoError(t, err)
1167+
assert.True(t, mockMsg1.acked, "first manual retry should be acked")
1168+
assert.Equal(t, 1, publisher.current, "first manual retry should publish")
1169+
require.Len(t, logPublisher.entries, 1, "first manual retry should log delivery")
1170+
1171+
// Step 2: Second manual retry for same event+destination should also execute
1172+
task2 := models.NewManualDeliveryTask(event, destination.ID)
1173+
mockMsg2, msg2 := newDeliveryMockMessage(task2)
1174+
err = handler.Handle(context.Background(), msg2)
1175+
require.NoError(t, err)
1176+
assert.True(t, mockMsg2.acked, "second manual retry should be acked")
1177+
assert.Equal(t, 2, publisher.current, "second manual retry should also publish")
1178+
require.Len(t, logPublisher.entries, 2, "second manual retry should also log delivery")
1179+
}
1180+
11211181
func TestMessageHandler_RetryID_MultipleDestinations(t *testing.T) {
11221182
// Test scenario:
11231183
// - One event is delivered to TWO different destinations

internal/models/tasks.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package models
33
import (
44
"encoding/json"
55

6+
"github.com/hookdeck/outpost/internal/idgen"
67
"github.com/hookdeck/outpost/internal/mqs"
78
)
89

@@ -38,6 +39,7 @@ type DeliveryTask struct {
3839
DestinationID string `json:"destination_id"`
3940
Attempt int `json:"attempt"`
4041
Manual bool `json:"manual"`
42+
Nonce string `json:"nonce,omitempty"`
4143
Telemetry *DeliveryTelemetry `json:"telemetry,omitempty"`
4244
}
4345

@@ -56,12 +58,16 @@ func (t *DeliveryTask) ToMessage() (*mqs.Message, error) {
5658
}
5759

5860
// IdempotencyKey returns the key used for idempotency checks.
59-
// Uses Event.ID + DestinationID + Manual flag.
60-
// Manual retries get a different key so they can bypass idempotency of failed automatic deliveries.
61+
// Manual retries include a nonce so each /retry request gets its own idempotency key,
62+
// while MQ redeliveries of the same message (same nonce) are still deduplicated.
63+
// Nonce was added to fix a regression from #653 where removing DeliveryEvent.ID
64+
// made the manual retry idempotency key static per event+destination.
6165
func (t *DeliveryTask) IdempotencyKey() string {
6266
if t.Manual {
63-
return t.Event.ID + ":" + t.DestinationID + ":manual"
67+
return t.Event.ID + ":" + t.DestinationID + ":manual:" + t.Nonce
6468
}
69+
// Non-manual deliveries share a key per event+destination. On failure, the
70+
// idempotency key is cleared so the scheduled retry can execute with the same key.
6571
return t.Event.ID + ":" + t.DestinationID
6672
}
6773

@@ -81,9 +87,11 @@ func NewDeliveryTask(event Event, destinationID string) DeliveryTask {
8187
}
8288

8389
// NewManualDeliveryTask creates a new DeliveryTask for a manual retry.
90+
// Each manual retry gets a unique nonce so separate /retry requests are not deduplicated.
8491
func NewManualDeliveryTask(event Event, destinationID string) DeliveryTask {
8592
task := NewDeliveryTask(event, destinationID)
8693
task.Manual = true
94+
task.Nonce = idgen.String()
8795
return task
8896
}
8997

internal/rsmq/rsmq_test.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
// RSMQSuite runs RSMQ tests against different backends.
1616
type RSMQSuite struct {
1717
suite.Suite
18+
cfg *redis.RedisConfig
1819
client RedisClient
1920
rsmq *RedisSMQ
2021
}
@@ -30,10 +31,21 @@ func TestRedisRSMQSuite(t *testing.T) {
3031

3132
func TestDragonflyRSMQSuite(t *testing.T) { suite.Run(t, new(DragonflyRSMQSuite)) }
3233

33-
func (s *RedisRSMQSuite) SetupTest() {
34+
func (s *RedisRSMQSuite) SetupSuite() {
3435
testinfra.Start(s.T())
35-
cfg := testinfra.NewRedisConfig(s.T())
36-
client, err := redis.New(context.Background(), cfg)
36+
s.cfg = testinfra.NewRedisConfig(s.T())
37+
}
38+
39+
func (s *RedisRSMQSuite) SetupTest() {
40+
// Flush the container's DB 0 before each test method for a clean state.
41+
flushClient, err := redis.New(context.Background(), s.cfg)
42+
if err != nil {
43+
s.T().Fatalf("failed to create redis client for flush: %v", err)
44+
}
45+
flushClient.FlushDB(context.Background())
46+
flushClient.Close()
47+
48+
client, err := redis.New(context.Background(), s.cfg)
3749
if err != nil {
3850
s.T().Fatalf("failed to create redis client: %v", err)
3951
}
@@ -42,10 +54,21 @@ func (s *RedisRSMQSuite) SetupTest() {
4254
s.rsmq = NewRedisSMQ(s.client, "test")
4355
}
4456

45-
func (s *DragonflyRSMQSuite) SetupTest() {
57+
func (s *DragonflyRSMQSuite) SetupSuite() {
4658
testinfra.Start(s.T())
47-
cfg := testinfra.NewDragonflyConfig(s.T())
48-
client, err := redis.New(context.Background(), cfg)
59+
s.cfg = testinfra.NewDragonflyConfig(s.T())
60+
}
61+
62+
func (s *DragonflyRSMQSuite) SetupTest() {
63+
// Flush the container's DB 0 before each test method for a clean state.
64+
flushClient, err := redis.New(context.Background(), s.cfg)
65+
if err != nil {
66+
s.T().Fatalf("failed to create redis client for flush: %v", err)
67+
}
68+
flushClient.FlushDB(context.Background())
69+
flushClient.Close()
70+
71+
client, err := redis.New(context.Background(), s.cfg)
4972
if err != nil {
5073
s.T().Fatalf("failed to create redis client: %v", err)
5174
}

0 commit comments

Comments
 (0)