Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.

Commit e7ab0c6

Browse files
committed
Receive mixed stream with WebTransport datagram.
After RTP packets are received, it uses WebRTC RTP depacketizer for depacketizing, WebCodecs for decoding and MediaStreamGenerator for rendering.
1 parent 3717273 commit e7ab0c6

File tree

5 files changed

+192
-41
lines changed

5 files changed

+192
-41
lines changed

src/samples/conference/public/quic.html

+8-2
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,19 @@ <h2>Sample of QuicTransport</h2>
3030
<div class="operations">
3131
<button id="start-sending">Start sending</button>
3232
<button id="stop-sending">Stop sending</button>
33+
<button id="start-receiving">Start receiving</button>
34+
<button id="stop-receiving">Stop receiving</button>
35+
</div>
36+
<div id="videocontainer">
37+
<video id="remote-video" playsinline muted autoplay controls></video>
3338
</div>
3439
<div id="conference-status"></div>
3540
<div class="stats">
3641
</div>
37-
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/3.1.2/socket.io.js" type="text/javascript"></script>
42+
43+
<script src="scripts/socket.io.js" type="text/javascript"></script>
3844
<script src="scripts/rest-sample.js" type="text/javascript"></script>
39-
<script src="scripts/owt.js" type="text/javascript"></script>
45+
<script src="../../../../dist/sdk-debug/owt.js" type="text/javascript"></script>
4046
<script src="scripts/quic.js" type="text/javascript"></script>
4147
</body>
4248

src/samples/conference/public/scripts/media-worker.js

+46-4
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
// SPDX-License-Identifier: Apache-2.0
44

55
/* eslint-disable require-jsdoc */
6-
/* global VideoEncoder */
6+
/* global VideoEncoder, VideoDecoder, EncodedVideoChunk */
77

8-
let bidirectionalStreamWritable, videoEncoder, frameBuffer, sendStreamWriter;
8+
let bidirectionalStreamWritable, videoEncoder, frameBuffer, sendStreamWriter,
9+
mediaSession, datagramReceiver, videoDecoder;
910
// 4 bytes for frame size before each frame. The 1st byte is reserved, always 0.
1011
const sizePrefix = 4;
1112

@@ -16,7 +17,15 @@ onmessage = (e) => {
1617
bidirectionalStreamWritable = e.data[1];
1718
sendStreamWriter = bidirectionalStreamWritable.getWriter();
1819
writeTrackId();
19-
initVideoEncoder();
20+
// initVideoEncoder();
21+
} else if (e.data[0] === 'datagram-receiver') {
22+
datagramReceiver = e.data[1];
23+
} else if (e.data[0] === 'encoded-video-frame') {
24+
if (videoDecoder.state === 'closed') {
25+
return;
26+
}
27+
videoDecoder.decode(new EncodedVideoChunk(
28+
{timestamp: Date.now(), data: e.data[1], type: 'key'}));
2029
}
2130
};
2231

@@ -60,6 +69,22 @@ function initVideoEncoder() {
6069
});
6170
}
6271

72+
function initVideoDecoder() {
73+
videoDecoder = new VideoDecoder({
74+
output: videoFrameOutputCallback,
75+
error: webCodecsErrorCallback,
76+
});
77+
videoDecoder.configure({codec: 'avc1.42400a', optimizeForLatency: true});
78+
}
79+
80+
function videoFrameOutputCallback(frame) {
81+
postMessage(['video-frame', frame], [frame]);
82+
}
83+
84+
function webCodecsErrorCallback(error) {
85+
console.log('error: ' + error.message);
86+
}
87+
6388
// Read data from video track.
6489
async function readVideoData(readable) {
6590
const reader = readable.getReader();
@@ -72,4 +97,21 @@ async function readVideoData(readable) {
7297
videoEncoder.encode(value);
7398
value.close();
7499
}
75-
}
100+
}
101+
102+
async function fetchWasm() {
103+
const Module={};
104+
Module['instantiateWasm'] = async (imports, successCallback) => {
105+
const response = await fetch('./owt.wasm');
106+
const buffer = await response.arrayBuffer();
107+
const module=new WebAssembly.Module(buffer);
108+
const instance = await WebAssembly.instantiate(module, imports);
109+
successCallback(instance, module);
110+
return {};
111+
};
112+
// Module['wasmModule']=new WebAssembly.Module(buffer);
113+
importScripts('./owt.js');
114+
console.log('Got wasm binary.');
115+
}
116+
117+
initVideoDecoder();

