Skip to content

Commit 4ff18c5

Browse files
fix: Make Session Replay stable for offline or intermittent network (#67)
- Make BatchWorker aware that any http request can fail, by creating failingItems which gonna be tried until network appears - Network Backoff <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Reworks transport to an actor-based BatchWorker that routes queue items by exporter with concurrency and exponential backoff, adds exporter typing to payloads/queue, exposes UserInteractionManager, and wires touch events into Session Replay. > > - **Transport**: > - **BatchWorker**: Convert to `actor`; add per-exporter concurrency limits, cost budgeting, earliest scheduling, and exponential backoff with jitter; track in-flight exporters/cost; async start/stop. > - **Queue/Items**: Store events by `exporterTypeId`; add `exporterClass` to `EventQueueItemPayload`; new `earliest(...)` and `removeFirst(...)` APIs; derive `exporterTypeId` in `EventQueueItem`. > - **Exporters**: Make `EventExporting` `Sendable`; add `typeId` helper; remove `MultiEventExporter`. > - **TransportService**: Call batch worker start/stop asynchronously. > - **Observability/UI**: > - Add `userInteractionManager` to `ObservabilityContext`; make `UserInteractionManager` public with multiple `TouchInteractionYield`s; update `TouchCaptureCoordinator` to use optional yield and new alias. > - Keep tracing on interactions via `UserInteractionManager` in factory. > - **Session Replay**: > - Register touch interaction yield to enqueue interactions; add `SessionReplayExporter` to `BatchWorker` and start transport. > - Update payloads (`TouchInteraction`, `ScreenImageItem`) to specify `exporterClass`; minor event generator flow tweaks. > - **Logs**: > - `LogItem` now specifies `exporterClass` for routing. > - **Network**: > - Minor `HttpService` response handling tidy-up. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit f509f7d. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 060338b commit 4ff18c5

17 files changed

+234
-118
lines changed

Sources/Common/Network/HttpServicing.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public final class HttpService: HttpServicing {
3333
public func send(_ request: URLRequest) async throws -> Data {
3434
do {
3535
let (data, response) = try await session.data(for: request)
36+
3637
guard let http = response as? HTTPURLResponse else {
3738
throw NetworkError.invalidResponse
3839
}

Sources/Observability/Client/ObservabilityClientFactory.swift

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,6 @@ public struct ObservabilityClientFactory {
104104
}
105105

106106
let userInteractionManager = UserInteractionManager(options: options) { interaction in
107-
Task {
108-
await eventQueue.send(interaction)
109-
}
110-
111107
// TODO: move to LD buffering
112108
if let span = interaction.span() {
113109
tracer.startSpan(name: span.name, attributes: span.attributes)
@@ -173,7 +169,8 @@ public struct ObservabilityClientFactory {
173169
options: options,
174170
appLifecycleManager: appLifecycleManager,
175171
sessionManager: sessionManager,
176-
transportService: transportService
172+
transportService: transportService,
173+
userInteractionManager: userInteractionManager
177174
)
178175

179176
transportService.start()

Sources/Observability/Client/ObservabilityContext.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,20 @@ public class ObservabilityContext {
99
public let sessionManager: SessionManaging
1010
public let transportService: TransportServicing
1111
public let appLifecycleManager: AppLifecycleManaging
12+
public let userInteractionManager: UserInteractionManager
1213

1314
public init(
1415
sdkKey: String,
1516
options: Options,
1617
appLifecycleManager: AppLifecycleManaging,
1718
sessionManager: SessionManaging,
18-
transportService: TransportServicing) {
19+
transportService: TransportServicing,
20+
userInteractionManager: UserInteractionManager) {
1921
self.sdkKey = sdkKey
2022
self.options = options
2123
self.appLifecycleManager = appLifecycleManager
2224
self.sessionManager = sessionManager
2325
self.transportService = transportService
26+
self.userInteractionManager = userInteractionManager
2427
}
2528
}

Sources/Observability/Logs/LogItem.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ import Foundation
22
import OpenTelemetrySdk
33

44
public struct LogItem: EventQueueItemPayload {
5+
public var exporterClass: AnyClass {
6+
OtlpLogExporter.self
7+
}
8+
59
public let log: ReadableLogRecord
610

711
public func cost() -> Int {

Sources/Observability/Transport/BatchWorker.swift

Lines changed: 103 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,60 +2,131 @@ import Foundation
22
import Common
33
import OSLog
44

5-
public final class BatchWorker {
5+
public final actor BatchWorker {
6+
enum Constants {
7+
static let maxConcurrentCost: Int = 30000
8+
static let maxConcurrentItems: Int = 100
9+
static let maxConcurrentExporters: Int = 2
10+
static let baseBackoffSeconds: TimeInterval = 2
11+
static let maxBackoffSeconds: TimeInterval = 60
12+
static let backoffJitter: Double = 0.2
13+
}
14+
15+
private struct BackOffExporterInfo {
16+
var until: DispatchTime
17+
var attempts: Int
18+
}
19+
620
private let eventQueue: EventQueue
7-
private let interval = TimeInterval(2)
8-
private let minInterval = TimeInterval(1)
21+
private let interval = TimeInterval(4)
22+
private let minInterval = TimeInterval(1.5)
923
private var task: Task<Void, Never>?
10-
private let multiExporter: MultiEventExporting
1124
private var log: OSLog
12-
25+
private var exporters = [ObjectIdentifier: any EventExporting]()
26+
private var costInFlight = 0
27+
private var exportersInFlight = Set<ObjectIdentifier>()
28+
private var exporterBackoff = [ObjectIdentifier: BackOffExporterInfo]()
29+
1330
public init(eventQueue: EventQueue,
14-
log: OSLog,
15-
multiExporter: MultiEventExporting = MultiEventExporter(exporters: [])) {
31+
log: OSLog) {
1632
self.eventQueue = eventQueue
17-
self.multiExporter = multiExporter
1833
self.log = log
1934
}
2035

2136
public func addExporter(_ exporter: EventExporting) async {
22-
await multiExporter.addExporter(exporter)
37+
let exporterId = exporter.typeId
38+
exporters[exporterId] = exporter
2339
}
2440

25-
func start() {
41+
public func start() {
2642
guard task == nil else { return }
2743

2844
task = Task.detached(priority: .background) { [weak self] in
29-
guard let self else { return }
30-
3145
while !Task.isCancelled {
32-
let items = await eventQueue.dequeue(cost: 30000, limit: 20)
33-
34-
guard items.isNotEmpty else {
35-
try? await Task.sleep(seconds: interval)
36-
continue
46+
guard let self else { return }
47+
let scheduledCost = await sendQueueItems()
48+
try? await Task.sleep(seconds: scheduledCost > 0 ? minInterval : interval)
49+
}
50+
}
51+
}
52+
53+
func sendQueueItems() async -> Int {
54+
var scheduledCost = 0
55+
56+
while true {
57+
let remainingExporterSlots = Constants.maxConcurrentExporters - exportersInFlight.count
58+
guard remainingExporterSlots > 0 else { break }
59+
60+
let budget = Constants.maxConcurrentCost - costInFlight
61+
guard budget > 0 else { break }
62+
63+
let now = DispatchTime.now()
64+
var except = exportersInFlight
65+
for (id, info) in exporterBackoff where info.until > now {
66+
except.insert(id)
67+
}
68+
69+
guard let earliest = await eventQueue.earliest(cost: budget,
70+
limit: Constants.maxConcurrentItems,
71+
except: except) else {
72+
break
73+
}
74+
let exporterId = earliest.id
75+
let items = earliest.items
76+
let cost = earliest.cost
77+
78+
guard let exporter = exporters[exporterId] else {
79+
os_log("%{public}@", log: log, type: .error, "Dropping \(items.count) items: exporter not found for id \(exporterId)")
80+
await eventQueue.removeFirst(id: exporterId, count: items.count)
81+
continue
82+
}
83+
84+
if tryReserve(exporterId: exporterId, cost: cost) {
85+
Task.detached(priority: .background) { [weak self] in
86+
do {
87+
try await exporter.export(items: items)
88+
await self?.finishExport(exporterId: exporterId, itemsCount: items.count, cost: cost, error: nil)
89+
} catch {
90+
await self?.finishExport(exporterId: exporterId, itemsCount: items.count, cost: cost, error: error)
91+
}
3792
}
38-
39-
let sendStart = DispatchTime.now()
40-
await self.send(items: items)
41-
42-
let elapsed = Double(DispatchTime.now().uptimeNanoseconds - sendStart.uptimeNanoseconds) / Double(NSEC_PER_SEC)
43-
let seconds = max(min(interval - elapsed, interval), minInterval)
44-
try? await Task.sleep(seconds: seconds)
93+
scheduledCost += cost
4594
}
4695
}
96+
97+
return scheduledCost
4798
}
4899

49-
func stop() {
50-
task?.cancel()
51-
task = nil
100+
private func tryReserve(exporterId: ObjectIdentifier, cost: Int) -> Bool {
101+
guard exportersInFlight.contains(exporterId) == false else {
102+
return false
103+
}
104+
105+
exportersInFlight.insert(exporterId)
106+
costInFlight += cost
107+
return true
52108
}
53109

54-
func send(items: [EventQueueItem]) async {
55-
do {
56-
try await multiExporter.export(items: items)
57-
} catch {
58-
os_log("%{public}@", log: log, type: .error, "BatchWorked has failed to send items: \(error)")
110+
private func finishExport(exporterId: ObjectIdentifier, itemsCount: Int, cost: Int, error: Error?) async {
111+
if let error {
112+
os_log("%{public}@", log: log, type: .error, "Exporter \(exporterId) failed with error \(error)")
113+
let attempts = (exporterBackoff[exporterId]?.attempts ?? 0) + 1
114+
let backoff = min(Constants.baseBackoffSeconds * pow(2, Double(max(0, attempts - 1))), Constants.maxBackoffSeconds)
115+
let jitter = backoff * Constants.backoffJitter
116+
let jittered = max(0, backoff + Double.random(in: -jitter...jitter))
117+
let until = DispatchTime.now() + .milliseconds(Int(jittered * 1000))
118+
exporterBackoff[exporterId] = BackOffExporterInfo(until: until, attempts: attempts)
119+
} else {
120+
await eventQueue.removeFirst(id: exporterId, count: itemsCount)
121+
exporterBackoff[exporterId] = nil
59122
}
123+
124+
exportersInFlight.remove(exporterId)
125+
costInFlight -= cost
126+
}
127+
128+
public func stop() {
129+
task?.cancel()
130+
task = nil
60131
}
61132
}
Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
import Foundation
22
import Common
33

4-
public protocol EventExporting {
4+
public protocol EventExporting: Sendable {
55
func export(items: [EventQueueItem]) async throws
66
}
77

88
public final class NoOpExporter: EventExporting {
9-
public func export(items: [EventQueueItem]) async throws {}
9+
public func export(items: [EventQueueItem]) async throws { return }
1010

1111
public init() {}
1212
}
13+
14+
extension EventExporting {
15+
var typeId: ObjectIdentifier {
16+
let type = type(of: Self.self)
17+
return ObjectIdentifier(type)
18+
}
19+
}

Sources/Observability/Transport/EventQueue.swift

Lines changed: 62 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,17 @@ import Common
44
public struct EventQueueItem {
55
public var payload: EventQueueItemPayload
66
public var cost: Int
7+
public var exporterTypeId: ObjectIdentifier
78

89
public init(payload: EventQueueItemPayload) {
10+
let type = type(of: payload.exporterClass)
11+
self.init(payload: payload, exporterTypeId: ObjectIdentifier(type))
12+
}
13+
14+
public init(payload: EventQueueItemPayload, exporterTypeId: ObjectIdentifier) {
915
self.payload = payload
1016
self.cost = payload.cost()
17+
self.exporterTypeId = exporterTypeId
1118
}
1219

1320
public var timestamp: TimeInterval {
@@ -21,7 +28,13 @@ public protocol EventQueuing: Actor {
2128

2229
// TODO: make it optimal
2330
public actor EventQueue: EventQueuing {
24-
private var storage = [EventQueueItem]()
31+
public struct EarliestItemsResult {
32+
let id: ObjectIdentifier
33+
let items: [EventQueueItem]
34+
let cost: Int
35+
}
36+
37+
private var storage = [ObjectIdentifier: [EventQueueItem]]()
2538
private var lastEventTime: TimeInterval = 0
2639
private let limitSize: Int
2740
private var currentSize = 0
@@ -43,32 +56,61 @@ public actor EventQueue: EventQueuing {
4356
return
4457
}
4558

46-
storage.append(item)
59+
storage[item.exporterTypeId, default: []].append(item)
4760
lastEventTime = item.timestamp
4861
currentSize += item.cost
4962
}
63+
64+
func earliest(cost: Int, limit: Int, except: Set<ObjectIdentifier>) -> EarliestItemsResult? {
65+
var earlistEvent: (id: ObjectIdentifier, items: [EventQueueItem], firstTimestamp: TimeInterval)?
66+
for (id, items) in storage where except.contains(id) == false {
67+
guard let firstItem = items.first else {
68+
continue
69+
}
70+
if let earlistEventUnwrapped = earlistEvent, firstItem.timestamp >= earlistEventUnwrapped.firstTimestamp {
71+
continue
72+
}
73+
74+
earlistEvent = (id, items, firstItem.timestamp)
75+
}
76+
77+
guard let earlistEvent else { return nil }
78+
79+
guard let (items, cost) = first(cost: cost, limit: limit, items: earlistEvent.items) else {
80+
return nil
81+
}
82+
83+
return EarliestItemsResult(id: earlistEvent.id, items: items, cost: cost)
84+
}
85+
86+
private func first(cost: Int, limit: Int, items: [EventQueueItem]) -> (items: [EventQueueItem], cost: Int)? {
87+
var sumCost = 0
88+
var resultItems = [EventQueueItem]()
89+
for (i, item) in items.enumerated() {
90+
resultItems.append(item)
91+
sumCost += item.cost
92+
93+
if i > limit || sumCost > cost {
94+
break
95+
}
96+
}
5097

51-
func dequeue(cost: Int, limit: Int) -> [EventQueueItem] {
52-
guard !storage.isEmpty else {
53-
return []
98+
return (items: resultItems, sumCost)
99+
}
100+
101+
func removeFirst(id: ObjectIdentifier, count: Int) {
102+
guard var items = storage[id], count > 0 else {
103+
return
54104
}
55105

56-
var result = [EventQueueItem]()
57-
var sumCost = 0
58-
for (i, item) in storage.enumerated() {
59-
result.append(item)
60-
61-
sumCost += item.cost
62-
63-
if i >= limit || sumCost > cost {
64-
currentSize -= item.cost
65-
storage.removeFirst(i + 1)
66-
return result
67-
}
106+
let removeCount = min(count, items.count)
107+
var removedCost = 0
108+
for i in 0..<removeCount {
109+
removedCost += items[i].cost
68110
}
111+
currentSize -= removedCost
69112

70-
storage.removeAll()
71-
currentSize = 0
72-
return result
113+
items.removeFirst(removeCount)
114+
storage[id] = items.isEmpty ? nil : items
73115
}
74116
}

Sources/Observability/Transport/EventQueueItemPayload.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ import Foundation
33
public protocol EventQueueItemPayload {
44
func cost() -> Int
55
var timestamp: TimeInterval { get }
6+
var exporterClass: AnyClass { get }
67
}

0 commit comments

Comments
 (0)