Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Sources/Common/Network/HttpServicing.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public final class HttpService: HttpServicing {
public func send(_ request: URLRequest) async throws -> Data {
do {
let (data, response) = try await session.data(for: request)

guard let http = response as? HTTPURLResponse else {
throw NetworkError.invalidResponse
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ public struct ObservabilityClientFactory {
}

let userInteractionManager = UserInteractionManager(options: options) { interaction in
Task {
await eventQueue.send(interaction)
}

// TODO: move to LD buffering
if let span = interaction.span() {
tracer.startSpan(name: span.name, attributes: span.attributes)
Expand Down Expand Up @@ -161,7 +157,8 @@ public struct ObservabilityClientFactory {
options: options,
appLifecycleManager: appLifecycleManager,
sessionManager: sessionManager,
transportService: transportService
transportService: transportService,
userInteractionManager: userInteractionManager
)

transportService.start()
Expand Down
5 changes: 4 additions & 1 deletion Sources/Observability/Client/ObservabilityContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@ public class ObservabilityContext {
public let sessionManager: SessionManaging
public let transportService: TransportServicing
public let appLifecycleManager: AppLifecycleManaging
public let userInteractionManager: UserInteractionManager

public init(
sdkKey: String,
options: Options,
appLifecycleManager: AppLifecycleManaging,
sessionManager: SessionManaging,
transportService: TransportServicing) {
transportService: TransportServicing,
userInteractionManager: UserInteractionManager) {
self.sdkKey = sdkKey
self.options = options
self.appLifecycleManager = appLifecycleManager
self.sessionManager = sessionManager
self.transportService = transportService
self.userInteractionManager = userInteractionManager
}
}
4 changes: 4 additions & 0 deletions Sources/Observability/Logs/LogItem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import Foundation
import OpenTelemetrySdk

public struct LogItem: EventQueueItemPayload {
public var exporterClass: AnyClass {
OtlpLogExporter.self
}

public let log: ReadableLogRecord

public func cost() -> Int {
Expand Down
58 changes: 42 additions & 16 deletions Sources/Observability/Transport/BatchWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public final class BatchWorker {
private var task: Task<Void, Never>?
private let multiExporter: MultiEventExporting
private var log: OSLog
private var failedItems = [ObjectIdentifier: [EventQueueItem]]()

public init(eventQueue: EventQueue,
log: OSLog,
Expand All @@ -29,15 +30,13 @@ public final class BatchWorker {
guard let self else { return }

while !Task.isCancelled {
let items = await eventQueue.dequeue(cost: 30000, limit: 20)

guard items.isNotEmpty else {
try? await Task.sleep(seconds: interval)
continue
}

let sendStart = DispatchTime.now()
await self.send(items: items)

if failedItems.isNotEmpty {
await sendFailedItems()
} else {
await sendQueueItems()
}

let elapsed = Double(DispatchTime.now().uptimeNanoseconds - sendStart.uptimeNanoseconds) / Double(NSEC_PER_SEC)
let seconds = max(min(interval - elapsed, interval), minInterval)
Expand All @@ -46,16 +45,43 @@ public final class BatchWorker {
}
}

func stop() {
task?.cancel()
task = nil
func sendFailedItems() async {
let result = await multiExporter.export(groupItems: failedItems)
switch result {
case .success:
failedItems.removeAll()
case .partialFailure(let results):
failedItems = results.groupItems
case .failure:
() // no-op
}
}

func send(items: [EventQueueItem]) async {
do {
try await multiExporter.export(items: items)
} catch {
os_log("%{public}@", log: log, type: .error, "BatchWorked has failed to send items: \(error)")
func sendQueueItems() async {
let items = await eventQueue.first(cost: 30000, limit: 20)

guard items.isNotEmpty else {
try? await Task.sleep(seconds: interval)
return
}

let groupItems = [ObjectIdentifier: [EventQueueItem]](grouping: items, by: \.exporterTypeId)
let result = await multiExporter.export(groupItems: groupItems)
switch result {
case .success:
await eventQueue.removeFirst(items.count)
case .partialFailure(let results):
await eventQueue.removeFirst(items.count)
failedItems = results.groupItems
case .failure:
() // no-op
}
}

func stop() {
task?.cancel()
task = nil
}
}


9 changes: 8 additions & 1 deletion Sources/Observability/Transport/EventExporter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@ public protocol EventExporting {
}

public final class NoOpExporter: EventExporting {
public func export(items: [EventQueueItem]) async throws {}
public func export(items: [EventQueueItem]) async throws { return }

public init() {}
}

extension EventExporting {
var typeId: ObjectIdentifier {
let type = type(of: Self.self)
return ObjectIdentifier(type)
}
}
20 changes: 16 additions & 4 deletions Sources/Observability/Transport/EventQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@ import Common
public struct EventQueueItem {
public var payload: EventQueueItemPayload
public var cost: Int
public var exporterTypeId: ObjectIdentifier

public init(payload: EventQueueItemPayload) {
let type = type(of: payload.exporterClass)
self.init(payload: payload, exporterTypeId: ObjectIdentifier(type))
}

public init(payload: EventQueueItemPayload, exporterTypeId: ObjectIdentifier) {
self.payload = payload
self.cost = payload.cost()
self.exporterTypeId = exporterTypeId
}

public var timestamp: TimeInterval {
Expand Down Expand Up @@ -47,8 +54,8 @@ public actor EventQueue: EventQueuing {
lastEventTime = item.timestamp
currentSize += item.cost
}
func dequeue(cost: Int, limit: Int) -> [EventQueueItem] {

func first(cost: Int, limit: Int) -> [EventQueueItem] {
guard !storage.isEmpty else {
return []
}
Expand All @@ -62,13 +69,18 @@ public actor EventQueue: EventQueuing {

if i >= limit || sumCost > cost {
currentSize -= item.cost
storage.removeFirst(i + 1)
return result
}
}

storage.removeAll()
currentSize = 0
return result
}

func removeFirst(_ count: Int) {
for i in 0..<count {
currentSize -= storage[i].cost
}
storage.removeFirst(count)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ import Foundation
public protocol EventQueueItemPayload {
func cost() -> Int
var timestamp: TimeInterval { get }
var exporterClass: AnyClass { get }
}
71 changes: 61 additions & 10 deletions Sources/Observability/Transport/MultiEventExporter.swift
Original file line number Diff line number Diff line change
@@ -1,32 +1,83 @@
import Foundation

public protocol MultiEventExporting: EventExporting {
public enum MultiExportResult {
case success
case partialFailure(groupItems: [ObjectIdentifier: [EventQueueItem]], errors: [ObjectIdentifier: Error])
case failure
}

public struct TypedIdResult {
var error: Error?
var typeId: ObjectIdentifier
var filteredItems: [EventQueueItem]?
}

public protocol MultiEventExporting {
func addExporter(_ exporter: EventExporting) async
func export(groupItems: [ObjectIdentifier: [EventQueueItem]]) async -> MultiExportResult
}

public actor MultiEventExporter: MultiEventExporting {
var exporters: [EventExporting]
var exporters: [ObjectIdentifier: any EventExporting]

public init(exporters: [EventExporting]) {
public init(exporters initialExporters: [EventExporting]) {
var exporters = [ObjectIdentifier: any EventExporting]()
for exporter in initialExporters {
let exporterId = exporter.typeId
exporters[exporterId] = exporter
}
self.exporters = exporters
}

public func addExporter(_ exporter: EventExporting) async {
exporters.append(exporter)
let exporterId = exporter.typeId
exporters[exporterId] = exporter
}

public func export(items: [EventQueueItem]) async throws {
public func export(groupItems: [ObjectIdentifier: [EventQueueItem]]) async -> MultiExportResult {
let exporters = self.exporters
try await withThrowingTaskGroup { group in
for exporter in exporters {

return await withTaskGroup<TypedIdResult> { group in
for (typeId, exporter) in exporters {
group.addTask {
try await exporter.export(items: items)
guard let filteredItems = groupItems[typeId] else {
return TypedIdResult(error: nil, typeId: typeId, filteredItems: nil)
}

do {
try await exporter.export(items: filteredItems)
return TypedIdResult(error: nil, typeId: typeId, filteredItems: nil)
} catch {
return TypedIdResult(error: error, typeId: typeId, filteredItems: filteredItems)
}
}
}

for _ in exporters {
try await group.next()
var failures = [TypedIdResult]()
var groupItems = [ObjectIdentifier: [EventQueueItem]]()
var errors = [ObjectIdentifier: Error]()

for _ in 0..<exporters.count {
do {
guard let res = try await group.next() else { break }
if let filteredItems = res.filteredItems {
groupItems[res.typeId] = res.filteredItems
errors[res.typeId] = res.error
}
} catch {

}
}

if groupItems.isEmpty {
return MultiExportResult.success
} else if failures.count < exporters.count {
return MultiExportResult.partialFailure(groupItems: groupItems, errors: errors)
} else {
return .failure
}
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,21 @@ public struct TouchSample: Sendable {
}
}

typealias UIInteractionYield = @Sendable (TouchInteraction) -> Void
public typealias TouchInteractionYield = @Sendable (TouchInteraction) -> Void

final class TouchCaptureCoordinator {
private let source: UIEventSource
private let targetResolver: TargetResolving
private let touchIntepreter: TouchIntepreter
private let yield: UIInteractionYield
private let receiverChecker: UIEventReceiverChecker

var yield: TouchInteractionYield?

init(targetResolver: TargetResolving = TargetResolver(),
receiverChecker: UIEventReceiverChecker = UIEventReceiverChecker(),
yield: @escaping UIInteractionYield) {
receiverChecker: UIEventReceiverChecker = UIEventReceiverChecker()) {
self.targetResolver = targetResolver
self.touchIntepreter = TouchIntepreter()
self.source = UIWindowSwizzleSource()
self.receiverChecker = receiverChecker
self.yield = yield
}

func start() {
Expand All @@ -73,7 +71,7 @@ final class TouchCaptureCoordinator {
}

Task.detached(priority: .background) { [weak self] in
guard let self else { return }
guard let self, let yield else { return }
// Bg thread part of work
for await touchSample in touchSampleStream {
touchIntepreter.process(touchSample: touchSample, yield: yield)
Expand Down
6 changes: 3 additions & 3 deletions Sources/Observability/UIInteractions/TouchIntepreter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ final class TouchIntepreter {
return id
}

func process(touchSample: TouchSample, yield: UIInteractionYield) {
func process(touchSample: TouchSample, yield: TouchInteractionYield) {
// UITouch and UIEvent use time based on systemUptime getting and we needed adjustment for proper time
let uptimeDifference = Date().timeIntervalSince1970 - ProcessInfo.processInfo.systemUptime
switch touchSample.phase {
Expand Down Expand Up @@ -77,7 +77,7 @@ final class TouchIntepreter {
}
}

func flushMovements(touchSample: TouchSample, uptimeDifference: TimeInterval, yield: UIInteractionYield) {
func flushMovements(touchSample: TouchSample, uptimeDifference: TimeInterval, yield: TouchInteractionYield) {
guard var track = tracks[touchSample.id], track.points.isNotEmpty else { return }

let moveInteraction = TouchInteraction(id: incrementingId,
Expand All @@ -89,7 +89,7 @@ final class TouchIntepreter {
yield(moveInteraction)
}

func flushTrack(touchSample: TouchSample, uptimeDifference: TimeInterval, yield: UIInteractionYield) {
func flushTrack(touchSample: TouchSample, uptimeDifference: TimeInterval, yield: TouchInteractionYield) {
guard let track = tracks.removeValue(forKey: touchSample.id), track.points.isNotEmpty else { return }

let moveInteraction = TouchInteraction(id: incrementingId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import Common

final class UserInteractionManager: AutoInstrumentation {
public final class UserInteractionManager: AutoInstrumentation {
private var touchCaptureCoordinator: TouchCaptureCoordinator

init(options: Options, yield: @escaping UIInteractionYield) {
private var yields: [TouchInteractionYield]

init(options: Options, yield: @escaping TouchInteractionYield) {
let targetResolver = TargetResolver()
self.touchCaptureCoordinator = TouchCaptureCoordinator(targetResolver: targetResolver,
yield: yield)
self.yields = [yield]
self.touchCaptureCoordinator = TouchCaptureCoordinator(targetResolver: targetResolver)
self.touchCaptureCoordinator.yield = { [weak self] interaction in
self?.yields.forEach { $0(interaction) }
}
}

public func addYield(_ yield: @escaping TouchInteractionYield) {
yields.append(yield)
}

func start() {
Expand Down
Loading