Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.

Commit 12422ef

Browse files
committed
Move RTP processing from sample to SDK.
1 parent 01b9d63 commit 12422ef

File tree

4 files changed

+55
-78
lines changed

4 files changed

+55
-78
lines changed

src/samples/conference/public/scripts/quic.js

+1-59
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,6 @@ function updateConferenceStatus(message) {
6363
('<p>' + message + '</p>');
6464
}
6565

66-
function initWorker() {
67-
dataWorker = new Worker('./scripts/data-worker.js');
68-
dataWorker.onmessage=((e) => {
69-
if (e.data[0] === 'video-frame') {
70-
generatorWriter.write(e.data[1]);
71-
//console.log(e.data[1]);
72-
}
73-
});
74-
}
75-
7666
function joinConference() {
7767
return new Promise((resolve, reject) => {
7868
createToken(undefined, 'user', 'presenter', token => {
@@ -85,7 +75,6 @@ function joinConference() {
8575
}
8676
}
8777
updateConferenceStatus('Connected to conference server.');
88-
initWorker();
8978
resolve();
9079
});
9180
}, 'http://jianjunz-nuc-ubuntu.sh.intel.com:3001');
@@ -153,7 +142,6 @@ async function writeData() {
153142

154143
window.addEventListener('load', () => {
155144
windowOnLoad();
156-
fetchWasm();
157145
});
158146

159147
document.getElementById('start-sending').addEventListener('click', async () => {
@@ -190,58 +178,12 @@ document.getElementById('stop-sending').addEventListener('click', () => {
190178

191179
document.getElementById('start-receiving')
192180
.addEventListener('click', async () => {
193-
const video=document.getElementById('remote-video');
194-
const generator = new MediaStreamTrackGenerator({kind: 'video'});
195-
generatorWriter=generator.writable.getWriter();
196-
video.srcObject = new MediaStream([generator]);
197-
const reader = conference.datagramReader();
198-
const ms = new Module.MediaSession();
199-
const receiver = ms.createRtpVideoReceiver();
200-
receiver.setCompleteFrameCallback((frame) => {
201-
const copiedFrame = frame.slice(0);
202-
dataWorker.postMessage(
203-
['encoded-video-frame', copiedFrame], [copiedFrame.buffer]);
204-
});
205181
subscribeMixedStream();
206-
while (true) {
207-
const received = await reader.read();
208-
const buffer = Module._malloc(received.value.byteLength);
209-
Module.writeArrayToMemory(received.value, buffer);
210-
receiver.onRtpPacket(buffer, received.value.byteLength);
211-
}
212182
});
213183

214-
async function fetchWasm() {
215-
Module['instantiateWasm'] = async (imports, successCallback) => {
216-
const response = await fetch('scripts/owt.wasm');
217-
const buffer = await response.arrayBuffer();
218-
const module=await WebAssembly.compile(buffer);
219-
const instance = await WebAssembly.instantiate(module, imports);
220-
successCallback(instance, module);
221-
return {};
222-
};
223-
const scriptPromise = new Promise((resolve, reject) => {
224-
const script = document.createElement('script');
225-
document.body.appendChild(script);
226-
script.onload = resolve;
227-
script.onerror = reject;
228-
script.async = true;
229-
script.src = 'scripts/owt.js';
230-
});
231-
await scriptPromise;
232-
}
233-
234184
async function subscribeMixedStream() {
235185
const subscription = await conference.subscribe(
236186
mixedStream,
237187
{audio: false, video: {codecs: ['h264']}, transport: {type: 'quic'}});
238-
const reader = subscription.stream.readable.getReader();
239-
while (true) {
240-
const {value, done} = await reader.read();
241-
if (done) {
242-
console.log('Subscription ends.');
243-
break;
244-
}
245-
// console.log('Received data: ' + value);
246-
}
188+
document.getElementById('remote-video').srcObject = subscription.stream;
247189
}

src/sdk/conference/client.js

+1-2
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,7 @@ export const ConferenceClient = function(config, workerDir, signalingImpl) {
161161
if (notification === 'soac' || notification === 'progress') {
162162
if (channels.has(data.id)) {
163163
channels.get(data.id).onMessage(notification, data);
164-
} else if (quicTransportChannel && quicTransportChannel
165-
.hasContentSessionId(data.id)) {
164+
} else if (quicTransportChannel) {
166165
quicTransportChannel.onMessage(notification, data);
167166
} else {
168167
Logger.warning('Cannot find a channel for incoming data.');

src/sdk/conference/webtransport/connection.js

+22-5
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,13 @@ export class QuicConnection extends EventDispatcher {
4242
this._subscribeOptions = new Map(); // Key is subscription ID.
4343
this._subscriptionInfoReady =
4444
new Map(); // Key is subscription ID, value is a promise.
45+
// Key is subscription ID, value is an object with audio and video RTP
46+
// configs.
47+
this._rtpConfigs = new Map();
4548
this._transportId = this._token.transportId;
4649
this._initReceiveStreamReader();
4750
this._worker = new Worker(workerDir + '/media-worker.js', {type: 'module'});
51+
this._initHandlersForWorker();
4852
// Key is subscription ID, value is a MediaStreamTrackGenerator writer.
4953
this._mstVideoGeneratorWriters = new Map();
5054
this._initRtpModule();
@@ -68,6 +72,8 @@ export class QuicConnection extends EventDispatcher {
6872
this._readyHandler();
6973
} else if (message.status === 'error') {
7074
this._errorHandler(message.data);
75+
} else if (message.status === 'rtp'){
76+
this._rtpHandler(message);
7177
}
7278
break;
7379
case 'stream':
@@ -507,17 +513,24 @@ export class QuicConnection extends EventDispatcher {
507513
// A MediaStream is associated with a subscription for media.
508514
// Media packets are received over WebTransport datagram.
509515
const generators = [];
510-
for (const track of mediaOptions) {
516+
for (const track of mediaOptions.tracks) {
511517
const generator =
512518
new MediaStreamTrackGenerator({kind: track.type});
513519
generators.push(generator);
514520
// TODO: Update key with the correct SSRC.
515521
this._mstVideoGeneratorWriters.set(
516-
'0', generator.writable.getWriter());
522+
data.id, generator.writable.getWriter());
517523
}
518524
const mediaStream = new MediaStream(generators);
519525
const subscription =
520526
this._createSubscription(data.id, mediaStream);
527+
this._worker.postMessage([
528+
'add-subscription',
529+
[
530+
subscription.id, options,
531+
this._rtpConfigs.get(subscription.id)
532+
]
533+
]);
521534
resolve(subscription);
522535
}
523536
if (this._subscriptionInfoReady.has(data.id)) {
@@ -582,17 +595,21 @@ export class QuicConnection extends EventDispatcher {
582595
// its own status. Do nothing here.
583596
}
584597

598+
_rtpHandler(message) {
599+
Logger.debug(`RTP config: ${JSON.stringify(message.data)}.`);
600+
this._rtpConfigs.set(message.id, message.data);
601+
}
602+
585603
datagramReader() {
586604
return this._quicTransport.datagrams.readable.getReader();
587605
}
588606

589-
initHandlersForWorker() {
607+
_initHandlersForWorker() {
590608
this._worker.onmessage = ((e) => {
591609
const [command, args] = e.data;
592610
switch (command) {
593611
case 'video-frame':
594-
// TODO: Use actual subscription ID.
595-
this._mstVideoGeneratorWriters.get('0').getWriter.write(args);
612+
this._mstVideoGeneratorWriters.get(args[0]).write(args[1]);
596613
break;
597614
default:
598615
Logger.warn('Unrecognized command ' + command);

src/sdk/conference/webtransport/media-worker.js

+31-12
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ const keyFrameRequested = new Map();
1919

2020
let wasmModule;
2121
let mediaSession;
22-
let rtpReceiver;
22+
// Key is SSRC, value is an RTP receiver.
23+
let rtpReceivers = new Map();
2324
let frameBuffer;
2425
let videoDecoder;
2526
// 4 bytes for frame size before each frame. The 1st byte is reserved, always 0.
@@ -46,6 +47,9 @@ onmessage = async (e) => {
4647
case 'rtp-packet':
4748
await handleRtpPacket(args);
4849
break;
50+
case 'add-subscription':
51+
addNewSubscription(...args);
52+
break;
4953
default:
5054
console.warn('Unrecognized command ' + command);
5155
}
@@ -69,14 +73,8 @@ async function initMediaSender(
6973
}
7074

7175
async function initRtpModule() {
72-
initVideoDecoder();
7376
wasmModule = await fetchWasm();
7477
mediaSession = new wasmModule.MediaSession();
75-
rtpReceiver = mediaSession.createRtpVideoReceiver();
76-
rtpReceiver.setCompleteFrameCallback((frame) => {
77-
videoDecoder.decode(new EncodedVideoChunk(
78-
{timestamp: Date.now(), data: frame, type: 'key'}));
79-
});
8078
}
8179

8280
async function fetchWasm() {
@@ -157,17 +155,17 @@ function initVideoEncoder(config, writer) {
157155
return videoEncoder;
158156
}
159157

160-
function initVideoDecoder() {
158+
function initVideoDecoder(subscriptionId) {
161159
videoDecoder = new VideoDecoder({
162-
output: videoFrameOutputCallback,
160+
output: videoFrameOutputCallback.bind(null, subscriptionId),
163161
error: webCodecsErrorCallback,
164162
});
165163
videoDecoder.configure({codec: 'avc1.42400a', optimizeForLatency: true});
166164
}
167165

168-
function videoFrameOutputCallback(frame) {
166+
function videoFrameOutputCallback(subscriptionId, frame) {
169167
// eslint-disable-next-line no-undef
170-
postMessage(['video-frame', frame], [frame]);
168+
postMessage(['video-frame', [subscriptionId, frame]], [frame]);
171169
frame.close();
172170
}
173171

@@ -196,8 +194,29 @@ async function readMediaData(trackReadable, encoder, publicationId) {
196194
}
197195
}
198196

197+
function getSsrc(packet) {
198+
// SSRC starts from the 65th bit, in network order.
199+
return new DataView(packet.buffer).getUint32(8, false);
200+
}
201+
199202
async function handleRtpPacket(packet) {
203+
const ssrc = getSsrc(packet);
200204
const buffer = wasmModule._malloc(packet.byteLength);
201205
wasmModule.writeArrayToMemory(packet, buffer);
202-
rtpReceiver.onRtpPacket(buffer, packet.byteLength);
206+
rtpReceivers.get(ssrc).onRtpPacket(buffer, packet.byteLength);
203207
}
208+
209+
function addNewSubscription(subscriptionId, subscribeOptions, rtpConfig) {
210+
// TODO: Audio is not supported yet, ignore the audio part.
211+
initVideoDecoder(subscriptionId);
212+
const videoSsrc = rtpConfig.video.ssrc;
213+
if (rtpReceivers.has(videoSsrc)) {
214+
console.error(`RTP receiver for SSRC ${videoSsrc} exits.`);
215+
}
216+
const rtpReceiver = mediaSession.createRtpVideoReceiver(videoSsrc);
217+
rtpReceivers.set(videoSsrc, rtpReceiver);
218+
rtpReceiver.setCompleteFrameCallback((frame) => {
219+
videoDecoder.decode(new EncodedVideoChunk(
220+
{timestamp: Date.now(), data: frame, type: 'key'}));
221+
});
222+
}

0 commit comments

Comments
 (0)