11import { createSocket , RemoteInfo } from 'dgram'
22import { Strings , Numbers , IProbeConfig , DEFAULT_PROBE_CONFIG } from './interfaces'
3- import { Observable , Observer , fromEvent , timer } from 'rxjs'
3+ import { Observable , Observer , fromEvent , timer , Subject } from 'rxjs'
44import { shareReplay , map , distinctUntilChanged , mapTo , takeUntil , scan } from 'rxjs/operators'
55
66type IMessage = readonly [ Buffer , RemoteInfo ]
@@ -41,19 +41,19 @@ export const flattenBuffersWithInfo =
4141
4242export const probe =
4343 ( config ?: Partial < IProbeConfig > ) =>
44- ( messages : Strings ) =>
45- ( until : Observable < any > ) : Observable < Strings > =>
44+ ( messages : Strings ) : Observable < Strings > =>
4645 Observable . create ( ( obs : Observer < Strings > ) => {
4746 const cfg = { ...DEFAULT_PROBE_CONFIG , ...( config || { } ) }
4847 const socket = createSocket ( { type : 'udp4' } )
4948 const socketMessages$ = fromEvent < IMessage > ( socket , 'message' ) . pipe ( map ( a => a [ 0 ] ) , shareReplay ( 1 ) )
49+ const internalLimit = new Subject ( )
5050
5151 socket . on ( 'err' , err => obs . error ( err ) )
5252 socket . on ( 'close' , ( ) => obs . complete ( ) )
5353
5454 timer ( 0 , cfg . PROBE_REQUEST_SAMPLE_RATE_MS ) . pipe (
5555 mapTo ( flattenBuffersWithInfo ( cfg . PORTS ) ( cfg . MULTICAST_ADDRESS ) ( messages . map ( mapStringToBuffer ) ) ) ,
56- takeUntil ( until ) )
56+ takeUntil ( internalLimit ) )
5757 . subscribe ( bfrPorts => {
5858 bfrPorts . forEach ( mdl => socket . send ( mdl . buffer , 0 , mdl . buffer . length , mdl . port , mdl . address ) )
5959 } )
@@ -65,10 +65,12 @@ export const probe =
6565 distinctUntilObjectChanged ,
6666 toArrayOfValues ,
6767 flattenDocumentStrings ,
68- takeUntil ( until )
69- ) . subscribe ( msg => obs . next ( msg ) , undefined , ( ) => {
70- setTimeout ( ( ) => {
71- socket . close ( )
72- } , 1000 )
73- } )
68+ takeUntil ( internalLimit )
69+ ) . subscribe ( msg => obs . next ( msg ) )
70+
71+ return function unsubscribe ( ) {
72+ internalLimit . next ( )
73+ internalLimit . complete ( )
74+ socket . close ( )
75+ }
7476 } )
0 commit comments