diff --git a/Benchmarks/Benchmarks/ConnectionPoolBenchmarks/ConnectionPoolBenchmarks.swift b/Benchmarks/Benchmarks/ConnectionPoolBenchmarks/ConnectionPoolBenchmarks.swift index 9cc535d4..c988c4b3 100644 --- a/Benchmarks/Benchmarks/ConnectionPoolBenchmarks/ConnectionPoolBenchmarks.swift +++ b/Benchmarks/Benchmarks/ConnectionPoolBenchmarks/ConnectionPoolBenchmarks.swift @@ -1,23 +1,27 @@ import _ConnectionPoolModule import _ConnectionPoolTestUtils import Benchmark +import NIOCore +import NIOPosix let benchmarks: @Sendable () -> Void = { - Benchmark("Lease/Release 1k requests: 50 parallel", configuration: .init(scalingFactor: .kilo)) { benchmark in + Benchmark("Pool: Lease/Release 1k requests: 50 parallel", configuration: .init(scalingFactor: .kilo)) { benchmark in let clock = MockClock() - let factory = MockConnectionFactory(autoMaxStreams: 1) + let factory = MockConnectionFactory(autoMaxStreams: 1) var configuration = ConnectionPoolConfiguration() configuration.maximumConnectionSoftLimit = 50 configuration.maximumConnectionHardLimit = 50 let pool = ConnectionPool( configuration: configuration, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self), - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: MockExecutor(), + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } await withTaskGroup { taskGroup in @@ -54,21 +58,23 @@ let benchmarks: @Sendable () -> Void = { } } - Benchmark("Lease/Release 1k requests: sequential", configuration: .init(scalingFactor: .kilo)) { benchmark in + Benchmark("Pool: Lease/Release 1k requests: sequential", configuration: .init(scalingFactor: .kilo)) { benchmark in let clock = MockClock() - let factory = MockConnectionFactory(autoMaxStreams: 1) + let factory = MockConnectionFactory(autoMaxStreams: 1) var configuration = ConnectionPoolConfiguration() configuration.maximumConnectionSoftLimit = 50 configuration.maximumConnectionHardLimit = 50 let pool = ConnectionPool( configuration: configuration, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self), - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: MockExecutor(), + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } await withTaskGroup { taskGroup in @@ -96,4 +102,123 @@ let benchmarks: @Sendable () -> Void = { taskGroup.cancelAll() } } + + Benchmark("PoolManager/TaskExecutor: Lease/Release 1k requests: 50 parallel – 10 MockExecutors", configuration: .init(scalingFactor: .kilo)) { benchmark in + let clock = MockClock() + let factory = MockConnectionFactory(autoMaxStreams: 1) + var configuration = ConnectionPoolManagerConfiguration() + let executorCount = 10 + let executors = (0...self, + keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self), + executors: executors, + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + clock: clock + ) { + try await factory.makeConnection(id: $0, configuration: $1, for: $2) + } + + await withTaskGroup { taskGroup in + taskGroup.addTask { + await pool.run() + } + + let sequential = benchmark.scaledIterations.upperBound / concurrency + + benchmark.startMeasurement() + + for parallel in 0..(autoMaxStreams: 1) + var configuration = ConnectionPoolManagerConfiguration() + try await NIOTaskExecutor.withExecutors(eventLoops) { executors in + let concurrency = 50 + + configuration.maximumConnectionPerExecutorSoftLimit = concurrency / executors.count + configuration.maximumConnectionPerExecutorHardLimit = concurrency / executors.count + + let pool = ConnectionPoolManager( + configuration: configuration, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), + idGenerator: ConnectionIDGenerator(), + requestType: ConnectionRequest.self, + keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self), + executors: executors, + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + clock: clock + ) { + try await factory.makeConnection(id: $0, configuration: $1, for: $2) + } + + await withTaskGroup { taskGroup in + taskGroup.addTask { + await pool.run() + } + + let sequential = benchmark.scaledIterations.upperBound / executors.count + + benchmark.startMeasurement() + + for executor in executors { + taskGroup.addTask(executorPreference: executor) { + for _ in 0..() + + let eventLoop: any EventLoop + + private init(eventLoop: any EventLoop) { + self.eventLoop = eventLoop + } + + static func withExecutors(_ eventLoops: MultiThreadedEventLoopGroup, _ body: ([NIOTaskExecutor]) async throws -> ()) async throws { + var executors = [NIOTaskExecutor]() + for eventLoop in eventLoops.makeIterator() { + let executor = NIOTaskExecutor(eventLoop: eventLoop) + try await eventLoop.submit { + NIOTaskExecutor.threadSpecificEventLoop.currentValue = executor + }.get() + executors.append(executor) + } + do { + try await body(executors) + } catch { + + } + for eventLoop in eventLoops.makeIterator() { + try await eventLoop.submit { + NIOTaskExecutor.threadSpecificEventLoop.currentValue = nil + }.get() + } + } +} + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *) +extension NIOTaskExecutor: TaskExecutor { + + func enqueue(_ job: consuming ExecutorJob) { + // By default we are just going to use execute to run the job + // this is quite heavy since it allocates the closure for + // every single job. + let unownedJob = UnownedJob(job) + self.eventLoop.execute { + unownedJob.runSynchronously(on: self.asUnownedTaskExecutor()) + } + } + + func asUnownedTaskExecutor() -> UnownedTaskExecutor { + UnownedTaskExecutor(ordinary: self) + } +} + +@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *) +extension NIOTaskExecutor: ConnectionPoolExecutor { + typealias ID = ObjectIdentifier + + var id: ObjectIdentifier { + ObjectIdentifier(self) + } + + static func getExecutorID() -> ObjectIdentifier? { + self.threadSpecificEventLoop.currentValue?.id + } +} diff --git a/Benchmarks/Package.swift b/Benchmarks/Package.swift index 11407176..68166b3c 100644 --- a/Benchmarks/Package.swift +++ b/Benchmarks/Package.swift @@ -10,6 +10,8 @@ let package = Package( dependencies: [ .package(path: "../"), .package(url: "https://github.com/ordo-one/package-benchmark.git", from: "1.29.0"), + .package(url: "https://github.com/vapor/postgres-kit.git", from: "2.14.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.82.0"), ], targets: [ .executableTarget( @@ -18,11 +20,22 @@ let package = Package( .product(name: "_ConnectionPoolModule", package: "postgres-nio"), .product(name: "_ConnectionPoolTestUtils", package: "postgres-nio"), .product(name: "Benchmark", package: "package-benchmark"), + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "NIOPosix", package: "swift-nio"), ], path: "Benchmarks/ConnectionPoolBenchmarks", plugins: [ .plugin(name: "BenchmarkPlugin", package: "package-benchmark") ] ), + .executableTarget( + name: "PostgresPerf", + dependencies: [ + .product(name: "PostgresNIO", package: "postgres-nio"), + .product(name: "PostgresKit", package: "postgres-kit"), + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "NIOPosix", package: "swift-nio"), + ], + ) ] ) diff --git a/Benchmarks/Sources/PostgresPerf/PostgresPerf.swift b/Benchmarks/Sources/PostgresPerf/PostgresPerf.swift new file mode 100644 index 00000000..19b9353c --- /dev/null +++ b/Benchmarks/Sources/PostgresPerf/PostgresPerf.swift @@ -0,0 +1,131 @@ +// +// PostgresPerf.swift +// benchmarks +// +// Created by Fabian Fett on 12.05.25. +// + +import Synchronization +import PostgresNIO +@preconcurrency import PostgresKit +@preconcurrency import AsyncKit + +@main +@available(macOS 15.0, *) +enum PostgresPerf { + + static let maxConnections: Int = 50 + static let tasks: Int = 400 + static let iterationsPerTask: Int = 1000 + static let logger = Logger(label: "TestLogger") + static let clock = ContinuousClock() + + static let eventLoopCount = { + NIOSingletons.posixEventLoopGroup.makeIterator().reduce(0, { (res, _) in res + 1 }) + }() + + static func main() async throws { +// if CommandLine.arguments.first == "kit" { + try await Self.runPostgresKit() +// } else { + try await self.runPostgresNIO() +// } + } + + static func runPostgresKit() async throws { + let configuration = SQLPostgresConfiguration( + hostname: "localhost", port: 5432, + username: "test_username", + password: "test_password", + database: "test_database", + tls: .disable + ) + + let pools = EventLoopGroupConnectionPool( + source: PostgresConnectionSource(sqlConfiguration: configuration), + maxConnectionsPerEventLoop: Self.maxConnections / Self.eventLoopCount, + on: NIOSingletons.posixEventLoopGroup + ) + + let start = self.clock.now + await withThrowingTaskGroup(of: Void.self) { taskGroup in + for _ in 0.. String? { + getenv(name).flatMap { String(cString: $0) } +} diff --git a/Sources/ConnectionPoolModule/ConnectionIDGenerator.swift b/Sources/ConnectionPoolModule/ConnectionIDGenerator.swift index b428d805..ea9ff216 100644 --- a/Sources/ConnectionPoolModule/ConnectionIDGenerator.swift +++ b/Sources/ConnectionPoolModule/ConnectionIDGenerator.swift @@ -1,7 +1,7 @@ import Atomics public struct ConnectionIDGenerator: ConnectionIDGeneratorProtocol { - static let globalGenerator = ConnectionIDGenerator() + public static let globalGenerator = ConnectionIDGenerator() private let atomic: ManagedAtomic diff --git a/Sources/ConnectionPoolModule/ConnectionLease.swift b/Sources/ConnectionPoolModule/ConnectionLease.swift new file mode 100644 index 00000000..ace63bc8 --- /dev/null +++ b/Sources/ConnectionPoolModule/ConnectionLease.swift @@ -0,0 +1,25 @@ +// +// ConnectionLease.swift +// postgres-nio +// +// Created by Fabian Fett on 05.05.25. +// + +public struct ConnectionLease: Sendable { + + public var connection: Connection + + @usableFromInline + let _release: @Sendable () -> () + + @inlinable + public init(connection: Connection, release: @escaping @Sendable () -> Void) { + self.connection = connection + self._release = release + } + + @inlinable + public func release() { + self._release() + } +} diff --git a/Sources/ConnectionPoolModule/ConnectionPool.swift b/Sources/ConnectionPoolModule/ConnectionPool.swift index b460b263..fb1f25ef 100644 --- a/Sources/ConnectionPoolModule/ConnectionPool.swift +++ b/Sources/ConnectionPoolModule/ConnectionPool.swift @@ -88,7 +88,7 @@ public protocol ConnectionRequestProtocol: Sendable { /// A function that is called with a connection or a /// `PoolError`. - func complete(with: Result) + func complete(with: Result, ConnectionPoolError>) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -134,9 +134,11 @@ public final class ConnectionPool< Connection: PooledConnection, ConnectionID: Hashable & Sendable, ConnectionIDGenerator: ConnectionIDGeneratorProtocol, + ConnectionConfiguration: Equatable & Sendable, Request: ConnectionRequestProtocol, RequestID: Hashable & Sendable, KeepAliveBehavior: ConnectionKeepAliveBehavior, + Executor: ConnectionPoolExecutor, ObservabilityDelegate: ConnectionPoolObservabilityDelegate, Clock: _Concurrency.Clock >: Sendable where @@ -148,7 +150,7 @@ public final class ConnectionPool< ObservabilityDelegate.ConnectionID == ConnectionID, Clock.Duration == Duration { - public typealias ConnectionFactory = @Sendable (ConnectionID, ConnectionPool) async throws -> ConnectionAndMetadata + public typealias ConnectionFactory = @Sendable (ConnectionID, ConnectionConfiguration, ConnectionPool) async throws -> ConnectionAndMetadata @usableFromInline typealias StateMachine = PoolStateMachine> @@ -156,6 +158,11 @@ public final class ConnectionPool< @usableFromInline let factory: ConnectionFactory + public let executor: Executor + + @usableFromInline + let connectionConfiguration: ConnectionConfiguration + @usableFromInline let keepAliveBehavior: KeepAliveBehavior @@ -188,18 +195,22 @@ public final class ConnectionPool< public init( configuration: ConnectionPoolConfiguration, + connectionConfiguration: ConnectionConfiguration, idGenerator: ConnectionIDGenerator, requestType: Request.Type, keepAliveBehavior: KeepAliveBehavior, + executor: Executor, observabilityDelegate: ObservabilityDelegate, clock: Clock, connectionFactory: @escaping ConnectionFactory ) { + self.executor = executor self.clock = clock self.factory = connectionFactory self.keepAliveBehavior = keepAliveBehavior self.observabilityDelegate = observabilityDelegate self.configuration = configuration + self.connectionConfiguration = connectionConfiguration var stateMachine = StateMachine( configuration: .init(configuration, keepAliveBehavior: keepAliveBehavior), generator: idGenerator, @@ -253,10 +264,19 @@ public final class ConnectionPool< } } - public func cancelLeaseConnection(_ requestID: RequestID) { + @discardableResult + public func cancelLeaseConnection(_ requestID: RequestID) -> Bool { + var found = false self.modifyStateAndRunActions { state in - state.stateMachine.cancelRequest(id: requestID) + let action = state.stateMachine.cancelRequest(id: requestID) + if case .failRequest = action.request { + found = true + } else { + found = false + } + return action } + return found } /// Mark a connection as going away. Connection implementors have to call this method if the connection @@ -271,6 +291,13 @@ public final class ConnectionPool< } } + @inlinable + public func updateConfiguration(_ configuration: ConnectionConfiguration, forceReconnection: Bool) { + // TODO: Implement connection will close correctly + // If the forceReconnection flag is set, we should gracefully close the connection once they + // are returned the next time. + } + @inlinable public func run() async { await withTaskCancellationHandler { @@ -402,8 +429,11 @@ public final class ConnectionPool< /*private*/ func runRequestAction(_ action: StateMachine.RequestAction) { switch action { case .leaseConnection(let requests, let connection): + let lease = ConnectionLease(connection: connection) { + self.releaseConnection(connection) + } for request in requests { - request.complete(with: .success(connection)) + request.complete(with: .success(lease)) } case .failRequest(let request, let error): @@ -419,11 +449,11 @@ public final class ConnectionPool< @inlinable /*private*/ func makeConnection(for request: StateMachine.ConnectionRequest, in taskGroup: inout some TaskGroupProtocol) { - taskGroup.addTask_ { + self.addTask(into: &taskGroup) { self.observabilityDelegate.startedConnecting(id: request.connectionID) do { - let bundle = try await self.factory(request.connectionID, self) + let bundle = try await self.factory(request.connectionID, self.connectionConfiguration, self) self.connectionEstablished(bundle) // after the connection has been established, we keep the task open. This ensures @@ -468,7 +498,7 @@ public final class ConnectionPool< /*private*/ func runKeepAlive(_ connection: Connection, in taskGroup: inout some TaskGroupProtocol) { self.observabilityDelegate.keepAliveTriggered(id: connection.id) - taskGroup.addTask_ { + self.addTask(into: &taskGroup) { do { try await self.keepAliveBehavior.runKeepAlive(for: connection) @@ -502,8 +532,8 @@ public final class ConnectionPool< } @inlinable - /*private*/ func runTimer(_ timer: StateMachine.Timer, in poolGroup: inout some TaskGroupProtocol) { - poolGroup.addTask_ { () async -> () in + /*private*/ func runTimer(_ timer: StateMachine.Timer, in taskGroup: inout some TaskGroupProtocol) { + self.addTask(into: &taskGroup) { () async -> () in await withTaskGroup(of: TimerRunResult.self, returning: Void.self) { taskGroup in taskGroup.addTask { do { @@ -554,6 +584,17 @@ public final class ConnectionPool< token.resume() } } + + @inlinable + func addTask(into taskGroup: inout some TaskGroupProtocol, operation: @escaping @Sendable () async -> Void) { + #if compiler(>=6.0) + if #available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *), let executor = self.executor as? TaskExecutor { + taskGroup.addTask_(executorPreference: executor, operation: operation) + return + } + #endif + taskGroup.addTask_(operation: operation) + } } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -573,6 +614,11 @@ protocol TaskGroupProtocol { // under exactly this name and others have different attributes. So let's pick // a name that doesn't clash anywhere and implement it using the standard `addTask`. mutating func addTask_(operation: @escaping @Sendable () async -> Void) + + #if compiler(>=6.0) + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *) + mutating func addTask_(executorPreference: ((any TaskExecutor)?), operation: @escaping @Sendable () async -> Void) + #endif } @available(macOS 14.0, iOS 17.0, tvOS 17.0, watchOS 10.0, *) @@ -581,6 +627,14 @@ extension DiscardingTaskGroup: TaskGroupProtocol { mutating func addTask_(operation: @escaping @Sendable () async -> Void) { self.addTask(priority: nil, operation: operation) } + + #if compiler(>=6.0) + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *) + @inlinable + mutating func addTask_(executorPreference: (any TaskExecutor)?, operation: @escaping @Sendable () async -> Void) { + self.addTask(executorPreference: executorPreference, operation: operation) + } + #endif } extension TaskGroup: TaskGroupProtocol { @@ -588,4 +642,12 @@ extension TaskGroup: TaskGroupProtocol { mutating func addTask_(operation: @escaping @Sendable () async -> Void) { self.addTask(priority: nil, operation: operation) } + + #if compiler(>=6.0) + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *) + @inlinable + mutating func addTask_(executorPreference: (any TaskExecutor)?, operation: @escaping @Sendable () async -> Void) { + self.addTask(executorPreference: executorPreference, operation: operation) + } + #endif } diff --git a/Sources/ConnectionPoolModule/ConnectionPoolExecutor.swift b/Sources/ConnectionPoolModule/ConnectionPoolExecutor.swift new file mode 100644 index 00000000..b887752a --- /dev/null +++ b/Sources/ConnectionPoolModule/ConnectionPoolExecutor.swift @@ -0,0 +1,17 @@ +public protocol ConnectionPoolExecutor: AnyObject, Sendable { + associatedtype ID: Hashable, Sendable + + var id: ID { get } + + static func getExecutorID() -> Self.ID? +} + +public final class NothingConnectionPoolExecutor: ConnectionPoolExecutor { + public typealias ID = ObjectIdentifier + + public init() {} + + public var id: ObjectIdentifier { ObjectIdentifier(self) } + + public static func getExecutorID() -> ObjectIdentifier? { nil } +} diff --git a/Sources/ConnectionPoolModule/ConnectionPoolManager.swift b/Sources/ConnectionPoolModule/ConnectionPoolManager.swift new file mode 100644 index 00000000..81f0aaa2 --- /dev/null +++ b/Sources/ConnectionPoolModule/ConnectionPoolManager.swift @@ -0,0 +1,228 @@ +import Atomics + +@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) +public struct ConnectionPoolManagerConfiguration: Sendable { + /// The minimum number of connections to preserve in the pool. + /// + /// If the pool is mostly idle and the remote servers closes + /// idle connections, + /// the `ConnectionPool` will initiate new outbound + /// connections proactively to avoid the number of available + /// connections dropping below this number. + public var minimumConnectionPerExecutorCount: Int + + /// Between the `minimumConnectionCount` and + /// `maximumConnectionSoftLimit` the connection pool creates + /// _preserved_ connections. Preserved connections are closed + /// if they have been idle for ``idleTimeout``. + public var maximumConnectionPerExecutorSoftLimit: Int + + /// The maximum number of connections for this pool, that can + /// exist at any point in time. The pool can create _overflow_ + /// connections, if all connections are leased, and the + /// `maximumConnectionHardLimit` > `maximumConnectionSoftLimit ` + /// Overflow connections are closed immediately as soon as they + /// become idle. + public var maximumConnectionPerExecutorHardLimit: Int + + /// The time that a _preserved_ idle connection stays in the + /// pool before it is closed. + public var idleTimeout: Duration + + /// initializer + public init() { + self.minimumConnectionPerExecutorCount = 0 + self.maximumConnectionPerExecutorSoftLimit = 16 + self.maximumConnectionPerExecutorHardLimit = 16 + self.idleTimeout = .seconds(60) + } +} + +@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) +public final class ConnectionPoolManager< + Connection: PooledConnection, + ConnectionID: Hashable & Sendable, + ConnectionIDGenerator: ConnectionIDGeneratorProtocol, + ConnectionConfiguration: Equatable & Sendable, + Request: ConnectionRequestProtocol, + RequestID: Hashable & Sendable, + KeepAliveBehavior: ConnectionKeepAliveBehavior, + Executor: ConnectionPoolExecutor, + ObservabilityDelegate: ConnectionPoolObservabilityDelegate, + Clock: _Concurrency.Clock +>: Sendable where + Connection.ID == ConnectionID, + ConnectionIDGenerator.ID == ConnectionID, + Request.Connection == Connection, + Request.ID == RequestID, + KeepAliveBehavior.Connection == Connection, + ObservabilityDelegate.ConnectionID == ConnectionID, + Clock.Duration == Duration +{ + public typealias ConnectionFactory = @Sendable (ConnectionID, ConnectionConfiguration, ConnectionPool) async throws -> ConnectionAndMetadata + + public typealias ConnectionPool = _ConnectionPoolModule.ConnectionPool< + Connection, + ConnectionID, + ConnectionIDGenerator, + ConnectionConfiguration, + Request, + RequestID, + KeepAliveBehavior, + Executor, + ObservabilityDelegate, + Clock + > + + @usableFromInline + let pools: [Executor.ID: ConnectionPool] + + @usableFromInline + let roundRobinCounter = ManagedAtomic(0) + + @usableFromInline + let roundRobinPools: [ConnectionPool] + + @usableFromInline + let actionsStream: AsyncStream + + @usableFromInline + let eventContinuation: AsyncStream.Continuation + + @inlinable + public init( + configuration: ConnectionPoolManagerConfiguration, + connectionConfiguration: ConnectionConfiguration, + idGenerator: ConnectionIDGenerator, + requestType: Request.Type, + keepAliveBehavior: KeepAliveBehavior, + executors: [Executor], + observabilityDelegate: ObservabilityDelegate, + clock: Clock, + connectionFactory: @escaping ConnectionFactory + ) { + let (stream, continuation) = AsyncStream.makeStream(of: Actions.self) + self.actionsStream = stream + self.eventContinuation = continuation + + var pools = [Executor.ID: ConnectionPool]() + pools.reserveCapacity(executors.count) + + var singlePoolConfig = ConnectionPoolConfiguration() + singlePoolConfig.minimumConnectionCount = configuration.minimumConnectionPerExecutorCount + singlePoolConfig.maximumConnectionSoftLimit = configuration.maximumConnectionPerExecutorSoftLimit + singlePoolConfig.maximumConnectionHardLimit = configuration.maximumConnectionPerExecutorHardLimit + + for executor in executors { + pools[executor.id] = ConnectionPool( + configuration: singlePoolConfig, + connectionConfiguration: connectionConfiguration, + idGenerator: idGenerator, + requestType: requestType, + keepAliveBehavior: keepAliveBehavior, + executor: executor, + observabilityDelegate: observabilityDelegate, + clock: clock, + connectionFactory: connectionFactory + ) + } + + self.pools = pools + self.roundRobinPools = Array(pools.values) + + for pool in pools.values { + self.eventContinuation.yield(.runPool(pool)) + } + } + + @inlinable + public func leaseConnection(_ request: Request) { + if let executorID = Executor.getExecutorID(), let pool = self.pools[executorID] { + pool.leaseConnection(request) + return + } + + let index = self.roundRobinCounter.loadThenWrappingIncrement(ordering: .relaxed) % self.roundRobinPools.count + self.roundRobinPools[index].leaseConnection(request) + } + + @inlinable + public func cancelLeaseConnection(_ requestID: RequestID) { + // TODO: This is expensive! + for pool in self.roundRobinPools { + if pool.cancelLeaseConnection(requestID) { + break + } + } + } + + @usableFromInline + enum Actions: Sendable { + case runPool(ConnectionPool) + } + + @inlinable + public func run() async { + await withTaskCancellationHandler { + if #available(macOS 14.0, iOS 17.0, tvOS 17.0, watchOS 10.0, *) { + return await withDiscardingTaskGroup() { taskGroup in + await self.run(in: &taskGroup) + } + } + return await withTaskGroup(of: Void.self) { taskGroup in + await self.run(in: &taskGroup) + } + } onCancel: { + + } + } + + @inlinable + public func updateConfiguration(_ configuration: ConnectionConfiguration, forceReconnection: Bool) { + for pool in self.pools.values { + pool.updateConfiguration(configuration, forceReconnection: forceReconnection) + } + } + + // MARK: - Private Methods - + + @available(macOS 14.0, iOS 17.0, tvOS 17.0, watchOS 10.0, *) + @inlinable + /* private */ func run(in taskGroup: inout DiscardingTaskGroup) async { + for await event in self.actionsStream { + self.runEvent(event, in: &taskGroup) + } + } + + @inlinable + /* private */ func run(in taskGroup: inout TaskGroup) async { + var running = 0 + for await event in self.actionsStream { + running += 1 + self.runEvent(event, in: &taskGroup) + + if running == 100 { + _ = await taskGroup.next() + running -= 1 + } + } + } + + @inlinable + /* private */ func runEvent(_ event: Actions, in taskGroup: inout some TaskGroupProtocol) { + switch event { + case .runPool(let pool): + #if compiler(>=6.0) + if #available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *), let executor = pool.executor as? TaskExecutor { + taskGroup.addTask_(executorPreference: executor) { + await pool.run() + } + return + } + #endif + taskGroup.addTask_ { + await pool.run() + } + } + } +} diff --git a/Sources/ConnectionPoolModule/ConnectionRequest.swift b/Sources/ConnectionPoolModule/ConnectionRequest.swift index 1d1c55da..e58bac95 100644 --- a/Sources/ConnectionPoolModule/ConnectionRequest.swift +++ b/Sources/ConnectionPoolModule/ConnectionRequest.swift @@ -1,3 +1,4 @@ +import Atomics public struct ConnectionRequest: ConnectionRequestProtocol { public typealias ID = Int @@ -5,18 +6,18 @@ public struct ConnectionRequest: ConnectionRequest public var id: ID @usableFromInline - private(set) var continuation: CheckedContinuation + private(set) var continuation: CheckedContinuation, any Error> @inlinable init( id: Int, - continuation: CheckedContinuation + continuation: CheckedContinuation, any Error> ) { self.id = id self.continuation = continuation } - public func complete(with result: Result) { + public func complete(with result: Result, ConnectionPoolError>) { self.continuation.resume(with: result) } } @@ -28,17 +29,21 @@ let requestIDGenerator = _ConnectionPoolModule.ConnectionIDGenerator() extension ConnectionPool where Request == ConnectionRequest { public convenience init( configuration: ConnectionPoolConfiguration, + connectionConfiguration: ConnectionConfiguration, idGenerator: ConnectionIDGenerator = _ConnectionPoolModule.ConnectionIDGenerator(), keepAliveBehavior: KeepAliveBehavior, + executor: Executor, observabilityDelegate: ObservabilityDelegate, clock: Clock = ContinuousClock(), connectionFactory: @escaping ConnectionFactory ) { self.init( configuration: configuration, + connectionConfiguration: connectionConfiguration, idGenerator: idGenerator, requestType: ConnectionRequest.self, keepAliveBehavior: keepAliveBehavior, + executor: executor, observabilityDelegate: observabilityDelegate, clock: clock, connectionFactory: connectionFactory @@ -46,7 +51,7 @@ extension ConnectionPool where Request == ConnectionRequest { } @inlinable - public func leaseConnection() async throws -> Connection { + public func leaseConnection() async throws -> ConnectionLease { let requestID = requestIDGenerator.next() let connection = try await withTaskCancellationHandler { @@ -54,7 +59,7 @@ extension ConnectionPool where Request == ConnectionRequest { throw CancellationError() } - return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation, Error>) in let request = Request( id: requestID, continuation: continuation @@ -71,8 +76,26 @@ extension ConnectionPool where Request == ConnectionRequest { @inlinable public func withConnection(_ closure: (Connection) async throws -> Result) async throws -> Result { - let connection = try await self.leaseConnection() - defer { self.releaseConnection(connection) } - return try await closure(connection) + let lease = try await self.leaseConnection() + defer { lease.release() } + return try await closure(lease.connection) + } +} + +@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) +extension ConnectionPoolManager where Request == ConnectionRequest { + @inlinable + public func leaseConnection() async throws -> ConnectionLease { + + let index = self.roundRobinCounter.loadThenWrappingIncrement(ordering: .relaxed) % self.roundRobinPools.count + + return try await self.roundRobinPools[index].leaseConnection() + } + + @inlinable + public func withConnection(_ closure: (Connection) async throws -> Result) async throws -> Result { + let lease = try await self.leaseConnection() + defer { lease.release() } + return try await closure(lease.connection) } } diff --git a/Sources/ConnectionPoolTestUtils/MockConnection.swift b/Sources/ConnectionPoolTestUtils/MockConnection.swift index db5c3ef7..5691a28e 100644 --- a/Sources/ConnectionPoolTestUtils/MockConnection.swift +++ b/Sources/ConnectionPoolTestUtils/MockConnection.swift @@ -2,7 +2,17 @@ import _ConnectionPoolModule import DequeModule import NIOConcurrencyHelpers -public final class MockConnection: PooledConnection, Sendable { +public struct MockConnectionConfiguration: Sendable, Hashable { + public var username: String + public var password: String + + public init(username: String, password: String) { + self.username = username + self.password = password + } +} + +public final class MockConnection: PooledConnection, Sendable { public typealias ID = Int public let id: ID @@ -15,7 +25,7 @@ public final class MockConnection: PooledConnection, Sendable { private let lock: NIOLockedValueBox = NIOLockedValueBox(.running([], [])) - public init(id: Int) { + public init(id: Int, executor: Executor) { self.id = id } diff --git a/Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift b/Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift index 936b47cc..86ff7f65 100644 --- a/Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift +++ b/Sources/ConnectionPoolTestUtils/MockConnectionFactory.swift @@ -3,24 +3,30 @@ import DequeModule import NIOConcurrencyHelpers @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) -public final class MockConnectionFactory: Sendable where Clock.Duration == Duration { +public final class MockConnectionFactory: Sendable where Clock.Duration == Duration { public typealias ConnectionIDGenerator = _ConnectionPoolModule.ConnectionIDGenerator - public typealias Request = ConnectionRequest + public typealias Request = ConnectionRequest> public typealias KeepAliveBehavior = MockPingPongBehavior public typealias MetricsDelegate = NoOpConnectionPoolMetrics public typealias ConnectionID = Int public typealias Connection = MockConnection + @usableFromInline let stateBox = NIOLockedValueBox(State()) + @usableFromInline struct State { - var attempts = Deque<(ConnectionID, CheckedContinuation<(MockConnection, UInt16), any Error>)>() + @usableFromInline + var attempts = Deque<(ConnectionID, Executor, CheckedContinuation<(MockConnection, UInt16), any Error>)>() - var waiter = Deque), Never>>() + @usableFromInline + var waiter = Deque, UInt16), any Error>), Never>>() - var runningConnections = [ConnectionID: Connection]() + @usableFromInline + var runningConnections = [ConnectionID: Connection]() } + @usableFromInline let autoMaxStreams: UInt16? public init(autoMaxStreams: UInt16? = nil) { @@ -31,16 +37,18 @@ public final class MockConnectionFactory: Sendable wh self.stateBox.withLockedValue { $0.attempts.count } } - public var runningConnections: [Connection] { + public var runningConnections: [Connection] { self.stateBox.withLockedValue { Array($0.runningConnections.values) } } + @inlinable public func makeConnection( id: Int, - for pool: ConnectionPool, NoOpConnectionPoolMetrics, Clock> - ) async throws -> ConnectionAndMetadata { + configuration: MockConnectionConfiguration, + for pool: ConnectionPool, Int, ConnectionIDGenerator, MockConnectionConfiguration, some ConnectionRequestProtocol, Int, MockPingPongBehavior>, Executor, NoOpConnectionPoolMetrics, Clock> + ) async throws -> ConnectionAndMetadata> { if let autoMaxStreams = self.autoMaxStreams { - let connection = MockConnection(id: id) + let connection = MockConnection(id: id, executor: pool.executor) Task { try? await connection.signalToClose connection.closeIfClosing() @@ -49,18 +57,18 @@ public final class MockConnectionFactory: Sendable wh } // we currently don't support cancellation when creating a connection - let result = try await withCheckedThrowingContinuation { (checkedContinuation: CheckedContinuation<(MockConnection, UInt16), any Error>) in - let waiter = self.stateBox.withLockedValue { state -> (CheckedContinuation<(ConnectionID, CheckedContinuation<(MockConnection, UInt16), any Error>), Never>)? in + let result = try await withCheckedThrowingContinuation { (checkedContinuation: CheckedContinuation<(MockConnection, UInt16), any Error>) in + let waiter = self.stateBox.withLockedValue { state -> (CheckedContinuation<(ConnectionID, Executor, CheckedContinuation<(MockConnection, UInt16), any Error>), Never>)? in if let waiter = state.waiter.popFirst() { return waiter } else { - state.attempts.append((id, checkedContinuation)) + state.attempts.append((id, pool.executor, checkedContinuation)) return nil } } if let waiter { - waiter.resume(returning: (id, checkedContinuation)) + waiter.resume(returning: (id, pool.executor, checkedContinuation)) } } @@ -68,9 +76,10 @@ public final class MockConnectionFactory: Sendable wh } @discardableResult - public func nextConnectAttempt(_ closure: (ConnectionID) async throws -> UInt16) async rethrows -> Connection { - let (connectionID, continuation) = await withCheckedContinuation { (continuation: CheckedContinuation<(ConnectionID, CheckedContinuation<(MockConnection, UInt16), any Error>), Never>) in - let attempt = self.stateBox.withLockedValue { state -> (ConnectionID, CheckedContinuation<(MockConnection, UInt16), any Error>)? in + @inlinable + public func nextConnectAttempt(_ closure: (ConnectionID) async throws -> UInt16) async rethrows -> Connection { + let (connectionID, executor, continuation) = await withCheckedContinuation { (continuation: CheckedContinuation<(ConnectionID, Executor, CheckedContinuation<(MockConnection, UInt16), any Error>), Never>) in + let attempt = self.stateBox.withLockedValue { state -> (ConnectionID, Executor, CheckedContinuation<(MockConnection, UInt16), any Error>)? in if let attempt = state.attempts.popFirst() { return attempt } else { @@ -86,7 +95,7 @@ public final class MockConnectionFactory: Sendable wh do { let streamCount = try await closure(connectionID) - let connection = MockConnection(id: connectionID) + let connection = MockConnection(id: connectionID, executor: executor) connection.onClose { _ in self.stateBox.withLockedValue { state in diff --git a/Sources/ConnectionPoolTestUtils/MockExecutor.swift b/Sources/ConnectionPoolTestUtils/MockExecutor.swift new file mode 100644 index 00000000..f1b93956 --- /dev/null +++ b/Sources/ConnectionPoolTestUtils/MockExecutor.swift @@ -0,0 +1,23 @@ +// +// MockExecutor.swift +// postgres-nio +// +// Created by Fabian Fett on 07.05.25. +// + +import _ConnectionPoolModule + +public final class MockExecutor: ConnectionPoolExecutor, Sendable { + public typealias ID = ObjectIdentifier + + public var id: ID { ObjectIdentifier(self) } + + static public func getExecutorID() -> ObjectIdentifier? { + MockExecutor.executorID + } + + public init() {} + + @TaskLocal + static var executorID: MockExecutor.ID? +} diff --git a/Sources/ConnectionPoolTestUtils/MockRequest.swift b/Sources/ConnectionPoolTestUtils/MockRequest.swift index 5e4e2fc0..dc89dd62 100644 --- a/Sources/ConnectionPoolTestUtils/MockRequest.swift +++ b/Sources/ConnectionPoolTestUtils/MockRequest.swift @@ -1,8 +1,7 @@ import _ConnectionPoolModule -public final class MockRequest: ConnectionRequestProtocol, Hashable, Sendable { - public typealias Connection = MockConnection - +public final class MockRequest: ConnectionRequestProtocol, Hashable, Sendable { + public struct ID: Hashable, Sendable { var objectID: ObjectIdentifier @@ -11,7 +10,7 @@ public final class MockRequest: ConnectionRequestProtocol, Hashable, Sendable { } } - public init() {} + public init(connectionType: Connection.Type = Connection.self) {} public var id: ID { ID(self) } @@ -23,7 +22,8 @@ public final class MockRequest: ConnectionRequestProtocol, Hashable, Sendable { hasher.combine(self.id) } - public func complete(with: Result) { + @inlinable + public func complete(with: Result, ConnectionPoolError>) { } } diff --git a/Sources/PostgresNIO/Pool/ConnectionFactory.swift b/Sources/PostgresNIO/Pool/ConnectionFactory.swift index 319b86c4..4eb82d6c 100644 --- a/Sources/PostgresNIO/Pool/ConnectionFactory.swift +++ b/Sources/PostgresNIO/Pool/ConnectionFactory.swift @@ -35,7 +35,7 @@ final class ConnectionFactory: Sendable { self.logger = logger } - func makeConnection(_ connectionID: PostgresConnection.ID, pool: PostgresClient.Pool) async throws -> PostgresConnection { + func makeConnection(_ connectionID: PostgresConnection.ID, pool: PostgresClient.ConnectionPool) async throws -> PostgresConnection { let config = try await self.makeConnectionConfig() var connectionLogger = self.logger diff --git a/Sources/PostgresNIO/Pool/PostgresClient.swift b/Sources/PostgresNIO/Pool/PostgresClient.swift index d54e34eb..6f180237 100644 --- a/Sources/PostgresNIO/Pool/PostgresClient.swift +++ b/Sources/PostgresNIO/Pool/PostgresClient.swift @@ -233,17 +233,130 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { } } - typealias Pool = ConnectionPool< + typealias ConnectionPoolManager = _ConnectionPoolModule.ConnectionPoolManager< PostgresConnection, PostgresConnection.ID, ConnectionIDGenerator, - ConnectionRequest, - ConnectionRequest.ID, + Foo, + PostgresConnectionRequest, + PostgresConnectionRequest.ID, PostgresKeepAliveBehavor, + NothingConnectionPoolExecutor, PostgresClientMetrics, ContinuousClock > + typealias ConnectionPool = _ConnectionPoolModule.ConnectionPool< + PostgresConnection, + PostgresConnection.ID, + ConnectionIDGenerator, + Foo, + PostgresConnectionRequest, + PostgresConnectionRequest.ID, + PostgresKeepAliveBehavor, + NothingConnectionPoolExecutor, + PostgresClientMetrics, + ContinuousClock + > + + enum Pool { + case manager(ConnectionPoolManager) + case pool(ConnectionPool) + + init( + configuration: Configuration, + factory: ConnectionFactory, + eventLoopGroup: any EventLoopGroup, + backgroundLogger: Logger + ) { + let idGenerator = ConnectionIDGenerator.globalGenerator + + if configuration.options.maximumConnections > 50 { + // make as many executors as we have NIO else + let executorCount = Int(ceil(Double(configuration.options.maximumConnections) / 50.0)) + let executors = (0.. ConnectionLease { + let requestID = PostgresConnectionRequest.idGenerator.next() + + return try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation, Error>) in + let request = PostgresConnectionRequest(id: requestID, continuation: continuation) + + self.leaseConnection(request) + } + } onCancel: { + self.cancelRequest(id: requestID) + } + } + } + let pool: Pool let factory: ConnectionFactory let runningAtomic = ManagedAtomic(false) @@ -280,18 +393,12 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { self.factory = factory self.backgroundLogger = backgroundLogger - self.pool = ConnectionPool( - configuration: .init(configuration), - idGenerator: ConnectionIDGenerator(), - requestType: ConnectionRequest.self, - keepAliveBehavior: .init(configuration.options.keepAliveBehavior, logger: backgroundLogger), - observabilityDelegate: .init(logger: backgroundLogger), - clock: ContinuousClock() - ) { (connectionID, pool) in - let connection = try await factory.makeConnection(connectionID, pool: pool) - - return ConnectionAndMetadata(connection: connection, maximalStreamsOnConnection: 1) - } + self.pool = .init( + configuration: configuration, + factory: factory, + eventLoopGroup: eventLoopGroup, + backgroundLogger: backgroundLogger + ) } /// Lease a connection for the provided `closure`'s lifetime. @@ -301,11 +408,11 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { /// - Returns: The closure's return value. @_disfavoredOverload public func withConnection(_ closure: (PostgresConnection) async throws -> Result) async throws -> Result { - let connection = try await self.leaseConnection() + let lease = try await self.leaseConnection() - defer { self.pool.releaseConnection(connection) } + defer { lease.release() } - return try await closure(connection) + return try await closure(lease.connection) } #if compiler(>=6.0) @@ -319,11 +426,11 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { // DO NOT FIX THE WHITESPACE IN THE NEXT LINE UNTIL 5.10 IS UNSUPPORTED // https://github.com/swiftlang/swift/issues/79285 _ closure: (PostgresConnection) async throws -> sending Result) async throws -> sending Result { - let connection = try await self.leaseConnection() + let lease = try await self.leaseConnection() - defer { self.pool.releaseConnection(connection) } + defer { lease.release() } - return try await closure(connection) + return try await closure(lease.connection) } /// Lease a connection, which is in an open transaction state, for the provided `closure`'s lifetime. @@ -404,29 +511,22 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { throw PSQLError(code: .tooManyParameters, query: query, file: file, line: line) } - let connection = try await self.leaseConnection() + let requestID = PostgresConnectionRequest.idGenerator.next() - var logger = logger - logger[postgresMetadataKey: .connectionID] = "\(connection.id)" + return try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let request = PostgresConnectionRequest( + id: requestID, + query: query, + continuation: continuation, + logger: logger + ) - let promise = connection.channel.eventLoop.makePromise(of: PSQLRowStream.self) - let context = ExtendedQueryContext( - query: query, - logger: logger, - promise: promise - ) - - connection.channel.write(HandlerTask.extendedQuery(context), promise: nil) - - promise.futureResult.whenFailure { _ in - self.pool.releaseConnection(connection) + self.pool.leaseConnection(request) + } + } onCancel: { + self.pool.cancelRequest(id: requestID) } - - return try await promise.futureResult.map { - $0.asyncSequence(onFinish: { - self.pool.releaseConnection(connection) - }) - }.get() } catch var error as PSQLError { error.file = file error.line = line @@ -446,7 +546,8 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { let logger = logger ?? Self.loggingDisabled do { - let connection = try await self.leaseConnection() + let lease = try await self.leaseConnection() + let connection = lease.connection let promise = connection.channel.eventLoop.makePromise(of: PSQLRowStream.self) let task = HandlerTask.executePreparedStatement(.init( @@ -460,11 +561,11 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { connection.channel.write(task, promise: nil) promise.futureResult.whenFailure { _ in - self.pool.releaseConnection(connection) + lease.release() } return try await promise.futureResult - .map { $0.asyncSequence(onFinish: { self.pool.releaseConnection(connection) }) } + .map { $0.asyncSequence(onFinish: { lease.release() }) } .get() .map { try preparedStatement.decodeRow($0) } } catch var error as PSQLError { @@ -504,7 +605,7 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { // MARK: - Private Methods - - private func leaseConnection() async throws -> PostgresConnection { + private func leaseConnection() async throws -> ConnectionLease { if !self.runningAtomic.load(ordering: .relaxed) { self.backgroundLogger.warning("Trying to lease connection from `PostgresClient`, but `PostgresClient.run()` hasn't been called yet.") } @@ -518,7 +619,11 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service { PostgresConnection.defaultEventLoopGroup } - static let loggingDisabled = Logger(label: "Postgres-do-not-log", factory: { _ in SwiftLogNoOpLogHandler() }) + static let loggingDisabled = { + var logger = Logger(label: "Postgres-do-not-log", factory: { _ in SwiftLogNoOpLogHandler() }) + logger.logLevel = .critical + return logger + }() } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -579,3 +684,5 @@ extension ConnectionPoolError { return psqlError } } + +struct Foo: Equatable {} diff --git a/Sources/PostgresNIO/Pool/PostgresConnectionRequest.swift b/Sources/PostgresNIO/Pool/PostgresConnectionRequest.swift new file mode 100644 index 00000000..48a6c140 --- /dev/null +++ b/Sources/PostgresNIO/Pool/PostgresConnectionRequest.swift @@ -0,0 +1,78 @@ +// +// PostgresConnectionRequest.swift +// postgres-nio +// +// Created by Fabian Fett on 15.05.25. +// + +import _ConnectionPoolModule + +struct PostgresConnectionRequest: ConnectionRequestProtocol { + + static let idGenerator = ConnectionIDGenerator() + + private enum `Type` { + case connection(CheckedContinuation, any Error>) + case query(PostgresQuery, Logger, CheckedContinuation) + } + + typealias ID = Int + + var id: ID + private var type: `Type` + + init( + id: Int, + continuation: CheckedContinuation, any Error> + ) { + self.id = id + self.type = .connection(continuation) + } + + init( + id: Int, + query: PostgresQuery, + continuation: CheckedContinuation, + logger: Logger + ) { + self.id = id + self.type = .query(query, logger, continuation) + } + + public func complete(with result: Result, ConnectionPoolError>) { + switch self.type { + case .connection(let checkedContinuation): + checkedContinuation.resume(with: result) + + case .query(let query, var logger, let checkedContinuation): + switch result { + case .success(let lease): + logger[postgresMetadataKey: .connectionID] = "\(lease.connection.id)" + + let promise = lease.connection.channel.eventLoop.makePromise(of: PSQLRowStream.self) + let context = ExtendedQueryContext( + query: query, + logger: logger, + promise: promise + ) + + lease.connection.channel.write(HandlerTask.extendedQuery(context), promise: nil) + promise.futureResult.whenFailure { error in + lease.release() + checkedContinuation.resume(throwing: error) + } + + promise.futureResult.whenSuccess { rowSequence in + let asyncSequence = rowSequence.asyncSequence { + lease.release() + } + checkedContinuation.resume(returning: asyncSequence) + } + + case .failure(let error): + checkedContinuation.resume(throwing: error) + } + } + + } +} diff --git a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift index c745b4a0..217aa898 100644 --- a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift +++ b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift @@ -7,21 +7,25 @@ import XCTest @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) final class ConnectionPoolTests: XCTestCase { + let executor = MockExecutor() + func test1000ConsecutiveRequestsOnSingleConnection() async { - let factory = MockConnectionFactory() + let factory = MockConnectionFactory() var config = ConnectionPoolConfiguration() config.minimumConnectionCount = 1 let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self), - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: self.executor, + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: ContinuousClock() ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } // the same connection is reused 1000 times @@ -39,15 +43,13 @@ final class ConnectionPoolTests: XCTestCase { do { for _ in 0..<1000 { async let connectionFuture = try await pool.leaseConnection() - var leasedConnection: MockConnection? + var connectionLease: ConnectionLease>? XCTAssertEqual(factory.pendingConnectionAttemptsCount, 0) - leasedConnection = try await connectionFuture - XCTAssertNotNil(leasedConnection) - XCTAssert(createdConnection === leasedConnection) + connectionLease = try await connectionFuture + XCTAssertNotNil(connectionLease) + XCTAssert(createdConnection === connectionLease?.connection) - if let leasedConnection { - pool.releaseConnection(leasedConnection) - } + connectionLease?.release() } } catch { XCTFail("Unexpected error: \(error)") @@ -66,20 +68,22 @@ final class ConnectionPoolTests: XCTestCase { func testShutdownPoolWhileConnectionIsBeingCreated() async { let clock = MockClock() - let factory = MockConnectionFactory() + let factory = MockConnectionFactory() var config = ConnectionPoolConfiguration() config.minimumConnectionCount = 1 let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self), - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: MockExecutor(), + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } await withTaskGroup(of: Void.self) { taskGroup in @@ -111,20 +115,22 @@ final class ConnectionPoolTests: XCTestCase { func testShutdownPoolWhileConnectionIsBackingOff() async { let clock = MockClock() - let factory = MockConnectionFactory() + let factory = MockConnectionFactory() var config = ConnectionPoolConfiguration() config.minimumConnectionCount = 1 let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self), - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: MockExecutor(), + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } await withTaskGroup(of: Void.self) { taskGroup in @@ -145,7 +151,7 @@ final class ConnectionPoolTests: XCTestCase { } func testConnectionHardLimitIsRespected() async { - let factory = MockConnectionFactory() + let factory = MockConnectionFactory() var mutableConfig = ConnectionPoolConfiguration() mutableConfig.minimumConnectionCount = 0 @@ -155,13 +161,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self), - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: MockExecutor(), + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: ContinuousClock() ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } let hasFinished = ManagedAtomic(false) @@ -195,8 +203,8 @@ final class ConnectionPoolTests: XCTestCase { for _ in 0..() + let factory = MockConnectionFactory() let keepAliveDuration = Duration.seconds(30) - let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) var mutableConfig = ConnectionPoolConfiguration() mutableConfig.minimumConnectionCount = 0 @@ -236,13 +244,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: keepAlive, - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: MockExecutor(), + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -250,16 +260,16 @@ final class ConnectionPoolTests: XCTestCase { await pool.run() } - async let lease1ConnectionAsync = pool.leaseConnection() + async let connectionLeaseFuture = pool.leaseConnection() let connection = await factory.nextConnectAttempt { connectionID in return 1 } - let lease1Connection = try await lease1ConnectionAsync - XCTAssert(connection === lease1Connection) + let connectionLease = try await connectionLeaseFuture + XCTAssert(connection === connectionLease.connection) - pool.releaseConnection(lease1Connection) + connectionLease.release() // keep alive 1 @@ -280,7 +290,7 @@ final class ConnectionPoolTests: XCTestCase { await keepAlive.nextKeepAlive { keepAliveConnection in defer { print("keep alive 1 has run") } - XCTAssertTrue(keepAliveConnection === lease1Connection) + XCTAssertTrue(keepAliveConnection === connectionLease.connection) return true } @@ -303,9 +313,9 @@ final class ConnectionPoolTests: XCTestCase { func testKeepAliveOnClose() async throws { let clock = MockClock() - let factory = MockConnectionFactory() + let factory = MockConnectionFactory() let keepAliveDuration = Duration.seconds(20) - let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) var mutableConfig = ConnectionPoolConfiguration() mutableConfig.minimumConnectionCount = 0 @@ -315,13 +325,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: keepAlive, - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: MockExecutor(), + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -329,16 +341,16 @@ final class ConnectionPoolTests: XCTestCase { await pool.run() } - async let lease1ConnectionAsync = pool.leaseConnection() + async let connectionLeaseFuture = pool.leaseConnection() let connection = await factory.nextConnectAttempt { connectionID in return 1 } - let lease1Connection = try await lease1ConnectionAsync - XCTAssert(connection === lease1Connection) + let connectionLease = try await connectionLeaseFuture + XCTAssert(connection === connectionLease.connection) - pool.releaseConnection(lease1Connection) + connectionLease.release() // keep alive 1 @@ -357,7 +369,7 @@ final class ConnectionPoolTests: XCTestCase { clock.advance(to: newTime) await keepAlive.nextKeepAlive { keepAliveConnection in - XCTAssertTrue(keepAliveConnection === lease1Connection) + XCTAssertTrue(keepAliveConnection === connectionLease.connection) return true } @@ -373,7 +385,7 @@ final class ConnectionPoolTests: XCTestCase { XCTAssertFalse(failingKeepAliveDidRun .compareExchange(expected: false, desired: true, ordering: .relaxed).original) } - XCTAssertTrue(keepAliveConnection === lease1Connection) + XCTAssertTrue(keepAliveConnection === connectionLease.connection) keepAliveConnection.close() throw CancellationError() // any error } // will fail and it's expected @@ -389,9 +401,9 @@ final class ConnectionPoolTests: XCTestCase { func testKeepAliveWorksRacesAgainstShutdown() async throws { let clock = MockClock() - let factory = MockConnectionFactory() + let factory = MockConnectionFactory() let keepAliveDuration = Duration.seconds(30) - let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) var mutableConfig = ConnectionPoolConfiguration() mutableConfig.minimumConnectionCount = 0 @@ -401,13 +413,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: keepAlive, - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: MockExecutor(), + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -415,16 +429,16 @@ final class ConnectionPoolTests: XCTestCase { await pool.run() } - async let lease1ConnectionAsync = pool.leaseConnection() + async let connectionLeaseFuture = pool.leaseConnection() let connection = await factory.nextConnectAttempt { connectionID in return 1 } - let lease1Connection = try await lease1ConnectionAsync - XCTAssert(connection === lease1Connection) + let connectionLease = try await connectionLeaseFuture + XCTAssert(connection === connectionLease.connection) - pool.releaseConnection(lease1Connection) + connectionLease.release() // keep alive 1 @@ -442,7 +456,7 @@ final class ConnectionPoolTests: XCTestCase { await keepAlive.nextKeepAlive { keepAliveConnection in defer { print("keep alive 1 has run") } - XCTAssertTrue(keepAliveConnection === lease1Connection) + XCTAssertTrue(keepAliveConnection === connectionLease.connection) return true } @@ -457,9 +471,9 @@ final class ConnectionPoolTests: XCTestCase { func testCancelConnectionRequestWorks() async throws { let clock = MockClock() - let factory = MockConnectionFactory() + let factory = MockConnectionFactory() let keepAliveDuration = Duration.seconds(30) - let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) var mutableConfig = ConnectionPoolConfiguration() mutableConfig.minimumConnectionCount = 0 @@ -470,13 +484,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: keepAlive, - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: MockExecutor(), + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -517,9 +533,9 @@ final class ConnectionPoolTests: XCTestCase { func testLeasingMultipleConnectionsAtOnceWorks() async throws { let clock = MockClock() - let factory = MockConnectionFactory() + let factory = MockConnectionFactory() let keepAliveDuration = Duration.seconds(30) - let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) var mutableConfig = ConnectionPoolConfiguration() mutableConfig.minimumConnectionCount = 4 @@ -530,13 +546,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionFuture.self, keepAliveBehavior: keepAlive, - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: MockExecutor(), + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -556,19 +574,19 @@ final class ConnectionPoolTests: XCTestCase { // lease 4 connections at once pool.leaseConnections(requests) - var connections = [MockConnection]() + var connectionLeases = [ConnectionLease>]() for request in requests { let connection = try await request.future.success - connections.append(connection) + connectionLeases.append(connection) } // Ensure that we got 4 distinct connections - XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 4) + XCTAssertEqual(Set(connectionLeases.lazy.map(\.connection.id)).count, 4) // release all 4 leased connections - for connection in connections { - pool.releaseConnection(connection) + for lease in connectionLeases { + lease.release() } // shutdown @@ -581,9 +599,9 @@ final class ConnectionPoolTests: XCTestCase { func testLeasingConnectionAfterShutdownIsInvokedFails() async throws { let clock = MockClock() - let factory = MockConnectionFactory() + let factory = MockConnectionFactory() let keepAliveDuration = Duration.seconds(30) - let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) var mutableConfig = ConnectionPoolConfiguration() mutableConfig.minimumConnectionCount = 4 @@ -594,13 +612,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionRequest.self, keepAliveBehavior: keepAlive, - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: MockExecutor(), + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -636,9 +656,9 @@ final class ConnectionPoolTests: XCTestCase { func testLeasingConnectionsAfterShutdownIsInvokedFails() async throws { let clock = MockClock() - let factory = MockConnectionFactory() + let factory = MockConnectionFactory() let keepAliveDuration = Duration.seconds(30) - let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) var mutableConfig = ConnectionPoolConfiguration() mutableConfig.minimumConnectionCount = 4 @@ -649,13 +669,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionFuture.self, keepAliveBehavior: keepAlive, - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: MockExecutor(), + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -697,9 +719,9 @@ final class ConnectionPoolTests: XCTestCase { func testLeasingMultipleStreamsFromOneConnectionWorks() async throws { let clock = MockClock() - let factory = MockConnectionFactory() + let factory = MockConnectionFactory() let keepAliveDuration = Duration.seconds(30) - let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) var mutableConfig = ConnectionPoolConfiguration() mutableConfig.minimumConnectionCount = 0 @@ -710,13 +732,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionFuture.self, keepAliveBehavior: keepAlive, - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: MockExecutor(), + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -727,7 +751,7 @@ final class ConnectionPoolTests: XCTestCase { // create 4 connection requests let requests = (0..<10).map { ConnectionFuture(id: $0) } pool.leaseConnections(requests) - var connections = [MockConnection]() + var connectionLeases = [ConnectionLease>]() await factory.nextConnectAttempt { connectionID in return 10 @@ -735,15 +759,15 @@ final class ConnectionPoolTests: XCTestCase { for request in requests { let connection = try await request.future.success - connections.append(connection) + connectionLeases.append(connection) } // Ensure that all requests got the same connection - XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 1) + XCTAssertEqual(Set(connectionLeases.lazy.map(\.connection.id)).count, 1) // release all 10 leased streams - for connection in connections { - pool.releaseConnection(connection) + for lease in connectionLeases { + lease.release() } for _ in 0..<9 { @@ -762,9 +786,9 @@ final class ConnectionPoolTests: XCTestCase { func testIncreasingAvailableStreamsWorks() async throws { let clock = MockClock() - let factory = MockConnectionFactory() + let factory = MockConnectionFactory() let keepAliveDuration = Duration.seconds(30) - let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) var mutableConfig = ConnectionPoolConfiguration() mutableConfig.minimumConnectionCount = 0 @@ -775,13 +799,15 @@ final class ConnectionPoolTests: XCTestCase { let pool = ConnectionPool( configuration: config, + connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"), idGenerator: ConnectionIDGenerator(), requestType: ConnectionFuture.self, keepAliveBehavior: keepAlive, - observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + executor: MockExecutor(), + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), clock: clock ) { - try await factory.makeConnection(id: $0, for: $1) + try await factory.makeConnection(id: $0, configuration: $1, for: $2) } try await withThrowingTaskGroup(of: Void.self) { taskGroup in @@ -792,41 +818,41 @@ final class ConnectionPoolTests: XCTestCase { // create 4 connection requests var requests = (0..<21).map { ConnectionFuture(id: $0) } pool.leaseConnections(requests) - var connections = [MockConnection]() + var connectionLease = [ConnectionLease>]() await factory.nextConnectAttempt { connectionID in return 1 } - let connection = try await requests.first!.future.success - connections.append(connection) + let lease = try await requests.first!.future.success + connectionLease.append(lease) requests.removeFirst() - pool.connectionReceivedNewMaxStreamSetting(connection, newMaxStreamSetting: 21) + pool.connectionReceivedNewMaxStreamSetting(lease.connection, newMaxStreamSetting: 21) for (_, request) in requests.enumerated() { let connection = try await request.future.success - connections.append(connection) + connectionLease.append(connection) } // Ensure that all requests got the same connection - XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 1) + XCTAssertEqual(Set(connectionLease.lazy.map(\.connection.id)).count, 1) requests = (22..<42).map { ConnectionFuture(id: $0) } pool.leaseConnections(requests) // release all 21 leased streams in a single call - pool.releaseConnection(connection, streams: 21) + pool.releaseConnection(lease.connection, streams: 21) // ensure all 20 new requests got fulfilled for request in requests { let connection = try await request.future.success - connections.append(connection) + connectionLease.append(connection) } // release all 20 leased streams one by one for _ in requests { - pool.releaseConnection(connection, streams: 1) + pool.releaseConnection(lease.connection, streams: 1) } // shutdown @@ -840,14 +866,14 @@ final class ConnectionPoolTests: XCTestCase { struct ConnectionFuture: ConnectionRequestProtocol { let id: Int - let future: Future + let future: Future>> init(id: Int) { self.id = id - self.future = Future(of: MockConnection.self) + self.future = Future(of: ConnectionLease.self) } - func complete(with result: Result) { + func complete(with result: Result>, ConnectionPoolError>) { switch result { case .success(let success): self.future.yield(value: success) diff --git a/Tests/ConnectionPoolModuleTests/ConnectionRequestTests.swift b/Tests/ConnectionPoolModuleTests/ConnectionRequestTests.swift index 537efbd9..25c15cfa 100644 --- a/Tests/ConnectionPoolModuleTests/ConnectionRequestTests.swift +++ b/Tests/ConnectionPoolModuleTests/ConnectionRequestTests.swift @@ -4,20 +4,27 @@ import XCTest final class ConnectionRequestTests: XCTestCase { + let executor = NothingConnectionPoolExecutor() + func testHappyPath() async throws { - let mockConnection = MockConnection(id: 1) - let connection = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let mockConnection = MockConnection(id: 1, executor: self.executor) + let lease = try await withCheckedThrowingContinuation { + (continuation: CheckedContinuation>, any Error>) in let request = ConnectionRequest(id: 42, continuation: continuation) XCTAssertEqual(request.id, 42) - continuation.resume(with: .success(mockConnection)) + let lease = ConnectionLease(connection: mockConnection) { + + } + continuation.resume(with: .success(lease)) } - XCTAssert(connection === mockConnection) + XCTAssert(lease.connection === mockConnection) } func testSadPath() async throws { do { - _ = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + _ = try await withCheckedThrowingContinuation { + (continuation: CheckedContinuation, any Error>) in continuation.resume(with: .failure(ConnectionPoolError.requestCancelled)) } XCTFail("This point should not be reached") diff --git a/Tests/ConnectionPoolModuleTests/NoKeepAliveBehaviorTests.swift b/Tests/ConnectionPoolModuleTests/NoKeepAliveBehaviorTests.swift index b1b54023..00edf309 100644 --- a/Tests/ConnectionPoolModuleTests/NoKeepAliveBehaviorTests.swift +++ b/Tests/ConnectionPoolModuleTests/NoKeepAliveBehaviorTests.swift @@ -5,7 +5,7 @@ import XCTest @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) final class NoKeepAliveBehaviorTests: XCTestCase { func testNoKeepAlive() { - let keepAliveBehavior = NoOpKeepAliveBehavior(connectionType: MockConnection.self) + let keepAliveBehavior = NoOpKeepAliveBehavior(connectionType: MockConnection.self) XCTAssertNil(keepAliveBehavior.keepAliveFrequency) } } diff --git a/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionGroupTests.swift b/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionGroupTests.swift index b09bfcb4..fe8f51f6 100644 --- a/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionGroupTests.swift +++ b/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionGroupTests.swift @@ -6,6 +6,8 @@ import XCTest final class PoolStateMachine_ConnectionGroupTests: XCTestCase { var idGenerator: ConnectionIDGenerator! + let executor = NothingConnectionPoolExecutor() + override func setUp() { self.idGenerator = ConnectionIDGenerator() super.setUp() @@ -41,7 +43,7 @@ final class PoolStateMachine_ConnectionGroupTests: XCTestCase { var connected: UInt16 = 0 for request in requests { - let newConnection = MockConnection(id: request.connectionID) + let newConnection = MockConnection(id: request.connectionID, executor: self.executor) let (_, context) = connections.newConnectionEstablished(newConnection, maxStreams: 1) XCTAssertEqual(context.info, .idle(availableStreams: 1, newIdle: true)) XCTAssertEqual(context.use, .persisted) @@ -76,7 +78,7 @@ final class PoolStateMachine_ConnectionGroupTests: XCTestCase { XCTAssertEqual(connections.soonAvailableConnections, 1) XCTAssertEqual(connections.stats, .init(connecting: 1)) - let newConnection = MockConnection(id: request.connectionID) + let newConnection = MockConnection(id: request.connectionID, executor: self.executor) let (_, establishedContext) = connections.newConnectionEstablished(newConnection, maxStreams: 1) XCTAssertEqual(establishedContext.info, .idle(availableStreams: 1, newIdle: true)) XCTAssertEqual(establishedContext.use, .demand) @@ -184,14 +186,14 @@ final class PoolStateMachine_ConnectionGroupTests: XCTestCase { } XCTAssertEqual(connections.stats, .init(connecting: 3)) - let newSecondConnection = MockConnection(id: secondRequest.connectionID) + let newSecondConnection = MockConnection(id: secondRequest.connectionID, executor: self.executor) let (_, establishedSecondConnectionContext) = connections.newConnectionEstablished(newSecondConnection, maxStreams: 1) XCTAssertEqual(establishedSecondConnectionContext.info, .idle(availableStreams: 1, newIdle: true)) XCTAssertEqual(establishedSecondConnectionContext.use, .persisted) XCTAssertEqual(connections.stats, .init(connecting: 2, idle: 1, availableStreams: 1)) XCTAssertEqual(connections.soonAvailableConnections, 2) - let newThirdConnection = MockConnection(id: thirdRequest.connectionID) + let newThirdConnection = MockConnection(id: thirdRequest.connectionID, executor: self.executor) let (thirdConnectionIndex, establishedThirdConnectionContext) = connections.newConnectionEstablished(newThirdConnection, maxStreams: 1) XCTAssertEqual(establishedThirdConnectionContext.info, .idle(availableStreams: 1, newIdle: true)) XCTAssertEqual(establishedThirdConnectionContext.use, .demand) @@ -238,7 +240,7 @@ final class PoolStateMachine_ConnectionGroupTests: XCTestCase { } XCTAssertEqual(connections.stats, .init(connecting: 2)) - let newFirstConnection = MockConnection(id: firstRequest.connectionID) + let newFirstConnection = MockConnection(id: firstRequest.connectionID, executor: self.executor) let (_, establishedFirstConnectionContext) = connections.newConnectionEstablished(newFirstConnection, maxStreams: 1) XCTAssertEqual(establishedFirstConnectionContext.info, .idle(availableStreams: 1, newIdle: true)) XCTAssertEqual(establishedFirstConnectionContext.use, .demand) @@ -273,7 +275,7 @@ final class PoolStateMachine_ConnectionGroupTests: XCTestCase { XCTAssertEqual(requests.count, 1) guard let firstRequest = requests.first else { return XCTFail("Expected to have a request here") } - let newConnection = MockConnection(id: firstRequest.connectionID) + let newConnection = MockConnection(id: firstRequest.connectionID, executor: self.executor) let (connectionIndex, establishedConnectionContext) = connections.newConnectionEstablished(newConnection, maxStreams: 1) XCTAssertEqual(establishedConnectionContext.info, .idle(availableStreams: 1, newIdle: true)) XCTAssertEqual(establishedConnectionContext.use, .persisted) @@ -307,7 +309,7 @@ final class PoolStateMachine_ConnectionGroupTests: XCTestCase { guard let firstRequest = connections.createNewDemandConnectionIfPossible() else { return XCTFail("Expected to have a request here") } - let newConnection = MockConnection(id: firstRequest.connectionID) + let newConnection = MockConnection(id: firstRequest.connectionID, executor: self.executor) let (connectionIndex, establishedConnectionContext) = connections.newConnectionEstablished(newConnection, maxStreams: 1) XCTAssertEqual(establishedConnectionContext.info, .idle(availableStreams: 1, newIdle: true)) XCTAssertEqual(connections.stats, .init(idle: 1, availableStreams: 1)) diff --git a/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionStateTests.swift b/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionStateTests.swift index 7dd2b726..965019f4 100644 --- a/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionStateTests.swift +++ b/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionStateTests.swift @@ -7,6 +7,8 @@ final class PoolStateMachine_ConnectionStateTests: XCTestCase { typealias TestConnectionState = TestPoolStateMachine.ConnectionState + let executor = NothingConnectionPoolExecutor() + func testStartupLeaseReleaseParkLease() { let connectionID = 1 var state = TestConnectionState(id: connectionID) @@ -15,7 +17,7 @@ final class PoolStateMachine_ConnectionStateTests: XCTestCase { XCTAssertEqual(state.isAvailable, false) XCTAssertEqual(state.isConnected, false) XCTAssertEqual(state.isLeased, false) - let connection = MockConnection(id: connectionID) + let connection = MockConnection(id: connectionID, executor: self.executor) XCTAssertEqual(state.connected(connection, maxStreams: 1), .idle(availableStreams: 1, newIdle: true)) XCTAssertEqual(state.isIdle, true) XCTAssertEqual(state.isAvailable, true) @@ -58,7 +60,7 @@ final class PoolStateMachine_ConnectionStateTests: XCTestCase { func testStartupParkLeaseBeforeTimersRegistered() { let connectionID = 1 var state = TestConnectionState(id: connectionID) - let connection = MockConnection(id: connectionID) + let connection = MockConnection(id: connectionID, executor: self.executor) XCTAssertEqual(state.connected(connection, maxStreams: 1), .idle(availableStreams: 1, newIdle: true)) let parkResult = state.parkConnection(scheduleKeepAliveTimer: true, scheduleIdleTimeoutTimer: true) XCTAssertEqual( @@ -84,7 +86,7 @@ final class PoolStateMachine_ConnectionStateTests: XCTestCase { func testStartupParkLeasePark() { let connectionID = 1 var state = TestConnectionState(id: connectionID) - let connection = MockConnection(id: connectionID) + let connection = MockConnection(id: connectionID, executor: self.executor) XCTAssertEqual(state.connected(connection, maxStreams: 1), .idle(availableStreams: 1, newIdle: true)) let parkResult = state.parkConnection(scheduleKeepAliveTimer: true, scheduleIdleTimeoutTimer: true) XCTAssert( @@ -145,14 +147,14 @@ final class PoolStateMachine_ConnectionStateTests: XCTestCase { ) XCTAssertEqual(state.retryConnect(), forthBackoffTimerCancellationToken) - let connection = MockConnection(id: connectionID) + let connection = MockConnection(id: connectionID, executor: self.executor) XCTAssertEqual(state.connected(connection, maxStreams: 1), .idle(availableStreams: 1, newIdle: true)) } func testLeaseMultipleStreams() { let connectionID = 1 var state = TestConnectionState(id: connectionID) - let connection = MockConnection(id: connectionID) + let connection = MockConnection(id: connectionID, executor: self.executor) XCTAssertEqual(state.connected(connection, maxStreams: 100), .idle(availableStreams: 100, newIdle: true)) let timers = state.parkConnection(scheduleKeepAliveTimer: true, scheduleIdleTimeoutTimer: false) guard let keepAliveTimer = timers.first else { return XCTFail("Expected to get a keepAliveTimer") } @@ -185,7 +187,7 @@ final class PoolStateMachine_ConnectionStateTests: XCTestCase { func testRunningKeepAliveReducesAvailableStreams() { let connectionID = 1 var state = TestConnectionState(id: connectionID) - let connection = MockConnection(id: connectionID) + let connection = MockConnection(id: connectionID, executor: self.executor) XCTAssertEqual(state.connected(connection, maxStreams: 100), .idle(availableStreams: 100, newIdle: true)) let timers = state.parkConnection(scheduleKeepAliveTimer: true, scheduleIdleTimeoutTimer: false) guard let keepAliveTimer = timers.first else { return XCTFail("Expected to get a keepAliveTimer") } @@ -217,7 +219,7 @@ final class PoolStateMachine_ConnectionStateTests: XCTestCase { func testRunningKeepAliveDoesNotReduceAvailableStreams() { let connectionID = 1 var state = TestConnectionState(id: connectionID) - let connection = MockConnection(id: connectionID) + let connection = MockConnection(id: connectionID, executor: self.executor) XCTAssertEqual(state.connected(connection, maxStreams: 100), .idle(availableStreams: 100, newIdle: true)) let timers = state.parkConnection(scheduleKeepAliveTimer: true, scheduleIdleTimeoutTimer: false) guard let keepAliveTimer = timers.first else { return XCTFail("Expected to get a keepAliveTimer") } @@ -242,7 +244,7 @@ final class PoolStateMachine_ConnectionStateTests: XCTestCase { func testRunKeepAliveRacesAgainstIdleClose() { let connectionID = 1 var state = TestConnectionState(id: connectionID) - let connection = MockConnection(id: connectionID) + let connection = MockConnection(id: connectionID, executor: self.executor) XCTAssertEqual(state.connected(connection, maxStreams: 1), .idle(availableStreams: 1, newIdle: true)) let parkResult = state.parkConnection(scheduleKeepAliveTimer: true, scheduleIdleTimeoutTimer: true) guard let keepAliveTimer = parkResult.first, let idleTimer = parkResult.second else { diff --git a/Tests/ConnectionPoolModuleTests/PoolStateMachine+RequestQueueTests.swift b/Tests/ConnectionPoolModuleTests/PoolStateMachine+RequestQueueTests.swift index b74b86cc..89285d3e 100644 --- a/Tests/ConnectionPoolModuleTests/PoolStateMachine+RequestQueueTests.swift +++ b/Tests/ConnectionPoolModuleTests/PoolStateMachine+RequestQueueTests.swift @@ -11,7 +11,7 @@ final class PoolStateMachine_RequestQueueTests: XCTestCase { var queue = TestQueue() XCTAssert(queue.isEmpty) - let request1 = MockRequest() + let request1 = MockRequest(connectionType: MockConnection.self) queue.queue(request1) XCTAssertEqual(queue.count, 1) XCTAssertFalse(queue.isEmpty) @@ -25,11 +25,11 @@ final class PoolStateMachine_RequestQueueTests: XCTestCase { var queue = TestQueue() XCTAssert(queue.isEmpty) - var request1 = MockRequest() + var request1 = MockRequest(connectionType: MockConnection.self) queue.queue(request1) - var request2 = MockRequest() + var request2 = MockRequest(connectionType: MockConnection.self) queue.queue(request2) - var request3 = MockRequest() + var request3 = MockRequest(connectionType: MockConnection.self) queue.queue(request3) do { @@ -49,11 +49,11 @@ final class PoolStateMachine_RequestQueueTests: XCTestCase { var queue = TestQueue() XCTAssert(queue.isEmpty) - var request1 = MockRequest() + var request1 = MockRequest(connectionType: MockConnection.self) queue.queue(request1) - var request2 = MockRequest() + var request2 = MockRequest(connectionType: MockConnection.self) queue.queue(request2) - var request3 = MockRequest() + var request3 = MockRequest(connectionType: MockConnection.self) queue.queue(request3) do { @@ -76,11 +76,11 @@ final class PoolStateMachine_RequestQueueTests: XCTestCase { var queue = TestQueue() XCTAssert(queue.isEmpty) - var request1 = MockRequest() + var request1 = MockRequest(connectionType: MockConnection.self) queue.queue(request1) - var request2 = MockRequest() + var request2 = MockRequest(connectionType: MockConnection.self) queue.queue(request2) - var request3 = MockRequest() + var request3 = MockRequest(connectionType: MockConnection.self) queue.queue(request3) do { @@ -113,11 +113,11 @@ final class PoolStateMachine_RequestQueueTests: XCTestCase { var queue = TestQueue() XCTAssert(queue.isEmpty) - var request1 = MockRequest() + var request1 = MockRequest(connectionType: MockConnection.self) queue.queue(request1) - var request2 = MockRequest() + var request2 = MockRequest(connectionType: MockConnection.self) queue.queue(request2) - var request3 = MockRequest() + var request3 = MockRequest(connectionType: MockConnection.self) queue.queue(request3) do { diff --git a/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift b/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift index c0b6ddcd..12c98f40 100644 --- a/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift +++ b/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift @@ -4,17 +4,19 @@ import XCTest @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) typealias TestPoolStateMachine = PoolStateMachine< - MockConnection, + MockConnection, ConnectionIDGenerator, MockConnection.ID, - MockRequest, - MockRequest.ID, + MockRequest>, + MockRequest>.ID, MockTimerCancellationToken > @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) final class PoolStateMachineTests: XCTestCase { + let executor = NothingConnectionPoolExecutor() + func testConnectionsAreCreatedAndParkedOnStartup() { var configuration = PoolConfiguration() configuration.minimumConnectionCount = 2 @@ -28,8 +30,8 @@ final class PoolStateMachineTests: XCTestCase { timerCancellationTokenType: MockTimerCancellationToken.self ) - let connection1 = MockConnection(id: 0) - let connection2 = MockConnection(id: 1) + let connection1 = MockConnection(id: 0, executor: self.executor) + let connection2 = MockConnection(id: 1, executor: self.executor) do { let requests = stateMachine.refillConnections() @@ -65,7 +67,7 @@ final class PoolStateMachineTests: XCTestCase { timerCancellationTokenType: MockTimerCancellationToken.self ) - let connection1 = MockConnection(id: 0) + let connection1 = MockConnection(id: 0, executor: self.executor) // refill pool to at least one connection let requests = stateMachine.refillConnections() @@ -75,7 +77,7 @@ final class PoolStateMachineTests: XCTestCase { XCTAssertEqual(createdAction1.connection, .scheduleTimers([])) // lease connection 1 - let request1 = MockRequest() + let request1 = MockRequest(connectionType: MockConnection.self) let leaseRequest1 = stateMachine.leaseConnection(request1) XCTAssertEqual(leaseRequest1.connection, .cancelTimers([])) XCTAssertEqual(leaseRequest1.request, .leaseConnection(.init(element: request1), connection1)) @@ -84,19 +86,19 @@ final class PoolStateMachineTests: XCTestCase { XCTAssertEqual(stateMachine.releaseConnection(connection1, streams: 1), .none()) // lease connection 1 - let request2 = MockRequest() + let request2 = MockRequest(connectionType: MockConnection.self) let leaseRequest2 = stateMachine.leaseConnection(request2) XCTAssertEqual(leaseRequest2.connection, .cancelTimers([])) XCTAssertEqual(leaseRequest2.request, .leaseConnection(.init(element: request2), connection1)) // request connection while none is available - let request3 = MockRequest() + let request3 = MockRequest(connectionType: MockConnection.self) let leaseRequest3 = stateMachine.leaseConnection(request3) XCTAssertEqual(leaseRequest3.connection, .makeConnection(.init(connectionID: 1), [])) XCTAssertEqual(leaseRequest3.request, .none) // make connection 2 and lease immediately - let connection2 = MockConnection(id: 1) + let connection2 = MockConnection(id: 1, executor: NothingConnectionPoolExecutor()) let createdAction2 = stateMachine.connectionEstablished(connection2, maxStreams: 1) XCTAssertEqual(createdAction2.request, .leaseConnection(.init(element: request3), connection2)) XCTAssertEqual(createdAction2.connection, .none) @@ -132,19 +134,19 @@ final class PoolStateMachineTests: XCTestCase { XCTAssertEqual(requests.count, 0) // request connection while none exists - let request1 = MockRequest() + let request1 = MockRequest(connectionType: MockConnection.self) let leaseRequest1 = stateMachine.leaseConnection(request1) XCTAssertEqual(leaseRequest1.connection, .makeConnection(.init(connectionID: 0), [])) XCTAssertEqual(leaseRequest1.request, .none) // make connection 1 and lease immediately - let connection1 = MockConnection(id: 0) + let connection1 = MockConnection(id: 0, executor: self.executor) let createdAction1 = stateMachine.connectionEstablished(connection1, maxStreams: 1) XCTAssertEqual(createdAction1.request, .leaseConnection(.init(element: request1), connection1)) XCTAssertEqual(createdAction1.connection, .none) // request connection while none is available - let request2 = MockRequest() + let request2 = MockRequest(connectionType: MockConnection.self) let leaseRequest2 = stateMachine.leaseConnection(request2) XCTAssertEqual(leaseRequest2.connection, .makeConnection(.init(connectionID: 1), [])) XCTAssertEqual(leaseRequest2.request, .none) @@ -155,7 +157,7 @@ final class PoolStateMachineTests: XCTestCase { XCTAssertEqual(releaseRequest1.connection, .none) // connection 2 comes up and should be closed right away - let connection2 = MockConnection(id: 1) + let connection2 = MockConnection(id: 1, executor: self.executor) let createdAction2 = stateMachine.connectionEstablished(connection2, maxStreams: 1) XCTAssertEqual(createdAction2.request, .none) XCTAssertEqual(createdAction2.connection, .closeConnection(connection2, [])) @@ -185,7 +187,7 @@ final class PoolStateMachineTests: XCTestCase { timerCancellationTokenType: MockTimerCancellationToken.self ) - let connection1 = MockConnection(id: 0) + let connection1 = MockConnection(id: 0, executor: self.executor) // refill pool to at least one connection let requests = stateMachine.refillConnections() @@ -195,19 +197,19 @@ final class PoolStateMachineTests: XCTestCase { XCTAssertEqual(createdAction1.connection, .scheduleTimers([])) // lease connection 1 - let request1 = MockRequest() + let request1 = MockRequest(connectionType: MockConnection.self) let leaseRequest1 = stateMachine.leaseConnection(request1) XCTAssertEqual(leaseRequest1.connection, .cancelTimers([])) XCTAssertEqual(leaseRequest1.request, .leaseConnection(.init(element: request1), connection1)) // request connection while none is available - let request2 = MockRequest() + let request2 = MockRequest(connectionType: MockConnection.self) let leaseRequest2 = stateMachine.leaseConnection(request2) XCTAssertEqual(leaseRequest2.connection, .makeConnection(.init(connectionID: 1), [])) XCTAssertEqual(leaseRequest2.request, .none) // make connection 2 and lease immediately - let connection2 = MockConnection(id: 1) + let connection2 = MockConnection(id: 1, executor: self.executor) let createdAction2 = stateMachine.connectionEstablished(connection2, maxStreams: 1) XCTAssertEqual(createdAction2.request, .leaseConnection(.init(element: request2), connection2)) XCTAssertEqual(createdAction2.connection, .none) @@ -245,13 +247,13 @@ final class PoolStateMachineTests: XCTestCase { XCTAssertEqual(requests.count, 0) // request connection while none exists - let request1 = MockRequest() + let request1 = MockRequest(connectionType: MockConnection.self) let leaseRequest1 = stateMachine.leaseConnection(request1) XCTAssertEqual(leaseRequest1.connection, .makeConnection(.init(connectionID: 0), [])) XCTAssertEqual(leaseRequest1.request, .none) // make connection 1 and lease immediately - let connection1 = MockConnection(id: 0) + let connection1 = MockConnection(id: 0, executor: self.executor) let createdAction1 = stateMachine.connectionEstablished(connection1, maxStreams: 1) XCTAssertEqual(createdAction1.request, .leaseConnection(.init(element: request1), connection1)) XCTAssertEqual(createdAction1.connection, .none) @@ -287,13 +289,13 @@ final class PoolStateMachineTests: XCTestCase { XCTAssertEqual(requests.count, 0) // request connection while none exists - let request1 = MockRequest() + let request1 = MockRequest(connectionType: MockConnection.self) let leaseRequest1 = stateMachine.leaseConnection(request1) XCTAssertEqual(leaseRequest1.connection, .makeConnection(.init(connectionID: 0), [])) XCTAssertEqual(leaseRequest1.request, .none) // make connection 1 - let connection1 = MockConnection(id: 0) + let connection1 = MockConnection(id: 0, executor: self.executor) let createdAction1 = stateMachine.connectionEstablished(connection1, maxStreams: 1) XCTAssertEqual(createdAction1.request, .leaseConnection(.init(element: request1), connection1)) XCTAssertEqual(createdAction1.connection, .none) @@ -309,13 +311,13 @@ final class PoolStateMachineTests: XCTestCase { connection1.closeIfClosing() // request connection while none exists anymore - let request2 = MockRequest() + let request2 = MockRequest(connectionType: MockConnection.self) let leaseRequest2 = stateMachine.leaseConnection(request2) XCTAssertEqual(leaseRequest2.connection, .makeConnection(.init(connectionID: 1), [])) XCTAssertEqual(leaseRequest2.request, .none) // make connection 2 - let connection2 = MockConnection(id: 1) + let connection2 = MockConnection(id: 1, executor: self.executor) let createdAction2 = stateMachine.connectionEstablished(connection2, maxStreams: 1) XCTAssertEqual(createdAction2.request, .leaseConnection(.init(element: request2), connection2)) XCTAssertEqual(createdAction2.connection, .none) @@ -354,13 +356,13 @@ final class PoolStateMachineTests: XCTestCase { XCTAssertEqual(requests.count, 1) // one connection should exist - let request = MockRequest() + let request = MockRequest(connectionType: MockConnection.self) let leaseRequest = stateMachine.leaseConnection(request) XCTAssertEqual(leaseRequest.connection, .none) XCTAssertEqual(leaseRequest.request, .none) // make connection 1 - let connection = MockConnection(id: 0) + let connection = MockConnection(id: 0, executor: self.executor) let createdAction = stateMachine.connectionEstablished(connection, maxStreams: 1) XCTAssertEqual(createdAction.request, .leaseConnection(.init(element: request), connection)) XCTAssertEqual(createdAction.connection, .none) @@ -376,7 +378,7 @@ final class PoolStateMachineTests: XCTestCase { let connectionClosed = stateMachine.connectionClosed(connection) XCTAssertEqual(connectionClosed.connection, .makeConnection(.init(connectionID: 1), [])) connection.closeIfClosing() - let establishAction = stateMachine.connectionEstablished(.init(id: 1), maxStreams: 1) + let establishAction = stateMachine.connectionEstablished(.init(id: 1, executor: self.executor), maxStreams: 1) XCTAssertEqual(establishAction.request, .none) guard case .scheduleTimers(let timers) = establishAction.connection else { return XCTFail("Unexpected connection action") } XCTAssertEqual(timers, [.init(.init(timerID: 0, connectionID: 1, usecase: .keepAlive), duration: configuration.keepAliveDuration!)]) diff --git a/docker-compose.yml b/docker-compose.yml index 3eff4249..ee66a12d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,7 @@ version: '3.7' x-shared-config: &shared_config + command: -c 'max_connections=500' environment: POSTGRES_HOST_AUTH_METHOD: "${POSTGRES_HOST_AUTH_METHOD:-scram-sha-256}" POSTGRES_USER: test_username