Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/api/BasicUPnPService.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
NSString *eventUUID;

NSMutableDictionary<NSString *, StateVariable *> *stateVariables;
NSMutableDictionary *currentStateVariableValues;
NSMutableArray<BasicUPnPServiceObserver> *mObservers;

NSRecursiveLock *mMutex;
Expand All @@ -81,6 +82,7 @@
@property (readwrite, retain) NSString *serviceType;
@property (readonly, retain) SSDPDBDevice_ObjC *ssdpdevice;
@property (readonly) NSMutableDictionary *stateVariables;
@property (readonly) NSMutableDictionary *currentStateVariableValues;
@property (readonly) SoapAction *soap;
@property (readwrite, retain) NSString *urn;
@property (readwrite) BOOL isSetUp;
Expand Down
14 changes: 14 additions & 0 deletions src/api/UPnPEvents.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,23 @@ NS_ASSUME_NONNULL_BEGIN

@end

@interface BufferedEvent : NSObject{
double eventTime;
NSString* subscriptionID;
NSDictionary *events;
}

@property (readwrite) double eventTime;
@property (readwrite, strong) NSString* subscriptionID;
@property (readwrite, strong) NSDictionary *events;

@end



@interface UPnPEvents : NSObject <BasicHTTPServer_ObjC_Observer> {
NSMutableDictionary *mEventSubscribers;//uuid, observer
NSMutableArray *mArrivedEventsBuffers; //uuid, list of events
BasicHTTPServer_ObjC *server;
UPnPEventParser *parser;
NSRecursiveLock *mMutex;
Expand Down
10 changes: 10 additions & 0 deletions src/upnp/BasicUPnPService.m
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ @implementation BasicUPnPService
@synthesize controlURL;
@synthesize ssdpdevice;
@synthesize stateVariables;
@synthesize currentStateVariableValues;
@synthesize urn;
@synthesize soap;
@synthesize isSetUp;
Expand Down Expand Up @@ -76,6 +77,8 @@ - (instancetype)initWithSSDPDevice:(SSDPDBDevice_ObjC *)device {
isSubscribedForEvents = NO;

stateVariables = [[NSMutableDictionary alloc] init];
currentStateVariableValues = [[NSMutableDictionary alloc] init];


mObservers = [[NSMutableArray<BasicUPnPServiceObserver> alloc] init];

Expand All @@ -101,6 +104,7 @@ - (void)dealloc {
[baseURLString release];

[stateVariables release];
[currentStateVariableValues release];

[urn release];
[soap release];
Expand All @@ -125,6 +129,11 @@ - (NSUInteger)addObserver:(BasicUPnPServiceObserver *)obs {
[mMutex lock];
[mObservers addObject:obs];
ret = [mObservers count];

// a new observer should get current state (if we've received some state already)
if ([currentStateVariableValues count]>0) {
[obs basicUPnPService:self receivedEvents:currentStateVariableValues];
}
[mMutex unlock];

return ret;
Expand Down Expand Up @@ -233,6 +242,7 @@ - (void)UPnPEvent:(NSDictionary *)events {
BasicUPnPServiceObserver *obs = nil;

[mMutex lock];
[currentStateVariableValues addEntriesFromDictionary:events];
NSEnumerator *listeners = [mObservers objectEnumerator];
while (obs = [listeners nextObject]) {
[obs basicUPnPService:self receivedEvents:events];
Expand Down
94 changes: 93 additions & 1 deletion src/upnp/UPnPEvents.m
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,23 @@ @implementation ObserverEntry
@end


@implementation BufferedEvent

@synthesize eventTime;
@synthesize subscriptionID;
@synthesize events;

@end


@implementation UPnPEvents

- (instancetype)init {
self = [super init];
if (self) {
mMutex = [[NSRecursiveLock alloc] init];
mEventSubscribers = [[NSMutableDictionary alloc] init];
mArrivedEventsBuffers = [[NSMutableArray alloc] init];
parser = [[UPnPEventParser alloc] init];

server = [[BasicHTTPServer_ObjC alloc] init];
Expand All @@ -68,6 +78,7 @@ - (void)dealloc {
[server stop];
[server release];
[mEventSubscribers release];
[mArrivedEventsBuffers release];
[parser release];
[mMutex release];

Expand All @@ -76,7 +87,7 @@ - (void)dealloc {

- (void)start {
//Start the subscription timer
mTimeoutTimer = [NSTimer timerWithTimeInterval:60.0 target:self selector:@selector(manageSubscriptionTimeouts:) userInfo:nil repeats:YES];
mTimeoutTimer = [NSTimer timerWithTimeInterval:60.0 target:self selector:@selector(manageTimeouts:) userInfo:nil repeats:YES];
[[NSRunLoop currentRunLoop] addTimer:mTimeoutTimer forMode:NSDefaultRunLoopMode];
[server start];
}
Expand Down Expand Up @@ -142,6 +153,9 @@ -(void)subscribe:(id<UPnPEvents_Observer>)subscriber completion:(void (^)(NSStri
NSLog(@"[UPnP-GENA] Subscribed successfully < uuid: %@ | timeout: %d >", retUUID, en.timeout);

mEventSubscribers[retUUID] = en;

[self processEventsFromBufferForKey: retUUID];

[en release];
}
else {
Expand Down Expand Up @@ -190,6 +204,61 @@ - (void)unsubscribe:(id<UPnPEvents_Observer>)subscriber withSID:(NSString *)uuid
});
}


/**
* As a race condition, first events can arrive before we have processed the response from an event subscrition,
* thus know and have initialized the subscription id.
* So each event that arrives, without us knowing a responsible observer already will be written into buffer.
* As soon as we know the subscription id, the buffer can be cleared.
* Each event in the buffer not consumed within a certain time frame will be discarded, as the race condition is typically
* just about some milliseconds.
*/

- (void)bufferEarlyEventFromCurrentParser:(NSString *)uuid {
BufferedEvent *bufferedEvent = [[BufferedEvent alloc] init];
[bufferedEvent setEvents:[[[parser events] copy] autorelease]];
[bufferedEvent setEventTime:[[NSDate date]timeIntervalSince1970]];
[bufferedEvent setSubscriptionID:uuid];
[mArrivedEventsBuffers addObject: bufferedEvent];
[bufferedEvent release];
}

-(void) processEventsFromBufferForKey: (NSString *) uuid {

id<UPnPEvents_Observer> thisObserver = nil;
NSMutableArray *toSend = [[NSMutableArray alloc] init];

[mMutex lock];
@try {
ObserverEntry *entry = [mEventSubscribers objectForKey:uuid];
if(entry != nil){
thisObserver = entry.observer;
}

NSMutableArray *eventsToRemove = [NSMutableArray array];

for (BufferedEvent *event in mArrivedEventsBuffers) {
if ([event.subscriptionID isEqualToString:uuid]) {
[toSend addObject:event];
[eventsToRemove addObject:event];
}
}

// Remove events that are to be sent from mArrivedEventsBuffers
[mArrivedEventsBuffers removeObjectsInArray:eventsToRemove];

} @finally {
[mMutex unlock];
}

if (thisObserver) {
for (BufferedEvent *bufferedEvent in toSend) {
[thisObserver UPnPEvent:bufferedEvent.events];
}
}
}


/*
* Incomming HTTP events
* BasicHTTPServer_ObjC_Observer
Expand Down Expand Up @@ -258,6 +327,8 @@ - (BOOL)request:(BasicHTTPServer_ObjC *)sender
ObserverEntry *entry = mEventSubscribers[uuid];
if (entry != nil) {
thisObserver = entry.observer;
} else {
[self bufferEarlyEventFromCurrentParser:uuid];
}
[mMutex unlock];
if (thisObserver != nil) {
Expand All @@ -282,6 +353,11 @@ - (BOOL)response:(BasicHTTPServer_ObjC*)sender
return result;
}

-(void)manageTimeouts:(NSTimer*)timer{
[self manageSubscriptionTimeouts:timer];
[self manageBufferedEventsTimeouts:timer];
}


- (void)manageSubscriptionTimeouts:(NSTimer *)timer {
double tm = [[NSDate date]timeIntervalSince1970];
Expand Down Expand Up @@ -331,4 +407,20 @@ - (void)manageSubscriptionTimeouts:(NSTimer *)timer {
}


-(void)manageBufferedEventsTimeouts:(NSTimer*)timer{
double tm = [[NSDate date]timeIntervalSince1970];
NSMutableArray *remove = [[NSMutableArray alloc] init];
[mMutex lock];

for (BufferedEvent *bufferedEvent in mArrivedEventsBuffers) {
// Race condition should be resolved after 60seconds latest
if (tm - bufferedEvent.eventTime >=60) {
[remove addObject:bufferedEvent];
}
}
[mArrivedEventsBuffers removeObjectsInArray:remove];
[mMutex unlock];

}

@end