9
9
import BaseIssueDetector from './BaseIssueDetector' ;
10
10
11
11
interface MissingStreamDetectorParams {
12
+ timeoutMs ?: number ; // delay to report the issue no more often then once specified value
12
13
steps ?: number ; // number of last stats to check
13
14
}
14
15
@@ -21,7 +22,7 @@ export default class MissingStreamDataDetector extends BaseIssueDetector {
21
22
22
23
constructor ( params : MissingStreamDetectorParams = { } ) {
23
24
super ( ) ;
24
- this . #timeoutMs = 5_000 ;
25
+ this . #timeoutMs = params . timeoutMs ?? 15_000 ;
25
26
this . #steps = params . steps ?? 3 ;
26
27
}
27
28
@@ -40,19 +41,19 @@ export default class MissingStreamDataDetector extends BaseIssueDetector {
40
41
return issues ;
41
42
}
42
43
43
- const lastThreeProcessedStats = allLastProcessedStats . slice ( - this . #steps) ;
44
+ const lastNProcessedStats = allLastProcessedStats . slice ( - this . #steps) ;
44
45
45
- const lastThreeVideoInbound = lastThreeProcessedStats . map ( ( stats ) => stats . video . inbound ) ;
46
- const lastThreeAudioInbound = lastThreeProcessedStats . map ( ( stats ) => stats . audio . inbound ) ;
46
+ const lastNVideoInbound = lastNProcessedStats . map ( ( stats ) => stats . video . inbound ) ;
47
+ const lastNAudioInbound = lastNProcessedStats . map ( ( stats ) => stats . audio . inbound ) ;
47
48
48
49
issues . push ( ...this . detectMissingData (
49
- lastThreeAudioInbound as unknown as CommonParsedInboundStreamStats [ ] [ ] ,
50
+ lastNAudioInbound as unknown as CommonParsedInboundStreamStats [ ] [ ] ,
50
51
IssueType . Stream ,
51
52
IssueReason . MissingAudioStreamData ,
52
53
) ) ;
53
54
54
55
issues . push ( ...this . detectMissingData (
55
- lastThreeVideoInbound ,
56
+ lastNVideoInbound ,
56
57
IssueType . Stream ,
57
58
IssueReason . MissingVideoStreamData ,
58
59
) ) ;
@@ -70,36 +71,29 @@ export default class MissingStreamDataDetector extends BaseIssueDetector {
70
71
}
71
72
72
73
private detectMissingData (
73
- lastThreeInboundStats : CommonParsedInboundStreamStats [ ] [ ] ,
74
+ lastNInboundStats : CommonParsedInboundStreamStats [ ] [ ] ,
74
75
type : IssueType ,
75
76
reason : IssueReason ,
76
77
) : IssueDetectorResult {
77
78
const issues : IssuePayload [ ] = [ ] ;
78
79
79
- const firstInboundStats = lastThreeInboundStats [ 0 ] ;
80
- const secondInboundStats = lastThreeInboundStats [ 1 ] ;
81
- const currentInboundStats = lastThreeInboundStats [ 2 ] ;
82
-
83
- const firstInboundItemsByTrackId = MissingStreamDataDetector . mapStatsByTrackId ( firstInboundStats ) ;
84
- const secondInboundItemsByTrackId = MissingStreamDataDetector . mapStatsByTrackId ( secondInboundStats ) ;
80
+ const currentInboundStats = lastNInboundStats . pop ( ) ! ;
81
+ const prevInboundItemsByTrackId = MissingStreamDataDetector . mapStatsByTrackId ( lastNInboundStats ) ;
85
82
86
83
currentInboundStats . forEach ( ( inboundItem ) => {
87
84
const trackId = inboundItem . track . trackIdentifier ;
88
85
89
- const firstInboundItem = firstInboundItemsByTrackId . get ( trackId ) ;
90
- const secondInboundItem = secondInboundItemsByTrackId . get ( trackId ) ;
91
- if ( ! firstInboundItem || ! secondInboundItem ) {
86
+ const prevInboundItems = prevInboundItemsByTrackId . get ( trackId ) ;
87
+
88
+ if ( ! Array . isArray ( prevInboundItems ) || prevInboundItems . length === 0 ) {
92
89
return ;
93
90
}
94
91
95
92
if ( inboundItem . track . detached || inboundItem . track . ended ) {
96
93
return ;
97
94
}
98
95
99
- if (
100
- firstInboundItem . bytesReceived === secondInboundItem . bytesReceived
101
- && secondInboundItem . bytesReceived === inboundItem . bytesReceived
102
- ) {
96
+ if ( MissingStreamDataDetector . isAllBytesReceivedDidntChange ( inboundItem . bytesReceived , prevInboundItems ) ) {
103
97
const hasIssue = this . markIssue ( trackId ) ;
104
98
105
99
if ( ! hasIssue ) {
@@ -127,9 +121,27 @@ export default class MissingStreamDataDetector extends BaseIssueDetector {
127
121
return issues ;
128
122
}
129
123
130
- private static mapStatsByTrackId ( items : CommonParsedInboundStreamStats [ ] ) {
131
- return new Map < string , CommonParsedInboundStreamStats > ( items
132
- . map ( ( item ) => [ item . track . trackIdentifier , item ] as const ) ) ;
124
+ private static mapStatsByTrackId ( items : CommonParsedInboundStreamStats [ ] [ ] ) : Map < string , CommonParsedInboundStreamStats [ ] > {
125
+ const statsById = new Map < string , CommonParsedInboundStreamStats [ ] > ( ) ;
126
+ items . forEach ( ( inboundItems ) => {
127
+ inboundItems . forEach ( ( inbountItem ) => {
128
+ const accumulatedItems = statsById . get ( inbountItem . track . trackIdentifier ) || [ ] ;
129
+ accumulatedItems . push ( inbountItem ) ;
130
+ statsById . set ( inbountItem . track . trackIdentifier , accumulatedItems ) ;
131
+ } ) ;
132
+ } )
133
+
134
+ return statsById ;
135
+ }
136
+
137
+ private static isAllBytesReceivedDidntChange ( bytesReceived : number , inboundItems : CommonParsedInboundStreamStats [ ] ) : boolean {
138
+ for ( const inboundItem of inboundItems ) {
139
+ if ( inboundItem . bytesReceived !== bytesReceived ) {
140
+ return false ;
141
+ }
142
+ }
143
+
144
+ return true ;
133
145
}
134
146
135
147
private markIssue ( trackId : string ) : boolean {
0 commit comments