Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: audio video stream missing data detector #30

Merged
merged 11 commits into from
Jan 23, 2025
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,23 @@ const exampleIssue = {
}
```


### MissingStreamDataDetector
Detects issues with missing data in active inbound streams
```ts
const exampleIssue = {
type: 'stream',
reason: 'missing-video-stream-data' | 'missing-audio-stream-data',
trackIdentifier: 'some-track-id',
statsSample: {
bytesReceivedDelta: 0, // always zero if issue detected
bytesReceived: 2392384,
trackDetached: false,
trackEnded: false,
},
}
```

## Roadmap

- [ ] Adaptive getStats() call interval based on last getStats() execution time
Expand Down
2 changes: 2 additions & 0 deletions src/WebRTCIssueDetector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
} from './detectors';
import { CompositeRTCStatsParser, RTCStatsParser } from './parser';
import createLogger from './utils/logger';
import MissingStreamDataDetector from './detectors/MissingStreamDataDetector';

class WebRTCIssueDetector {
readonly eventEmitter: WebRTCIssueEmitter;
Expand Down Expand Up @@ -67,6 +68,7 @@ class WebRTCIssueDetector {
new AvailableOutgoingBitrateIssueDetector(),
new UnknownVideoDecoderImplementationDetector(),
new FrozenVideoTrackDetector(),
new MissingStreamDataDetector(),
];

this.networkScoresCalculator = params.networkScoresCalculator ?? new DefaultNetworkScoresCalculator();
Expand Down
165 changes: 165 additions & 0 deletions src/detectors/MissingStreamDataDetector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import {
CommonParsedInboundStreamStats,
IssueDetectorResult,
IssuePayload,
IssueReason,
IssueType,
WebRTCStatsParsed,
} from '../types';
import BaseIssueDetector from './BaseIssueDetector';

interface MissingStreamDetectorParams {
timeoutMs?: number; // delay to report the issue no more often then once per specified timeout
steps?: number; // number of last stats to check
}

export default class MissingStreamDataDetector extends BaseIssueDetector {
readonly #lastMarkedAt = new Map<string, number>();

readonly #timeoutMs: number;

readonly #steps: number;

constructor(params: MissingStreamDetectorParams = {}) {
super();
this.#timeoutMs = params.timeoutMs ?? 15_000;
this.#steps = params.steps ?? 3;
}

performDetection(data: WebRTCStatsParsed): IssueDetectorResult {
const { connection: { id: connectionId } } = data;
const issues = this.processData(data);
this.setLastProcessedStats(connectionId, data);
return issues;
}

private processData(data: WebRTCStatsParsed): IssueDetectorResult {
const issues: IssueDetectorResult = [];

const allLastProcessedStats = [...this.getAllLastProcessedStats(data.connection.id), data];
if (allLastProcessedStats.length < this.#steps) {
return issues;
}

const lastNProcessedStats = allLastProcessedStats.slice(-this.#steps);

const lastNVideoInbound = lastNProcessedStats.map((stats) => stats.video.inbound);
const lastNAudioInbound = lastNProcessedStats.map((stats) => stats.audio.inbound);

issues.push(...this.detectMissingData(
lastNAudioInbound as unknown as CommonParsedInboundStreamStats[][],
IssueType.Stream,
IssueReason.MissingAudioStreamData,
));

issues.push(...this.detectMissingData(
lastNVideoInbound,
IssueType.Stream,
IssueReason.MissingVideoStreamData,
));

const unvisitedTrackIds = new Set(this.#lastMarkedAt.keys());

unvisitedTrackIds.forEach((trackId) => {
const lastMarkedAt = this.#lastMarkedAt.get(trackId);
if (lastMarkedAt && Date.now() - lastMarkedAt > this.#timeoutMs) {
this.removeMarkedIssue(trackId);
}
});

return issues;
}

private detectMissingData(
lastNInboundStats: CommonParsedInboundStreamStats[][],
type: IssueType,
reason: IssueReason,
): IssueDetectorResult {
const issues: IssuePayload[] = [];

const currentInboundStats = lastNInboundStats.pop()!;
const prevInboundItemsByTrackId = MissingStreamDataDetector.mapStatsByTrackId(lastNInboundStats);

currentInboundStats.forEach((inboundItem) => {
const trackId = inboundItem.track.trackIdentifier;

const prevInboundItems = prevInboundItemsByTrackId.get(trackId);

if (!Array.isArray(prevInboundItems) || prevInboundItems.length === 0) {
return;
}

if (inboundItem.track.detached || inboundItem.track.ended) {
return;
}

if (!MissingStreamDataDetector.isAllBytesReceivedDidntChange(inboundItem.bytesReceived, prevInboundItems)) {
this.removeMarkedIssue(trackId);
return;
}

const issueMarked = this.markIssue(trackId);

if (!issueMarked) {
return;
}

const statsSample = {
bytesReceived: inboundItem.bytesReceived,
};

issues.push({
type,
reason,
statsSample,
trackIdentifier: trackId,
});
});

return issues;
}

private static mapStatsByTrackId(
items: CommonParsedInboundStreamStats[][],
): Map<string, CommonParsedInboundStreamStats[]> {
const statsById = new Map<string, CommonParsedInboundStreamStats[]>();
items.forEach((inboundItems) => {
inboundItems.forEach((inbountItem) => {
const accumulatedItems = statsById.get(inbountItem.track.trackIdentifier) || [];
accumulatedItems.push(inbountItem);
statsById.set(inbountItem.track.trackIdentifier, accumulatedItems);
});
});

return statsById;
}

private static isAllBytesReceivedDidntChange(
bytesReceived: number, inboundItems: CommonParsedInboundStreamStats[],
): boolean {
for (let i = 0; i < inboundItems.length; i += 1) {
const inboundItem = inboundItems[i];
if (inboundItem.bytesReceived !== bytesReceived) {
return false;
}
}

return true;
}

private markIssue(trackId: string): boolean {
const now = Date.now();
const lastMarkedAt = this.#lastMarkedAt.get(trackId);

if (!lastMarkedAt || now - lastMarkedAt > this.#timeoutMs) {
this.#lastMarkedAt.set(trackId, now);
return true;
}

return false;
}

private removeMarkedIssue(trackId: string): void {
this.#lastMarkedAt.delete(trackId);
}
}
14 changes: 14 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ export enum IssueReason {
LowInboundMOS = 'low-inbound-mean-opinion-score',
LowOutboundMOS = 'low-outbound-mean-opinion-score',
FrozenVideoTrack = 'frozen-video-track',
MissingVideoStreamData = 'missing-video-stream-data',
MissingAudioStreamData = 'missing-audio-stream-data',
}

export type IssuePayload = {
Expand Down Expand Up @@ -433,3 +435,15 @@ export interface Logger {
warn: (msg: any, ...meta: any[]) => void;
error: (msg: any, ...meta: any[]) => void;
}

type CommonKeys<T, U> = Extract<keyof T, keyof U>;

type CommonFields<T, U> = {
[K in CommonKeys<T, U>]: T[K] extends object
? U[K] extends object
? CommonFields<T[K], U[K]> // Recursively check nested objects
: never
: T[K];
};

export type CommonParsedInboundStreamStats = CommonFields<ParsedInboundVideoStreamStats, ParsedInboundAudioStreamStats>;
Loading