Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion RXPromise Libraries/Source/RXPromise.h
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,15 @@ typedef RXPromise* (^progress_on_main_block_t)(promise_progressHandler_t onProgr
*/
- (void) resolveWithResult:(id)result;


/*!
@brief Updates the promise with the specified progress value and fires the notification to subscribers.

This won't call any subsequent subscribers and won't trigger progress reporting chain, which could be expected, however isn't way yet implemented

@param progress The relative value of progress set by the asynchronous result provider which should
represent a value in range from 0.0 to 1.0
*/
- (void) updateWithProgress:(float)progress;

@end

Expand Down
77 changes: 70 additions & 7 deletions RXPromise Libraries/Source/RXPromise.mm
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
#import "RXPromise+Private.h"
#import <CoreData/CoreData.h>
#import <objc/runtime.h>
#import <libkern/OSAtomic.h>
#include <dispatch/dispatch.h>
#include <cassert>
#include <cstdio>
#include <list>

// Set default logger serverity to "Error" (logs only errors)
#if !defined (DEBUG_LOG)
Expand Down Expand Up @@ -143,6 +145,10 @@ @implementation RXPromise {
dispatch_queue_t _handler_queue; // a serial queue, uses target queue: s_sync_queue
id _result;
RXPromise_State _state;

std::list<std::pair<id,promise_progressHandler_t>> *_progressHandlers;
OSSpinLock __volatile _progressSpinLock;
std::list<RXPromise* __weak> *_children;
}
@synthesize result = _result;
@synthesize parent = _parent;
Expand All @@ -158,6 +164,8 @@ - (void) dealloc {
}
}
void const* key = (__bridge void const*)(self);
delete _progressHandlers;
delete _children;
if (dispatch_get_specific(rxpromise::shared::QueueID) == rxpromise::shared::sync_queue_id) {
Shared.assocs.erase(key);
} else {
Expand Down Expand Up @@ -344,6 +352,42 @@ - (void) synced_cancelWithReason:(id)reason {
}
}

- (void)synced_updateWithProgress:(float)progress {
assert(dispatch_get_specific(rxpromise::shared::QueueID) == rxpromise::shared::sync_queue_id);
if (_state != Pending) {
return;
}
OSSpinLockLock(&_progressSpinLock);
_progressValue = progress;
if (_progressHandlers) {
for (const std::pair<id, promise_progressHandler_t> handlerPair : *_progressHandlers) {
id executionContext = handlerPair.first;
promise_progressHandler_t handlerBlock = handlerPair.second;
if (executionContext == Shared.default_concurrent_queue) {
// If the continuation has been registered with `progress`, we run
// the handler is parallel:
dispatch_async(executionContext, ^(){handlerBlock(progress);});
}
else if ([executionContext conformsToProtocol:@protocol(OS_dispatch_queue)]) {
// If the continuation has been registered with `progressOn:` and when the
// execution context is a dispatch queue, we run the handler serially:
dispatch_barrier_async(executionContext, ^(){handlerBlock(progress);});
}
else {
// Otherwise, the execution context is not a dispatch_queue. Dispatch
// to the corresponding execution context:
[executionContext rxp_dispatchBlock:^(){handlerBlock(progress);}];
}
}
}
if (_children) {
for(const RXPromise * __weak child : *_children) {
[child updateWithProgress:progress];
}
}
OSSpinLockUnlock(&_progressSpinLock);
}


// Registers success and failure handlers.
// The receiver will be retained and only released when the receiver will be
Expand All @@ -368,6 +412,21 @@ - (instancetype) registerWithExecutionContext:(id)executionContext
if (_handler_queue == nil) {
_handler_queue = createHandlerQueue(_state == Pending, (__bridge void*)self);
}
OSSpinLockLock(&_progressSpinLock);
if (onProgress) {
if (!_progressHandlers) {
_progressHandlers = new std::list<std::pair<id, promise_progressHandler_t>>();
}
_progressHandlers->push_back({executionContext, onProgress});
}
if (!_children) {
_children = new std::list<RXPromise* __weak>();
}
if (weakReturnedPromise) {
_children->push_back(weakReturnedPromise);
}
OSSpinLockUnlock(&_progressSpinLock);