src/samples/conference/public/scripts/quic.js

+115-29
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,25 @@
88

99
let quicChannel = null;
1010
let bidirectionalStream = null;
11-
let writeTask, mediaStream, mediaWorker, conferenceId, myId;
11+
let writeTask, mediaStream, mediaWorker, conferenceId, myId, mixedStream, generatorWriter;
12+
13+
window.Module={};
1214

1315
const conference = new Owt.Conference.ConferenceClient({
1416
webTransportConfiguration: {
1517
serverCertificateFingerprints: [{
1618
value:
17-
'DD:A8:11:FD:A1:08:17:41:36:CD:1A:33:1E:CF:AE:0D:46:3D:15:16:2C:67:C5:A2:06:35:C2:0E:88:A1:9E:C6',
19+
'59:74:C6:C5:2C:D8:E8:18:A9:D2:14:77:ED:94:89:87:DF:83:BA:B3:96:4C:4C:0B:B8:D3:22:58:11:55:67:1A',
20+
algorithm: 'sha-256',
21+
}],
22+
serverCertificateHashes: [{
23+
value: new Uint8Array([
24+
0x59, 0x74, 0xC6, 0xC5, 0x2C, 0xD8, 0xE8, 0x18, 0xA9, 0xD2, 0x14,
25+
0x77, 0xED, 0x94, 0x89, 0x87, 0xDF, 0x83, 0xBA, 0xB3, 0x96, 0x4C,
26+
0x4C, 0x0B, 0xB8, 0xD3, 0x22, 0x58, 0x11, 0x55, 0x67, 0x1A
27+
]),
1828
algorithm: 'sha-256',
19-
}]
29+
}],
2030
}
2131
});
2232
conference.addEventListener('streamadded', async (event) => {
@@ -26,35 +36,51 @@ conference.addEventListener('streamadded', async (event) => {
2636
conferenceId, event.stream.id, 'common',
2737
'http://jianjunz-nuc-ubuntu.sh.intel.com:3001');
2838
}
29-
if (event.stream.source.data || event.stream.source.video) {
30-
const subscription = await conference.subscribe(
31-
event.stream,
32-
{audio: false, video: {codecs: ['h264']}, transport: {type: 'quic'}});
33-
const reader = subscription.stream.readable.getReader();
34-
while (true) {
35-
const {value, done} = await reader.read();
36-
if (done) {
37-
console.log('Subscription ends.');
38-
break;
39-
}
40-
console.log('Received data: ' + value);
41-
}
42-
}
39+
// if (event.stream.source.data) {
40+
// const subscription = await conference.subscribe(
41+
// event.stream,
42+
// // {transport:{type: 'quic'}});
43+
// {audio: false, video: {codecs: ['h264']}, transport: {type: 'quic'}});
44+
// const reader = subscription.stream.readable.getReader();
45+
// while (true) {
46+
// const {value, done} = await reader.read();
47+
// if (done) {
48+
// console.log('Subscription ends.');
49+
// break;
50+
// }
51+
// //console.log('Received data: ' + value);
52+
// }
53+
// }
4354
});
4455

4556
function updateConferenceStatus(message) {
4657
document.getElementById('conference-status').innerHTML +=
4758
('<p>' + message + '</p>');
4859
}
4960

61+
function initWorker() {
62+
mediaWorker = new Worker('./scripts/media-worker.js');
63+
mediaWorker.onmessage=((e) => {
64+
if (e.data[0] === 'video-frame') {
65+
generatorWriter.write(e.data[1]);
66+
//console.log(e.data[1]);
67+
}
68+
});
69+
}
5070

