Skip to content

Commit 34e89b4

Browse files
sebstoSebastien Stormacq
andauthored
Fix Test hangs in Lambda+LocalServer (#630) (#631)
# Fix test hangs caused by Pool cancellation race conditions ## Summary This PR fixes two related race conditions in `Lambda+LocalServer+Pool.swift` that were causing the test suite to hang approximately 10% of the time. ## Problem The test suite exhibited intermittent hangs (~10% frequency) due to two bugs in the Pool implementation: 1. **Individual task cancellation bug**: When one task waiting for a specific `requestId` was cancelled, the cancellation handler would incorrectly cancel ALL waiting tasks instead of just the cancelled one. 2. **Server shutdown hang**: When the server shut down, waiting continuations in the pools were never cancelled, causing handlers to wait indefinitely for responses that would never arrive. ## Root Causes ### Root Cause #1: Cancellation Handler Removes ALL Continuations The `onCancel` handler in `Pool._next()` was removing all continuations from the `waitingForSpecific` dictionary when any single task was cancelled: ```swift onCancel: { // BUG: Removes ALL continuations, not just the cancelled task's for continuation in state.waitingForSpecific.values { toCancel.append(continuation) } state.waitingForSpecific.removeAll() } ``` This caused unrelated concurrent invocations to fail with `CancellationError` when one client cancelled their request. ### Root Cause #2: No Pool Cleanup During Server Shutdown When the server shut down (e.g., test completes), the task group was cancelled but the pools' waiting continuations were never notified. The `/invoke` endpoint handlers would continue waiting for responses that would never arrive because the Lambda function had stopped. ## Solution ### Fix #1: Only Remove Specific Continuation on Cancellation Modified the cancellation handler to only remove the continuation for the specific cancelled task: ```swift onCancel: { // Only remove THIS task's continuation let continuationToCancel = self.lock.withLock { state -> CheckedContinuation<T, any Error>? in if let requestId = requestId { return state.waitingForSpecific.removeValue(forKey: requestId) } else { let cont = state.waitingForAny state.waitingForAny = nil return cont } } continuationToCancel?.resume(throwing: CancellationError()) } ``` ### Fix #2: Add Pool Cleanup During Server Shutdown Added `cancelAll()` method to the Pool class and call it during server shutdown: ```swift func cancelAll() { let continuationsToCancel = self.lock.withLock { state -> [CheckedContinuation<T, any Error>] in var toCancel: [CheckedContinuation<T, any Error>] = [] if let continuation = state.waitingForAny { toCancel.append(continuation) state.waitingForAny = nil } for continuation in state.waitingForSpecific.values { toCancel.append(continuation) } state.waitingForSpecific.removeAll() return toCancel } for continuation in continuationsToCancel { continuation.resume(throwing: CancellationError()) } } ``` Called during server shutdown: ```swift let serverOrHandlerResult1 = await group.next()! group.cancelAll() // Cancel all waiting continuations in the pools to prevent hangs server.invocationPool.cancelAll() server.responsePool.cancelAll() ``` ## Changes ### Modified Files - **Sources/AWSLambdaRuntime/HTTPServer/Lambda+LocalServer+Pool.swift** - Fixed cancellation handler in `_next()` to only remove specific continuation - Added `cancelAll()` method for server shutdown cleanup - **Sources/AWSLambdaRuntime/HTTPServer/Lambda+LocalServer.swift** - Call `cancelAll()` on both pools during server shutdown ### New Files - **Tests/AWSLambdaRuntimeTests/LocalServerPoolCancellationTests.swift** - Added comprehensive test suite with 3 tests - `testCancellationOnlyAffectsOwnTask`: Verifies only the cancelled task receives CancellationError - `testConcurrentInvocationsWithCancellation`: Tests real-world scenario with 5 concurrent invocations - `testFIFOModeCancellation`: Ensures FIFO mode cancellation works correctly ## Testing ### Before Fix - Test suite hung ~10% of the time - When 1 task was cancelled, all 5 concurrent tasks received `CancellationError` - Streaming tests would occasionally hang during shutdown ### After Fix - All 91 tests pass consistently without hangs - When 1 task is cancelled, only that specific task receives `CancellationError` - Other tasks continue waiting normally - Server shutdown properly cleans up all waiting continuations - Multiple consecutive test runs confirm stability ### Test Coverage The new test suite reproduces both bugs and verifies the fixes: 1. **testCancellationOnlyAffectsOwnTask**: Creates 3 tasks waiting for different requestIds, cancels only one, and verifies the others are not affected 2. **testConcurrentInvocationsWithCancellation**: Simulates 5 concurrent invocations with one cancellation 3. **testFIFOModeCancellation**: Tests FIFO mode to ensure it still works correctly --------- Co-authored-by: Sebastien Stormacq <stormacq@amazon.lu>
1 parent feb6d2c commit 34e89b4

File tree

3 files changed

+294
-17
lines changed

3 files changed

+294
-17
lines changed

Sources/AWSLambdaRuntime/HTTPServer/Lambda+LocalServer+Pool.swift

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -131,27 +131,20 @@ extension LambdaHTTPServer {
131131
}
132132
}
133133
} onCancel: {
134-
// Ensure we properly handle cancellation by removing stored continuation
135-
let continuationsToCancel = self.lock.withLock { state -> [CheckedContinuation<T, any Error>] in
136-
var toCancel: [CheckedContinuation<T, any Error>] = []
137-
138-
if let continuation = state.waitingForAny {
139-
toCancel.append(continuation)
134+
// Only remove THIS task's continuation
135+
let continuationToCancel = self.lock.withLock { state -> CheckedContinuation<T, any Error>? in
136+
if let requestId = requestId {
137+
// Remove only the continuation for this specific requestId
138+
return state.waitingForSpecific.removeValue(forKey: requestId)
139+
} else {
140+
// Remove only the FIFO continuation
141+
let cont = state.waitingForAny
140142
state.waitingForAny = nil
143+
return cont
141144
}
142-
143-
for continuation in state.waitingForSpecific.values {
144-
toCancel.append(continuation)
145-
}
146-
state.waitingForSpecific.removeAll()
147-
148-
return toCancel
149145
}
150146

151-
// Resume all continuations outside the lock to avoid potential deadlocks
152-
for continuation in continuationsToCancel {
153-
continuation.resume(throwing: CancellationError())
154-
}
147+
continuationToCancel?.resume(throwing: CancellationError())
155148
}
156149
}
157150

@@ -169,6 +162,30 @@ extension LambdaHTTPServer {
169162
self
170163
}
171164

165+
/// Cancel all waiting continuations - used during server shutdown
166+
func cancelAll() {
167+
let continuationsToCancel = self.lock.withLock { state -> [CheckedContinuation<T, any Error>] in
168+
var toCancel: [CheckedContinuation<T, any Error>] = []
169+
170+
if let continuation = state.waitingForAny {
171+
toCancel.append(continuation)
172+
state.waitingForAny = nil
173+
}
174+
175+
for continuation in state.waitingForSpecific.values {
176+
toCancel.append(continuation)
177+
}
178+
state.waitingForSpecific.removeAll()
179+
180+
return toCancel
181+
}
182+
183+
// Resume all continuations outside the lock
184+
for continuation in continuationsToCancel {
185+
continuation.resume(throwing: CancellationError())
186+
}
187+
}
188+
172189
struct PoolError: Error {
173190
let cause: Cause
174191
var message: String {

Sources/AWSLambdaRuntime/HTTPServer/Lambda+LocalServer.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,10 @@ internal struct LambdaHTTPServer {
223223
let serverOrHandlerResult1 = await group.next()!
224224
group.cancelAll()
225225

226+
// Cancel all waiting continuations in the pools to prevent hangs
227+
server.invocationPool.cancelAll()
228+
server.responsePool.cancelAll()
229+
226230
switch serverOrHandlerResult1 {
227231
case .closureResult(let result):
228232
return result
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright SwiftAWSLambdaRuntime project authors
6+
// Copyright (c) Amazon.com, Inc. or its affiliates.
7+
// Licensed under Apache License v2.0
8+
//
9+
// See LICENSE.txt for license information
10+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
11+
//
12+
// SPDX-License-Identifier: Apache-2.0
13+
//
14+
//===----------------------------------------------------------------------===//
15+
16+
#if LocalServerSupport
17+
import Testing
18+
import NIOCore
19+
import Synchronization
20+
21+
@testable import AWSLambdaRuntime
22+
23+
@Suite("LocalServer Pool Cancellation Tests")
24+
struct LocalServerPoolCancellationTests {
25+
26+
/// Test that reproduces Issue #2: Cancellation handler removes ALL continuations
27+
///
28+
/// This test demonstrates the bug where cancelling one task waiting in `waitingForSpecific`
29+
/// causes ALL other waiting tasks to also receive CancellationError, even though they
30+
/// weren't cancelled.
31+
///
32+
/// Expected behavior: Only the cancelled task should receive CancellationError
33+
/// Actual behavior: ALL waiting tasks receive CancellationError
34+
@Test("Cancelling one task should not affect other waiting tasks")
35+
@available(LambdaSwift 2.0, *)
36+
func testCancellationOnlyAffectsOwnTask() async throws {
37+
#if compiler(>=6.0)
38+
let pool = LambdaHTTPServer.Pool<TestItem>(name: "Test Pool")
39+
40+
let cancelledFlags = Mutex<[Bool]>([false, false, false])
41+
42+
// Create 3 tasks waiting for different requestIds
43+
let task1 = Task { @Sendable in
44+
do {
45+
_ = try await pool.next(for: "request-1")
46+
} catch is CancellationError {
47+
cancelledFlags.withLock { $0[0] = true }
48+
}
49+
}
50+
51+
let task2 = Task { @Sendable in
52+
do {
53+
_ = try await pool.next(for: "request-2")
54+
} catch is CancellationError {
55+
cancelledFlags.withLock { $0[1] = true }
56+
}
57+
}
58+
59+
let task3 = Task { @Sendable in
60+
do {
61+
_ = try await pool.next(for: "request-3")
62+
} catch is CancellationError {
63+
cancelledFlags.withLock { $0[2] = true }
64+
}
65+
}
66+
67+
// Let tasks register their continuations
68+
try await Task.sleep(for: .milliseconds(100))
69+
70+
// Cancel only task 2
71+
task2.cancel()
72+
73+
// Give cancellation time to propagate
74+
try await Task.sleep(for: .milliseconds(100))
75+
76+
// Check cancellation status
77+
let flags = cancelledFlags.withLock { $0 }
78+
79+
#expect(flags[1] == true, "Task 2 should be cancelled")
80+
81+
// With the bug, task1 and task3 will also be cancelled
82+
if flags[0] || flags[2] {
83+
Issue.record("BUG REPRODUCED: Other tasks were cancelled when only task 2 should have been cancelled")
84+
}
85+
86+
#expect(flags[0] == false, "Task 1 should NOT be cancelled")
87+
#expect(flags[2] == false, "Task 3 should NOT be cancelled")
88+
89+
// Clean up - cancel all tasks
90+
task1.cancel()
91+
task2.cancel()
92+
task3.cancel()
93+
94+
_ = await task1.result
95+
_ = await task2.result
96+
_ = await task3.result
97+
98+
#else
99+
throw XCTSkip("This test requires Swift 6.0 or later")
100+
#endif
101+
}
102+
103+
/// Test concurrent invocations with one being cancelled
104+
///
105+
/// This simulates the real-world scenario where multiple clients invoke the Lambda
106+
/// function simultaneously, and one client's connection drops.
107+
@Test("Multiple concurrent invocations with one cancellation")
108+
@available(LambdaSwift 2.0, *)
109+
func testConcurrentInvocationsWithCancellation() async throws {
110+
#if compiler(>=6.0)
111+
112+
try await withThrowingTaskGroup(of: Void.self) { group in
113+
// Timeout task
114+
group.addTask {
115+
try await Task.sleep(for: .seconds(10))
116+
throw TestError.timeout
117+
}
118+
119+
// Main test task
120+
group.addTask {
121+
let pool = LambdaHTTPServer.Pool<TestItem>(name: "Concurrent Test Pool")
122+
123+
let cancelledCount = Mutex<Int>(0)
124+
125+
// Spawn 5 concurrent tasks waiting for different requestIds
126+
var tasks: [Task<Void, any Error>] = []
127+
for i in 1...5 {
128+
let task = Task { @Sendable in
129+
do {
130+
_ = try await pool.next(for: "request-\(i)")
131+
} catch is CancellationError {
132+
cancelledCount.withLock { $0 += 1 }
133+
}
134+
}
135+
tasks.append(task)
136+
}
137+
138+
// Let all tasks register their continuations
139+
try await Task.sleep(for: .milliseconds(200))
140+
141+
// Cancel task 3 (index 2)
142+
tasks[2].cancel()
143+
144+
// Give cancellation time to propagate
145+
try await Task.sleep(for: .milliseconds(200))
146+
147+
// Check how many tasks were cancelled
148+
let count = cancelledCount.withLock { $0 }
149+
150+
// Expected: 1 cancelled
151+
// Actual (with bug): 5 cancelled
152+
if count > 1 {
153+
Issue.record("BUG REPRODUCED: \(count) tasks were cancelled, but only 1 should have been cancelled")
154+
}
155+
156+
#expect(count == 1, "Only 1 task should be cancelled, but \(count) were cancelled")
157+
158+
// Clean up - cancel all remaining tasks
159+
for task in tasks {
160+
task.cancel()
161+
}
162+
163+
for task in tasks {
164+
_ = await task.result
165+
}
166+
}
167+
168+
// Wait for first task to complete (should be main test, not timeout)
169+
try await group.next()
170+
group.cancelAll()
171+
}
172+
173+
#else
174+
throw XCTSkip("This test requires Swift 6.0 or later")
175+
#endif
176+
}
177+
178+
/// Test that FIFO mode doesn't have the same issue
179+
///
180+
/// FIFO mode only allows one waiter at a time, so this bug shouldn't affect it.
181+
@Test("FIFO mode cancellation works correctly")
182+
@available(LambdaSwift 2.0, *)
183+
func testFIFOModeCancellation() async throws {
184+
#if compiler(>=6.0)
185+
let pool = LambdaHTTPServer.Pool<TestItem>(name: "FIFO Test Pool")
186+
187+
try await withThrowingTaskGroup(of: Void.self) { group in
188+
189+
// Timeout
190+
group.addTask {
191+
try await Task.sleep(for: .seconds(10))
192+
throw TestError.timeout
193+
}
194+
195+
// Main test
196+
group.addTask {
197+
let task = Task { @Sendable in
198+
do {
199+
guard let item = try await pool.next() else {
200+
return "error: nil item"
201+
}
202+
return "success: \(item.id)"
203+
} catch is CancellationError {
204+
return "cancelled"
205+
} catch {
206+
return "error: \(error)"
207+
}
208+
}
209+
210+
// Let task register continuation
211+
try await Task.sleep(for: .milliseconds(100))
212+
213+
// Cancel the task
214+
task.cancel()
215+
216+
// Wait for result
217+
let result = await task.value
218+
219+
#expect(result == "cancelled", "Task should be cancelled")
220+
}
221+
222+
try await group.next()
223+
group.cancelAll()
224+
}
225+
#else
226+
throw XCTSkip("This test requires Swift 6.0 or later")
227+
#endif
228+
}
229+
}
230+
231+
// MARK: - Test Helpers
232+
233+
extension LocalServerPoolCancellationTests {
234+
235+
struct TestItem: Sendable {
236+
let id: String
237+
let data: String
238+
}
239+
240+
enum TestResult: Sendable {
241+
case success(String)
242+
case cancelled(String)
243+
case error(String, any Error)
244+
}
245+
246+
enum TestError: Error {
247+
case timeout
248+
}
249+
}
250+
251+
// Make TestItem conform to LocalServerResponse protocol if needed
252+
extension LocalServerPoolCancellationTests.TestItem {
253+
var requestId: String? { id }
254+
}
255+
256+
#endif

0 commit comments

Comments
 (0)