// Finally, *enqueue* a wrapper block which eventually gets invoked when the
// promise will be resolved:
dispatch_async(_handler_queue, ^{
Expand All @@ -384,16 +443,12 @@ - (instancetype) registerWithExecutionContext:(id)executionContext
// If the execution context equals the Shared.sync_queue, the block
// must be enqueued with a barrier! (implementation details)
@autoreleasepool {
assert(promise_state != Pending || abs(progress) < FLT_EPSILON);
assert(promise_state != Pending);
RXPromise_StateT state = promise_state;
__strong id result = promise_result;
if (state == Fulfilled && onSuccess) {
result = onSuccess(blockSelf->_result);
}
else if (state == Pending && onProgress) {
onProgress(blockSelf->_progressValue);
return;
}
else if (state != Fulfilled && onFailure) {
result = onFailure(blockSelf->_result);
}
Expand Down Expand Up @@ -908,8 +963,16 @@ - (void) rejectWithReason:(id)reason {
}
}



- (void) updateWithProgress:(float)progress {
if (dispatch_get_specific(rxpromise::shared::QueueID) == rxpromise::shared::sync_queue_id) {
[self synced_updateWithProgress:progress];
}
else {
dispatch_barrier_async(Shared.sync_queue, ^{
[self synced_updateWithProgress:progress];
});
}
}


@end
215 changes: 215 additions & 0 deletions Test/Tests/RXPromiseTest.mm
Original file line number Diff line number Diff line change
Expand Up @@ -3132,6 +3132,221 @@ -(void) testTreeCancel2WithBoundPromise
}
}

#pragma mark - progress

- (void) testSimpleProgressReporting {
XCTestExpectation *expectation = [self expectationWithDescription:@"Progress should be equal to sum of progresses"];
NSArray *progressValues = @[@0.0, @0.25, @0.5, @0.75, @1.0];
RXPromise *promise = [[RXPromise alloc] init];
__block float progress = 0;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, progress will be accessed from different threads. We need to ensure the access is thread-safe, for example ensuring the execution context is set to the main thread, or use a dedicated serial dispatch queue. Note, the unit test itself will execute on the main thread, too.

This data race exists basically in all progress test.

It was interesting to experience the effect when running the tests: they intermittently failed one my iMac.

Note: I've made a new topic branch which fixes these issues in all tests. You may now apply these changes, or I merge the new fixed branch into master.

promise.progress(^(float reportedProgress) {
progress += reportedProgress;
if (fabs(reportedProgress - 1.0) < 0.01) {
[expectation fulfill];
}
});
for (NSNumber *progressValue in progressValues) {
[promise updateWithProgress:progressValue.floatValue];
}
[self waitForExpectationsWithTimeout:0.1 handler:nil];
XCTAssertEqualWithAccuracy(progress,
[[progressValues valueForKeyPath:@"@sum.self"] floatValue],
0.01);
}

- (void) testTwoSimultaneousProgressReporting {
XCTestExpectation *expectation = [self expectationWithDescription:@"Progress should be equal to sum of progresses"];
NSArray *progressValues = @[@0.0, @0.25, @0.5, @0.75, @1.0];
RXPromise *promise = [[RXPromise alloc] init];
__block float progress = 0;
promise.progress(^(float reportedProgress) {
progress += reportedProgress;
if (fabs(reportedProgress - 1.0) < 0.01) {
[expectation fulfill];
}
});
promise.progress(^(float reportedProgress) {
progress += reportedProgress;
});
for (NSNumber *progressValue in progressValues) {
[promise updateWithProgress:progressValue.floatValue];
}
[self waitForExpectationsWithTimeout:0.1 handler:nil];
XCTAssertEqualWithAccuracy(progress,
[[progressValues valueForKeyPath:@"@sum.self"] floatValue] * 2,
0.01);
}

- (void) testSimultaneousProgressReportingAndFullfill {
XCTestExpectation *expectation = [self expectationWithDescription:@"Progress should be equal to sum of progresses"];
NSArray *progressValues = @[@0.0, @0.25, @0.5, @0.75, @1.0];
RXPromise *promise = [[RXPromise alloc] init];
__block float progress = 0;
promise.progress(^(float reportedProgress) {
progress += reportedProgress;
if (fabs(reportedProgress - 1.0) < 0.01) {
[promise fulfillWithValue:nil];
}
});
promise.then(^id(id result) {
[expectation fulfill];
return nil;
}, nil);
for (NSNumber *progressValue in progressValues) {
[promise updateWithProgress:progressValue.floatValue];
}
[self waitForExpectationsWithTimeout:0.1 handler:nil];
XCTAssertEqualWithAccuracy(progress,
[[progressValues valueForKeyPath:@"@sum.self"] floatValue],
0.01);
}

- (void) testSimultaneousProgressReportingAndReject {
XCTestExpectation *expectation = [self expectationWithDescription:@"Progress should be equal to sum of progresses"];
NSArray *progressValues = @[@0.0, @0.25, @0.5, @0.75, @1.0];
RXPromise *promise = [[RXPromise alloc] init];
__block float progress = 0;
promise.progress(^(float reportedProgress) {
progress += reportedProgress;
if (fabs(reportedProgress - 1.0) < 0.01) {
[promise rejectWithReason:nil];
}
});
promise.then(nil, ^id(id result) {
[expectation fulfill];
return nil;
});
for (NSNumber *progressValue in progressValues) {
[promise updateWithProgress:progressValue.floatValue];
}
[self waitForExpectationsWithTimeout:0.1 handler:nil];
XCTAssertEqualWithAccuracy(progress,
[[progressValues valueForKeyPath:@"@sum.self"] floatValue],
0.01);
}

