-
Notifications
You must be signed in to change notification settings - Fork 117
Propagate Connection Closed Information up to top-level (fix #465) #545
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 24 commits
2020d6b
6e01c6e
166cd46
a69ed54
04d9fc7
092da82
5efb706
1d98a7c
4b23d4f
5822e0a
025a0e5
ce8b567
be4cb20
b37ea0e
cd00948
008c542
9dcb4b3
f2d94a2
852391e
b13bf5c
1aa07b1
9c283c3
a620a2f
f671228
b0234f5
482e09e
156e6c7
a0b1d57
54459e0
89cd259
6b07955
86ccbf4
cbf9c5e
9a70915
c54cb2f
ccb3d14
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -97,6 +97,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { | |
private let configuration: Configuration | ||
|
||
private var connectionState: ConnectionState = .disconnected | ||
|
||
private var lambdaState: LambdaState = .idle(previousRequestID: nil) | ||
private var closingState: ClosingState = .notClosing | ||
|
||
|
@@ -118,10 +119,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { | |
} catch { | ||
result = .failure(error) | ||
} | ||
|
||
await runtime.close() | ||
|
||
//try? await runtime.close() | ||
return try result.get() | ||
} | ||
|
||
|
@@ -163,12 +161,14 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { | |
|
||
@usableFromInline | ||
func nextInvocation() async throws -> (Invocation, Writer) { | ||
|
||
try await withTaskCancellationHandler { | ||
switch self.lambdaState { | ||
case .idle: | ||
self.lambdaState = .waitingForNextInvocation | ||
let handler = try await self.makeOrGetConnection() | ||
let invocation = try await handler.nextInvocation() | ||
|
||
guard case .waitingForNextInvocation = self.lambdaState else { | ||
fatalError("Invalid state: \(self.lambdaState)") | ||
} | ||
|
@@ -283,7 +283,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { | |
case (.connecting(let array), .notClosing): | ||
self.connectionState = .disconnected | ||
for continuation in array { | ||
continuation.resume(throwing: LambdaRuntimeError(code: .lostConnectionToControlPlane)) | ||
continuation.resume(throwing: LambdaRuntimeError(code: .connectionToControlPlaneLost)) | ||
} | ||
|
||
case (.connecting(let array), .closing(let continuation)): | ||
|
@@ -363,7 +363,19 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { | |
) | ||
channel.closeFuture.whenComplete { result in | ||
self.assumeIsolated { runtimeClient in | ||
|
||
// resume any pending continuation on the handler | ||
if case .connected(_, let handler) = runtimeClient.connectionState { | ||
if case .connected(_, let lambdaState) = handler.state { | ||
|
||
if case .waitingForNextInvocation(let continuation) = lambdaState { | ||
continuation.resume(throwing: LambdaRuntimeError(code: .connectionToControlPlaneLost)) | ||
} | ||
} | ||
} | ||
|
||
// close the channel | ||
runtimeClient.channelClosed(channel) | ||
runtimeClient.connectionState = .disconnected | ||
} | ||
} | ||
|
||
|
@@ -382,6 +394,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { | |
return handler | ||
} | ||
} catch { | ||
|
||
switch self.connectionState { | ||
case .disconnected, .connected: | ||
fatalError("Unexpected state: \(self.connectionState)") | ||
|
@@ -430,7 +443,6 @@ extension LambdaRuntimeClient: LambdaChannelHandlerDelegate { | |
} | ||
|
||
isolated.connectionState = .disconnected | ||
|
||
} | ||
} | ||
} | ||
|
@@ -463,7 +475,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate> | |
} | ||
} | ||
|
||
private var state: State = .disconnected | ||
private(set) var state: State = .disconnected | ||
private var lastError: Error? | ||
private var reusableErrorBuffer: ByteBuffer? | ||
private let logger: Logger | ||
|
@@ -885,6 +897,7 @@ extension LambdaChannelHandler: ChannelInboundHandler { | |
// fail any pending responses with last error or assume peer disconnected | ||
switch self.state { | ||
case .connected(_, .waitingForNextInvocation(let continuation)): | ||
self.state = .disconnected | ||
continuation.resume(throwing: self.lastError ?? ChannelError.ioOnClosedChannel) | ||
default: | ||
break | ||
|
Uh oh!
There was an error while loading. Please reload this page.