@@ -66,23 +66,23 @@ export class DatabaseChanges implements IDatabaseChanges {
6666 return new WebSocket ( url , options ) ;
6767 }
6868
69- private _onConnectionStatusChanged ( ) {
69+ private async _onConnectionStatusChanged ( ) {
7070 const acquiredSemContext = acquireSemaphore ( this . _semaphore ) ;
7171
72- BluebirdPromise . resolve ( acquiredSemContext . promise )
73- . then ( ( ) => {
74- if ( this . connected ) {
75- this . _tcs . resolve ( this ) ;
76- return ;
77- }
72+ try {
73+ await acquiredSemContext . promise ;
74+
75+ if ( this . connected ) {
76+ this . _tcs . resolve ( this ) ;
77+ return ;
78+ }
7879
79- if ( this . _tcs . promise . isFulfilled ( ) ) {
80- this . _tcs = PromiseUtil . defer < IDatabaseChanges > ( ) ;
81- }
82- } )
83- . finally ( ( ) => {
84- acquiredSemContext . dispose ( ) ;
85- } ) ;
80+ if ( this . _tcs . promise . isFulfilled ( ) ) {
81+ this . _tcs = PromiseUtil . defer < IDatabaseChanges > ( ) ;
82+ }
83+ } finally {
84+ acquiredSemContext . dispose ( ) ;
85+ }
8686 }
8787
8888 public get connected ( ) {
@@ -201,7 +201,9 @@ export class DatabaseChanges implements IDatabaseChanges {
201201 }
202202
203203 public dispose ( ) : void {
204- Array . from ( this . _confirmations . values ( ) ) . forEach ( confirmation => confirmation . reject ( ) ) ;
204+ for ( const confirmation of this . _confirmations . values ( ) ) {
205+ confirmation . reject ( ) ;
206+ }
205207
206208 this . _isCancelled = true ;
207209 if ( this . _client ) {
@@ -258,37 +260,42 @@ export class DatabaseChanges implements IDatabaseChanges {
258260 }
259261
260262 private _send ( command : string , value : string , values : string [ ] ) : Promise < void > {
261- return new Promise < void > ( ( ( resolve , reject ) => {
263+ return new Promise < void > ( ( async ( resolve , reject ) => {
262264 let currentCommandId : number ;
263265
264- const acquiredSemContext = acquireSemaphore ( this . _semaphore ) ;
266+ const acquiredSemContext = acquireSemaphore ( this . _semaphore , {
267+ timeout : 15000 ,
268+ contextName : "DatabaseChanges._send()"
269+ } ) ;
265270
266- BluebirdPromise . resolve ( acquiredSemContext . promise )
267- . then ( ( ) => {
268- currentCommandId = ++ this . _commandId ;
271+ try {
272+ await acquiredSemContext . promise ;
269273
270- const payload = {
271- CommandId : currentCommandId ,
272- Command : command ,
273- Param : value
274- } ;
274+ currentCommandId = ++ this . _commandId ;
275275
276- if ( values && values . length ) {
277- payload [ "Params" ] = values ;
278- }
276+ const payload = {
277+ CommandId : currentCommandId ,
278+ Command : command ,
279+ Param : value
280+ } ;
279281
280- this . _confirmations . set ( currentCommandId , { resolve, reject } ) ;
281- const payloadAsString = JSON . stringify ( payload , null , 0 ) ;
282+ if ( values && values . length ) {
283+ payload [ "Params" ] = values ;
284+ }
282285
283- this . _client . send ( payloadAsString ) ;
284- } )
285- . catch ( ( err ) => {
286- if ( ! this . _isCancelled ) {
287- throw err ;
288- }
289- } )
290- . timeout ( 15000 )
291- . finally ( ( ) => acquiredSemContext . dispose ( ) ) ;
286+ this . _confirmations . set ( currentCommandId , { resolve, reject } ) ;
287+ const payloadAsString = JSON . stringify ( payload , null , 0 ) ;
288+
289+ this . _client . send ( payloadAsString ) ;
290+ } catch ( err ) {
291+ if ( ! this . _isCancelled ) {
292+ throw err ;
293+ }
294+ } finally {
295+ if ( acquiredSemContext ) {
296+ acquiredSemContext . dispose ( ) ;
297+ }
298+ }
292299 } ) ) ;
293300 }
294301
@@ -336,7 +343,10 @@ export class DatabaseChanges implements IDatabaseChanges {
336343 setTimeout ( ( ) => this . _doWorkInternal ( url ) , 1000 ) ;
337344 }
338345
339- Array . from ( this . _confirmations . values ( ) ) . forEach ( v => v . reject ( ) ) ;
346+ for ( const confirm of this . _confirmations . values ( ) ) {
347+ confirm . reject ( ) ;
348+ }
349+
340350 this . _confirmations . clear ( ) ;
341351 } ) ;
342352
@@ -365,8 +375,13 @@ export class DatabaseChanges implements IDatabaseChanges {
365375 const payloadParsed = JSON . parse ( data ) as any [ ] ;
366376
367377 try {
368- for ( const message of ( Array . isArray ( payloadParsed ) ? payloadParsed : [ payloadParsed ] ) ) {
378+ const messages = Array . isArray ( payloadParsed ) ? payloadParsed : [ payloadParsed ] ;
379+ for ( const message of messages ) {
369380 const type = message . Type ;
381+ if ( ! type ) {
382+ continue ;
383+ }
384+
370385 switch ( type ) {
371386 case "Error" :
372387 const exceptionAsString = message . Exception ;
@@ -419,9 +434,9 @@ export class DatabaseChanges implements IDatabaseChanges {
419434
420435 this . _emitter . emit ( "error" , e ) ;
421436
422- Array . from ( this . _counters . values ( ) ) . forEach ( state => {
437+ for ( const state of this . _counters . values ( ) ) {
423438 state . error ( e ) ;
424- } ) ;
439+ }
425440 }
426441
427442 public forAllCounters ( ) : IChangesObservable < CounterChange > {
0 commit comments