Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Sources/Common/Network/HttpServicing.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public final class HttpService: HttpServicing {
public func send(_ request: URLRequest) async throws -> Data {
do {
let (data, response) = try await session.data(for: request)

guard let http = response as? HTTPURLResponse else {
throw NetworkError.invalidResponse
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ public struct ObservabilityClientFactory {
}

let userInteractionManager = UserInteractionManager(options: options) { interaction in
Task {
await eventQueue.send(interaction)
}

// TODO: move to LD buffering
if let span = interaction.span() {
tracer.startSpan(name: span.name, attributes: span.attributes)
Expand Down Expand Up @@ -161,7 +157,8 @@ public struct ObservabilityClientFactory {
options: options,
appLifecycleManager: appLifecycleManager,
sessionManager: sessionManager,
transportService: transportService
transportService: transportService,
userInteractionManager: userInteractionManager
)

transportService.start()
Expand Down
5 changes: 4 additions & 1 deletion Sources/Observability/Client/ObservabilityContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@ public class ObservabilityContext {
public let sessionManager: SessionManaging
public let transportService: TransportServicing
public let appLifecycleManager: AppLifecycleManaging
public let userInteractionManager: UserInteractionManager

public init(
sdkKey: String,
options: Options,
appLifecycleManager: AppLifecycleManaging,
sessionManager: SessionManaging,
transportService: TransportServicing) {
transportService: TransportServicing,
userInteractionManager: UserInteractionManager) {
self.sdkKey = sdkKey
self.options = options
self.appLifecycleManager = appLifecycleManager
self.sessionManager = sessionManager
self.transportService = transportService
self.userInteractionManager = userInteractionManager
}
}
4 changes: 4 additions & 0 deletions Sources/Observability/Logs/LogItem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import Foundation
import OpenTelemetrySdk

public struct LogItem: EventQueueItemPayload {
public var exporterClass: AnyClass {
OtlpLogExporter.self
}

public let log: ReadableLogRecord

public func cost() -> Int {
Expand Down
132 changes: 100 additions & 32 deletions Sources/Observability/Transport/BatchWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,128 @@ import Foundation
import Common
import OSLog

public final class BatchWorker {
public final actor BatchWorker {
enum Constants {
static let maxConcurrentCost: Int = 30000
static let maxConcurrentItems: Int = 100
static let maxConcurrentExporters: Int = 2
static let baseBackoffSeconds: TimeInterval = 2
static let maxBackoffSeconds: TimeInterval = 60
static let backoffJitter: Double = 0.2
}

private struct BackOffExporterInfo {
var until: DispatchTime
var attempts: Int
}

private let eventQueue: EventQueue
private let interval = TimeInterval(2)
private let minInterval = TimeInterval(1)
private let interval = TimeInterval(4)
private let minInterval = TimeInterval(1.5)
private var task: Task<Void, Never>?
private let multiExporter: MultiEventExporting
private var log: OSLog

private var exporters = [ObjectIdentifier: any EventExporting]()
private var costInFlight = 0
private var exportersInFlight = Set<ObjectIdentifier>()
private var exporterBackoff = [ObjectIdentifier: BackOffExporterInfo]()

public init(eventQueue: EventQueue,
log: OSLog,
multiExporter: MultiEventExporting = MultiEventExporter(exporters: [])) {
log: OSLog) {
self.eventQueue = eventQueue
self.multiExporter = multiExporter
self.log = log
}

public func addExporter(_ exporter: EventExporting) async {
await multiExporter.addExporter(exporter)
let exporterId = exporter.typeId
exporters[exporterId] = exporter
}

func start() {
public func start() {
guard task == nil else { return }

task = Task.detached(priority: .background) { [weak self] in
guard let self else { return }

while !Task.isCancelled {
let items = await eventQueue.dequeue(cost: 30000, limit: 20)

guard items.isNotEmpty else {
try? await Task.sleep(seconds: interval)
continue
guard let self else { return }
let scheduledCost = await sendQueueItems()
try? await Task.sleep(seconds: scheduledCost > 0 ? minInterval : interval)
}
}
}

func sendQueueItems() async -> Int {
var scheduledCost = 0

while true {
let remainingExporterSlots = Constants.maxConcurrentExporters - exportersInFlight.count
guard remainingExporterSlots > 0 else { break }

let budget = Constants.maxConcurrentCost - costInFlight
guard budget > 0 else { break }

let now = DispatchTime.now()
var except = exportersInFlight
for (id, info) in exporterBackoff where info.until > now {
except.insert(id)
}

guard let (exporterId, items, cost) = await eventQueue.earliest(cost: budget,
limit: Constants.maxConcurrentItems,
except: except) else {
break
}

guard let exporter = exporters[exporterId] else {
os_log("%{public}@", log: log, type: .error, "Dropping \(items.count) items: exporter not found for id \(exporterId)")
await eventQueue.removeFirst(id: exporterId, count: items.count)
continue
}

if tryReserve(exporterId: exporterId, cost: cost) {
Task.detached(priority: .background) { [weak self] in
do {
try await exporter.export(items: items)
await self?.finishExport(exporterId: exporterId, itemsCount: items.count, cost: cost, error: nil)
} catch {
await self?.finishExport(exporterId: exporterId, itemsCount: items.count, cost: cost, error: error)
}
}

let sendStart = DispatchTime.now()
await self.send(items: items)

let elapsed = Double(DispatchTime.now().uptimeNanoseconds - sendStart.uptimeNanoseconds) / Double(NSEC_PER_SEC)
let seconds = max(min(interval - elapsed, interval), minInterval)
try? await Task.sleep(seconds: seconds)
scheduledCost += cost
}
}

return scheduledCost
}

func stop() {
task?.cancel()
task = nil
private func tryReserve(exporterId: ObjectIdentifier, cost: Int) -> Bool {
guard exportersInFlight.contains(exporterId) == false else {
return false
}

exportersInFlight.insert(exporterId)
costInFlight += cost
return true
}

func send(items: [EventQueueItem]) async {
do {
try await multiExporter.export(items: items)
} catch {
os_log("%{public}@", log: log, type: .error, "BatchWorked has failed to send items: \(error)")
private func finishExport(exporterId: ObjectIdentifier, itemsCount: Int, cost: Int, error: Error?) async {
if let error {
os_log("%{public}@", log: log, type: .error, "Exporter \(exporterId) failed with error \(error)")
let attempts = (exporterBackoff[exporterId]?.attempts ?? 0) + 1
let backoff = min(Constants.baseBackoffSeconds * pow(2, Double(max(0, attempts - 1))), Constants.maxBackoffSeconds)
let jitter = backoff * Constants.backoffJitter
let jittered = max(0, backoff + Double.random(in: -jitter...jitter))
let until = DispatchTime.now() + .milliseconds(Int(jittered * 1000))
exporterBackoff[exporterId] = BackOffExporterInfo(until: until, attempts: attempts)
} else {
await eventQueue.removeFirst(id: exporterId, count: itemsCount)
exporterBackoff[exporterId] = nil
}

exportersInFlight.remove(exporterId)
costInFlight -= cost
}

public func stop() {
task?.cancel()
task = nil
}
}
11 changes: 9 additions & 2 deletions Sources/Observability/Transport/EventExporter.swift
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import Foundation
import Common

public protocol EventExporting {
public protocol EventExporting: Sendable {
func export(items: [EventQueueItem]) async throws
}

public final class NoOpExporter: EventExporting {
public func export(items: [EventQueueItem]) async throws {}
public func export(items: [EventQueueItem]) async throws { return }

public init() {}
}

extension EventExporting {
var typeId: ObjectIdentifier {
let type = type(of: Self.self)
return ObjectIdentifier(type)
}
}
72 changes: 52 additions & 20 deletions Sources/Observability/Transport/EventQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@ import Common
public struct EventQueueItem {
public var payload: EventQueueItemPayload
public var cost: Int
public var exporterTypeId: ObjectIdentifier

public init(payload: EventQueueItemPayload) {
let type = type(of: payload.exporterClass)
self.init(payload: payload, exporterTypeId: ObjectIdentifier(type))
}

public init(payload: EventQueueItemPayload, exporterTypeId: ObjectIdentifier) {
self.payload = payload
self.cost = payload.cost()
self.exporterTypeId = exporterTypeId
}

public var timestamp: TimeInterval {
Expand All @@ -21,7 +28,7 @@ public protocol EventQueuing: Actor {

// TODO: make it optimal
public actor EventQueue: EventQueuing {
private var storage = [EventQueueItem]()
private var storage = [ObjectIdentifier: [EventQueueItem]]()
private var lastEventTime: TimeInterval = 0
private let limitSize: Int
private var currentSize = 0
Expand All @@ -43,32 +50,57 @@ public actor EventQueue: EventQueuing {
return
}

storage.append(item)
storage[item.exporterTypeId, default: []].append(item)
lastEventTime = item.timestamp
currentSize += item.cost
}

func earliest(cost: Int, limit: Int, except: Set<ObjectIdentifier>) -> (id: ObjectIdentifier, items: [EventQueueItem], cost: Int)? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to use an object instead a tuple

var earlistEvent: (id: ObjectIdentifier, items: [EventQueueItem], firstTimestamp: TimeInterval)?
for (id, items) in storage where except.contains(id) == false {
guard let firstItem = items.first else {
continue
}
if let earlistEventUnwrapped = earlistEvent, firstItem.timestamp >= earlistEventUnwrapped.firstTimestamp {
continue
}

earlistEvent = (id, items, firstItem.timestamp)
}

guard let earlistEvent else { return nil }

guard let (items, cost) = first(cost: cost, limit: limit, items: earlistEvent.items) else {
return nil
}

return (id: earlistEvent.id, items: items, cost: cost)
}

private func first(cost: Int, limit: Int, items: [EventQueueItem]) -> (items: [EventQueueItem], cost: Int)? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same suggestion, so you can reuse the object here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is private method

var sumCost = 0
var resultItems = [EventQueueItem]()
for (i, item) in items.enumerated() {
resultItems.append(item)
sumCost += item.cost

if i > limit || sumCost > cost {
break
}
}

func dequeue(cost: Int, limit: Int) -> [EventQueueItem] {
guard !storage.isEmpty else {
return []
return (items: resultItems, sumCost)
}

func removeFirst(id: ObjectIdentifier, count: Int) {
guard let items = storage[id] else {
return
}

var result = [EventQueueItem]()
var sumCost = 0
for (i, item) in storage.enumerated() {
result.append(item)

sumCost += item.cost

if i >= limit || sumCost > cost {
currentSize -= item.cost
storage.removeFirst(i + 1)
return result
}
for i in 0..<count {
currentSize -= items[i].cost
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Index OOB from removeFirst when count exceeds items

The removeFirst method can cause an index out of bounds crash. If the count parameter is greater than the number of items in the array, the method accesses elements beyond the array's bounds.

Fix in Cursor Fix in Web

}

storage.removeAll()
currentSize = 0
return result
storage[id]?.removeFirst(count)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ import Foundation
public protocol EventQueueItemPayload {
func cost() -> Int
var timestamp: TimeInterval { get }
var exporterClass: AnyClass { get }
}
32 changes: 0 additions & 32 deletions Sources/Observability/Transport/MultiEventExporter.swift

This file was deleted.

Loading