Skip to content

Commit a968f97

Browse files
committed
Updated with retries
1 parent cf84b5a commit a968f97

File tree

1 file changed

+27
-42
lines changed

1 file changed

+27
-42
lines changed

channel_batch_updater_test.go

Lines changed: 27 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,11 @@ func TestChannelBatchUpdater_AddMembers(t *testing.T) {
6262
require.NoError(t, err)
6363
require.NotEmpty(t, resp.TaskID)
6464

65-
// Give the task a moment to be created and allow for rate limit retries
66-
// Rate limit errors typically retry after ~500ms, so wait a bit longer
6765
time.Sleep(2 * time.Second)
6866

69-
// Poll for task completion (up to 2 minutes)
7067
for i := 0; i < 120; i++ {
7168
task, err := c.GetTask(ctx, resp.TaskID)
7269
if err != nil {
73-
// If task doesn't exist yet, wait and retry
7470
if i < 10 {
7571
time.Sleep(time.Second)
7672
continue
@@ -79,20 +75,15 @@ func TestChannelBatchUpdater_AddMembers(t *testing.T) {
7975
}
8076
require.Equal(t, resp.TaskID, task.TaskID)
8177

82-
// If task is waiting/pending/running, it might be rate-limited and will retry
83-
// Rate limit errors cause Asynq to retry automatically after the suggested delay
8478
if task.Status == TaskStatusWaiting || task.Status == TaskStatusPending || task.Status == TaskStatusRunning {
8579
time.Sleep(time.Second)
8680
continue
8781
}
8882

8983
if task.Status == TaskStatusCompleted {
90-
// Wait up to 2 minutes for background workers to finish processing
91-
// Check periodically if changes are visible
9284
for j := 0; j < 120; j++ {
9385
time.Sleep(time.Second)
9486

95-
// Try to verify members were added by refreshing channel state
9687
err = ch1.refresh(ctx)
9788
if err != nil {
9889
continue
@@ -102,7 +93,6 @@ func TestChannelBatchUpdater_AddMembers(t *testing.T) {
10293
continue
10394
}
10495

105-
// Check that added users are now members
10696
ch1MemberIDs := make([]string, len(ch1.Members))
10797
for i, m := range ch1.Members {
10898
ch1MemberIDs[i] = m.UserID
@@ -121,18 +111,15 @@ func TestChannelBatchUpdater_AddMembers(t *testing.T) {
121111
t.Fatal("changes not visible after 2 minutes")
122112
}
123113
if task.Status == TaskStatusFailed {
124-
// Check if this is a rate limit error - Asynq will automatically retry these
125114
if len(task.Result) > 0 {
126115
if desc, ok := task.Result["description"].(string); ok {
127116
if strings.Contains(strings.ToLower(desc), "rate limit") {
128-
time.Sleep(2 * time.Second) // Wait for retry
117+
time.Sleep(2 * time.Second)
129118
continue
130119
}
131120
}
132121
t.Fatalf("task failed with result: %v", task.Result)
133122
}
134-
// If result is empty, the task might have failed during creation
135-
// Check the response for error details
136123
t.Fatalf("task failed (status: %s, result: %v)", task.Status, task.Result)
137124
}
138125

@@ -145,12 +132,10 @@ func TestChannelBatchUpdater_RemoveMembers(t *testing.T) {
145132
c := initClient(t)
146133
ctx := context.Background()
147134

148-
// Create channels with members
149135
membersID := randomUsersID(t, c, 2)
150136
ch1 := initChannel(t, c, membersID...)
151137
ch2 := initChannel(t, c, membersID...)
152138

153-
// Verify channels have 2 members before executing the task
154139
err := ch1.refresh(ctx)
155140
require.NoError(t, err, "failed to refresh channel 1")
156141
require.Len(t, ch1.Members, 2, "channel 1 should have 2 members before removal")
@@ -159,7 +144,6 @@ func TestChannelBatchUpdater_RemoveMembers(t *testing.T) {
159144
require.NoError(t, err, "failed to refresh channel 2")
160145
require.Len(t, ch2.Members, 2, "channel 2 should have 2 members before removal")
161146

162-
// Verify both channels have the same members
163147
ch1MemberIDs := make([]string, len(ch1.Members))
164148
for i, m := range ch1.Members {
165149
ch1MemberIDs[i] = m.UserID
@@ -173,7 +157,6 @@ func TestChannelBatchUpdater_RemoveMembers(t *testing.T) {
173157

174158
updater := c.ChannelBatchUpdater()
175159

176-
// Remove one member from both channels
177160
memberToRemove := membersID[0]
178161
resp, err := updater.RemoveMembers(ctx, ChannelsBatchFilters{
179162
CIDs: map[string]interface{}{
@@ -183,30 +166,29 @@ func TestChannelBatchUpdater_RemoveMembers(t *testing.T) {
183166
require.NoError(t, err)
184167
require.NotEmpty(t, resp.TaskID)
185168

186-
// Give the task a moment to be created
187-
time.Sleep(500 * time.Millisecond)
169+
time.Sleep(2 * time.Second)
188170

189-
// Poll for task completion (up to 2 minutes)
190171
for i := 0; i < 120; i++ {
191172
task, err := c.GetTask(ctx, resp.TaskID)
192173
if err != nil {
193-
// If task doesn't exist yet, wait and retry
194-
if i < 5 {
174+
if i < 10 {
195175
time.Sleep(time.Second)
196176
continue
197177
}
198178
require.NoError(t, err, "failed to get task status")
199179
}
200180
require.Equal(t, resp.TaskID, task.TaskID)
181+
182+
if task.Status == TaskStatusWaiting || task.Status == TaskStatusPending || task.Status == TaskStatusRunning {
183+
time.Sleep(time.Second)
184+
continue
185+
}
201186

202187
if task.Status == TaskStatusCompleted {
203-
// Wait up to 2 minutes for background workers to finish processing
204-
// Check periodically if changes are visible
205188
var ch1MemberIDs []string
206189
for j := 0; j < 120; j++ {
207190
time.Sleep(time.Second)
208191

209-
// Try to verify member was removed
210192
err = ch1.refresh(ctx)
211193
if err != nil {
212194
continue
@@ -223,12 +205,15 @@ func TestChannelBatchUpdater_RemoveMembers(t *testing.T) {
223205
t.Fatalf("changes not visible after 2 minutes. Channel 1 still has members: %v", ch1MemberIDs)
224206
}
225207
if task.Status == TaskStatusFailed {
226-
// Check if there's error information in the result
227208
if len(task.Result) > 0 {
209+
if desc, ok := task.Result["description"].(string); ok {
210+
if strings.Contains(strings.ToLower(desc), "rate limit") {
211+
time.Sleep(2 * time.Second)
212+
continue
213+
}
214+
}
228215
t.Fatalf("task failed with result: %v", task.Result)
229216
}
230-
// If result is empty, the task might have failed during creation
231-
// Check the response for error details
232217
t.Fatalf("task failed (status: %s, result: %v)", task.Status, task.Result)
233218
}
234219

@@ -247,7 +232,6 @@ func TestChannelBatchUpdater_Archive(t *testing.T) {
247232

248233
updater := c.ChannelBatchUpdater()
249234

250-
// Archive channels for the first member
251235
resp, err := updater.Archive(ctx, ChannelsBatchFilters{
252236
CIDs: map[string]interface{}{
253237
"$in": []string{ch1.CID, ch2.CID},
@@ -256,35 +240,33 @@ func TestChannelBatchUpdater_Archive(t *testing.T) {
256240
require.NoError(t, err)
257241
require.NotEmpty(t, resp.TaskID)
258242

259-
// Give the task a moment to be created
260-
time.Sleep(500 * time.Millisecond)
243+
time.Sleep(2 * time.Second)
261244

262-
// Poll for task completion (up to 2 minutes)
263245
for i := 0; i < 120; i++ {
264246
task, err := c.GetTask(ctx, resp.TaskID)
265247
if err != nil {
266-
// If task doesn't exist yet, wait and retry
267-
if i < 5 {
248+
if i < 10 {
268249
time.Sleep(time.Second)
269250
continue
270251
}
271252
require.NoError(t, err, "failed to get task status")
272253
}
273254
require.Equal(t, resp.TaskID, task.TaskID)
255+
256+
if task.Status == TaskStatusWaiting || task.Status == TaskStatusPending || task.Status == TaskStatusRunning {
257+
time.Sleep(time.Second)
258+
continue
259+
}
274260

275261
if task.Status == TaskStatusCompleted {
276-
// Wait up to 2 minutes for background workers to finish processing
277-
// Check periodically if changes are visible
278262
for j := 0; j < 120; j++ {
279263
time.Sleep(time.Second)
280264

281-
// Try to verify channel was archived for the user
282265
err = ch1.refresh(ctx)
283266
if err != nil {
284267
continue
285268
}
286269

287-
// Find the member and check if archived
288270
for _, m := range ch1.Members {
289271
if m.UserID == membersID[0] {
290272
if m.ArchivedAt != nil {
@@ -297,12 +279,15 @@ func TestChannelBatchUpdater_Archive(t *testing.T) {
297279
t.Fatal("changes not visible after 2 minutes")
298280
}
299281
if task.Status == TaskStatusFailed {
300-
// Check if there's error information in the result
301282
if len(task.Result) > 0 {
283+
if desc, ok := task.Result["description"].(string); ok {
284+
if strings.Contains(strings.ToLower(desc), "rate limit") {
285+
time.Sleep(2 * time.Second)
286+
continue
287+
}
288+
}
302289
t.Fatalf("task failed with result: %v", task.Result)
303290
}
304-
// If result is empty, the task might have failed during creation
305-
// Check the response for error details
306291
t.Fatalf("task failed (status: %s, result: %v)", task.Status, task.Result)
307292
}
308293

0 commit comments

Comments
 (0)