diff --git a/README.md b/README.md index 6cba1f4..ebd8847 100644 --- a/README.md +++ b/README.md @@ -235,6 +235,23 @@ const exampleIssue = { } ``` + +### MissingStreamDataDetector +Detects issues with missing data in active inbound streams +```ts +const exampleIssue = { + type: 'stream', + reason: 'missing-video-stream-data' | 'missing-audio-stream-data', + trackIdentifier: 'some-track-id', + statsSample: { + bytesReceivedDelta: 0, // always zero if issue detected + bytesReceived: 2392384, + trackDetached: false, + trackEnded: false, + }, +} +``` + ## Roadmap - [ ] Adaptive getStats() call interval based on last getStats() execution time diff --git a/src/WebRTCIssueDetector.ts b/src/WebRTCIssueDetector.ts index 3ba3cd3..709db76 100644 --- a/src/WebRTCIssueDetector.ts +++ b/src/WebRTCIssueDetector.ts @@ -27,6 +27,7 @@ import { } from './detectors'; import { CompositeRTCStatsParser, RTCStatsParser } from './parser'; import createLogger from './utils/logger'; +import MissingStreamDataDetector from './detectors/MissingStreamDataDetector'; class WebRTCIssueDetector { readonly eventEmitter: WebRTCIssueEmitter; @@ -67,6 +68,7 @@ class WebRTCIssueDetector { new AvailableOutgoingBitrateIssueDetector(), new UnknownVideoDecoderImplementationDetector(), new FrozenVideoTrackDetector(), + new MissingStreamDataDetector(), ]; this.networkScoresCalculator = params.networkScoresCalculator ?? new DefaultNetworkScoresCalculator(); diff --git a/src/detectors/MissingStreamDataDetector.ts b/src/detectors/MissingStreamDataDetector.ts new file mode 100644 index 0000000..a0b5ef3 --- /dev/null +++ b/src/detectors/MissingStreamDataDetector.ts @@ -0,0 +1,165 @@ +import { + CommonParsedInboundStreamStats, + IssueDetectorResult, + IssuePayload, + IssueReason, + IssueType, + WebRTCStatsParsed, +} from '../types'; +import BaseIssueDetector from './BaseIssueDetector'; + +interface MissingStreamDetectorParams { + timeoutMs?: number; // delay to report the issue no more often then once per specified timeout + steps?: number; // number of last stats to check +} + +export default class MissingStreamDataDetector extends BaseIssueDetector { + readonly #lastMarkedAt = new Map(); + + readonly #timeoutMs: number; + + readonly #steps: number; + + constructor(params: MissingStreamDetectorParams = {}) { + super(); + this.#timeoutMs = params.timeoutMs ?? 15_000; + this.#steps = params.steps ?? 3; + } + + performDetection(data: WebRTCStatsParsed): IssueDetectorResult { + const { connection: { id: connectionId } } = data; + const issues = this.processData(data); + this.setLastProcessedStats(connectionId, data); + return issues; + } + + private processData(data: WebRTCStatsParsed): IssueDetectorResult { + const issues: IssueDetectorResult = []; + + const allLastProcessedStats = [...this.getAllLastProcessedStats(data.connection.id), data]; + if (allLastProcessedStats.length < this.#steps) { + return issues; + } + + const lastNProcessedStats = allLastProcessedStats.slice(-this.#steps); + + const lastNVideoInbound = lastNProcessedStats.map((stats) => stats.video.inbound); + const lastNAudioInbound = lastNProcessedStats.map((stats) => stats.audio.inbound); + + issues.push(...this.detectMissingData( + lastNAudioInbound as unknown as CommonParsedInboundStreamStats[][], + IssueType.Stream, + IssueReason.MissingAudioStreamData, + )); + + issues.push(...this.detectMissingData( + lastNVideoInbound, + IssueType.Stream, + IssueReason.MissingVideoStreamData, + )); + + const unvisitedTrackIds = new Set(this.#lastMarkedAt.keys()); + + unvisitedTrackIds.forEach((trackId) => { + const lastMarkedAt = this.#lastMarkedAt.get(trackId); + if (lastMarkedAt && Date.now() - lastMarkedAt > this.#timeoutMs) { + this.removeMarkedIssue(trackId); + } + }); + + return issues; + } + + private detectMissingData( + lastNInboundStats: CommonParsedInboundStreamStats[][], + type: IssueType, + reason: IssueReason, + ): IssueDetectorResult { + const issues: IssuePayload[] = []; + + const currentInboundStats = lastNInboundStats.pop()!; + const prevInboundItemsByTrackId = MissingStreamDataDetector.mapStatsByTrackId(lastNInboundStats); + + currentInboundStats.forEach((inboundItem) => { + const trackId = inboundItem.track.trackIdentifier; + + const prevInboundItems = prevInboundItemsByTrackId.get(trackId); + + if (!Array.isArray(prevInboundItems) || prevInboundItems.length === 0) { + return; + } + + if (inboundItem.track.detached || inboundItem.track.ended) { + return; + } + + if (!MissingStreamDataDetector.isAllBytesReceivedDidntChange(inboundItem.bytesReceived, prevInboundItems)) { + this.removeMarkedIssue(trackId); + return; + } + + const issueMarked = this.markIssue(trackId); + + if (!issueMarked) { + return; + } + + const statsSample = { + bytesReceived: inboundItem.bytesReceived, + }; + + issues.push({ + type, + reason, + statsSample, + trackIdentifier: trackId, + }); + }); + + return issues; + } + + private static mapStatsByTrackId( + items: CommonParsedInboundStreamStats[][], + ): Map { + const statsById = new Map(); + items.forEach((inboundItems) => { + inboundItems.forEach((inbountItem) => { + const accumulatedItems = statsById.get(inbountItem.track.trackIdentifier) || []; + accumulatedItems.push(inbountItem); + statsById.set(inbountItem.track.trackIdentifier, accumulatedItems); + }); + }); + + return statsById; + } + + private static isAllBytesReceivedDidntChange( + bytesReceived: number, inboundItems: CommonParsedInboundStreamStats[], + ): boolean { + for (let i = 0; i < inboundItems.length; i += 1) { + const inboundItem = inboundItems[i]; + if (inboundItem.bytesReceived !== bytesReceived) { + return false; + } + } + + return true; + } + + private markIssue(trackId: string): boolean { + const now = Date.now(); + const lastMarkedAt = this.#lastMarkedAt.get(trackId); + + if (!lastMarkedAt || now - lastMarkedAt > this.#timeoutMs) { + this.#lastMarkedAt.set(trackId, now); + return true; + } + + return false; + } + + private removeMarkedIssue(trackId: string): void { + this.#lastMarkedAt.delete(trackId); + } +} diff --git a/src/types.ts b/src/types.ts index 7f6e7d6..d56b8d0 100644 --- a/src/types.ts +++ b/src/types.ts @@ -83,6 +83,8 @@ export enum IssueReason { LowInboundMOS = 'low-inbound-mean-opinion-score', LowOutboundMOS = 'low-outbound-mean-opinion-score', FrozenVideoTrack = 'frozen-video-track', + MissingVideoStreamData = 'missing-video-stream-data', + MissingAudioStreamData = 'missing-audio-stream-data', } export type IssuePayload = { @@ -433,3 +435,15 @@ export interface Logger { warn: (msg: any, ...meta: any[]) => void; error: (msg: any, ...meta: any[]) => void; } + +type CommonKeys = Extract; + +type CommonFields = { + [K in CommonKeys]: T[K] extends object + ? U[K] extends object + ? CommonFields // Recursively check nested objects + : never + : T[K]; +}; + +export type CommonParsedInboundStreamStats = CommonFields;