From ff4391e3b2ee195ba2647789b4dd2267210b74a9 Mon Sep 17 00:00:00 2001 From: bluegaspode Date: Wed, 7 Aug 2024 00:12:49 +0200 Subject: [PATCH 1/2] create a buffer for events where we don't know the subscription ID yet --- src/api/UPnPEvents.h | 14 +++++++ src/upnp/UPnPEvents.m | 94 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 107 insertions(+), 1 deletion(-) diff --git a/src/api/UPnPEvents.h b/src/api/UPnPEvents.h index 7d46b64..824a4da 100644 --- a/src/api/UPnPEvents.h +++ b/src/api/UPnPEvents.h @@ -60,9 +60,23 @@ NS_ASSUME_NONNULL_BEGIN @end +@interface BufferedEvent : NSObject{ + double eventTime; + NSString* subscriptionID; + NSDictionary *events; +} + +@property (readwrite) double eventTime; +@property (readwrite, strong) NSString* subscriptionID; +@property (readwrite, strong) NSDictionary *events; + +@end + + @interface UPnPEvents : NSObject { NSMutableDictionary *mEventSubscribers;//uuid, observer + NSMutableArray *mArrivedEventsBuffers; //uuid, list of events BasicHTTPServer_ObjC *server; UPnPEventParser *parser; NSRecursiveLock *mMutex; diff --git a/src/upnp/UPnPEvents.m b/src/upnp/UPnPEvents.m index 38324de..ca89cbc 100644 --- a/src/upnp/UPnPEvents.m +++ b/src/upnp/UPnPEvents.m @@ -47,6 +47,15 @@ @implementation ObserverEntry @end +@implementation BufferedEvent + +@synthesize eventTime; +@synthesize subscriptionID; +@synthesize events; + +@end + + @implementation UPnPEvents - (instancetype)init { @@ -54,6 +63,7 @@ - (instancetype)init { if (self) { mMutex = [[NSRecursiveLock alloc] init]; mEventSubscribers = [[NSMutableDictionary alloc] init]; + mArrivedEventsBuffers = [[NSMutableArray alloc] init]; parser = [[UPnPEventParser alloc] init]; server = [[BasicHTTPServer_ObjC alloc] init]; @@ -68,6 +78,7 @@ - (void)dealloc { [server stop]; [server release]; [mEventSubscribers release]; + [mArrivedEventsBuffers release]; [parser release]; [mMutex release]; @@ -76,7 +87,7 @@ - (void)dealloc { - (void)start { //Start the subscription timer - mTimeoutTimer = [NSTimer timerWithTimeInterval:60.0 target:self selector:@selector(manageSubscriptionTimeouts:) userInfo:nil repeats:YES]; + mTimeoutTimer = [NSTimer timerWithTimeInterval:60.0 target:self selector:@selector(manageTimeouts:) userInfo:nil repeats:YES]; [[NSRunLoop currentRunLoop] addTimer:mTimeoutTimer forMode:NSDefaultRunLoopMode]; [server start]; } @@ -142,6 +153,9 @@ -(void)subscribe:(id)subscriber completion:(void (^)(NSStri NSLog(@"[UPnP-GENA] Subscribed successfully < uuid: %@ | timeout: %d >", retUUID, en.timeout); mEventSubscribers[retUUID] = en; + + [self processEventsFromBufferForKey: retUUID]; + [en release]; } else { @@ -190,6 +204,61 @@ - (void)unsubscribe:(id)subscriber withSID:(NSString *)uuid }); } + +/** + * As a race condition, first events can arrive before we have processed the response from an event subscrition, + * thus know and have initialized the subscription id. + * So each event that arrives, without us knowing a responsible observer already will be written into buffer. + * As soon as we know the subscription id, the buffer can be cleared. + * Each event in the buffer not consumed within a certain time frame will be discarded, as the race condition is typically + * just about some milliseconds. + */ + +- (void)bufferEarlyEventFromCurrentParser:(NSString *)uuid { + BufferedEvent *bufferedEvent = [[BufferedEvent alloc] init]; + [bufferedEvent setEvents:[[[parser events] copy] autorelease]]; + [bufferedEvent setEventTime:[[NSDate date]timeIntervalSince1970]]; + [bufferedEvent setSubscriptionID:uuid]; + [mArrivedEventsBuffers addObject: bufferedEvent]; + [bufferedEvent release]; +} + +-(void) processEventsFromBufferForKey: (NSString *) uuid { + + id thisObserver = nil; + NSMutableArray *toSend = [[NSMutableArray alloc] init]; + + [mMutex lock]; + @try { + ObserverEntry *entry = [mEventSubscribers objectForKey:uuid]; + if(entry != nil){ + thisObserver = entry.observer; + } + + NSMutableArray *eventsToRemove = [NSMutableArray array]; + + for (BufferedEvent *event in mArrivedEventsBuffers) { + if ([event.subscriptionID isEqualToString:uuid]) { + [toSend addObject:event]; + [eventsToRemove addObject:event]; + } + } + + // Remove events that are to be sent from mArrivedEventsBuffers + [mArrivedEventsBuffers removeObjectsInArray:eventsToRemove]; + + } @finally { + [mMutex unlock]; + } + + if (thisObserver) { + for (BufferedEvent *bufferedEvent in toSend) { + [thisObserver UPnPEvent:bufferedEvent.events]; + } + } +} + + /* * Incomming HTTP events * BasicHTTPServer_ObjC_Observer @@ -258,6 +327,8 @@ - (BOOL)request:(BasicHTTPServer_ObjC *)sender ObserverEntry *entry = mEventSubscribers[uuid]; if (entry != nil) { thisObserver = entry.observer; + } else { + [self bufferEarlyEventFromCurrentParser:uuid]; } [mMutex unlock]; if (thisObserver != nil) { @@ -282,6 +353,11 @@ - (BOOL)response:(BasicHTTPServer_ObjC*)sender return result; } +-(void)manageTimeouts:(NSTimer*)timer{ + [self manageSubscriptionTimeouts:timer]; + [self manageBufferedEventsTimeouts:timer]; +} + - (void)manageSubscriptionTimeouts:(NSTimer *)timer { double tm = [[NSDate date]timeIntervalSince1970]; @@ -331,4 +407,20 @@ - (void)manageSubscriptionTimeouts:(NSTimer *)timer { } +-(void)manageBufferedEventsTimeouts:(NSTimer*)timer{ + double tm = [[NSDate date]timeIntervalSince1970]; + NSMutableArray *remove = [[NSMutableArray alloc] init]; + [mMutex lock]; + + for (BufferedEvent *bufferedEvent in mArrivedEventsBuffers) { + // Race condition should be resolved after 60seconds latest + if (tm - bufferedEvent.eventTime >=60) { + [remove addObject:bufferedEvent]; + } + } + [mArrivedEventsBuffers removeObjectsInArray:remove]; + [mMutex unlock]; + +} + @end From 32b9c71cf6699c38885666b4b741639ed61833f4 Mon Sep 17 00:00:00 2001 From: bluegaspode Date: Wed, 7 Aug 2024 00:25:48 +0200 Subject: [PATCH 2/2] an observer gets current state, if we've received state earlier then the observer was registered --- src/api/BasicUPnPService.h | 2 ++ src/upnp/BasicUPnPService.m | 10 ++++++++++ 2 files changed, 12 insertions(+) diff --git a/src/api/BasicUPnPService.h b/src/api/BasicUPnPService.h index 54e2012..3d14f12 100755 --- a/src/api/BasicUPnPService.h +++ b/src/api/BasicUPnPService.h @@ -68,6 +68,7 @@ NSString *eventUUID; NSMutableDictionary *stateVariables; + NSMutableDictionary *currentStateVariableValues; NSMutableArray *mObservers; NSRecursiveLock *mMutex; @@ -81,6 +82,7 @@ @property (readwrite, retain) NSString *serviceType; @property (readonly, retain) SSDPDBDevice_ObjC *ssdpdevice; @property (readonly) NSMutableDictionary *stateVariables; +@property (readonly) NSMutableDictionary *currentStateVariableValues; @property (readonly) SoapAction *soap; @property (readwrite, retain) NSString *urn; @property (readwrite) BOOL isSetUp; diff --git a/src/upnp/BasicUPnPService.m b/src/upnp/BasicUPnPService.m index 80d22e9..882f69e 100755 --- a/src/upnp/BasicUPnPService.m +++ b/src/upnp/BasicUPnPService.m @@ -47,6 +47,7 @@ @implementation BasicUPnPService @synthesize controlURL; @synthesize ssdpdevice; @synthesize stateVariables; +@synthesize currentStateVariableValues; @synthesize urn; @synthesize soap; @synthesize isSetUp; @@ -76,6 +77,8 @@ - (instancetype)initWithSSDPDevice:(SSDPDBDevice_ObjC *)device { isSubscribedForEvents = NO; stateVariables = [[NSMutableDictionary alloc] init]; + currentStateVariableValues = [[NSMutableDictionary alloc] init]; + mObservers = [[NSMutableArray alloc] init]; @@ -101,6 +104,7 @@ - (void)dealloc { [baseURLString release]; [stateVariables release]; + [currentStateVariableValues release]; [urn release]; [soap release]; @@ -125,6 +129,11 @@ - (NSUInteger)addObserver:(BasicUPnPServiceObserver *)obs { [mMutex lock]; [mObservers addObject:obs]; ret = [mObservers count]; + + // a new observer should get current state (if we've received some state already) + if ([currentStateVariableValues count]>0) { + [obs basicUPnPService:self receivedEvents:currentStateVariableValues]; + } [mMutex unlock]; return ret; @@ -233,6 +242,7 @@ - (void)UPnPEvent:(NSDictionary *)events { BasicUPnPServiceObserver *obs = nil; [mMutex lock]; + [currentStateVariableValues addEntriesFromDictionary:events]; NSEnumerator *listeners = [mObservers objectEnumerator]; while (obs = [listeners nextObject]) { [obs basicUPnPService:self receivedEvents:events];