diff --git a/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/NewPushChannelAPI.swift b/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/NewPushChannelAPI.swift new file mode 100644 index 00000000000..c2d4e98364a --- /dev/null +++ b/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/NewPushChannelAPI.swift @@ -0,0 +1,33 @@ +// +// Wire +// Copyright (C) 2025 Wire Swiss GmbH +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see http://www.gnu.org/licenses/. +// + +// sourcery: AutoMockable +/// An API access object for endpoints concerning the push channel. +public protocol NewPushChannelAPI { + + /// Create a new push channel. + /// + /// - Parameter clientID: The id of the self client. + /// - Returns: A push channel. + + func createPushChannel(clientID: String) async throws -> AnyNewPushChannel + +} + +// Workaround for automockable compiler error. +public typealias AnyNewPushChannel = any NewPushChannelProtocol diff --git a/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/NewPushChannelAPIBuilder.swift b/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/NewPushChannelAPIBuilder.swift new file mode 100644 index 00000000000..dc919aa7927 --- /dev/null +++ b/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/NewPushChannelAPIBuilder.swift @@ -0,0 +1,40 @@ +// +// Wire +// Copyright (C) 2025 Wire Swiss GmbH +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see http://www.gnu.org/licenses/. +// +import Foundation + +public struct NewPushChannelAPIBuilder { + + private let pushChannelService: PushChannelService + + /// Create a new builder. + /// + /// - Parameter pushChannelService: A push channel service to execute requests. + /// + public init(pushChannelService: PushChannelService) { + self.pushChannelService = pushChannelService + } + + /// Make a `PushChannelAPI`. + /// + /// - Returns: A `PushChannelAPI`. + + public func makeAPI(for apiVersion: APIVersion) -> any NewPushChannelAPI { + NewPushChannelAPIImpl(pushChannelService: pushChannelService, apiVersion: apiVersion) + } + +} diff --git a/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/NewPushChannelAPIImpl.swift b/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/NewPushChannelAPIImpl.swift new file mode 100644 index 00000000000..0d826a8697b --- /dev/null +++ b/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/NewPushChannelAPIImpl.swift @@ -0,0 +1,41 @@ +// +// Wire +// Copyright (C) 2025 Wire Swiss GmbH +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see http://www.gnu.org/licenses/. +// + + +final class NewPushChannelAPIImpl: NewPushChannelAPI, VersionedAPI { + + let pushChannelService: any PushChannelServiceProtocol + let apiVersion: APIVersion + + init(pushChannelService: any PushChannelServiceProtocol, apiVersion: APIVersion) { + self.pushChannelService = pushChannelService + self.apiVersion = apiVersion + } + + func createPushChannel(clientID: String) async throws -> any NewPushChannelProtocol { + let path = "\(pathPrefix)/events" + + let request = try URLRequestBuilder(path: path) + .withMethod(.get) + .withQueryItem(name: "client", value: clientID) + .build() + + return try await pushChannelService.createNewPushChannel(request) + } + +} diff --git a/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/PushChannelAPIBuilder.swift b/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/PushChannelAPIBuilder.swift index b625a0917ee..8c712a837f4 100644 --- a/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/PushChannelAPIBuilder.swift +++ b/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/PushChannelAPIBuilder.swift @@ -27,7 +27,7 @@ public struct PushChannelAPIBuilder { /// Create a new builder. /// /// - Parameter pushChannelService: A push channel service to execute requests. - + /// public init(pushChannelService: PushChannelService) { self.pushChannelService = pushChannelService } @@ -36,7 +36,7 @@ public struct PushChannelAPIBuilder { /// /// - Returns: A `PushChannelAPI`. - public func makeAPI() -> any PushChannelAPI { + public func makeAPI(for apiVersion: APIVersion) -> any PushChannelAPI { PushChannelAPIImpl(pushChannelService: pushChannelService) } diff --git a/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/PushChannelAPIImpl.swift b/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/PushChannelAPIImpl.swift index 07df3ca7a6a..6dd6bb7355c 100644 --- a/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/PushChannelAPIImpl.swift +++ b/WireAPI/Sources/WireAPI/APIs/PushChannelAPI/PushChannelAPIImpl.swift @@ -18,7 +18,7 @@ import Foundation -class PushChannelAPIImpl: PushChannelAPI { +final class PushChannelAPIImpl: PushChannelAPI { let pushChannelService: any PushChannelServiceProtocol diff --git a/WireAPI/Sources/WireAPI/APIs/UpdateEventsAPI/Responses/UpdateEventEnvelopeV0.swift b/WireAPI/Sources/WireAPI/APIs/UpdateEventsAPI/Responses/UpdateEventEnvelopeV0.swift index 3fbd2f24f74..05b3f55ff7f 100644 --- a/WireAPI/Sources/WireAPI/APIs/UpdateEventsAPI/Responses/UpdateEventEnvelopeV0.swift +++ b/WireAPI/Sources/WireAPI/APIs/UpdateEventsAPI/Responses/UpdateEventEnvelopeV0.swift @@ -30,7 +30,8 @@ public struct UpdateEventEnvelopeV0: Decodable, ToAPIModelConvertible { UpdateEventEnvelope( id: id, events: (payload ?? []).map(\.updateEvent), - isTransient: transient ?? false + isTransient: transient ?? false, + deliveryTag: nil ) } diff --git a/WireAPI/Sources/WireAPI/APIs/UpdateEventsAPI/Responses/UpdateEventEnvelopeV8.swift b/WireAPI/Sources/WireAPI/APIs/UpdateEventsAPI/Responses/UpdateEventEnvelopeV8.swift new file mode 100644 index 00000000000..f485c211e08 --- /dev/null +++ b/WireAPI/Sources/WireAPI/APIs/UpdateEventsAPI/Responses/UpdateEventEnvelopeV8.swift @@ -0,0 +1,29 @@ +// +// Wire +// Copyright (C) 2025 Wire Swiss GmbH +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see http://www.gnu.org/licenses/. +// + +import Foundation + +struct UpdateEventEnvelopeV8: Decodable { + enum CodingKeys: String, CodingKey { + case id + case payload + } + + let id: UUID + let payload: [UpdateEventDecodingProxy] +} diff --git a/WireAPI/Sources/WireAPI/Assembly.swift b/WireAPI/Sources/WireAPI/Assembly.swift index 6fd0025ca01..5dd8a29173e 100644 --- a/WireAPI/Sources/WireAPI/Assembly.swift +++ b/WireAPI/Sources/WireAPI/Assembly.swift @@ -60,22 +60,6 @@ public final class Assembly { return service }() - private lazy var pushChannelService: some PushChannelServiceProtocol = PushChannelService( - networkService: pushChannelNetworkService, - authenticationManager: authenticationManager - ) - - private lazy var pushChannelNetworkService: NetworkService = { - let service = NetworkService( - baseURL: backendEnvironment.webSocketURL, - serverTrustValidator: serverTrustValidator - ) - let config = urlSessionConfigurationFactory.makeWebSocketSessionConfiguration() - let session = URLSession(configuration: config, delegate: service, delegateQueue: nil) - service.configure(with: session) - return service - }() - public lazy var authenticationManager: some AuthenticationManagerProtocol = AuthenticationManager( clientID: clientID, cookieStorage: cookieStorage, diff --git a/WireAPI/Sources/WireAPI/Models/UpdateEvent/UpdateEventEnvelope.swift b/WireAPI/Sources/WireAPI/Models/UpdateEvent/UpdateEventEnvelope.swift index 9b6165c689a..440cf8b3c88 100644 --- a/WireAPI/Sources/WireAPI/Models/UpdateEvent/UpdateEventEnvelope.swift +++ b/WireAPI/Sources/WireAPI/Models/UpdateEvent/UpdateEventEnvelope.swift @@ -37,6 +37,8 @@ public struct UpdateEventEnvelope: Equatable, Sendable { public let isTransient: Bool + public let deliveryTag: UInt64? + /// Create a new `UpdateEventEnvelope`. /// /// - Parameters: @@ -47,11 +49,13 @@ public struct UpdateEventEnvelope: Equatable, Sendable { public init( id: UUID, events: [UpdateEvent], - isTransient: Bool + isTransient: Bool, + deliveryTag: UInt64? = nil ) { self.id = id self.events = events self.isTransient = isTransient + self.deliveryTag = deliveryTag } } diff --git a/WireAPI/Sources/WireAPI/Network/NetworkService/NetworkService.swift b/WireAPI/Sources/WireAPI/Network/NetworkService/NetworkService.swift index e7b497f67d0..2c53e2d9c6f 100644 --- a/WireAPI/Sources/WireAPI/Network/NetworkService/NetworkService.swift +++ b/WireAPI/Sources/WireAPI/Network/NetworkService/NetworkService.swift @@ -17,6 +17,7 @@ // import Foundation +import WireLogging // sourcery: AutoMockable public protocol NetworkServiceProtocol { @@ -102,7 +103,7 @@ extension NetworkService: URLSessionWebSocketDelegate { webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String? ) { - print("web socket task did open") + WireLogger.network.debug("web socket task did open") } public func urlSession( @@ -128,9 +129,9 @@ extension NetworkService: URLSessionTaskDelegate { ) { // NOTE: This method is not called when when using async/await APIs. if let error { - print("task did complete with error: \(error)") + WireLogger.network.error("task did complete with error: \(error)") } else { - print("task did complete") + WireLogger.network.debug("task did complete") } } diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/AcknowledgmentType.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/AcknowledgmentType.swift new file mode 100644 index 00000000000..b651578f3c2 --- /dev/null +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/AcknowledgmentType.swift @@ -0,0 +1,23 @@ +// +// Wire +// Copyright (C) 2025 Wire Swiss GmbH +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see http://www.gnu.org/licenses/. +// +import Foundation + +enum AcknowledgmentType: String, Encodable { + case fullSync = "ack_full_sync" + case ack +} diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/EventAcknowledgmentNotification.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/EventAcknowledgmentNotification.swift new file mode 100644 index 00000000000..4c9f22f24c9 --- /dev/null +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/EventAcknowledgmentNotification.swift @@ -0,0 +1,38 @@ +// +// Wire +// Copyright (C) 2025 Wire Swiss GmbH +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see http://www.gnu.org/licenses/. +// +import Foundation + +struct EventAcknowledgmentNotification: Encodable { + + struct AcknowledgmentData: Encodable { + enum CodingKeys: String, CodingKey { + case deliveryTag = "delivery_tag" + case multiple + } + + var deliveryTag: UInt64 + var multiple: Bool + } + + let type: AcknowledgmentType = .ack + var data: AcknowledgmentData + + init(deliveryTag: UInt64, multiple: Bool) { + self.data = .init(deliveryTag: deliveryTag, multiple: multiple) + } +} diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/FullSyncAcknowledgmentNotification.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/FullSyncAcknowledgmentNotification.swift new file mode 100644 index 00000000000..a1751ef88b1 --- /dev/null +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/FullSyncAcknowledgmentNotification.swift @@ -0,0 +1,22 @@ +// +// Wire +// Copyright (C) 2025 Wire Swiss GmbH +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see http://www.gnu.org/licenses/. +// +import Foundation + +struct FullSyncAcknowledgmentNotification: Encodable { + let type: AcknowledgmentType = .fullSync +} diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/NewPushChannel.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/NewPushChannel.swift new file mode 100644 index 00000000000..f20f7661be0 --- /dev/null +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/NewPushChannel.swift @@ -0,0 +1,247 @@ +// +// Wire +// Copyright (C) 2025 Wire Swiss GmbH +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see http://www.gnu.org/licenses/. +// + +import Foundation +import WireFoundation +import WireLogging + +public final class NewPushChannel: NewPushChannelProtocol { + + public enum Element: Equatable { + case upToDate + case event(UpdateEventEnvelope) + case missedEvents + } + + public typealias Stream = AsyncThrowingStream + + private let webSocket: any WebSocketProtocol + private let decoder = JSONDecoder() + + private let timeout: TimeInterval + private var timeoutTask: Task? + + private let channelState = ChannelState() + + private var keepAliveTask: Task? + private let keepAliveInterval: TimeInterval + + private var (stream, continuation) = AsyncThrowingStream.makeStream() + + /// Initial PushChannel with Async Stream capabitilites + /// - Parameters: + /// - webSocket: webSocket to use + /// - keepAliveInterval: interval for sending ping and keep webSocket open + /// - timeout: interval after we consider we're up to date with events + public init(webSocket: any WebSocketProtocol, + keepAliveInterval: TimeInterval = 5, + timeout: TimeInterval = 1) { + self.webSocket = webSocket + self.keepAliveInterval = keepAliveInterval + self.timeout = timeout + } + + deinit { + timeoutTask?.cancel() + } + + public func open() async throws -> AsyncThrowingStream { + WireLogger.pushChannel.debug("opening new push channel", attributes: .pushChannelV3) + + let sourceStream = try await webSocket.open() + await channelState.websocketOpened() + setupTimeoutTask() + + Task { [weak self] in + guard let self else { return } + do { + for try await message in sourceStream { + + let result = try await receiveMessage(message) + continuation.yield(result) + + await channelState.stopProcessing() + setupTimeoutTask() + } + } catch { + WireLogger.pushChannel.error("got error: \(error)", attributes: .pushChannelV3) + continuation.finish(throwing: error) + await close() + return + } + continuation.finish() + } + + // The server will drop the connection (possibly silently) + // if the client doesn’t send a ping message every so often. + setUpKeepAliveTask() + + return stream + } + + public func close() async { + WireLogger.pushChannel.debug("closing push channel", attributes: .pushChannelV3) + + await webSocket.close() + tearDownTimeoutTask() + tearDownKeepAliveTask() + } + + func receiveMessage(_ message: URLSessionWebSocketTask.Message) async throws -> Element { + timeoutTask?.cancel() + await channelState.receivedMessage() + await channelState.startProcessing() + + switch message { + case let .data(data): + WireLogger.pushChannel.debug("received web socket data, decoding..., \(String(data: data, encoding: .utf8))", attributes: .pushChannelV3) + let envelope = try decoder.decode(WebSocketNotification.self, from: data) + if envelope.type == .event { + return Element.event(envelope.toAPIModel()) + } else { + return Element.missedEvents + } + case .string: + WireLogger.pushChannel.debug("received web socket string, ignoring...", attributes: .pushChannelV3) + throw PushChannelError.receivedInvalidMessage + + @unknown default: + WireLogger.pushChannel.debug("received web socket message, ignoring...", attributes: .pushChannelV3) + throw PushChannelError.receivedInvalidMessage + } + + + } + + // MARK: - Keep alive + + private func setUpKeepAliveTask() { + tearDownKeepAliveTask() + keepAliveTask = Task { [keepAliveInterval] in + do { + while true { + try await Task.sleep(for: .seconds(keepAliveInterval)) + WireLogger.pushChannel.debug("sending keep alive ping") + await webSocket.sendPing() + } + } catch { + WireLogger.pushChannel.warn("keep alive task was cancelled") + tearDownKeepAliveTask() + } + } + } + + private func tearDownKeepAliveTask() { + guard let keepAliveTask else { return } + WireLogger.pushChannel.debug("tearing down keep alive task") + keepAliveTask.cancel() + self.keepAliveTask = nil + } + + // MARK: - Timeout + + private func setupTimeoutTask() { + tearDownTimeoutTask() + timeoutTask = Task { [timeout] in + do { + while true { + try Task.checkCancellation() + try await Task.sleep(nanoseconds: 100_000_000) + if await channelState.wait(timeout: timeout) { + if await channelState.catchingUp { + WireLogger.pushChannel.debug("caught up") + await channelState.caughtUp() + continuation.yield(.upToDate) + break + } + } + } + } catch { + WireLogger.pushChannel.warn("timeoutTask was cancelled") + tearDownTimeoutTask() + } + } + } + + private func tearDownTimeoutTask() { + guard let timeoutTask else { return } + WireLogger.pushChannel.debug("tearing down timeoutTask") + timeoutTask.cancel() + self.timeoutTask = nil + } + + // MARK: - Acknowledgement + + public func ackEvent(deliveryTag: UInt64, multiple: Bool = false) async throws { + WireLogger.pushChannel.debug("ackEvent \(deliveryTag)", attributes: .pushChannelV3) + let acknowledgement = EventAcknowledgmentNotification( + deliveryTag: deliveryTag, + multiple: multiple + ) + let data = try JSONEncoder().encode(acknowledgement) + try await write(data: data) + } + + public func ackFullSync() async throws { + WireLogger.pushChannel.debug("ackFullSync", attributes: .pushChannelV3) + let acknowledgement = FullSyncAcknowledgmentNotification() + let data = try JSONEncoder().encode(acknowledgement) + try await write(data: data) + } + + // MARK: - Helpers + + private func write(data: Data) async throws { + WireLogger.pushChannel.debug("write data to push channel", attributes: .pushChannelV3) + try await webSocket.write(data: data) + } +} + +private actor ChannelState { + private var lastMessageUpdate = Date() + var isProcessing = false + var catchingUp = false + + func receivedMessage() { + lastMessageUpdate = Date() + } + + func websocketOpened() { + catchingUp = true + } + + func startProcessing() { + isProcessing = true + } + + func stopProcessing() { + isProcessing = false + } + + func timeSinceLastMessage() -> TimeInterval { + Date().timeIntervalSince(lastMessageUpdate) + } + + func caughtUp() { + catchingUp = false + } + + func wait(timeout: TimeInterval) -> Bool { + return timeSinceLastMessage() > timeout && !isProcessing + } +} diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/NewPushChannelProtocol.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/NewPushChannelProtocol.swift new file mode 100644 index 00000000000..6d2f58368f3 --- /dev/null +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/NewPushChannelProtocol.swift @@ -0,0 +1,37 @@ +// +// Wire +// Copyright (C) 2025 Wire Swiss GmbH +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see http://www.gnu.org/licenses/. +// +import Foundation + +// sourcery: AutoMockable +/// Make a direct connection to a server to receive update events and acknowledge them. +public protocol NewPushChannelProtocol: Sendable { + + /// Open the push channel and start receiving update events. + /// + /// - Returns: An async stream of live update event envelopes. + + func open() async throws -> NewPushChannel.Stream + + /// Close the push channel and stop receiving update events. + + func close() async + + func ackFullSync() async throws + + func ackEvent(deliveryTag: UInt64, multiple: Bool) async throws +} diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannel.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannel.swift index 50c428b838c..56ff20880ce 100644 --- a/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannel.swift +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannel.swift @@ -107,4 +107,7 @@ public actor PushChannel: PushChannelProtocol { self.keepAliveTask = nil } + public func write(data: Data) async throws { + // do nothing + } } diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannelError.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannelError.swift index 30128fea60f..3a80c18e8cc 100644 --- a/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannelError.swift +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannelError.swift @@ -24,4 +24,8 @@ public enum PushChannelError: Error { case receivedInvalidMessage + /// The client was offline for a very long time and it has missed notifications + + case missingEvents + } diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannelService.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannelService.swift index a52a16c34da..08a1d6bdba8 100644 --- a/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannelService.swift +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannelService.swift @@ -21,13 +21,18 @@ import Foundation /// A service for creating push channel connections to a specific backend. public protocol PushChannelServiceProtocol { - /// Create a new push channel. + /// Create a new push channel (v1). /// /// - Parameter request: A request for a web socket connection. /// - Returns: A push channel. func createPushChannel(_ request: URLRequest) async throws -> any PushChannelProtocol + /// Create a new push channel (v2). + /// + /// - Parameter request: A request for a web socket connection. + /// - Returns: A push channel. + func createNewPushChannel(_ request: URLRequest) async throws -> any NewPushChannelProtocol } /// A service for creating push channel connections to a specific backend. @@ -55,5 +60,13 @@ public final class PushChannelService: PushChannelServiceProtocol { keepAliveInterval: 30 ) } + + public func createNewPushChannel(_ request: URLRequest) async throws -> any NewPushChannelProtocol { + var request = request + let accessToken = try await authenticationManager.getValidAccessToken() + request.setAccessToken(accessToken) + let webSocket = try networkService.executeWebSocketRequest(request) + return NewPushChannel(webSocket: webSocket) + } } diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/URLSessionWebSocketTaskProtocol.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/URLSessionWebSocketTaskProtocol.swift index e91e7c05b67..846e828ba07 100644 --- a/WireAPI/Sources/WireAPI/Network/PushChannel/URLSessionWebSocketTaskProtocol.swift +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/URLSessionWebSocketTaskProtocol.swift @@ -22,7 +22,12 @@ import Foundation public protocol URLSessionWebSocketTaskProtocol: Sendable { var isOpen: Bool { get } + var networkInformation: String { get } + + var closeCode: URLSessionWebSocketTask.CloseCode { get } + var closeReason: Data? { get } + func resume() func cancel( @@ -34,6 +39,8 @@ public protocol URLSessionWebSocketTaskProtocol: Sendable { func receive() async throws -> URLSessionWebSocketTask.Message + func send(_ message: URLSessionWebSocketTask.Message) async throws + typealias AnyError = any Error func sendPing(pongReceiveHandler: @escaping @Sendable (AnyError?) -> Void) @@ -44,5 +51,9 @@ extension URLSessionWebSocketTask: URLSessionWebSocketTaskProtocol { public var isOpen: Bool { closeCode == .invalid } - + + public var networkInformation: String { + "request: \(String(describing: currentRequest)), body: \(currentRequest?.httpBodyStream), response: \(String(describing: response)), payload: \((response as? HTTPURLResponse)?.description))" + } + } diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocket.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocket.swift index 5926e6f8357..7b1edf59e45 100644 --- a/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocket.swift +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocket.swift @@ -25,12 +25,13 @@ public actor WebSocket: WebSocketProtocol { private let connection: any URLSessionWebSocketTaskProtocol private var continuation: Stream.Continuation? - + public init(connection: any URLSessionWebSocketTaskProtocol) { self.connection = connection } public func open() async throws -> Stream { + WireLogger.webSocket.debug("open") connection.resume() if #available(iOS 17, *) { @@ -43,8 +44,10 @@ public actor WebSocket: WebSocketProtocol { while isAlive, connection.isOpen { do { let message = try await connection.receive() + WireLogger.webSocket.debug("received message") continuation.yield(message) } catch { + // WireLogger.webSocket.error("error throwing \(String(describing: error)) - closeCode: \(connection.closeCode) - closeReason: \(String(describing: connection.closeReason)), debug: \(connection.networkInformation)") continuation.finish(throwing: error) isAlive = false } @@ -73,6 +76,7 @@ public actor WebSocket: WebSocketProtocol { yieldNextMessage() case let .failure(error): + WireLogger.webSocket.error(String(String(describing: error))) continuation.finish(throwing: error) } } @@ -87,7 +91,6 @@ public actor WebSocket: WebSocketProtocol { connection.cancel(with: .goingAway, reason: nil) continuation?.finish() continuation = nil - } public func sendPing() async { @@ -98,4 +101,20 @@ public actor WebSocket: WebSocketProtocol { } } + public func write(data: Data) async throws { + WireLogger.webSocket.debug("write data") +// if !connection.isOpen { + connection.resume() +// } + try await connection.send(.data(data)) + WireLogger.webSocket.debug("wrote data") + } + + public func write(string: String) async throws { +// if !connection.isOpen { + connection.resume() +// } + try await connection.send(.string(string)) + } + } diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocketNotification.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocketNotification.swift new file mode 100644 index 00000000000..73f235f201f --- /dev/null +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocketNotification.swift @@ -0,0 +1,59 @@ +// +// Wire +// Copyright (C) 2025 Wire Swiss GmbH +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see http://www.gnu.org/licenses/. +// +import Foundation + +struct WebSocketNotification: Decodable { + + enum NotificationType: String, Decodable { + case event + case notificationsMissed = "notifications.missed" + } + + struct NotificationData: Decodable { + enum CodingKeys: String, CodingKey { + case deliveryTag = "delivery_tag" + case event + } + + var deliveryTag: UInt64 + var event: UpdateEventEnvelopeV8 + } + + var type: NotificationType + var data: NotificationData? +} + +extension WebSocketNotification: ToAPIModelConvertible { + + func toAPIModel() -> UpdateEventEnvelope { + guard let event = data?.event else { + assertionFailure("don't call toAPIModel() when type is `notificationsMissed`") + return UpdateEventEnvelope( + id: UUID(), + events: [], + isTransient: false + ) + } + return UpdateEventEnvelope( + id: event.id, + events: event.payload.map(\.updateEvent), + isTransient: false, + deliveryTag: data?.deliveryTag + ) + } +} diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocketProtocol.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocketProtocol.swift index e59cd68a55c..15e12577268 100644 --- a/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocketProtocol.swift +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocketProtocol.swift @@ -25,6 +25,10 @@ public protocol WebSocketProtocol: Sendable { func close() async + func write(data: Data) async throws + + func write(string: String) async throws + func sendPing() async } diff --git a/WireAPI/Tests/WireAPITests/Network/PushChannel/NewPushChannelTests.swift b/WireAPI/Tests/WireAPITests/Network/PushChannel/NewPushChannelTests.swift new file mode 100644 index 00000000000..6106ea465a1 --- /dev/null +++ b/WireAPI/Tests/WireAPITests/Network/PushChannel/NewPushChannelTests.swift @@ -0,0 +1,262 @@ +// +// Wire +// Copyright (C) 2025 Wire Swiss GmbH +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see http://www.gnu.org/licenses/. +// + +import XCTest + +@testable import WireAPI +@testable import WireAPISupport + +final class NewPushChannelTests: XCTestCase { + + var sut: NewPushChannel! + var request: URLRequest! + var webSocket: MockWebSocketProtocol! + + override func setUp() async throws { + try await super.setUp() + let url = try XCTUnwrap(URL(string: "www.example.com")) + request = URLRequest(url: url) + webSocket = MockWebSocketProtocol() + webSocket.close_MockMethod = {} + webSocket.sendPing_MockMethod = {} + sut = NewPushChannel( + webSocket: webSocket, + keepAliveInterval: 0.5, + timeout: 0.1 + ) + } + + override func tearDown() async throws { + request = nil + webSocket = nil + sut = nil + try await super.tearDown() + } + + func testOpenPushChannel() async throws { + // Given some envelopes that will be delivered through the push channel + let mockEnvelope1 = try MockJSONPayloadResource(name: "LiveUpdateEventEnvelope1") + let mockEnvelope2 = try MockJSONPayloadResource(name: "LiveUpdateEventEnvelope2") + let mockEnvelope3 = try MockJSONPayloadResource(name: "LiveUpdateEventEnvelope3") + + webSocket.open_MockValue = AsyncThrowingStream { continuation in + continuation.yield(.data(mockEnvelope1.jsonData)) + continuation.yield(.data(mockEnvelope2.jsonData)) + continuation.yield(.data(mockEnvelope3.jsonData)) + continuation.finish() + } + + // When the push channel is open and the stream is iterated + let liveEventEnvelopes = try await sut.open() + + var receivedEnvelopes = [NewPushChannel.Element]() + for try await envelope in liveEventEnvelopes { + receivedEnvelopes.append(envelope) + } + + // Then envelopes are received + try XCTAssertCount(receivedEnvelopes, count: 3) + XCTAssertEqual(receivedEnvelopes[0], .event(Scaffolding.envelope1)) + XCTAssertEqual(receivedEnvelopes[1], .event(Scaffolding.envelope2)) + XCTAssertEqual(receivedEnvelopes[2], .event(Scaffolding.envelope3)) + } + + func testClosingPushChannel() async throws { + // Given an open push channel + webSocket.open_MockValue = AsyncThrowingStream { _ in } + _ = try await sut.open() + + // When the push channel is closed + await sut.close() + + // Then the web socket was closed + XCTAssertEqual(webSocket.close_Invocations.count, 1) + } + + func testFailureToDecodeClosesPushChannel() async throws { + // Given an open push channel that is being iterated + webSocket.open_MockValue = AsyncThrowingStream { continuation in + // Send some invalid data + continuation.yield(.data(Data())) + // Don't call finish, so the stream stays open. + } + + let liveEventEnvelopes = try await sut.open() + + do { + for try await _ in liveEventEnvelopes { + // no op + } + } catch is DecodingError { + // Then a decoding error was thrown + } catch { + XCTFail("unexpected error: \(error)") + } + + // Then the web socket was closed + XCTAssertEqual(webSocket.close_Invocations.count, 1) + } + + func testReceivingUnknownMessageClosesPushChannel() async throws { + // Given an open push channel that is being iterated + webSocket.open_MockValue = AsyncThrowingStream { continuation in + // Send some invalid data. + continuation.yield(.string("some string")) + // Don't call finish, so the stream stays open. + } + + let liveEventEnvelopes = try await sut.open() + + do { + for try await _ in liveEventEnvelopes { + // no op + } + } catch PushChannelError.receivedInvalidMessage { + // Then an error is thrown + } catch { + XCTFail("unexpected error: \(error)") + } + + // Then the web socket was closed + XCTAssertEqual(webSocket.close_Invocations.count, 1) + } + + func testSendingKeepAlivePings() async throws { + // Mock. + webSocket.open_MockValue = AsyncThrowingStream { _ in } + + // Given an open push channel. + _ = try await sut.open() + + // When we wait for 1 second. + try await Task.sleep(for: .seconds(1.5)) + + // Then keep alive pings are sent periodically (the timer + // is not exact so we will we generous in our assertion of + // at least 2 in 1.5 seconds). + XCTAssertGreaterThanOrEqual(webSocket.sendPing_Invocations.count, 2) + } + + func testTimeoutTriggerIfNoEvents() async throws { + // Mock. + webSocket.open_MockValue = AsyncThrowingStream { _ in } + + // Given an open push channel. + _ = try await sut.open() + + // When we wait for 1 second. + try await Task.sleep(for: .seconds(1.5)) + + // Then keep alive pings are sent periodically (the timer + // is not exact so we will we generous in our assertion of + // at least 2 in 1.5 seconds). + XCTAssertGreaterThanOrEqual(webSocket.sendPing_Invocations.count, 2) + } +} + +private enum Scaffolding { + + static let envelope1 = UpdateEventEnvelope( + id: UUID(uuidString: "66c7731b-9985-4b5e-90d7-b8f8ce1cadb9")!, + events: [ + .conversation(.proteusMessageAdd(proteusMessageAddEvent)), + .conversation(.protocolUpdate(protocolUpdateEvent)) + ], + isTransient: false + ) + + static let envelope2 = UpdateEventEnvelope( + id: UUID(uuidString: "7b406b6e-df92-4844-b20b-2e673ca2d027")!, + events: [ + .conversation(.receiptModeUpdate(receiptModeUpdateEvent)), + .conversation(.rename(renameEvent)) + ], + isTransient: false + ) + + static let envelope3 = UpdateEventEnvelope( + id: UUID(uuidString: "eb660720-079c-43f3-9a80-1168638c928f")!, + events: [ + .conversation(.typing(typingEvent)), + .conversation(.delete(deleteEvent)) + ], + isTransient: false + ) + + static func fractionalDate(from string: String) -> Date { + ISO8601DateFormatter.fractionalInternetDateTime.date(from: string)! + } + + static func date(from string: String) -> Date { + ISO8601DateFormatter.internetDateTime.date(from: string)! + } + + static let conversationID = ConversationID( + uuid: UUID(uuidString: "a644fa88-2d83-406b-8a85-d4fd8dedad6b")!, + domain: "example.com" + ) + + static let senderID = UserID( + uuid: UUID(uuidString: "f55fe9b0-a0cc-4b11-944b-125c834d9b6a")!, + domain: "example.com" + ) + + static let timestamp = fractionalDate(from: "2024-06-04T15:03:07.598Z") + + static let proteusMessageAddEvent = ConversationProteusMessageAddEvent( + conversationID: conversationID, + senderID: senderID, + timestamp: timestamp, + message: .init(encryptedMessage: "foo"), + externalData: .init(encryptedMessage: "bar"), + messageSenderClientID: "abc123", + messageRecipientClientID: "def456" + ) + + static let protocolUpdateEvent = ConversationProtocolUpdateEvent( + conversationID: conversationID, + senderID: senderID, + newProtocol: .mls + ) + + static let receiptModeUpdateEvent = ConversationReceiptModeUpdateEvent( + conversationID: conversationID, + senderID: senderID, + newReceiptMode: 1 + ) + + static let renameEvent = ConversationRenameEvent( + conversationID: conversationID, + senderID: senderID, + timestamp: timestamp, + newName: "foo" + ) + + static let typingEvent = ConversationTypingEvent( + conversationID: conversationID, + senderID: senderID, + isTyping: true + ) + + static let deleteEvent = ConversationDeleteEvent( + conversationID: conversationID, + senderID: senderID, + timestamp: timestamp + ) + +} diff --git a/WireAPI/Tests/WireAPITests/Network/PushChannel/WebSocketTests.swift b/WireAPI/Tests/WireAPITests/Network/PushChannel/WebSocketTests.swift index 4252417ba91..e3771e800e0 100644 --- a/WireAPI/Tests/WireAPITests/Network/PushChannel/WebSocketTests.swift +++ b/WireAPI/Tests/WireAPITests/Network/PushChannel/WebSocketTests.swift @@ -205,5 +205,46 @@ final class WebSocketTests: XCTestCase { // Then all messages were received in order XCTAssertEqual(receivedMessages, messages) } + + func testWebSocketSendsDataSuccessfully()async throws { + // GIVEN + connection.send_MockMethod = { _ in } + + let sut = WebSocket(connection: connection) + let testData = try XCTUnwrap("test".data(using: .utf8)) + + // WHEN + try await sut.write(data: testData) + + // THEN + try XCTAssertCount(connection.send_Invocations, count: 1) + let result = try XCTUnwrap(connection.send_Invocations.first) + if case URLSessionWebSocketTask.Message.data(testData) = result { + // successful + } else { + XCTFail("unexpected message written: \(result)") + } + } + + + func testWebSocketSendsStringSuccessfully()async throws { + // GIVEN + connection.send_MockMethod = { _ in } + + let sut = WebSocket(connection: connection) + let testString = "test" + + // WHEN + try await sut.write(string: testString) + + // THEN + try XCTAssertCount(connection.send_Invocations, count: 1) + let result = try XCTUnwrap(connection.send_Invocations.first) + if case URLSessionWebSocketTask.Message.string(testString) = result { + // successful + } else { + XCTFail("unexpected message written: \(result)") + } + } } diff --git a/WireDomain/Sources/WireDomain/Components/ClientSessionComponent+IncrementalSyncProvider.swift b/WireDomain/Sources/WireDomain/Components/ClientSessionComponent+IncrementalSyncProvider.swift index 462749ad458..c2afc6cff14 100644 --- a/WireDomain/Sources/WireDomain/Components/ClientSessionComponent+IncrementalSyncProvider.swift +++ b/WireDomain/Sources/WireDomain/Components/ClientSessionComponent+IncrementalSyncProvider.swift @@ -23,5 +23,10 @@ extension ClientSessionComponent: IncrementalSyncProvider { public func provideIncrementalSync() throws -> any IncrementalSyncProtocol { incrementalSync } + + public func provideLiveSync(delegate: any LiveSyncDelegate) throws -> any LiveSyncProtocol { + newIncrementalSync.delegate = delegate + return newIncrementalSync + } } diff --git a/WireDomain/Sources/WireDomain/Components/ClientSessionComponent.swift b/WireDomain/Sources/WireDomain/Components/ClientSessionComponent.swift index ee064d8e0cf..24ba6def41b 100644 --- a/WireDomain/Sources/WireDomain/Components/ClientSessionComponent.swift +++ b/WireDomain/Sources/WireDomain/Components/ClientSessionComponent.swift @@ -60,6 +60,7 @@ public final class ClientSessionComponent { private let mlsDecryptionService: any MLSDecryptionServiceInterface private let proteusService: any ProteusServiceInterface + public let asyncStreamEnabled: Bool private let processorHandlers: ProcessorHandlers private let onAuthenticationFailure: @Sendable () -> Void @@ -79,6 +80,7 @@ public final class ClientSessionComponent { mlsService: any MLSServiceInterface, mlsDecryptionService: any MLSDecryptionServiceInterface, proteusService: any ProteusServiceInterface, + asyncStreamEnabled: Bool, processorHandlers: ProcessorHandlers, onAuthenticationFailure: @escaping @Sendable () -> Void ) { @@ -97,6 +99,7 @@ public final class ClientSessionComponent { self.localDomain = localDomain self.isFederationEnabled = isFederationEnabled self.isMLSEnabled = isMLSEnabled + self.asyncStreamEnabled = asyncStreamEnabled self.processorHandlers = processorHandlers self.onAuthenticationFailure = onAuthenticationFailure } @@ -133,8 +136,12 @@ public final class ClientSessionComponent { private lazy var pushChannelAPI = PushChannelAPIBuilder( pushChannelService: pushChannelService - ).makeAPI() + ).makeAPI(for: apiVersion) + private lazy var newPushChannelAPI = NewPushChannelAPIBuilder( + pushChannelService: pushChannelService + ).makeAPI(for: apiVersion) + private lazy var selfUserAPI = SelfUserAPIBuilder( apiService: apiService ).makeAPI(for: apiVersion) @@ -359,6 +366,16 @@ public final class ClientSessionComponent { syncStateSubject: syncStateSubject ) + public lazy var newIncrementalSync = NewIncrementalSync( + selfClientID: selfClientID, + pushChannelAPI: newPushChannelAPI, + decryptor: updateEventDecryptor, + store: updateEventsLocalStore, + processor: updateEventProcessor, + databaseSaver: databaseSaver, + syncStateSubject: syncStateSubject + ) + // MARK: - Repositories private lazy var conversationLabelsRepository = ConversationLabelsRepository( diff --git a/WireDomain/Sources/WireDomain/Components/UserSessionComponent.swift b/WireDomain/Sources/WireDomain/Components/UserSessionComponent.swift index 9686e702d8d..2ab6c2abf97 100644 --- a/WireDomain/Sources/WireDomain/Components/UserSessionComponent.swift +++ b/WireDomain/Sources/WireDomain/Components/UserSessionComponent.swift @@ -123,6 +123,7 @@ public final class UserSessionComponent { public func clientSessionComponent( clientID: String, + asyncStreamEnabled: Bool, processorHandlers: ClientSessionComponent.ProcessorHandlers, onAuthenticationFailure: @escaping @Sendable () -> Void ) -> ClientSessionComponent { @@ -142,6 +143,7 @@ public final class UserSessionComponent { mlsService: mlsService, mlsDecryptionService: mlsDecryptionService, proteusService: proteusService, + asyncStreamEnabled: asyncStreamEnabled, processorHandlers: processorHandlers, onAuthenticationFailure: onAuthenticationFailure ) diff --git a/WireDomain/Sources/WireDomain/Event Decryption/UpdateEventDecryptor.swift b/WireDomain/Sources/WireDomain/Event Decryption/UpdateEventDecryptor.swift index 3f5ecd8d9ce..b985a1df600 100644 --- a/WireDomain/Sources/WireDomain/Event Decryption/UpdateEventDecryptor.swift +++ b/WireDomain/Sources/WireDomain/Event Decryption/UpdateEventDecryptor.swift @@ -68,7 +68,7 @@ struct UpdateEventDecryptor: UpdateEventDecryptorProtocol { func decryptEvents(in eventEnvelope: UpdateEventEnvelope) async throws -> [UpdateEvent] { guard !DeveloperFlag.skipMLSMessagesDecryption.isOn else { return [] } - let logAttributes: LogAttributes = [ + var logAttributes: LogAttributes = [ .eventId: eventEnvelope.id.safeForLoggingDescription, .public: true ] @@ -77,6 +77,7 @@ struct UpdateEventDecryptor: UpdateEventDecryptorProtocol { var shouldCommitPendingProposals = false for event in eventEnvelope.events { + logAttributes[.messageType] = event.name switch event { case let .conversation(.proteusMessageAdd(eventData)): WireLogger.updateEvent.info( diff --git a/WireDomain/Sources/WireDomain/Providers/IncrementalSyncProvider.swift b/WireDomain/Sources/WireDomain/Providers/IncrementalSyncProvider.swift index 11eaa383fe0..c589e78e04f 100644 --- a/WireDomain/Sources/WireDomain/Providers/IncrementalSyncProvider.swift +++ b/WireDomain/Sources/WireDomain/Providers/IncrementalSyncProvider.swift @@ -23,4 +23,5 @@ public protocol IncrementalSyncProvider { func provideIncrementalSync() throws -> any IncrementalSyncProtocol + func provideLiveSync(delegate: any LiveSyncDelegate) throws -> any LiveSyncProtocol } diff --git a/WireDomain/Sources/WireDomain/Synchronization/IncrementalSync.swift b/WireDomain/Sources/WireDomain/Synchronization/IncrementalSync.swift index bda5a14703d..ccfee0f612b 100644 --- a/WireDomain/Sources/WireDomain/Synchronization/IncrementalSync.swift +++ b/WireDomain/Sources/WireDomain/Synchronization/IncrementalSync.swift @@ -22,6 +22,7 @@ import WireAPI import WireLogging public struct IncrementalSync: IncrementalSyncProtocol { + private let selfClientID: String private let pushChannelAPI: any PushChannelAPI @@ -54,6 +55,10 @@ public struct IncrementalSync: IncrementalSyncProtocol { } public func perform() async throws -> Token { + try await perform(acknowledgeFullSync: false) + } + + public func perform(acknowledgeFullSync: Bool) async throws -> Token { logger.debug("performing incremental sync") syncStateSubject.send(.incrementalSyncing(.createPushChannel)) let pushChannel = try await pushChannelAPI.createPushChannel(clientID: selfClientID) @@ -70,7 +75,7 @@ public struct IncrementalSync: IncrementalSyncProtocol { syncStateSubject.send(.incrementalSyncing(.processPendingEvents)) let processedEnvelopeIDs = try await processStoredEvents() - let task = Task { @Sendable [logger, decryptor, store, processor, databaseSaver, syncStateSubject] in + let task: Task = Task { @Sendable [logger, decryptor, store, processor, databaseSaver, syncStateSubject] in logger.debug("handling live event stream") syncStateSubject.send(.liveSyncing) @@ -236,11 +241,11 @@ public struct IncrementalSync: IncrementalSyncProtocol { public struct Token { - let task: Task + let task: Task let closePushChannel: () async -> Void public init( - task: Task, + task: Task, closePushChannel: @escaping () async -> Void ) { self.task = task diff --git a/WireDomain/Sources/WireDomain/Synchronization/NewIncrementalSync.swift b/WireDomain/Sources/WireDomain/Synchronization/NewIncrementalSync.swift new file mode 100644 index 00000000000..8e4858f81b2 --- /dev/null +++ b/WireDomain/Sources/WireDomain/Synchronization/NewIncrementalSync.swift @@ -0,0 +1,204 @@ +// +// Wire +// Copyright (C) 2025 Wire Swiss GmbH +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see http://www.gnu.org/licenses/. +// + +import Foundation +import WireAPI +import WireLogging +import Combine + +public protocol LiveSyncDelegate { + func didFinishSync(sync: NewIncrementalSync) + func didMissedEvents(sync: NewIncrementalSync) + func didFail(sync: NewIncrementalSync, error: any Error) +} + +/// IncrementalSync using new backend API async stream notifications +public struct NewIncrementalSync: LiveSyncProtocol { + + private let selfClientID: String + private let pushChannelAPI: any NewPushChannelAPI + private let decryptor: any UpdateEventDecryptorProtocol + private let store: any UpdateEventsLocalStoreProtocol + private let processor: any UpdateEventProcessorProtocol + private let databaseSaver: any DatabaseSaverProtocol + private let syncStateSubject: CurrentValueSubject + private let logger = WireLogger.sync + var delegate: (any LiveSyncDelegate)? + + public init( + selfClientID: String, + pushChannelAPI: any NewPushChannelAPI, + decryptor: any UpdateEventDecryptorProtocol, + store: any UpdateEventsLocalStoreProtocol, + processor: any UpdateEventProcessorProtocol, + databaseSaver: any DatabaseSaverProtocol, + syncStateSubject: CurrentValueSubject + ) { + self.selfClientID = selfClientID + self.pushChannelAPI = pushChannelAPI + self.decryptor = decryptor + self.store = store + self.processor = processor + self.databaseSaver = databaseSaver + self.syncStateSubject = syncStateSubject + } + + public func perform(acknowledgeFullSync: Bool) async throws -> IncrementalSync.Token { + logger.debug("performing live sync v3") + let pushChannel = try await pushChannelAPI.createPushChannel(clientID: selfClientID) + + if acknowledgeFullSync { + try await pushChannel.ackFullSync() + } + + logger.debug("opening new push channel v3") + syncStateSubject.send(.incrementalSyncing(.openPushChannel)) + let liveEventStream = try await pushChannel.open() + + let task: Task = Task { @Sendable [logger, decryptor, store, processor, databaseSaver, pushChannel, delegate] in + logger.debug("handling live event stream v3") + syncStateSubject.send(.liveSyncing) + + do { + for try await var element in liveEventStream { + logger.debug("received live event envelope v3") + switch element { + case let .event(enveloppe): + var envelope = enveloppe + do { + // Decrypt. + logger.debug( + "decrypting live event envelope v3", + attributes: [.eventEnvelopeID: envelope.id] + ) + envelope.events = try await decryptor.decryptEvents(in: envelope) + } catch { + logger.error( + "failed to decrypt live event envelope v3: \(String(describing: error))", + attributes: [.eventEnvelopeID: envelope.id] + ) + continue + } + + let index: Int64 + do { + // Store. + logger.debug( + "storing live event envelope v3", + attributes: [.eventEnvelopeID: envelope.id] + ) + index = try await store.indexOfLastEventEnvelope() + 1 + try await store.persistEventEnvelope(envelope, index: index) + } catch { + logger.error( + "failed to store live event envelope v3: \(String(describing: error))", + attributes: [.eventEnvelopeID: envelope.id] + ) + continue + } + + // Bump the last event id so we don't refetch it. + // there's no events marked as transcient anymore + store.storeLastEventID(id: envelope.id) + + // ACK + do { + if let deliveryTag = envelope.deliveryTag { + logger.debug( + "ack event envelope v3", + attributes: [.eventEnvelopeID: envelope.id] + ) + try await pushChannel.ackEvent(deliveryTag: deliveryTag, multiple: false) + } + } catch { + logger.error( + "failed to ack live event envelope v3: \(String(describing: error))", + attributes: [.eventEnvelopeID: envelope.id] + ) + + } + + // Process. + for event in envelope.events { + do { + logger.debug( + "processing live event: \(event.name)", + attributes: [.eventEnvelopeID: envelope.id] + ) + try await processor.processEvent(event) + } catch { + logger.error( + "failed to process live event: \(String(describing: error))", + attributes: [.eventEnvelopeID: envelope.id] + ) + } + } + // finish + do { + // Delete. + logger.debug( + "deleting live event envelope", + attributes: [.eventEnvelopeID: envelope.id] + ) + try await store.deleteEventEnvelope(atIndex: index) + } catch { + logger.error( + "failed to delete live event envelope v3: \(String(describing: error))", + attributes: [.eventEnvelopeID: envelope.id] + ) + } + + await store.calculateLastUnreadMessages() + + do { + // Save. + try await databaseSaver.save() + } catch { + logger.error("failed to save database v3: \(String(describing: error))") + } + + case .upToDate: + logger.debug("upToDate event v3") + syncStateSubject.send(.idle) + delegate?.didFinishSync(sync: self) + + case .missedEvents: + logger.debug("missedEvents event v3") + delegate?.didMissedEvents(sync: self) + } + + } + } catch PushChannelError.missingEvents { + delegate?.didMissedEvents(sync: self) + } catch { + // if we end up here, the pushChannel is closed + logger.warn("v3 live event stream encountered error: \(String(describing: error))") + syncStateSubject.send(.idle) + delegate?.didFail(sync: self, error: error) + return + } + + logger.debug("live event stream did finish v3") + syncStateSubject.send(.idle) + } + + return IncrementalSync.Token(task: task, closePushChannel: { + await pushChannel.close() + }) + } +} diff --git a/WireDomain/Sources/WireDomain/Synchronization/Protocols/IncrementalSyncProtocol.swift b/WireDomain/Sources/WireDomain/Synchronization/Protocols/IncrementalSyncProtocol.swift index 6d55dae217f..644b43ebb75 100644 --- a/WireDomain/Sources/WireDomain/Synchronization/Protocols/IncrementalSyncProtocol.swift +++ b/WireDomain/Sources/WireDomain/Synchronization/Protocols/IncrementalSyncProtocol.swift @@ -24,9 +24,14 @@ import Foundation public protocol IncrementalSyncProtocol { /// Perform the incremental sync. - /// + /// - Parameter acknowledgeFullSync: In case of New incremental sync, this will inform backend a full sync has been done /// - Returns: A token to retain to keep the push channel open. - func perform() async throws -> IncrementalSync.Token + func perform(acknowledgeFullSync: Bool) async throws -> IncrementalSync.Token } + +public protocol LiveSyncProtocol { + + func perform(acknowledgeFullSync: Bool) async throws -> IncrementalSync.Token +} diff --git a/WireDomain/Sources/WireDomain/Utilities/Journal/JournalKey.swift b/WireDomain/Sources/WireDomain/Utilities/Journal/JournalKey.swift index 8372e9447da..587e69c2a95 100644 --- a/WireDomain/Sources/WireDomain/Utilities/Journal/JournalKey.swift +++ b/WireDomain/Sources/WireDomain/Utilities/Journal/JournalKey.swift @@ -53,4 +53,9 @@ public extension JournalKey where Value == Bool { defaultValue: false ) + static let skipPullingLastNotificationID = Self( + "skipPullingLastNotificationID", + defaultValue: false + ) + } diff --git a/WireDomain/Sources/WireDomainSupport/Sourcery/generated/AutoMockable.generated.swift b/WireDomain/Sources/WireDomainSupport/Sourcery/generated/AutoMockable.generated.swift index 5109c8a1882..8682d0be09a 100644 --- a/WireDomain/Sources/WireDomainSupport/Sourcery/generated/AutoMockable.generated.swift +++ b/WireDomain/Sources/WireDomainSupport/Sourcery/generated/AutoMockable.generated.swift @@ -2046,24 +2046,24 @@ public class MockIncrementalSyncProtocol: IncrementalSyncProtocol { // MARK: - perform - public var perform_Invocations: [Void] = [] - public var perform_MockError: Error? - public var perform_MockMethod: (() async throws -> IncrementalSync.Token)? - public var perform_MockValue: IncrementalSync.Token? + public var performAcknowledgeFullSync_Invocations: [Bool] = [] + public var performAcknowledgeFullSync_MockError: Error? + public var performAcknowledgeFullSync_MockMethod: ((Bool) async throws -> IncrementalSync.Token)? + public var performAcknowledgeFullSync_MockValue: IncrementalSync.Token? - public func perform() async throws -> IncrementalSync.Token { - perform_Invocations.append(()) + public func perform(acknowledgeFullSync: Bool) async throws -> IncrementalSync.Token { + performAcknowledgeFullSync_Invocations.append(acknowledgeFullSync) - if let error = perform_MockError { + if let error = performAcknowledgeFullSync_MockError { throw error } - if let mock = perform_MockMethod { - return try await mock() - } else if let mock = perform_MockValue { + if let mock = performAcknowledgeFullSync_MockMethod { + return try await mock(acknowledgeFullSync) + } else if let mock = performAcknowledgeFullSync_MockValue { return mock } else { - fatalError("no mock for `perform`") + fatalError("no mock for `performAcknowledgeFullSync`") } } diff --git a/WireDomain/Tests/WireDomainTests/Synchronization/IncrementalSyncTests.swift b/WireDomain/Tests/WireDomainTests/Synchronization/IncrementalSyncTests.swift index 6da19d2dccb..4deb1eabc6e 100644 --- a/WireDomain/Tests/WireDomainTests/Synchronization/IncrementalSyncTests.swift +++ b/WireDomain/Tests/WireDomainTests/Synchronization/IncrementalSyncTests.swift @@ -77,7 +77,7 @@ final class IncrementalSyncTests: XCTestCase { Scaffolding.event3 ] - // Pendeng events are stored in batches. + // Pending events are stored in batches. store.fetchStoredEventEnvelopesLimit_MockMethod = { _ in let envelopes = storedEnvelopes storedEnvelopes = [] @@ -126,7 +126,7 @@ final class IncrementalSyncTests: XCTestCase { // When let token = try await sut.perform() - await token.task.value + try await token.task.value // Then push channel was created. XCTAssertEqual( diff --git a/WireDomain/Tests/WireDomainTests/Synchronization/NewIncrementalSyncTests.swift b/WireDomain/Tests/WireDomainTests/Synchronization/NewIncrementalSyncTests.swift new file mode 100644 index 00000000000..3933b961236 --- /dev/null +++ b/WireDomain/Tests/WireDomainTests/Synchronization/NewIncrementalSyncTests.swift @@ -0,0 +1,318 @@ +// +// Wire +// Copyright (C) 2025 Wire Swiss GmbH +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see http://www.gnu.org/licenses/. +// + +import XCTest +@testable import WireAPI +@testable import WireAPISupport +@testable import WireDomain +@testable import WireDomainSupport +import Combine + +final class NewIncrementalSyncTests: XCTestCase { + + var sut: NewIncrementalSync! + var pushChannelAPI: MockNewPushChannelAPI! + var decryptor: MockUpdateEventDecryptorProtocol! + var store: MockUpdateEventsLocalStoreProtocol! + var processor: MockUpdateEventProcessorProtocol! + var databaseSaver: MockDatabaseSaverProtocol! + var syncStateSubject: CurrentValueSubject! + + + override func setUp() { + pushChannelAPI = MockNewPushChannelAPI() + decryptor = MockUpdateEventDecryptorProtocol() + store = MockUpdateEventsLocalStoreProtocol() + processor = MockUpdateEventProcessorProtocol() + databaseSaver = MockDatabaseSaverProtocol() + syncStateSubject = .init(.idle) + + sut = NewIncrementalSync( + selfClientID: Scaffolding.selfClientID, + pushChannelAPI: pushChannelAPI, + decryptor: decryptor, + store: store, + processor: processor, + databaseSaver: databaseSaver, + syncStateSubject: syncStateSubject + ) + } + + override func tearDown() { + sut = nil + pushChannelAPI = nil + decryptor = nil + store = nil + processor = nil + databaseSaver = nil + syncStateSubject = nil + } + + func test_perform_pendingEventsExist() async throws { + // Mock + + + // Some live events, some of which were already pulled. + let pushChannel = MockNewPushChannelProtocol() + pushChannel.open_MockValue = AsyncThrowingStream { continuation in + Task { + continuation.yield(.event(Scaffolding.event2)) + continuation.finish(throwing: PushChannelError.missingEvents) + continuation.finish() + } + } + pushChannel.ackEventDeliveryTagMultiple_MockMethod = { (_, _) in } + pushChannelAPI.createPushChannelClientID_MockMethod = { _ in pushChannel } + + // Some indices at which live events will be stored. + var indices = [Int64(10)] + store.indexOfLastEventEnvelope_MockMethod = { indices.remove(at: 0) } + + // Live envelopes are peristed and deleted one by one. + store.persistEventEnvelopeIndex_MockMethod = { _, _ async throws in } + store.deleteEventEnvelopeAtIndex_MockMethod = { _ in } + + // Live events are decrypted. + decryptor.decryptEventsIn_MockMethod = { $0.events } + + // Last event is being updated. + store.storeLastEventIDId_MockMethod = { _ in } + + // Events are processed. + processor.processEvent_MockMethod = { _ in } + + // Unread messages are set + store.calculateLastUnreadMessages_MockMethod = {} + + // Database is saved. + databaseSaver.save_MockMethod = {} + + // When + let token = try await sut.perform(acknowledgeFullSync: false) + try await token.task.value + + // Then push channel was created. + XCTAssertEqual( + pushChannelAPI.createPushChannelClientID_Invocations, + [Scaffolding.selfClientID] + ) + + // Then push channel was opened. + XCTAssertEqual(pushChannel.open_Invocations.count, 1) + + + // Then live events were decrypted (duplicates skipped). + XCTAssertEqual( + decryptor.decryptEventsIn_Invocations, + [Scaffolding.event2] + ) + + // Then live events were stored. + XCTAssertEqual(store.indexOfLastEventEnvelope_Invocations.count, 1) + + // Then live events were stored. + let storeInvocations = store.persistEventEnvelopeIndex_Invocations + try XCTAssertCount(storeInvocations, count: 1) + XCTAssertEqual(storeInvocations[0].eventEnvelope, Scaffolding.event2) + XCTAssertEqual(storeInvocations[0].index, 11) + + // Then ack of events done adter storing + XCTAssertEqual(pushChannel.ackEventDeliveryTagMultiple_Invocations.count, 1) + + + // Then all events were processed once. + XCTAssertEqual( + processor.processEvent_Invocations, + [ + Scaffolding.event2, + ].flatMap(\.events) + ) + + // Then live events were deleted. + XCTAssertEqual(store.deleteEventEnvelopeAtIndex_Invocations, [11]) + + // Then unread messages are calculated once after processing pending events + // and once after processing each live event. + XCTAssertEqual(store.calculateLastUnreadMessages_Invocations.count, 1) + + // Then the database was saved once after processing pending events + // and once after processing each live event. + XCTAssertEqual(databaseSaver.save_Invocations.count, 1) + } + + + func test_perform_AcknowledgementFullSync() async throws { + // Mock + + + // Some live events, some of which were already pulled. + let pushChannel = MockNewPushChannelProtocol() + pushChannel.open_MockValue = AsyncThrowingStream { continuation in + Task { + continuation.yield(.event(Scaffolding.event2)) + continuation.yield(.event(Scaffolding.event3)) + continuation.yield(.missedEvents) + continuation.finish() + } + } + pushChannel.ackEventDeliveryTagMultiple_MockMethod = { (_, _) in } + pushChannelAPI.createPushChannelClientID_MockMethod = { _ in pushChannel } + + // Some indices at which live events will be stored. + var indices = [Int64(10), 11, 12, 13, 14, 15] + store.indexOfLastEventEnvelope_MockMethod = { indices.remove(at: 0) } + + // Live envelopes are peristed and deleted one by one. + store.persistEventEnvelopeIndex_MockMethod = { _, _ async throws in } + store.deleteEventEnvelopeAtIndex_MockMethod = { _ in } + + // Live events are decrypted. + decryptor.decryptEventsIn_MockMethod = { $0.events } + + // Last event is being updated. + store.storeLastEventIDId_MockMethod = { _ in } + + // Events are processed. + processor.processEvent_MockMethod = { _ in } + + // Unread messages are set + store.calculateLastUnreadMessages_MockMethod = {} + + // Database is saved. + databaseSaver.save_MockMethod = {} + + // When + let token = try await sut.perform(acknowledgeFullSync: false) + try await token.task.value + + // Then push channel was created. + XCTAssertEqual( + pushChannelAPI.createPushChannelClientID_Invocations, + [Scaffolding.selfClientID] + ) + + // Then push channel was opened. + XCTAssertEqual(pushChannel.open_Invocations.count, 1) + + + // Then live events were decrypted (duplicates skipped). + XCTAssertEqual( + decryptor.decryptEventsIn_Invocations, + [Scaffolding.event2, Scaffolding.event3, Scaffolding.event4, Scaffolding.event5] + ) + + // Then live events were stored. + XCTAssertEqual(store.indexOfLastEventEnvelope_Invocations.count, 4) + + // Then live events were stored. + let storeInvocations = store.persistEventEnvelopeIndex_Invocations + try XCTAssertCount(storeInvocations, count: 4) + XCTAssertEqual(storeInvocations[2].eventEnvelope, Scaffolding.event4) + XCTAssertEqual(storeInvocations[2].index, 13) + XCTAssertEqual(storeInvocations[3].eventEnvelope, Scaffolding.event5) + XCTAssertEqual(storeInvocations[3].index, 14) + + // Then ack of events done adter storing + XCTAssertEqual(pushChannel.ackEventDeliveryTagMultiple_Invocations.count, 4) + + + // Then all events were processed once. + XCTAssertEqual( + processor.processEvent_Invocations, + [ + Scaffolding.event2, + Scaffolding.event3, + Scaffolding.event4, + Scaffolding.event5 + ].flatMap(\.events) + ) + + // Then live events were deleted. + XCTAssertEqual(store.deleteEventEnvelopeAtIndex_Invocations, [11, 12, 13, 14]) + + // Then unread messages are calculated once after processing pending events + // and once after processing each live event. + XCTAssertEqual(store.calculateLastUnreadMessages_Invocations.count, 4) + + // Then the database was saved once after processing pending events + // and once after processing each live event. + XCTAssertEqual(databaseSaver.save_Invocations.count, 4) + } + +} + +private enum Scaffolding { + + static let selfClientID = "selfClientID" + + static let event2 = createEvent( + message: "ciao", + timeIntervalSinceNow: -9, + deliveryTag: 2 + ) + + static let event3 = createEvent( + message: "hola", + timeIntervalSinceNow: -8, + deliveryTag: 3 + ) + + static let event4 = createEvent( + message: "hallo", + timeIntervalSinceNow: -7, + deliveryTag: 4 + ) + + static let event5 = createEvent( + message: "bonjour", + timeIntervalSinceNow: -6, + deliveryTag: 5 + ) + + static func createEvent( + message: String, + timeIntervalSinceNow: TimeInterval, + deliveryTag: UInt64? = nil + ) -> UpdateEventEnvelope { + let event = ConversationProteusMessageAddEvent( + conversationID: ConversationID( + uuid: UUID(), + domain: "example.com" + ), + senderID: UserID( + uuid: UUID(), + domain: "example.com" + ), + timestamp: Date(timeIntervalSinceNow: timeIntervalSinceNow), + message: MessageContent( + encryptedMessage: message, + decryptedMessage: nil + ), + externalData: nil, + messageSenderClientID: "senderClientID", + messageRecipientClientID: selfClientID + ) + return UpdateEventEnvelope( + id: UUID(), + events: [.conversation(.proteusMessageAdd(event))], + isTransient: false, + deliveryTag: deliveryTag + ) + } + +} diff --git a/WireLogging/Sources/WireLogging/LogAttributes.swift b/WireLogging/Sources/WireLogging/LogAttributes.swift index 1eac0e23437..bf987b695a8 100644 --- a/WireLogging/Sources/WireLogging/LogAttributes.swift +++ b/WireLogging/Sources/WireLogging/LogAttributes.swift @@ -40,6 +40,7 @@ public enum LogAttributesKey: String, Comparable, Sendable { case nse = "NSE" case accountID = "account_id" case mlsGroupID = "mls_group_id" + case pushChannelVersion = "push_channel" public static func < (lhs: LogAttributesKey, rhs: LogAttributesKey) -> Bool { lhs.rawValue < rhs.rawValue @@ -50,4 +51,6 @@ public extension LogAttributes { static let safePublic = [LogAttributesKey.public: true] static let newNSE = [LogAttributesKey.nse: "new"] static let legacyNSE = [LogAttributesKey.nse: "legacy"] + static let pushChannelV3 = [LogAttributesKey.pushChannelVersion: "v3"] + static let pushChannelV1 = [LogAttributesKey.pushChannelVersion: "v1"] } diff --git a/WireLogging/Sources/WireLogging/SystemLogger.swift b/WireLogging/Sources/WireLogging/SystemLogger.swift index e548f273c1a..f564efe36ae 100644 --- a/WireLogging/Sources/WireLogging/SystemLogger.swift +++ b/WireLogging/Sources/WireLogging/SystemLogger.swift @@ -90,12 +90,12 @@ public class SystemLogger: LoggerProtocol { } var finalMessage = "\(message.logDescription)\(attributesDescription(from: mergedAttributes))" - - if !tags.isEmpty { - let extraInfo = tags.map { key, value in "[\(key.rawValue):\(value)]" }.joined() - finalMessage += extraInfo - } - + #if !DEBUG + if !tags.isEmpty { + let extraInfo = tags.map { key, value in "[\(key.rawValue):\(value)]" }.joined() + finalMessage += extraInfo + } + #endif if mergedAttributes[.public] as? Bool == true { os_log(osLogType, log: logger, "%{public}@", finalMessage) } else { diff --git a/WireLogging/Sources/WireLogging/WireLogger+Instances.swift b/WireLogging/Sources/WireLogging/WireLogger+Instances.swift index e20dfe4f05d..9e8ebacda9e 100644 --- a/WireLogging/Sources/WireLogging/WireLogger+Instances.swift +++ b/WireLogging/Sources/WireLogging/WireLogger+Instances.swift @@ -46,6 +46,7 @@ public extension WireLogger { static let performance = WireLogger(tag: "performance") static let push = WireLogger(tag: "push") static let pushChannel = WireLogger(tag: "push-channel") + static let webSocket = WireLogger(tag: "websocket") static let proteus = WireLogger(tag: "proteus") static let session = WireLogger(tag: "session") static let sessionManager = WireLogger(tag: "SessionManager") diff --git a/wire-ios-data-model/Source/Core Crypto/CoreCryptoProvider.swift b/wire-ios-data-model/Source/Core Crypto/CoreCryptoProvider.swift index cd77fd63d50..6df8efdf45b 100644 --- a/wire-ios-data-model/Source/Core Crypto/CoreCryptoProvider.swift +++ b/wire-ios-data-model/Source/Core Crypto/CoreCryptoProvider.swift @@ -137,7 +137,7 @@ public actor CoreCryptoProvider: CoreCryptoProviderProtocol { do { try await registerEpochObserverIfNecessary(with: coreCrypto()) } catch { - WireLogger.mls.warn("Failed to register epoch observer, will try again later") + WireLogger.mls.warn("Failed to register epoch observer, will try again later: \(error)") } } diff --git a/wire-ios-data-model/Source/Proteus/ProteusService.swift b/wire-ios-data-model/Source/Proteus/ProteusService.swift index 3136ea4b7e5..61de2bbdb84 100644 --- a/wire-ios-data-model/Source/Proteus/ProteusService.swift +++ b/wire-ios-data-model/Source/Proteus/ProteusService.swift @@ -188,7 +188,7 @@ public final class ProteusService: ProteusServiceInterface { // MARK: - proteusDecrypt - public enum DecryptionError: Error { + public enum DecryptionError: Error, LocalizedError { case failedToDecryptData(ProteusError) case failedToEstablishSessionFromMessage(ProteusError) @@ -203,6 +203,15 @@ public final class ProteusService: ProteusServiceInterface { } } + public var errorDescription: String? { + switch self { + case let .failedToDecryptData(proteusError): + "failedToDecryptData: \(String(describing: proteusError))" + + case let .failedToEstablishSessionFromMessage(proteusError): + "failedToEstablishSessionFromMessage: \(String(describing: proteusError))" + } + } } public func decrypt( diff --git a/wire-ios-share-engine/Sources/SharingSession.swift b/wire-ios-share-engine/Sources/SharingSession.swift index 44883dfae9e..8bd45014aac 100644 --- a/wire-ios-share-engine/Sources/SharingSession.swift +++ b/wire-ios-share-engine/Sources/SharingSession.swift @@ -552,8 +552,10 @@ public final class SharingSession { onProcessedTypingUsers: { _ in } ) + let selfClient = ZMUser.selfUser(in: coreDataStack.viewContext).selfClient() let clientUserSessionComponent = userSessionComponent.clientSessionComponent( clientID: selfClientID, + asyncStreamEnabled: selfClient?.asyncStreamCapable == true, processorHandlers: processHandlers, onAuthenticationFailure: {} ) diff --git a/wire-ios-sync-engine/Source/Synchronization/IncrementalSyncObserver.swift b/wire-ios-sync-engine/Source/Synchronization/IncrementalSyncObserver.swift index 6f3892be234..ec6879701be 100644 --- a/wire-ios-sync-engine/Source/Synchronization/IncrementalSyncObserver.swift +++ b/wire-ios-sync-engine/Source/Synchronization/IncrementalSyncObserver.swift @@ -56,10 +56,13 @@ final class IncrementalSyncObserver: IncrementalSyncObserverProtocol { .sink { [weak self] syncState in switch syncState { case .incrementalSyncing(.pullPendingEvents): + WireLogger.sync.debug("syncObserver: inProgress") self?.decryptionState = .inProgress case .incrementalSyncing(.processPendingEvents), .liveSyncing: + WireLogger.sync.debug("syncObserver: done") self?.decryptionState = .done default: + WireLogger.sync.debug("syncObserver: notStarted") self?.decryptionState = .notStarted } } diff --git a/wire-ios-sync-engine/Source/Synchronization/SyncAgent.swift b/wire-ios-sync-engine/Source/Synchronization/SyncAgent.swift index 2035f26a5f8..9b3bbe71dc0 100644 --- a/wire-ios-sync-engine/Source/Synchronization/SyncAgent.swift +++ b/wire-ios-sync-engine/Source/Synchronization/SyncAgent.swift @@ -151,25 +151,15 @@ final class SyncAgent: NSObject, SyncAgentProtocol { func performInitialSync() async throws { if isSyncV2Enabled { - do { - delegate?.syncAgentDidStartInitialSync(self) - WireLogger.sync.debug("did start new initial sync") - try await initialSyncProvider.provideInitialSync().perform(skipPullingLastUpdateEventID: false) - WireLogger.sync.debug("did finish new initial sync") - journal[.isInitialSyncRequired] = false - delegate?.syncAgentDidFinishInitialSync(self) - } catch { - WireLogger.sync.error("failed to perform new initial sync: \(String(describing: error))") - throw error - } - + + try await performInitialSyncV2() try await performIncrementalSync() } else { // Incremental sync automatically follows the slow sync. legacySyncStatus.forceSlowSync() } } - + /// Perform a resource sync. func performResourceSync() async throws { @@ -194,7 +184,9 @@ final class SyncAgent: NSObject, SyncAgentProtocol { /// Perform an incremental sync. - func performIncrementalSync() async throws { + func performIncrementalSync(shouldAcknowledgeFullSync: Bool = false) async throws { + let liveSync = journal[.skipPullingLastNotificationID] + if isSyncV2Enabled { guard incrementalSyncToken == nil else { WireLogger.sync.info("incremental sync already running...") @@ -205,8 +197,13 @@ final class SyncAgent: NSObject, SyncAgentProtocol { try await incrementalSyncTaskManager.performIfNeeded { [weak self] in guard let self else { return } delegate?.syncAgentDidStartIncrementalSync(self) - incrementalSyncToken = try await incrementalSyncProvider.provideIncrementalSync().perform() - delegate?.syncAgentDidFinishIncrementalSync(self, isRecovering: false) + + if liveSync { + incrementalSyncToken = try await incrementalSyncProvider.provideLiveSync(delegate: self).perform(acknowledgeFullSync: shouldAcknowledgeFullSync) + } else { + incrementalSyncToken = try await incrementalSyncProvider.provideIncrementalSync().perform(acknowledgeFullSync: shouldAcknowledgeFullSync) + delegate?.syncAgentDidFinishIncrementalSync(self, isRecovering: false) + } } } catch { WireLogger.sync.error("failed to perform new incremental sync: \(String(describing: error))") @@ -216,7 +213,52 @@ final class SyncAgent: NSObject, SyncAgentProtocol { await legacySyncStatus.performQuickSync() } } + + private func performInitialSyncV2() async throws { + do { + delegate?.syncAgentDidStartInitialSync(self) + WireLogger.sync.debug("did start new initial sync") + try await initialSyncProvider.provideInitialSync().perform(skipPullingLastUpdateEventID: journal[.skipPullingLastNotificationID]) + WireLogger.sync.debug("did finish new initial sync") + journal[.isInitialSyncRequired] = false + delegate?.syncAgentDidFinishInitialSync(self) + } catch { + WireLogger.sync.error("failed to perform new initial sync: \(String(describing: error))") + throw error + } + } +} +extension SyncAgent: LiveSyncDelegate { + + func didFinishSync(sync: NewIncrementalSync) { + delegate?.syncAgentDidFinishIncrementalSync(self, isRecovering: false) + } + + func didMissedEvents(sync: NewIncrementalSync) { + // as this will close this websocket and sync, + // we don't want the initialSync to be cancelled too + Task.detached { [self] in + await incrementalSyncToken?.suspend() + incrementalSyncToken = nil + WireLogger.sync.debug("slow sync requested by sync v3") + do { + try await performInitialSyncV2() + WireLogger.sync.debug("slow sync done, restarting live sync") + try await performIncrementalSync(shouldAcknowledgeFullSync: true) + } catch { + WireLogger.sync.error("error while requesing slow sync: \(error.localizedDescription)") + } + } + } + + func didFail(sync: NewIncrementalSync, error: any Error) { + delegate?.syncAgentDidFailSyncing( + self, + error: error + ) + } + } // MARK: - MLS sync delegate @@ -234,7 +276,7 @@ extension SyncAgent: MLSSyncDelegate { try await incrementalSyncTaskManager.performIfNeeded { [weak self] in guard let self else { return } delegate?.syncAgentDidStartIncrementalSync(self) - incrementalSyncToken = try await incrementalSyncProvider.provideIncrementalSync().perform() + incrementalSyncToken = try await incrementalSyncProvider.provideIncrementalSync().perform(acknowledgeFullSync: false) delegate?.syncAgentDidFinishIncrementalSync(self, isRecovering: true) } } catch { diff --git a/wire-ios-sync-engine/Source/Synchronization/ZMOperationLoop.m b/wire-ios-sync-engine/Source/Synchronization/ZMOperationLoop.m index 6466094fb8d..7f2993fce10 100644 --- a/wire-ios-sync-engine/Source/Synchronization/ZMOperationLoop.m +++ b/wire-ios-sync-engine/Source/Synchronization/ZMOperationLoop.m @@ -89,7 +89,11 @@ - (instancetype)initWithTransportSession:(id)transportSess // this is needed to avoid loading from syncMOC on the main queue [moc performGroupedBlock:^{ [self.transportSession configurePushChannelWithConsumer:self groupQueue:moc]; - [self.transportSession.pushChannel setKeepOpen:operationStatus.operationState == SyncEngineOperationStateForeground]; + if (isSyncV2Enabled) { + [self.transportSession.pushChannel setKeepOpen:false]; + } else { + [self.transportSession.pushChannel setKeepOpen:operationStatus.operationState == SyncEngineOperationStateForeground]; + } }]; } diff --git a/wire-ios-sync-engine/Source/UserSession/ZMUserSession/ZMUserSession.swift b/wire-ios-sync-engine/Source/UserSession/ZMUserSession/ZMUserSession.swift index 768b6c197ad..fc8fcaf596a 100644 --- a/wire-ios-sync-engine/Source/UserSession/ZMUserSession/ZMUserSession.swift +++ b/wire-ios-sync-engine/Source/UserSession/ZMUserSession/ZMUserSession.swift @@ -559,6 +559,7 @@ public final class ZMUserSession: NSObject { private func setUpSyncAgent(clientID: String, asyncStreamEnabled: Bool) { let clientSessionComponent = userSessionComponent.clientSessionComponent( clientID: clientID, + asyncStreamEnabled: asyncStreamEnabled, processorHandlers: .init( onProcessedCallEvent: onProcessedCallEvent(callEventInfo:), onSelfClientInvalidated: onSelfClientInvalidated, @@ -567,21 +568,18 @@ public final class ZMUserSession: NSObject { onAuthenticationFailure: onAuthenticationFailure ) - coreCryptoProvider.registerMlsTransport(clientSessionComponent.mlsTransport) - - let incrementalSyncProvider: IncrementalSyncProvider = if !asyncStreamEnabled { - clientSessionComponent - } else { - // TODO: [WPB-17225] replace syncProvider here - clientSessionComponent + if asyncStreamEnabled { + journal[.skipPullingLastNotificationID] = true } + coreCryptoProvider.registerMlsTransport(clientSessionComponent.mlsTransport) + let syncAgent = SyncAgent( journal: journal, lastUpdateEventIDRepository: lastEventIDRepository, coreCryptoProvider: coreCryptoProvider, initialSyncProvider: clientSessionComponent, - incrementalSyncProvider: incrementalSyncProvider, + incrementalSyncProvider: clientSessionComponent, legacySyncStatus: applicationStatusDirectory.syncStatus, syncStateSubject: clientSessionComponent.syncStateSubject ) diff --git a/wire-ios-transport/Source/PushChannel/StarscreamPushChannel.swift b/wire-ios-transport/Source/PushChannel/StarscreamPushChannel.swift index 6e734b55ae2..5aa15551891 100644 --- a/wire-ios-transport/Source/PushChannel/StarscreamPushChannel.swift +++ b/wire-ios-transport/Source/PushChannel/StarscreamPushChannel.swift @@ -35,14 +35,14 @@ final class StarscreamPushChannel: NSObject, PushChannelType { var clientID: String? { didSet { - WireLogger.pushChannel.debug("Setting client ID") + WireLogger.pushChannel.debug("Setting client ID", attributes: .pushChannelV1) scheduleOpen() } } var accessToken: AccessToken? { didSet { - WireLogger.pushChannel.debug("Setting access token") + WireLogger.pushChannel.debug("Setting access token", attributes: .pushChannelV1) } } @@ -116,7 +116,7 @@ final class StarscreamPushChannel: NSObject, PushChannelType { } func close() { - WireLogger.pushChannel.info("Push channel was closed") + WireLogger.pushChannel.info("Push channel was closed", attributes: .pushChannelV1) scheduler.performGroupedBlock { self.webSocket?.disconnect() @@ -130,7 +130,7 @@ final class StarscreamPushChannel: NSObject, PushChannelType { let accessToken, let websocketURL else { - WireLogger.pushChannel.warn("Can't connect websocket") + WireLogger.pushChannel.warn("Can't connect websocket", attributes: .pushChannelV1) return } @@ -168,7 +168,8 @@ final class StarscreamPushChannel: NSObject, PushChannelType { let attributes: LogAttributes = [ .selfClientId: clientID?.redactedAndTruncated(maxVisibleCharacters: 3, length: 8) - ] + + ].merging(.pushChannelV1, uniquingKeysWith: { _, new in new }) WireLogger.pushChannel.info( "Connecting websocket with URL: \(websocketURL.endpointRemoteLogDescription)", attributes: attributes, @@ -189,10 +190,10 @@ final class StarscreamPushChannel: NSObject, PushChannelType { private func scheduleOpenInternal() { guard canOpenConnection else { - WireLogger.pushChannel.debug("Conditions for scheduling opening not fulfilled, waiting...") + WireLogger.pushChannel.debug("Conditions for scheduling opening not fulfilled, waiting...", attributes: .pushChannelV1) return } - WireLogger.pushChannel.debug("Schedule opening..") + WireLogger.pushChannel.debug("Schedule opening..", attributes: .pushChannelV1) scheduler.add(ZMOpenPushChannelRequest()) } @@ -238,7 +239,7 @@ final class StarscreamPushChannel: NSObject, PushChannelType { extension StarscreamPushChannel: ZMTimerClient { func timerDidFire(_ timer: ZMTimer!) { - WireLogger.pushChannel.debug("Sending ping") + WireLogger.pushChannel.debug("Sending ping", attributes: .pushChannelV1) webSocket?.write(ping: Data()) schedulePingTimer() } @@ -250,15 +251,15 @@ extension StarscreamPushChannel: WebSocketDelegate { switch event { case .connected: - WireLogger.pushChannel.debug("Sending ping") + WireLogger.pushChannel.debug("Sending ping", attributes: .pushChannelV1) onOpen() case .disconnected: - WireLogger.pushChannel.debug("Websocket disconnected") + WireLogger.pushChannel.debug("Websocket disconnected", attributes: .pushChannelV1) onClose() case .text: break case let .binary(data): - WireLogger.pushChannel.debug("Received data") + WireLogger.pushChannel.debug("Received data", attributes: .pushChannelV1) consumerQueue?.performGroupedBlock { [weak self] in self?.consumer?.pushChannelDidReceive(data) }