- (void) testProgressReportingAfterFullfill {
XCTestExpectation *expectation = [self expectationWithDescription:@"Progress should be equal to sum of progresses"];
NSArray *progressValues = @[@0.0, @0.25, @0.5, @0.75, @1.0];
RXPromise *promise = [[RXPromise alloc] init];
__block float progress = 0;
promise.progress(^(float reportedProgress) {
progress += reportedProgress;
});
promise.then(^id(id result) {
[expectation fulfill];
return nil;
}, nil);
[promise fulfillWithValue:nil];
for (NSNumber *progressValue in progressValues) {
[promise updateWithProgress:progressValue.floatValue];
}
[self waitForExpectationsWithTimeout:0.1 handler:nil];
XCTAssertEqualWithAccuracy(progress,
0.0,
0.01);
}

- (void) testProgressReportingAfterReject {
XCTestExpectation *expectation = [self expectationWithDescription:@"Progress should be equal to sum of progresses"];
NSArray *progressValues = @[@0.0, @0.25, @0.5, @0.75, @1.0];
RXPromise *promise = [[RXPromise alloc] init];
__block float progress = 0;
promise.progress(^(float reportedProgress) {
progress += reportedProgress;
});
promise.then(nil, ^id(id result) {
[expectation fulfill];
return nil;
});
[promise rejectWithReason:nil];
for (NSNumber *progressValue in progressValues) {
[promise updateWithProgress:progressValue.floatValue];
}
[self waitForExpectationsWithTimeout:0.1 handler:nil];
XCTAssertEqualWithAccuracy(progress,
0.0,
0.01);
}

- (void)testProgressPropagationByPromiseChain {
XCTestExpectation *expectation = [self expectationWithDescription:@"Progress should be equal to sum of progresses"];
NSArray *progressValues = @[@0.0, @0.25, @0.5, @0.75, @1.0];
RXPromise *promise = [[RXPromise alloc] init];
__block float progress = 0;
RXPromise *promise2 = promise.then(nil, ^id(id result) {
[expectation fulfill];
return nil;
});
promise2.progress(^(float reportedProgress) {
progress += reportedProgress;
if (fabs(reportedProgress - 1.0) < 0.01) {
[expectation fulfill];
}
});
for (NSNumber *progressValue in progressValues) {
[promise updateWithProgress:progressValue.floatValue];
}
[self waitForExpectationsWithTimeout:0.1 handler:nil];
XCTAssertEqualWithAccuracy(progress,
[[progressValues valueForKeyPath:@"@sum.self"] floatValue],
0.01);
}

- (void)testProgressPropagationByPromiseChainOfThreePromises {
XCTestExpectation *expectation = [self expectationWithDescription:@"Progress should be equal to sum of progresses"];
NSArray *progressValues = @[@0.0, @0.25, @0.5, @0.75, @1.0];
RXPromise *promise = [[RXPromise alloc] init];
__block float progress = 0;
RXPromise *promise2 = promise.then(nil, ^id(id result) {
return nil;
});
RXPromise *promise3 = promise2.then(nil, ^id(id result) {
return nil;
});
promise3.progress(^(float reportedProgress) {
progress += reportedProgress;
if (fabs(reportedProgress - 1.0) < 0.01) {
[expectation fulfill];
}
});
for (NSNumber *progressValue in progressValues) {
[promise updateWithProgress:progressValue.floatValue];
}
[self waitForExpectationsWithTimeout:0.1 handler:nil];
XCTAssertEqualWithAccuracy(progress,
[[progressValues valueForKeyPath:@"@sum.self"] floatValue],
0.01);
}

- (void)testProgressPropagationByPromiseChainOfTreeOfPromises {
XCTestExpectation *expectation = [self expectationWithDescription:@"Progress should be equal to sum of progresses"];
NSArray *progressValues = @[@0.0, @0.25, @0.5, @0.75, @1.0];
RXPromise *promise = [[RXPromise alloc] init];
__block float progress = 0;
RXPromise *promise2 = promise.then(nil, ^id(id result) {
return nil;
});
RXPromise *promise3 = promise.then(nil, ^id(id result) {
return nil;
});
promise2.progress(^(float reportedProgress) {
progress += reportedProgress;
if (fabs(reportedProgress - 1.0) < 0.01) {
[expectation fulfill];
}
});
promise3.progress(^(float reportedProgress) {
progress += reportedProgress;
});
for (NSNumber *progressValue in progressValues) {
[promise updateWithProgress:progressValue.floatValue];
}
[self waitForExpectationsWithTimeout:0.1 handler:nil];
XCTAssertEqualWithAccuracy(progress,
[[progressValues valueForKeyPath:@"@sum.self"] floatValue] * 2.0,
0.01);
}

#pragma mark - all

Expand Down