diff --git a/Flow/Callbacker.swift b/Flow/Callbacker.swift index 01966e2..99bfccf 100644 --- a/Flow/Callbacker.swift +++ b/Flow/Callbacker.swift @@ -20,8 +20,7 @@ public final class Callbacker { } private var callbacks = Callbacks.none - private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() public init() { mutex.initialize() diff --git a/Flow/Disposable.swift b/Flow/Disposable.swift index 9b3bcc3..5949b22 100644 --- a/Flow/Disposable.swift +++ b/Flow/Disposable.swift @@ -28,8 +28,7 @@ public struct NilDisposer: Disposable { /// - Note: Is thread safe and reentrant (dispose callback could call itself) public final class Disposer: Disposable { private var disposer: (() -> ())? - private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() /// Pass a closure to be called when being disposed public init(_ disposer: @escaping () -> () = {}) { @@ -58,8 +57,7 @@ public final class Disposer: Disposable { /// - Note: New disposables could be added after a disposal. public final class DisposeBag: Disposable { private var disposables: [Disposable] - private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() /// Create an empty instance public init() { @@ -86,7 +84,10 @@ public final class DisposeBag: Disposable { /// Returns true if there is currently no disposables to dispose. public var isEmpty: Bool { - return mutex.protect { disposables.isEmpty } + mutex.lock() + let isEmpty = disposables.isEmpty + mutex.unlock() + return isEmpty } public func dispose() { diff --git a/Flow/Future+Combiners.swift b/Flow/Future+Combiners.swift index 26dcc41..1d39dd5 100644 --- a/Flow/Future+Combiners.swift +++ b/Flow/Future+Combiners.swift @@ -65,9 +65,9 @@ public func join(_ futures: [Future], cancelNonCompleted: Bool = true) -> var results = [T?](repeating: nil, count: futures.count) let mutex = Mutex() func onValue(_ i: Int, _ val: T) { - mutex.protect { - results[i] = val - } + mutex.lock() + results[i] = val + mutex.unlock() } var future = futures.first!.onValue(on: .none) { onValue(0, $0) } @@ -220,7 +220,9 @@ public final class SingleTaskPerformer { mutex.unlock() // unlock while calling out as we might either recurs or always might execute at once. let singleFuture = function().always(on: .none) { - self.mutex.protect { self.future = nil } + self.mutex.lock() + self.future = nil + self.mutex.unlock() } mutex.lock() @@ -233,7 +235,9 @@ public final class SingleTaskPerformer { } public var isPerforming: Bool { - return mutex.protect { self.future != nil } + mutex.lock() + defer { mutex.unlock() } + return self.future != nil } } diff --git a/Flow/Future.swift b/Flow/Future.swift index 19fad38..82bf33f 100644 --- a/Flow/Future.swift +++ b/Flow/Future.swift @@ -57,7 +57,7 @@ public final class Future { private var state: State private let clone: () -> Future - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() /// Helper used to move external futures inside `Future.init`'s `onComplete` closure. Needed for repetition to work properly. public struct Mover { @@ -327,10 +327,10 @@ func memPrint(_ str: String, _ count: Int32) { } private extension Future { - var mutex: PThreadMutex { return PThreadMutex(&_mutex) } - private var protectedState: State { - return mutex.protect { state } + mutex.lock() + defer { mutex.unlock() } + return state } func lock() { diff --git a/Flow/FutureQueue.swift b/Flow/FutureQueue.swift index 931f9e2..145ed53 100644 --- a/Flow/FutureQueue.swift +++ b/Flow/FutureQueue.swift @@ -18,7 +18,7 @@ public final class FutureQueue { private let queueScheduler: Scheduler private var _closedError: Error? private let isEmptyCallbacker = Callbacker() - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() // enqueued items. private var items: [Executable] = [] { @@ -61,9 +61,9 @@ public extension FutureQueue { return Future { completion in let item = QueueItem(operation: operation, completion: completion) - self.mutex.protect { - self.items.append(item) - } + self.mutex.lock() + self.items.append(item) + self.mutex.unlock() self.executeNextItem() @@ -119,7 +119,9 @@ public extension FutureQueue { public extension FutureQueue { /// Do we have any enqueued operations? var isEmpty: Bool { - return mutex.protect { items.isEmpty } + mutex.lock() + defer { mutex.unlock() } + return items.isEmpty } /// Returns a signal that will signal when `isEmpty` is changed. @@ -164,19 +166,20 @@ public extension FutureQueue { /// The error passed to `abortQueuedExecutionWithError()` if called with `shouldCloseQueue` as true. var closedError: Error? { - return mutex.protect { _closedError } + mutex.lock() + defer { mutex.unlock() } + return _closedError } } private extension FutureQueue { - var mutex: PThreadMutex { return PThreadMutex(&_mutex) } func lock() { mutex.lock() } func unlock() { mutex.unlock() } func removeItem(_ item: Executable) { - mutex.protect { - _ = items.firstIndex { $0 === item }.map { items.remove(at: $0) } - } + mutex.lock() + _ = items.firstIndex { $0 === item }.map { items.remove(at: $0) } + mutex.unlock() } func executeNextItem() { @@ -188,9 +191,9 @@ private extension FutureQueue { unlock() item.execute(on: queueScheduler) { - self.mutex.protect { - self.concurrentCount -= 1 - } + self.lock() + self.concurrentCount -= 1 + self.unlock() self.removeItem(item) self.executeNextItem() } @@ -214,7 +217,7 @@ private final class QueueItem: Executable { private let completion: (Result) -> () private weak var future: Future? private var hasBeenCancelled = false - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() init(operation: @escaping () throws -> Future, completion: @escaping (Result) -> ()) { self.completion = completion @@ -231,7 +234,6 @@ private final class QueueItem: Executable { memPrint("Queue Item deinit", queueItemUnitTestAliveCount) } - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } private func lock() { mutex.lock() } private func unlock() { mutex.unlock() } diff --git a/Flow/Locking.swift b/Flow/Locking.swift index 416be1b..4447da2 100644 --- a/Flow/Locking.swift +++ b/Flow/Locking.swift @@ -10,8 +10,7 @@ import Foundation /// A reference wrapper around a POSIX thread mutex public final class Mutex { - private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() public init() { mutex.initialize() @@ -30,13 +29,27 @@ public final class Mutex { public func unlock() { mutex.unlock() } +} - /// Will lock `self`, call `block`, then unlock `self` - @discardableResult - public func protect(_ block: () throws -> T) rethrows -> T { - mutex.lock() - defer { mutex.unlock() } - return try block() +extension pthread_mutex_t { + mutating func withPointer(_ body: (PThreadMutex) throws -> T) rethrows -> T { + return try withUnsafeMutablePointer(to: &self, body) + } + + mutating func initialize() { + withPointer { $0.initialize() } + } + + mutating func deinitialize() { + withPointer { $0.deinitialize() } + } + + mutating func lock() { + withPointer { $0.lock() } + } + + mutating func unlock() { + withPointer { $0.unlock() } } } @@ -86,8 +99,7 @@ final class StateAndCallback: Disposable { var callback: ((Value) -> ())? var val: State fileprivate var disposables = [Disposable]() - private var _mutex = pthread_mutex_t() - fileprivate var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() init(state: State, callback: @escaping (Value) -> ()) { val = state @@ -192,12 +204,12 @@ extension StateAndCallback where Value == () { func +=(bag: StateAndCallback, disposable: Disposable?) { guard let disposable = disposable else { return } - bag.mutex.lock() + bag.lock() let hasBeenDisposed = bag.callback == nil if !hasBeenDisposed { bag.disposables.append(disposable) } - bag.mutex.unlock() + bag.unlock() if hasBeenDisposed { disposable.dispose() } diff --git a/Flow/OrderedCallbacker.swift b/Flow/OrderedCallbacker.swift index 04e63ac..075db90 100644 --- a/Flow/OrderedCallbacker.swift +++ b/Flow/OrderedCallbacker.swift @@ -14,8 +14,7 @@ import Foundation /// - Note: Is thread safe. public final class OrderedCallbacker { private var callbacks: [Key: (OrderedValue, (CallbackValue) -> Future<()>)] = [:] - private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() public init() { mutex.initialize() @@ -27,7 +26,10 @@ public final class OrderedCallbacker { /// - Returns: True if no callbacks has been registered. public var isEmpty: Bool { - return mutex.protect { callbacks.isEmpty } + mutex.lock() + let isEmpty = callbacks.isEmpty + mutex.unlock() + return isEmpty } /// Register a callback and orderedValue to be called when `callAll` is executed. @@ -35,12 +37,14 @@ public final class OrderedCallbacker { /// - Parameter orderedValue: The value used to order this callback /// - Returns: A `Disposable` to be disposed to unregister the callback. public func addCallback(_ callback: @escaping (CallbackValue) -> Future<()>, orderedBy orderedValue: OrderedValue) -> Disposable { - return mutex.protect { - let key = generateKey() - callbacks[key] = (orderedValue, callback) - return Disposer { - self.mutex.protect { self.callbacks[key] = nil } - } + mutex.lock() + defer { mutex.unlock() } + let key = generateKey() + callbacks[key] = (orderedValue, callback) + return Disposer { + self.mutex.lock() + self.callbacks[key] = nil + self.mutex.unlock() } } @@ -48,9 +52,10 @@ public final class OrderedCallbacker { /// - Returns: A `Future` that will complete when all callbacks has been called. @discardableResult public func callAll(with value: CallbackValue, isOrderedBefore: (OrderedValue, OrderedValue) -> Bool) -> Future<()> { - return mutex.protect { - callbacks.values.sorted { isOrderedBefore($0.0, $1.0) }.map { $1 } - }.mapToFuture { $0(value) }.toVoid() + mutex.lock() + let sortedCallbacks = callbacks.values.sorted { isOrderedBefore($0.0, $1.0) }.map { $1 } + mutex.unlock() + return sortedCallbacks.mapToFuture { $0(value) }.toVoid() } } diff --git a/Flow/Signal+Construction.swift b/Flow/Signal+Construction.swift index ee9697e..0bdfc15 100644 --- a/Flow/Signal+Construction.swift +++ b/Flow/Signal+Construction.swift @@ -112,8 +112,7 @@ private final class CallbackState: Disposable { private var shared: SharedState? let sharedKey: Key - private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() init(shared: SharedState? = nil, getValue: (() -> Value)?, callback: @escaping (EventType) -> Void) { self.shared = shared @@ -292,8 +291,7 @@ private final class CallbackState: Disposable { /// Helper to implement sharing of a single `onEvent` if more than one listner, see `SignalOption.shared` final class SharedState { private let getValue: (() -> Value)? - private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() typealias Callback = (EventType) -> Void var firstCallback: (key: Key, value: Callback)? diff --git a/Flow/Signal+Scheduling.swift b/Flow/Signal+Scheduling.swift index 36bc088..6db5972 100644 --- a/Flow/Signal+Scheduling.swift +++ b/Flow/Signal+Scheduling.swift @@ -119,8 +119,8 @@ internal extension CoreSignal { // Using custom Disposable instead of DisposeBag for efficiency (less allocations) private final class OnEventTypeDisposer: Disposable { private var disposable: Disposable? - private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() + private let scheduler: Scheduler private var callback: ((EventType) -> Void)? diff --git a/Flow/Signal+Transforms.swift b/Flow/Signal+Transforms.swift index 8877b10..80359e3 100644 --- a/Flow/Signal+Transforms.swift +++ b/Flow/Signal+Transforms.swift @@ -741,14 +741,20 @@ private extension SignalProvider { let mutex = Mutex() var setter: ((T) -> ())? func setValue(_ value: T) { - let setValue = mutex.protect { setter ?? transform(signal.getter()!).setter! } + mutex.lock() + let setValue = setter ?? transform(signal.getter()!).setter! + mutex.unlock() setValue(value) } return CoreSignal(setValue: setValue, onEventType: { callback in let latestBag = DisposeBag() let bag = DisposeBag(latestBag) - bag += { mutex.protect { setter = nil } } + bag += { + mutex.lock() + setter = nil + mutex.unlock() + } bag += signal.onEventType(on: scheduler) { eventType in switch eventType { @@ -756,13 +762,17 @@ private extension SignalProvider { callback(.initial(nil)) case .initial(let val?): let signal = scheduler.sync { transform(val) } - mutex.protect { setter = signal.setter } + mutex.lock() + setter = signal.setter + mutex.unlock() latestBag += signal.onEventType(callback) case let .event(.value(val)): let isFirstEvent = latestBag.isEmpty latestBag.dispose() let signal = transform(val) - mutex.protect { setter = signal.setter } + mutex.lock() + setter = signal.setter + mutex.unlock() latestBag += signal.onEventType { eventType in switch eventType { case .initial(let val?) where KO.isReadable: diff --git a/FlowTests/FutureSchedulingTests.swift b/FlowTests/FutureSchedulingTests.swift index 17ecc64..16767ce 100644 --- a/FlowTests/FutureSchedulingTests.swift +++ b/FlowTests/FutureSchedulingTests.swift @@ -184,7 +184,9 @@ class FutureNewSchedulingTests: FutureTest { var f = Future(v).delay(by: delay) f = f.map(on: .concurrentBackground) { $0*2 } return f/*assertValue(v*2)*/.assert(on: .main).always(on: .concurrentBackground) { - mutex.protect { completeCount += 1 } + mutex.lock() + completeCount += 1 + mutex.unlock() } }).onCancel { e.fulfill() } diff --git a/FlowTests/SignalProviderTests.swift b/FlowTests/SignalProviderTests.swift index dcc2e31..590f856 100644 --- a/FlowTests/SignalProviderTests.swift +++ b/FlowTests/SignalProviderTests.swift @@ -1718,13 +1718,19 @@ class SignalProviderTests: XCTestCase { _ = Signal(callbacker: callbacker).start(with: 1).take(first: 2).onEvent(on: .concurrentBackground) { event in switch event { case .value(let val): - mutex.protect { result.append(val) } + mutex.lock() + result.append(val) + mutex.unlock() backgroundQueue.async { callbacker.callAll(with: val + 1) } - mutex.protect { result.append(val*10) } + mutex.lock() + result.append(val*10) + mutex.unlock() case .end: - XCTAssertEqual(mutex.protect { result }, [1, 10, 2, 20]) + mutex.lock() + XCTAssertEqual(result, [1, 10, 2, 20]) + mutex.unlock() } } } @@ -2718,13 +2724,15 @@ final class SimulatedTimer { func schedule(at time: TimeInterval, execute work: @escaping () -> ()) -> Disposable { let key = UUID() - mutex.protect { - assert(time >= self.time) - scheduledWork[key] = (time, work) - } + mutex.lock() + assert(time >= self.time) + scheduledWork[key] = (time, work) + mutex.unlock() return Disposer { - self.mutex.protect { self.scheduledWork[key] = nil } + self.mutex.lock() + self.scheduledWork[key] = nil + self.mutex.unlock() } } @@ -2748,7 +2756,9 @@ final class SimulatedTimer { //print("call", next) next.value.work() - mutex.protect { count -= 1 } + mutex.lock() + count -= 1 + mutex.unlock() mainQueue.async { self.release() } } }