5171
function joinConference() {
5272
return new Promise((resolve, reject) => {
5373
createToken(undefined, 'user', 'presenter', token => {
5474
conference.join(token).then((info) => {
5575
conferenceId = info.id;
5676
myId = info.self.id;
77+
for (const stream of info.remoteStreams) {
78+
if (stream.source.video === 'mixed') {
79+
mixedStream = stream;
80+
}
81+
}
5782
updateConferenceStatus('Connected to conference server.');
83+
initWorker();
5884
resolve();
5985
});
6086
}, 'http://jianjunz-nuc-ubuntu.sh.intel.com:3001');
@@ -89,17 +115,17 @@ async function attachReader(stream) {
89115

90116
async function createSendChannel() {
91117
bidirectionalStream = await conference.createSendStream();
92-
const localStream = new Owt.Base.LocalStream(
93-
bidirectionalStream,
94-
new Owt.Base.StreamSourceInfo(undefined, 'camera', undefined));
95-
attachReader(bidirectionalStream);
96-
const publication = await conference.publish(
97-
localStream, {video: {codec: 'h264'}, transport: {type: 'quic'}});
98118
// const localStream = new Owt.Base.LocalStream(
99119
// bidirectionalStream,
100-
// new Owt.Base.StreamSourceInfo(undefined, undefined, true));
120+
// new Owt.Base.StreamSourceInfo(undefined, 'camera', undefined));
121+
// attachReader(bidirectionalStream);
101122
// const publication = await conference.publish(
102-
// localStream, {transport: {type: 'quic'}});
123+
// localStream, {video: {codec: 'h264'}, transport: {type: 'quic'}});
124+
const localStream = new Owt.Base.LocalStream(
125+
bidirectionalStream,
126+
new Owt.Base.StreamSourceInfo(undefined, undefined, true));
127+
const publication = await conference.publish(
128+
localStream, {transport: {type: 'quic'}});
103129
console.log(publication);
104130
updateConferenceStatus('Created send channel.');
105131
}
@@ -123,7 +149,6 @@ async function writeUuid() {
123149
async function writeVideoData() {
124150
mediaStream = await navigator.mediaDevices.getUserMedia({video: true});
125151
const track = new MediaStreamTrackProcessor(mediaStream.getVideoTracks()[0]);
126-
mediaWorker = new Worker('./scripts/media-worker.js');
127152
mediaWorker.postMessage(['video-source', track.readable], [track.readable]);
128153
mediaWorker.postMessage(
129154
['send-stream', bidirectionalStream.writable],
@@ -135,26 +160,87 @@ async function writeData() {
135160
const encoded = encoder.encode('message', {stream: true});
136161
const writer = bidirectionalStream.writable.getWriter();
137162
await writer.ready;
138-
await writer.write(new ArrayBuffer(2));
163+
const ab=new Uint8Array(10000);
164+
ab.fill(1, 0);
165+
await writer.write(ab);
139166
writer.releaseLock();
140167
return;
141168
}
142169

143170
window.addEventListener('load', () => {
144171
windowOnLoad();
172+
fetchWasm();
145173
});
146174

147175
document.getElementById('start-sending').addEventListener('click', async () => {
148176
if (!bidirectionalStream) {
149177
updateConferenceStatus('Stream is not created.');
150178
return;
151179
}
152-
writeVideoData();
153-
// writeTask = setInterval(writeData, 2000);
180+
//writeVideoData();
181+
writeTask = setInterval(writeData, 200);
154182
updateConferenceStatus('Started sending.');
155183
});
156184

157185
document.getElementById('stop-sending').addEventListener('click', () => {
158186
clearInterval(writeTask);
159187
updateConferenceStatus('Stopped sending.');
160188
});
189+
190+
document.getElementById('start-receiving')
191+
.addEventListener('click', async () => {
192+
const video=document.getElementById('remote-video');
193+
const generator = new MediaStreamTrackGenerator({kind: 'video'});
194+
generatorWriter=generator.writable.getWriter();
195+
video.srcObject = new MediaStream([generator]);
196+
const reader = conference.datagramReader();
197+
const ms = new Module.MediaSession();
198+
const receiver = ms.createRtpVideoReceiver();
199+
receiver.setCompleteFrameCallback((frame) => {
200+
const copiedFrame = frame.slice(0);
201+
mediaWorker.postMessage(
202+
['encoded-video-frame', copiedFrame], [copiedFrame.buffer]);
203+
});
204+
subscribeMixedStream();
205+
while (true) {
206+
const received = await reader.read();
207+
const buffer = Module._malloc(received.value.byteLength);
208+
Module.writeArrayToMemory(received.value, buffer);
209+
receiver.onRtpPacket(buffer, received.value.byteLength);
210+
}
211+
});
212+
213+
async function fetchWasm() {
214+
Module['instantiateWasm'] = async (imports, successCallback) => {
215+
const response = await fetch('scripts/owt.wasm');
216+
const buffer = await response.arrayBuffer();
217+
const module=await WebAssembly.compile(buffer);
218+
const instance = await WebAssembly.instantiate(module, imports);
219+
successCallback(instance, module);
220+
return {};
221+
};
222+
const scriptPromise = new Promise((resolve, reject) => {
223+
const script = document.createElement('script');
224+
document.body.appendChild(script);
225+
script.onload = resolve;
226+
script.onerror = reject;
227+
script.async = true;
228+
script.src = 'scripts/owt.js';
229+
});
230+
await scriptPromise;
231+
}
232+
233+
async function subscribeMixedStream() {
234+
const subscription = await conference.subscribe(
235+
mixedStream,
236+
{audio: false, video: {codecs: ['h264']}, transport: {type: 'quic'}});
237+
const reader = subscription.stream.readable.getReader();
238+
while (true) {
239+
const {value, done} = await reader.read();
240+
if (done) {
241+
console.log('Subscription ends.');
242+
break;
243+
}
244+
// console.log('Received data: ' + value);
245+
}
246+
}

src/sdk/conference/client.js

+4
Original file line numberDiff line numberDiff line change
@@ -558,5 +558,9 @@ export const ConferenceClient = function(config, signalingImpl) {
558558
}
559559
return quicTransportChannel.createSendStream();
560560
};
561+
562+
this.datagramReader = function() {
563+
return quicTransportChannel.datagramReader();
564+
};
561565
}
562566
};

src/sdk/conference/webtransport/connection.js

+19-6
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ export class QuicConnection extends EventDispatcher {
4040
new Map(); // Key is subscription ID, value is a promise.
4141
this._transportId = this._token.transportId;
4242
this._initReceiveStreamReader();
43+
//this._initDatagrams();
4344
}
4445

4546
/**
@@ -73,6 +74,14 @@ export class QuicConnection extends EventDispatcher {
7374
await this._authenticate(this._tokenString);
7475
}
7576

77+
async _initDatagrams() {
78+
const datagramReader = this._quicTransport.datagrams.readable.getReader();
79+
while (true) {
80+
const value = await datagramReader.read();
81+
console.log(value);
82+
}
83+
}
84+
7685
async _initReceiveStreamReader() {
7786
const receiveStreamReader =
7887
this._quicTransport.incomingBidirectionalStreams.getReader();
@@ -275,12 +284,12 @@ export class QuicConnection extends EventDispatcher {
275284
if (typeof options !== 'object') {
276285
return Promise.reject(new TypeError('Options should be an object.'));
277286
}
278-
if (options.audio === undefined) {
279-
options.audio = !!stream.settings.audio;
280-
}
281-
if (options.video === undefined) {
282-
options.video = !!stream.settings.video;
283-
}
287+
// if (options.audio === undefined) {
288+
// options.audio = !!stream.settings.audio;
289+
// }
290+
// if (options.video === undefined) {
291+
// options.video = !!stream.settings.video;
292+
// }
284293
let mediaOptions;
285294
let dataOptions;
286295
if (options.audio || options.video) {
@@ -416,4 +425,8 @@ export class QuicConnection extends EventDispatcher {
416425
// Ready message from server is useless for QuicStream since QuicStream has
417426
// its own status. Do nothing here.
418427
}
428+
429+
datagramReader() {
430+
return this._quicTransport.datagrams.readable.getReader();
431+
}
419432
}

0 commit comments

Comments
 (0)