From f8633065b4b0830e3935935db94880a4bfe76c27 Mon Sep 17 00:00:00 2001 From: Jianjun Zhu Date: Fri, 13 Aug 2021 12:51:51 +0800 Subject: [PATCH 1/4] Publish media data with WebTransport. --- .../conference/public/scripts/media-worker.js | 75 ++++++ src/samples/conference/public/scripts/quic.js | 77 ++++-- src/sdk/base/publication.js | 20 +- src/sdk/base/transport.js | 4 +- src/sdk/conference/client.js | 15 +- src/sdk/conference/subscription.js | 4 +- .../connection.js} | 219 ++++++++++++++++-- .../webtransport/receive-stream-worker.js | 73 ++++++ 8 files changed, 436 insertions(+), 51 deletions(-) create mode 100644 src/samples/conference/public/scripts/media-worker.js rename src/sdk/conference/{quicconnection.js => webtransport/connection.js} (50%) create mode 100644 src/sdk/conference/webtransport/receive-stream-worker.js diff --git a/src/samples/conference/public/scripts/media-worker.js b/src/samples/conference/public/scripts/media-worker.js new file mode 100644 index 00000000..9bcc1df4 --- /dev/null +++ b/src/samples/conference/public/scripts/media-worker.js @@ -0,0 +1,75 @@ +// Copyright (C) <2021> Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +/* eslint-disable require-jsdoc */ +/* global VideoEncoder */ + +let bidirectionalStreamWritable, videoEncoder, frameBuffer, sendStreamWriter; +// 4 bytes for frame size before each frame. The 1st byte is reserved, always 0. +const sizePrefix = 4; + +onmessage = (e) => { + if (e.data[0] === 'video-source') { + readVideoData(e.data[1]); + } else if (e.data[0] === 'send-stream') { + bidirectionalStreamWritable = e.data[1]; + sendStreamWriter = bidirectionalStreamWritable.getWriter(); + writeTrackId(); + initVideoEncoder(); + } +}; + +async function videoOutput(chunk, metadata) { + if (bidirectionalStreamWritable) { + if (!frameBuffer || + frameBuffer.byteLength < chunk.byteLength + sizePrefix) { + frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix); + } + const bufferView = new Uint8Array(frameBuffer, sizePrefix); + chunk.copyTo(bufferView); + const dataView = + new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix); + dataView.setUint32(0, chunk.byteLength); + await sendStreamWriter.ready; + await sendStreamWriter.write(dataView); + console.log('Write a frame.'); + } +} + +function videoError(error) { + console.log('Encode error, ' + error); +} + +async function writeTrackId() { + const id = new Uint8Array(16); + id[16] = 2; + await sendStreamWriter.ready; + sendStreamWriter.write(id); +} + +function initVideoEncoder() { + videoEncoder = new VideoEncoder({output: videoOutput, error: videoError}); + videoEncoder.configure({ + codec: 'avc1.4d002a', + width: 640, + height: 480, + framerate: 30, + latencyMode: 'realtime', + avc: {format: 'annexb'}, + }); +} + +// Read data from video track. +async function readVideoData(readable) { + const reader = readable.getReader(); + while (true) { + const {value, done} = await reader.read(); + if (done) { + console.log('MediaStream ends.'); + break; + } + videoEncoder.encode(value); + value.close(); + } +} \ No newline at end of file diff --git a/src/samples/conference/public/scripts/quic.js b/src/samples/conference/public/scripts/quic.js index 1bd437a0..3d0d45a4 100644 --- a/src/samples/conference/public/scripts/quic.js +++ b/src/samples/conference/public/scripts/quic.js @@ -2,16 +2,34 @@ // // SPDX-License-Identifier: Apache-2.0 +/* eslint-disable require-jsdoc */ + 'use strict'; let quicChannel = null; let bidirectionalStream = null; -let writeTask; -const conference=new Owt.Conference.ConferenceClient(); +let writeTask, mediaStream, mediaWorker, conferenceId, myId; + +const conference = new Owt.Conference.ConferenceClient({ + webTransportConfiguration: { + serverCertificateFingerprints: [{ + value: + 'DD:A8:11:FD:A1:08:17:41:36:CD:1A:33:1E:CF:AE:0D:46:3D:15:16:2C:67:C5:A2:06:35:C2:0E:88:A1:9E:C6', + algorithm: 'sha-256', + }] + } +}); conference.addEventListener('streamadded', async (event) => { console.log(event.stream); - if (event.stream.source.data) { - const subscription = await conference.subscribe(event.stream); + if (event.stream.origin == myId) { + mixStream( + conferenceId, event.stream.id, 'common', + 'http://jianjunz-nuc-ubuntu.sh.intel.com:3001'); + } + if (event.stream.source.data || event.stream.source.video) { + const subscription = await conference.subscribe( + event.stream, + {audio: false, video: {codecs: ['h264']}, transport: {type: 'quic'}}); const reader = subscription.stream.readable.getReader(); while (true) { const {value, done} = await reader.read(); @@ -19,25 +37,27 @@ conference.addEventListener('streamadded', async (event) => { console.log('Subscription ends.'); break; } - console.log('Received data: '+value); + console.log('Received data: ' + value); } } }); function updateConferenceStatus(message) { document.getElementById('conference-status').innerHTML += - ('

' + message + '

'); + ('

' + message + '

'); } function joinConference() { return new Promise((resolve, reject) => { - createToken(undefined, 'user', 'presenter', resp => { - conference.join(resp).then(() => { + createToken(undefined, 'user', 'presenter', token => { + conference.join(token).then((info) => { + conferenceId = info.id; + myId = info.self.id; updateConferenceStatus('Connected to conference server.'); resolve(); }); - }); + }, 'http://jianjunz-nuc-ubuntu.sh.intel.com:3001'); }); }; @@ -55,10 +75,31 @@ function createRandomContentSessionId() { return id; } +async function attachReader(stream) { + const reader = stream.readable.getReader(); + while (true) { + const {value, done} = await reader.read(); + if (done) { + console.log('Ends.'); + break; + } + console.log('Received data: ' + value); + } +} + async function createSendChannel() { bidirectionalStream = await conference.createSendStream(); - const localStream=new Owt.Base.LocalStream(bidirectionalStream, new Owt.Base.StreamSourceInfo(undefined, undefined,true)); - const publication = await conference.publish(localStream); + const localStream = new Owt.Base.LocalStream( + bidirectionalStream, + new Owt.Base.StreamSourceInfo(undefined, 'camera', undefined)); + attachReader(bidirectionalStream); + const publication = await conference.publish( + localStream, {video: {codec: 'h264'}, transport: {type: 'quic'}}); + // const localStream = new Owt.Base.LocalStream( + // bidirectionalStream, + // new Owt.Base.StreamSourceInfo(undefined, undefined, true)); + // const publication = await conference.publish( + // localStream, {transport: {type: 'quic'}}); console.log(publication); updateConferenceStatus('Created send channel.'); } @@ -79,6 +120,16 @@ async function writeUuid() { return; } +async function writeVideoData() { + mediaStream = await navigator.mediaDevices.getUserMedia({video: true}); + const track = new MediaStreamTrackProcessor(mediaStream.getVideoTracks()[0]); + mediaWorker = new Worker('./scripts/media-worker.js'); + mediaWorker.postMessage(['video-source', track.readable], [track.readable]); + mediaWorker.postMessage( + ['send-stream', bidirectionalStream.writable], + [bidirectionalStream.writable]); +} + async function writeData() { const encoder = new TextEncoder(); const encoded = encoder.encode('message', {stream: true}); @@ -98,8 +149,8 @@ document.getElementById('start-sending').addEventListener('click', async () => { updateConferenceStatus('Stream is not created.'); return; } - await writeUuid(); - writeTask = setInterval(writeData, 2000); + writeVideoData(); + // writeTask = setInterval(writeData, 2000); updateConferenceStatus('Started sending.'); }); diff --git a/src/sdk/base/publication.js b/src/sdk/base/publication.js index 7ead125e..a1b4e8bc 100644 --- a/src/sdk/base/publication.js +++ b/src/sdk/base/publication.js @@ -195,17 +195,29 @@ export class PublishOptions { // eslint-disable-next-line require-jsdoc constructor(audio, video, transport) { /** - * @member {?Array | ?Array} audio + * @member {?Array | + * ?Array | ?AudioEncoderConfig } audio * @instance * @memberof Owt.Base.PublishOptions - * @desc Parameters for audio RtpSender. Publishing with RTCRtpEncodingParameters is an experimental feature. It is subject to change. + * @desc Parameters for audio RtpSender when transport's type is 'webrtc' or + * configuration of audio encoder when transport's type is 'quic'. + * Publishing with RTCRtpEncodingParameters is an experimental feature. It + * is subject to change. + * @see {@link https://www.w3.org/TR/webrtc/#rtcrtpencodingparameters|RTCRtpEncodingParameters} + * @see {@link https://w3c.github.io/webcodecs/#dictdef-audioencoderconfig|AudioEncoderConfig} */ this.audio = audio; /** - * @member {?Array | ?Array} video + * @member {?Array | + * ?Array | ?VideoEncoderConfig } video * @instance * @memberof Owt.Base.PublishOptions - * @desc Parameters for video RtpSender. Publishing with RTCRtpEncodingParameters is an experimental feature. It is subject to change. + * @desc Parameters for video RtpSender when transport's type is 'webrtc' or + * configuration of video encoder when transport's type is 'quic'. + * Publishing with RTCRtpEncodingParameters is an experimental feature. It + * is subject to change. + * @see {@link https://www.w3.org/TR/webrtc/#rtcrtpencodingparameters|RTCRtpEncodingParameters} + * @see {@link https://w3c.github.io/webcodecs/#dictdef-videoencoderconfig|VideoEncoderConfig} */ this.video = video; /** diff --git a/src/sdk/base/transport.js b/src/sdk/base/transport.js index 3bb5a79a..863e2c39 100644 --- a/src/sdk/base/transport.js +++ b/src/sdk/base/transport.js @@ -29,7 +29,9 @@ export class TransportConstraints { * @member {Array.} type * @instance * @memberof Owt.Base.TransportConstraints - * @desc Transport type for publication and subscription. + * @desc Transport type for publication and subscription. 'quic' is only + * supported in conference mode when WebTransport is supported by client and + * enabled at server side. */ this.type = type; /** diff --git a/src/sdk/conference/client.js b/src/sdk/conference/client.js index 35ee3581..91dbd3a8 100644 --- a/src/sdk/conference/client.js +++ b/src/sdk/conference/client.js @@ -16,7 +16,7 @@ import * as StreamModule from '../base/stream.js'; import {Participant} from './participant.js'; import {ConferenceInfo} from './info.js'; import {ConferencePeerConnectionChannel} from './channel.js'; -import {QuicConnection} from './quicconnection.js'; +import {QuicConnection} from './webtransport/connection.js'; import {RemoteMixedStream, ActiveAudioInputChangeEvent, LayoutChangeEvent} from './mixedstream.js'; import * as StreamUtilsModule from './streamutils.js'; @@ -464,7 +464,7 @@ export const ConferenceClient = function(config, signalingImpl) { * @instance * @desc Publish a LocalStream to conference server. Other participants will be able to subscribe this stream when it is successfully published. * @param {Owt.Base.LocalStream} stream The stream to be published. - * @param {(Owt.Base.PublishOptions|RTCRtpTransceiver[])} options If options is a PublishOptions, the stream will be published as options specified. If options is a list of RTCRtpTransceivers, each track in the first argument must have a corresponding RTCRtpTransceiver here, and the track will be published with the RTCRtpTransceiver associated with it. + * @param {(Owt.Base.PublishOptions|RTCRtpTransceiver[])} options If options is a PublishOptions, the stream will be published as options specified. If options is a list of RTCRtpTransceivers, each track in the first argument must have a corresponding RTCRtpTransceiver here, and the track will be published with the RTCRtpTransceiver associated with it. If the type of transport is quic, PublishOptions.audio should be AudioEncoderConfig, and PublishOptions.video should be VideoEncoderConfig. * @param {string[]} videoCodecs Video codec names for publishing. Valid values are 'VP8', 'VP9' and 'H264'. This parameter only valid when the second argument is PublishOptions and options.video is RTCRtpEncodingParameters. Publishing with RTCRtpEncodingParameters is an experimental feature. This parameter is subject to change. * @return {Promise} Returned promise will be resolved with a newly created Publication once specific stream is successfully published, or rejected with a newly created Error if stream is invalid or options cannot be satisfied. Successfully published means PeerConnection is established and server is able to process media data. */ @@ -472,8 +472,8 @@ export const ConferenceClient = function(config, signalingImpl) { if (!(stream instanceof StreamModule.LocalStream)) { return Promise.reject(new ConferenceError('Invalid stream.')); } - if (stream.source.data) { - return quicTransportChannel.publish(stream); + if (options?.transport?.type === 'quic') { + return quicTransportChannel.publish(stream, options, videoCodecs); } if (publishChannels.has(stream.mediaStream.id)) { return Promise.reject(new ConferenceError( @@ -501,13 +501,16 @@ export const ConferenceClient = function(config, signalingImpl) { 'Invalid source info. A remote stream is either a data stream or ' + 'a media stream.')); } + } + if (options?.transport?.type === 'quic') { if (quicTransportChannel) { - return quicTransportChannel.subscribe(stream); + return quicTransportChannel.subscribe(stream, options); } else { return Promise.reject(new TypeError('WebTransport is not supported.')); } + } else { + return peerConnectionChannel.subscribe(stream, options); } - return peerConnectionChannel.subscribe(stream, options); }; /** diff --git a/src/sdk/conference/subscription.js b/src/sdk/conference/subscription.js index 16676f24..84ff99f7 100644 --- a/src/sdk/conference/subscription.js +++ b/src/sdk/conference/subscription.js @@ -104,7 +104,7 @@ export class AudioSubscriptionConstraints { * @member {?Array.} codecs * @instance * @memberof Owt.Conference.AudioSubscriptionConstraints - * @desc Codecs accepted. If none of `codecs` supported by both sides, connection fails. Leave it undefined will use all possible codecs. + * @desc Codecs accepted. Please only include 1 item if transport is "quic". For "webrtc" transport, if none of `codecs` supported by both sides, connection fails. Leave it undefined will use all possible codecs. */ this.codecs = codecs; } @@ -124,7 +124,7 @@ export class VideoSubscriptionConstraints { * @member {?Array.} codecs * @instance * @memberof Owt.Conference.VideoSubscriptionConstraints - * @desc Codecs accepted. If none of `codecs` supported by both sides, connection fails. Leave it undefined will use all possible codecs. + * @desc Codecs accepted. Please only include 1 item if transport is "quic". For "webrtc" transport, if none of `codecs` supported by both sides, connection fails. Leave it undefined will use all possible codecs. */ this.codecs = codecs; /** diff --git a/src/sdk/conference/quicconnection.js b/src/sdk/conference/webtransport/connection.js similarity index 50% rename from src/sdk/conference/quicconnection.js rename to src/sdk/conference/webtransport/connection.js index 638c4963..61e2b5d2 100644 --- a/src/sdk/conference/quicconnection.js +++ b/src/sdk/conference/webtransport/connection.js @@ -3,15 +3,18 @@ // SPDX-License-Identifier: Apache-2.0 /* eslint-disable require-jsdoc */ -/* global Promise, Map, WebTransport, Uint8Array, Uint32Array, TextEncoder */ +/* global Promise, Map, WebTransport, Uint8Array, Uint32Array, TextEncoder, + * ArrayBuffer */ 'use strict'; -import Logger from '../base/logger.js'; -import {EventDispatcher} from '../base/event.js'; -import {Publication} from '../base/publication.js'; -import {Subscription} from './subscription.js'; -import {Base64} from '../base/base64.js'; +import Logger from '../../base/logger.js'; +import {EventDispatcher} from '../../base/event.js'; +import {Publication} from '../../base/publication.js'; +import {SubscribeOptions, Subscription} from '../subscription.js'; +import {Base64} from '../../base/base64.js'; + +const uuidByteLength = 16; /** * @class QuicConnection @@ -32,6 +35,9 @@ export class QuicConnection extends EventDispatcher { this._quicStreams = new Map(); // Key is publication or subscription ID. this._quicTransport = new WebTransport(url, webTransportOptions); this._subscribePromises = new Map(); // Key is subscription ID. + this._subscribeOptions = new Map(); // Key is subscription ID. + this._subscriptionInfoReady = + new Map(); // Key is subscription ID, value is a promise. this._transportId = this._token.transportId; this._initReceiveStreamReader(); } @@ -76,22 +82,84 @@ export class QuicConnection extends EventDispatcher { const {value: receiveStream, done: readingReceiveStreamsDone} = await receiveStreamReader.read(); Logger.info('New stream received'); + const subscriptionIdBytes = new Uint8Array(uuidByteLength); + let subscriptionIdBytesOffset = 0; + const trackIdBytes = new Uint8Array(uuidByteLength); + let trackIdBytesOffset = 0; if (readingReceiveStreamsDone) { receivingDone = true; break; } + // Use BYOB reader when it's supported to avoid copy. See + // https://github.com/w3c/webtransport/issues/131. Issue tracker: + // https://crbug.com/1182905. const chunkReader = receiveStream.readable.getReader(); - const {value: uuid, done: readingChunksDone} = await chunkReader.read(); - if (readingChunksDone) { - Logger.error('Stream closed unexpectedly.'); - return; - } - if (uuid.length != 16) { - Logger.error('Unexpected length for UUID.'); - return; + let readingChunksDone = false; + let readingHeaderDone = false; + let mediaStream = false; + let subscriptionId; + while (!readingChunksDone && !readingHeaderDone) { + const {value, done: readingChunksDone} = await chunkReader.read(); + let valueOffset = 0; + if (subscriptionIdBytesOffset < uuidByteLength) { + const copyLength = Math.min( + uuidByteLength - subscriptionIdBytesOffset, + value.byteLength - valueOffset); + subscriptionIdBytes.set( + value.subarray(valueOffset, valueOffset + copyLength), + subscriptionIdBytesOffset); + subscriptionIdBytesOffset += copyLength; + valueOffset += copyLength; + if (subscriptionIdBytesOffset < uuidByteLength) { + continue; + } + subscriptionId = + this._uint8ArrayToUuid(new Uint8Array(subscriptionIdBytes)); + if (!this._subscribeOptions.has(subscriptionId)) { + Logger.debug('Subscribe options is not ready.'); + const p = new Promise((resolve) => { + this._subscriptionInfoReady.set(subscriptionId, resolve); + }); + await p; + this._subscriptionInfoReady.delete(subscriptionId); + } + const subscribeOptions = this._subscribeOptions.get(subscriptionId); + if (subscribeOptions.audio || subscribeOptions.video) { + mediaStream = true; + } + if (!mediaStream) { + readingHeaderDone = true; + if (copyLength < value.byteLength) { + Logger.warning( + 'Potential data lose. Expect to be fixed when BYOB reader ' + + 'is supported.'); + } + continue; + } + } + if (valueOffset < value.byteLength) { + const copyLength = Math.min( + uuidByteLength - trackIdBytesOffset, + value.byteLength - valueOffset); + trackIdBytes.set( + value.subarray(valueOffset, valueOffset + copyLength), + trackIdBytesOffset); + trackIdBytesOffset += copyLength; + valueOffset += copyLength; + if (trackIdBytesOffset < uuidByteLength) { + continue; + } + const trackId = this._uint8ArrayToUuid(trackIdBytes); + Logger.debug(`WebTransport stream for subscription ID ${ + subscriptionId} and track ID ${ + trackId} is ready to receive data.`); + } + if (readingChunksDone) { + Logger.error('Stream closed unexpectedly.'); + return; + } } chunkReader.releaseLock(); - const subscriptionId = this._uint8ArrayToUuid(uuid); this._quicStreams.set(subscriptionId, receiveStream); if (this._subscribePromises.has(subscriptionId)) { const subscription = @@ -150,23 +218,23 @@ export class QuicConnection extends EventDispatcher { return quicStream; } - async publish(stream) { + async publish(stream, options) { // TODO: Avoid a stream to be published twice. The first 16 bit data send to // server must be it's publication ID. // TODO: Potential failure because of publication stream is created faster // than signaling stream(created by the 1st call to initiatePublication). - const publicationId = await this._initiatePublication(); + const publicationId = await this._initiatePublication(stream, options); const quicStream = stream.stream; const writer = quicStream.writable.getWriter(); await writer.ready; writer.write(this._uuidToUint8Array(publicationId)); writer.releaseLock(); - Logger.info('publish id'); this._quicStreams.set(publicationId, quicStream); const publication = new Publication(publicationId, () => { this._signaling.sendSignalingMessage('unpublish', {id: publication}) .catch((e) => { - Logger.warning('MCU returns negative ack for unpublishing, ' + e); + Logger.warning( + 'Server returns negative ack for unpublishing, ' + e); }); } /* TODO: getStats, mute, unmute is not implemented */); return publication; @@ -196,15 +264,76 @@ export class QuicConnection extends EventDispatcher { return s; } - subscribe(stream) { + async subscribe(stream, options) { + // TODO: Combine this with channel.js. + if (options === undefined) { + options = { + audio: !!stream.settings.audio, + video: !!stream.settings.video, + }; + } + if (typeof options !== 'object') { + return Promise.reject(new TypeError('Options should be an object.')); + } + if (options.audio === undefined) { + options.audio = !!stream.settings.audio; + } + if (options.video === undefined) { + options.video = !!stream.settings.video; + } + let mediaOptions; + let dataOptions; + if (options.audio || options.video) { + mediaOptions = {tracks: []}; + dataOptions = undefined; + if (options.audio) { + const trackOptions = {type: 'audio', from: stream.id}; + if (typeof options.audio !== 'object' || + !Array.isArray(options.audio.codecs) || + options.audio.codecs.length !== 1) { + return Promise.reject(new TypeError( + 'Audio codec is expect to be a list with one item.')); + } + mediaOptions.tracks.push(trackOptions); + } + if (options.video) { + const trackOptions = {type: 'video', from: stream.id}; + if (typeof options.video !== 'object' || + !Array.isArray(options.video.codecs) || + options.video.codecs.length !== 1) { + return Promise.reject(new TypeError( + 'Video codec is expect to be a list with one item.')); + } + if (options.video.resolution || options.video.frameRate || + (options.video.bitrateMultiplier && + options.video.bitrateMultiplier !== 1) || + options.video.keyFrameInterval) { + trackOptions.parameters = { + resolution: options.video.resolution, + framerate: options.video.frameRate, + bitrate: options.video.bitrateMultiplier ? + 'x' + options.video.bitrateMultiplier.toString() : + undefined, + keyFrameInterval: options.video.keyFrameInterval, + }; + } + mediaOptions.tracks.push(trackOptions); + } + } else { + // Data stream. + mediaOptions = null; + dataOptions = {from: stream.id}; + } const p = new Promise((resolve, reject) => { this._signaling .sendSignalingMessage('subscribe', { - media: null, - data: {from: stream.id}, + media: mediaOptions, + data: dataOptions, transport: {type: 'quic', id: this._transportId}, }) .then((data) => { + this._subscribeOptions.set(data.id, options); + Logger.debug('Subscribe info is set.'); if (this._quicStreams.has(data.id)) { // QUIC stream created before signaling returns. const subscription = this._createSubscription( @@ -217,6 +346,9 @@ export class QuicConnection extends EventDispatcher { this._subscribePromises.set( data.id, {resolve: resolve, reject: reject}); } + if (this._subscriptionInfoReady.has(data.id)) { + this._subscriptionInfoReady.get(data.id)(); + } }); }); return p; @@ -231,10 +363,47 @@ export class QuicConnection extends EventDispatcher { }); } - async _initiatePublication() { + async _initiatePublication(stream, options) { + const media = {tracks: []}; + if (stream.source.audio) { + if (!options.audio) { + throw new TypeError( + 'Options for audio is missing. Publish audio track with ' + + 'WebTransport must have AudioEncoderConfig specified.'); + } + const track = { + from: stream.id, + source: stream.source.audio, + type: 'audio', + format: { + codec: options.audio.codec, + sampleRate: options.audio.sampleRate, + channelNum: options.audio.numberOfChannels, + }, + }; + media.tracks.push(track); + } + if (stream.source.video) { + if (!options.video) { + throw new TypeError( + 'Options for audio is missing. Publish video track with ' + + 'WebTransport must have VideoEncoderConfig specified.'); + } + const track = { + from: stream.id, + source: stream.source.video, + type: 'video', + // TODO: convert from MIME type to the format required by server. + format: { + codec: 'h264', + profile: 'B', + }, + }; + media.tracks.push(track); + } const data = await this._signaling.sendSignalingMessage('publish', { - media: null, - data: true, + media: stream.source.data ? null : media, + data: stream.source.data, transport: {type: 'quic', id: this._transportId}, }); if (this._transportId !== data.transportId) { diff --git a/src/sdk/conference/webtransport/receive-stream-worker.js b/src/sdk/conference/webtransport/receive-stream-worker.js new file mode 100644 index 00000000..256f1244 --- /dev/null +++ b/src/sdk/conference/webtransport/receive-stream-worker.js @@ -0,0 +1,73 @@ +// Copyright (C) <2021> Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +/* eslint-disable require-jsdoc */ +/* global AudioDecoder, postMessage */ + +'use strict'; + +import Logger from '../../base/logger.js'; + +let audioDecoder; + +onmessage = (e) => { + // ['init', SubscribeOptions, WebTransportStream, TrackKind]. + if (e.data[0] === 'init') { + _initWebCodecs(e.data[1], e.data[3]); + _handleReceiveStream(e.data[2]); + } +}; + +const audioDecoderConfig = { + codec: 'opus', + sampleRate: 48000, + numberOfChannels: 2, +}; + +function audioDecoderOutput(audioFrame) { + const audioBuffer = { + numberOfChannels: audioFrame.buffer.numberOfChannels, + sampleRate: audioFrame.buffer.sampleRate, + length: audioFrame.buffer.length, + duration: audioFrame.buffer.duration, + channelData: [], + }; + for (let i = 0; i < audioFrame.buffer.numberOfChannels; i++) { + audioBuffer.channelData.push(audioFrame.buffer.getChannelData(i)); + } + postMessage(['audio-frame', audioBuffer]); +} + +function audioDecoderError(error) { + Logger.warn('Audio decoder failed to decode. Error: ' + error); +} + +async function _initWebCodecs(options, trackKind) { + if (trackKind !== 'audio') { + Logger.error( + 'Receiving ' + trackKind + ' over WebTransport is not supported.'); + return; + } + if (options.audio) { + Logger.error('No options for audio.'); + return; + } + audioDecoder = + new AudioDecoder({output: audioDecoderOutput, error: audioDecoderError}); + audioDecoder.configure(audioDecoderConfig); +} + +async function _handleReceiveStream(stream) { + const reader = stream.readable.getReader(); + let readingDone = false; + while (!readingDone) { + const {value, done: finished} = await reader.read(); + if (finished) { + readingDone = true; + } + // TODO: Read audio frame header. + // Implement it when BYOB reader is implemented in Chrome to reduce buffer + // copy. + } +} \ No newline at end of file From ddd42fc4a898d49d2e2324ba2da51d755f738eb7 Mon Sep 17 00:00:00 2001 From: Jianjun Zhu Date: Wed, 13 Oct 2021 14:28:31 +0800 Subject: [PATCH 2/4] Receive mixed stream with WebTransport datagram. After RTP packets are received, it uses WebRTC RTP depacketizer for depacketizing, WebCodecs for decoding and MediaStreamGenerator for rendering. --- src/samples/conference/public/quic.html | 10 +- .../conference/public/scripts/media-worker.js | 50 +++++- src/samples/conference/public/scripts/quic.js | 144 ++++++++++++++---- src/sdk/base/stream.js | 14 +- src/sdk/conference/client.js | 4 + src/sdk/conference/webtransport/connection.js | 25 ++- 6 files changed, 199 insertions(+), 48 deletions(-) diff --git a/src/samples/conference/public/quic.html b/src/samples/conference/public/quic.html index 077f6637..300ce4d1 100644 --- a/src/samples/conference/public/quic.html +++ b/src/samples/conference/public/quic.html @@ -30,13 +30,19 @@

Sample of QuicTransport

+ + +
+
+
- + + - + diff --git a/src/samples/conference/public/scripts/media-worker.js b/src/samples/conference/public/scripts/media-worker.js index 9bcc1df4..3b8c97ff 100644 --- a/src/samples/conference/public/scripts/media-worker.js +++ b/src/samples/conference/public/scripts/media-worker.js @@ -3,9 +3,10 @@ // SPDX-License-Identifier: Apache-2.0 /* eslint-disable require-jsdoc */ -/* global VideoEncoder */ +/* global VideoEncoder, VideoDecoder, EncodedVideoChunk */ -let bidirectionalStreamWritable, videoEncoder, frameBuffer, sendStreamWriter; +let bidirectionalStreamWritable, videoEncoder, frameBuffer, sendStreamWriter, + mediaSession, datagramReceiver, videoDecoder; // 4 bytes for frame size before each frame. The 1st byte is reserved, always 0. const sizePrefix = 4; @@ -16,7 +17,15 @@ onmessage = (e) => { bidirectionalStreamWritable = e.data[1]; sendStreamWriter = bidirectionalStreamWritable.getWriter(); writeTrackId(); - initVideoEncoder(); + // initVideoEncoder(); + } else if (e.data[0] === 'datagram-receiver') { + datagramReceiver = e.data[1]; + } else if (e.data[0] === 'encoded-video-frame') { + if (videoDecoder.state === 'closed') { + return; + } + videoDecoder.decode(new EncodedVideoChunk( + {timestamp: Date.now(), data: e.data[1], type: 'key'})); } }; @@ -60,6 +69,22 @@ function initVideoEncoder() { }); } +function initVideoDecoder() { + videoDecoder = new VideoDecoder({ + output: videoFrameOutputCallback, + error: webCodecsErrorCallback, + }); + videoDecoder.configure({codec: 'avc1.42400a', optimizeForLatency: true}); +} + +function videoFrameOutputCallback(frame) { + postMessage(['video-frame', frame], [frame]); +} + +function webCodecsErrorCallback(error) { + console.log('error: ' + error.message); +} + // Read data from video track. async function readVideoData(readable) { const reader = readable.getReader(); @@ -72,4 +97,21 @@ async function readVideoData(readable) { videoEncoder.encode(value); value.close(); } -} \ No newline at end of file +} + +async function fetchWasm() { + const Module={}; + Module['instantiateWasm'] = async (imports, successCallback) => { + const response = await fetch('./owt.wasm'); + const buffer = await response.arrayBuffer(); + const module=new WebAssembly.Module(buffer); + const instance = await WebAssembly.instantiate(module, imports); + successCallback(instance, module); + return {}; + }; + // Module['wasmModule']=new WebAssembly.Module(buffer); + importScripts('./owt.js'); + console.log('Got wasm binary.'); +} + +initVideoDecoder(); diff --git a/src/samples/conference/public/scripts/quic.js b/src/samples/conference/public/scripts/quic.js index 3d0d45a4..ff8de67e 100644 --- a/src/samples/conference/public/scripts/quic.js +++ b/src/samples/conference/public/scripts/quic.js @@ -8,15 +8,25 @@ let quicChannel = null; let bidirectionalStream = null; -let writeTask, mediaStream, mediaWorker, conferenceId, myId; +let writeTask, mediaStream, mediaWorker, conferenceId, myId, mixedStream, generatorWriter; + +window.Module={}; const conference = new Owt.Conference.ConferenceClient({ webTransportConfiguration: { serverCertificateFingerprints: [{ value: - 'DD:A8:11:FD:A1:08:17:41:36:CD:1A:33:1E:CF:AE:0D:46:3D:15:16:2C:67:C5:A2:06:35:C2:0E:88:A1:9E:C6', + '59:74:C6:C5:2C:D8:E8:18:A9:D2:14:77:ED:94:89:87:DF:83:BA:B3:96:4C:4C:0B:B8:D3:22:58:11:55:67:1A', + algorithm: 'sha-256', + }], + serverCertificateHashes: [{ + value: new Uint8Array([ + 0x59, 0x74, 0xC6, 0xC5, 0x2C, 0xD8, 0xE8, 0x18, 0xA9, 0xD2, 0x14, + 0x77, 0xED, 0x94, 0x89, 0x87, 0xDF, 0x83, 0xBA, 0xB3, 0x96, 0x4C, + 0x4C, 0x0B, 0xB8, 0xD3, 0x22, 0x58, 0x11, 0x55, 0x67, 0x1A + ]), algorithm: 'sha-256', - }] + }], } }); conference.addEventListener('streamadded', async (event) => { @@ -26,20 +36,21 @@ conference.addEventListener('streamadded', async (event) => { conferenceId, event.stream.id, 'common', 'http://jianjunz-nuc-ubuntu.sh.intel.com:3001'); } - if (event.stream.source.data || event.stream.source.video) { - const subscription = await conference.subscribe( - event.stream, - {audio: false, video: {codecs: ['h264']}, transport: {type: 'quic'}}); - const reader = subscription.stream.readable.getReader(); - while (true) { - const {value, done} = await reader.read(); - if (done) { - console.log('Subscription ends.'); - break; - } - console.log('Received data: ' + value); - } - } + // if (event.stream.source.data) { + // const subscription = await conference.subscribe( + // event.stream, + // // {transport:{type: 'quic'}}); + // {audio: false, video: {codecs: ['h264']}, transport: {type: 'quic'}}); + // const reader = subscription.stream.readable.getReader(); + // while (true) { + // const {value, done} = await reader.read(); + // if (done) { + // console.log('Subscription ends.'); + // break; + // } + // //console.log('Received data: ' + value); + // } + // } }); function updateConferenceStatus(message) { @@ -47,6 +58,15 @@ function updateConferenceStatus(message) { ('

' + message + '

'); } +function initWorker() { + mediaWorker = new Worker('./scripts/media-worker.js'); + mediaWorker.onmessage=((e) => { + if (e.data[0] === 'video-frame') { + generatorWriter.write(e.data[1]); + //console.log(e.data[1]); + } + }); +} function joinConference() { return new Promise((resolve, reject) => { @@ -54,7 +74,13 @@ function joinConference() { conference.join(token).then((info) => { conferenceId = info.id; myId = info.self.id; + for (const stream of info.remoteStreams) { + if (stream.source.video === 'mixed') { + mixedStream = stream; + } + } updateConferenceStatus('Connected to conference server.'); + initWorker(); resolve(); }); }, 'http://jianjunz-nuc-ubuntu.sh.intel.com:3001'); @@ -89,17 +115,17 @@ async function attachReader(stream) { async function createSendChannel() { bidirectionalStream = await conference.createSendStream(); - const localStream = new Owt.Base.LocalStream( - bidirectionalStream, - new Owt.Base.StreamSourceInfo(undefined, 'camera', undefined)); - attachReader(bidirectionalStream); - const publication = await conference.publish( - localStream, {video: {codec: 'h264'}, transport: {type: 'quic'}}); // const localStream = new Owt.Base.LocalStream( // bidirectionalStream, - // new Owt.Base.StreamSourceInfo(undefined, undefined, true)); + // new Owt.Base.StreamSourceInfo(undefined, 'camera', undefined)); + // attachReader(bidirectionalStream); // const publication = await conference.publish( - // localStream, {transport: {type: 'quic'}}); + // localStream, {video: {codec: 'h264'}, transport: {type: 'quic'}}); + const localStream = new Owt.Base.LocalStream( + bidirectionalStream, + new Owt.Base.StreamSourceInfo(undefined, undefined, true)); + const publication = await conference.publish( + localStream, {transport: {type: 'quic'}}); console.log(publication); updateConferenceStatus('Created send channel.'); } @@ -123,7 +149,6 @@ async function writeUuid() { async function writeVideoData() { mediaStream = await navigator.mediaDevices.getUserMedia({video: true}); const track = new MediaStreamTrackProcessor(mediaStream.getVideoTracks()[0]); - mediaWorker = new Worker('./scripts/media-worker.js'); mediaWorker.postMessage(['video-source', track.readable], [track.readable]); mediaWorker.postMessage( ['send-stream', bidirectionalStream.writable], @@ -135,13 +160,16 @@ async function writeData() { const encoded = encoder.encode('message', {stream: true}); const writer = bidirectionalStream.writable.getWriter(); await writer.ready; - await writer.write(new ArrayBuffer(2)); + const ab=new Uint8Array(10000); + ab.fill(1, 0); + await writer.write(ab); writer.releaseLock(); return; } window.addEventListener('load', () => { windowOnLoad(); + fetchWasm(); }); document.getElementById('start-sending').addEventListener('click', async () => { @@ -149,8 +177,8 @@ document.getElementById('start-sending').addEventListener('click', async () => { updateConferenceStatus('Stream is not created.'); return; } - writeVideoData(); - // writeTask = setInterval(writeData, 2000); + //writeVideoData(); + writeTask = setInterval(writeData, 200); updateConferenceStatus('Started sending.'); }); @@ -158,3 +186,61 @@ document.getElementById('stop-sending').addEventListener('click', () => { clearInterval(writeTask); updateConferenceStatus('Stopped sending.'); }); + +document.getElementById('start-receiving') + .addEventListener('click', async () => { + const video=document.getElementById('remote-video'); + const generator = new MediaStreamTrackGenerator({kind: 'video'}); + generatorWriter=generator.writable.getWriter(); + video.srcObject = new MediaStream([generator]); + const reader = conference.datagramReader(); + const ms = new Module.MediaSession(); + const receiver = ms.createRtpVideoReceiver(); + receiver.setCompleteFrameCallback((frame) => { + const copiedFrame = frame.slice(0); + mediaWorker.postMessage( + ['encoded-video-frame', copiedFrame], [copiedFrame.buffer]); + }); + subscribeMixedStream(); + while (true) { + const received = await reader.read(); + const buffer = Module._malloc(received.value.byteLength); + Module.writeArrayToMemory(received.value, buffer); + receiver.onRtpPacket(buffer, received.value.byteLength); + } + }); + +async function fetchWasm() { + Module['instantiateWasm'] = async (imports, successCallback) => { + const response = await fetch('scripts/owt.wasm'); + const buffer = await response.arrayBuffer(); + const module=await WebAssembly.compile(buffer); + const instance = await WebAssembly.instantiate(module, imports); + successCallback(instance, module); + return {}; + }; + const scriptPromise = new Promise((resolve, reject) => { + const script = document.createElement('script'); + document.body.appendChild(script); + script.onload = resolve; + script.onerror = reject; + script.async = true; + script.src = 'scripts/owt.js'; + }); + await scriptPromise; +} + +async function subscribeMixedStream() { + const subscription = await conference.subscribe( + mixedStream, + {audio: false, video: {codecs: ['h264']}, transport: {type: 'quic'}}); + const reader = subscription.stream.readable.getReader(); + while (true) { + const {value, done} = await reader.read(); + if (done) { + console.log('Subscription ends.'); + break; + } + // console.log('Received data: ' + value); + } +} diff --git a/src/sdk/base/stream.js b/src/sdk/base/stream.js index 279d645b..e036b65c 100644 --- a/src/sdk/base/stream.js +++ b/src/sdk/base/stream.js @@ -55,13 +55,13 @@ export class Stream extends EventDispatcher { // eslint-disable-next-line require-jsdoc constructor(stream, sourceInfo, attributes) { super(); - if ((stream && !(stream instanceof MediaStream) && - !(typeof SendStream === 'function' && stream instanceof SendStream) && - !(typeof BidirectionalStream === 'function' && - stream instanceof BidirectionalStream)) || - (typeof sourceInfo !== 'object')) { - throw new TypeError('Invalid stream or sourceInfo.'); - } + // if ((stream && !(stream instanceof MediaStream) && + // !(typeof SendStream === 'function' && stream instanceof SendStream) && + // !(typeof BidirectionalStream === 'function' && + // stream instanceof BidirectionalStream)) || + // (typeof sourceInfo !== 'object')) { + // throw new TypeError('Invalid stream or sourceInfo.'); + // } if (stream && (stream instanceof MediaStream) && ((stream.getAudioTracks().length > 0 && !sourceInfo.audio) || stream.getVideoTracks().length > 0 && !sourceInfo.video)) { diff --git a/src/sdk/conference/client.js b/src/sdk/conference/client.js index 91dbd3a8..a5f1729f 100644 --- a/src/sdk/conference/client.js +++ b/src/sdk/conference/client.js @@ -558,5 +558,9 @@ export const ConferenceClient = function(config, signalingImpl) { } return quicTransportChannel.createSendStream(); }; + + this.datagramReader = function() { + return quicTransportChannel.datagramReader(); + }; } }; diff --git a/src/sdk/conference/webtransport/connection.js b/src/sdk/conference/webtransport/connection.js index 61e2b5d2..51520f07 100644 --- a/src/sdk/conference/webtransport/connection.js +++ b/src/sdk/conference/webtransport/connection.js @@ -40,6 +40,7 @@ export class QuicConnection extends EventDispatcher { new Map(); // Key is subscription ID, value is a promise. this._transportId = this._token.transportId; this._initReceiveStreamReader(); + //this._initDatagrams(); } /** @@ -73,6 +74,14 @@ export class QuicConnection extends EventDispatcher { await this._authenticate(this._tokenString); } + async _initDatagrams() { + const datagramReader = this._quicTransport.datagrams.readable.getReader(); + while (true) { + const value = await datagramReader.read(); + console.log(value); + } + } + async _initReceiveStreamReader() { const receiveStreamReader = this._quicTransport.incomingBidirectionalStreams.getReader(); @@ -275,12 +284,12 @@ export class QuicConnection extends EventDispatcher { if (typeof options !== 'object') { return Promise.reject(new TypeError('Options should be an object.')); } - if (options.audio === undefined) { - options.audio = !!stream.settings.audio; - } - if (options.video === undefined) { - options.video = !!stream.settings.video; - } + // if (options.audio === undefined) { + // options.audio = !!stream.settings.audio; + // } + // if (options.video === undefined) { + // options.video = !!stream.settings.video; + // } let mediaOptions; let dataOptions; if (options.audio || options.video) { @@ -416,4 +425,8 @@ export class QuicConnection extends EventDispatcher { // Ready message from server is useless for QuicStream since QuicStream has // its own status. Do nothing here. } + + datagramReader() { + return this._quicTransport.datagrams.readable.getReader(); + } } From db7445b0174a863f0dae3a3bd7f5f37dedaa86cf Mon Sep 17 00:00:00 2001 From: Jianjun Zhu Date: Tue, 19 Oct 2021 16:40:40 +0800 Subject: [PATCH 3/4] Send audio over WebTransport streams. --- .../conference/public/scripts/media-worker.js | 85 ++++++++++++++----- src/samples/conference/public/scripts/quic.js | 51 +++++++---- 2 files changed, 96 insertions(+), 40 deletions(-) diff --git a/src/samples/conference/public/scripts/media-worker.js b/src/samples/conference/public/scripts/media-worker.js index 3b8c97ff..ab0c48b4 100644 --- a/src/samples/conference/public/scripts/media-worker.js +++ b/src/samples/conference/public/scripts/media-worker.js @@ -3,21 +3,30 @@ // SPDX-License-Identifier: Apache-2.0 /* eslint-disable require-jsdoc */ -/* global VideoEncoder, VideoDecoder, EncodedVideoChunk */ +/* global AudioEncoder, EncodedAudioChunk, VideoEncoder, VideoDecoder, + * EncodedVideoChunk */ -let bidirectionalStreamWritable, videoEncoder, frameBuffer, sendStreamWriter, - mediaSession, datagramReceiver, videoDecoder; +let videoBidiStreamWritable, audioEncoder, videoEncoder, frameBuffer, + audioSendStreamWriter, videoSendStreamWriter, mediaSession, + datagramReceiver, videoDecoder; // 4 bytes for frame size before each frame. The 1st byte is reserved, always 0. const sizePrefix = 4; onmessage = (e) => { - if (e.data[0] === 'video-source') { - readVideoData(e.data[1]); - } else if (e.data[0] === 'send-stream') { - bidirectionalStreamWritable = e.data[1]; - sendStreamWriter = bidirectionalStreamWritable.getWriter(); - writeTrackId(); - // initVideoEncoder(); + if (e.data[0] === 'audio-source') { + readMediaData(e.data[1], 'audio'); + } else if (e.data[0] === 'video-source') { + readMediaData(e.data[1], 'video'); + } else if (e.data[0] === 'send-stream-audio') { + const audioBidiStreamWritable = e.data[1]; + audioSendStreamWriter = audioBidiStreamWritable.getWriter(); + writeTrackId('audio', audioSendStreamWriter); + initAudioEncoder(); + } else if (e.data[0] === 'send-stream-video') { + videoBidiStreamWritable = e.data[1]; + videoSendStreamWriter = videoBidiStreamWritable.getWriter(); + writeTrackId('video', videoSendStreamWriter); + initVideoEncoder(); } else if (e.data[0] === 'datagram-receiver') { datagramReceiver = e.data[1]; } else if (e.data[0] === 'encoded-video-frame') { @@ -30,7 +39,8 @@ onmessage = (e) => { }; async function videoOutput(chunk, metadata) { - if (bidirectionalStreamWritable) { + return; + if (videoBidiStreamWritable) { if (!frameBuffer || frameBuffer.byteLength < chunk.byteLength + sizePrefix) { frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix); @@ -40,21 +50,48 @@ async function videoOutput(chunk, metadata) { const dataView = new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix); dataView.setUint32(0, chunk.byteLength); - await sendStreamWriter.ready; - await sendStreamWriter.write(dataView); - console.log('Write a frame.'); + await videoSendStreamWriter.ready; + await videoSendStreamWriter.write(dataView); } } function videoError(error) { - console.log('Encode error, ' + error); + console.log('Video encode error, ' + error.message); } -async function writeTrackId() { +async function audioOutput(chunk, metadata) { + if (audioSendStreamWriter) { + if (!frameBuffer || + frameBuffer.byteLength < chunk.byteLength + sizePrefix) { + frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix); + } + const bufferView = new Uint8Array(frameBuffer, sizePrefix); + chunk.copyTo(bufferView); + const dataView = + new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix); + dataView.setUint32(0, chunk.byteLength); + await audioSendStreamWriter.ready; + await audioSendStreamWriter.write(dataView); + console.log('Wrote an audio frame. '+chunk.byteLength); + } +} + +function audioError(error) { + console.log(`Audio encode error: ${error.message}`); +} + +async function writeTrackId(kind, writer) { const id = new Uint8Array(16); - id[16] = 2; - await sendStreamWriter.ready; - sendStreamWriter.write(id); + id[15] = (kind === 'audio' ? 1 : 2); + await writer.ready; + writer.write(id); + console.log('Wrote track ID for '+kind); +} + +function initAudioEncoder() { + audioEncoder = new AudioEncoder({output: audioOutput, error: audioError}); + audioEncoder.configure( + {codec: 'opus', numberOfChannels: 1, sampleRate: 48000}); } function initVideoEncoder() { @@ -85,8 +122,8 @@ function webCodecsErrorCallback(error) { console.log('error: ' + error.message); } -// Read data from video track. -async function readVideoData(readable) { +// Read data from media track. +async function readMediaData(readable, kind) { const reader = readable.getReader(); while (true) { const {value, done} = await reader.read(); @@ -94,7 +131,11 @@ async function readVideoData(readable) { console.log('MediaStream ends.'); break; } - videoEncoder.encode(value); + if (kind === 'audio') { + audioEncoder.encode(value); + } else if (kind === 'video') { + videoEncoder.encode(value); + } value.close(); } } diff --git a/src/samples/conference/public/scripts/quic.js b/src/samples/conference/public/scripts/quic.js index ff8de67e..cb4cf8ee 100644 --- a/src/samples/conference/public/scripts/quic.js +++ b/src/samples/conference/public/scripts/quic.js @@ -8,6 +8,7 @@ let quicChannel = null; let bidirectionalStream = null; +let bidiAudioStream = null; let writeTask, mediaStream, mediaWorker, conferenceId, myId, mixedStream, generatorWriter; window.Module={}; @@ -114,18 +115,22 @@ async function attachReader(stream) { } async function createSendChannel() { - bidirectionalStream = await conference.createSendStream(); + //bidirectionalStream = await conference.createSendStream(); + bidiAudioStream = await conference.createSendStream(); + const localStream = new Owt.Base.LocalStream( + bidiAudioStream, + new Owt.Base.StreamSourceInfo('mic', undefined, undefined)); + attachReader(bidiAudioStream); + const publication = await conference.publish(localStream, { + audio: {codec: 'opus', numberOfChannels: 2, sampleRate: 48000}, + video: false, + transport: {type: 'quic'}, + }); // const localStream = new Owt.Base.LocalStream( // bidirectionalStream, - // new Owt.Base.StreamSourceInfo(undefined, 'camera', undefined)); - // attachReader(bidirectionalStream); + // new Owt.Base.StreamSourceInfo(undefined, undefined, true)); // const publication = await conference.publish( - // localStream, {video: {codec: 'h264'}, transport: {type: 'quic'}}); - const localStream = new Owt.Base.LocalStream( - bidirectionalStream, - new Owt.Base.StreamSourceInfo(undefined, undefined, true)); - const publication = await conference.publish( - localStream, {transport: {type: 'quic'}}); + // localStream, {transport: {type: 'quic'}}); console.log(publication); updateConferenceStatus('Created send channel.'); } @@ -146,13 +151,23 @@ async function writeUuid() { return; } -async function writeVideoData() { - mediaStream = await navigator.mediaDevices.getUserMedia({video: true}); - const track = new MediaStreamTrackProcessor(mediaStream.getVideoTracks()[0]); - mediaWorker.postMessage(['video-source', track.readable], [track.readable]); +async function writeMediaData() { + mediaStream = + await navigator.mediaDevices.getUserMedia({audio: true, video: true}); + const audioTrack = + new MediaStreamTrackProcessor(mediaStream.getAudioTracks()[0]); + mediaWorker.postMessage( + ['audio-source', audioTrack.readable], [audioTrack.readable]); mediaWorker.postMessage( - ['send-stream', bidirectionalStream.writable], - [bidirectionalStream.writable]); + ['send-stream-audio', bidiAudioStream.writable], + [bidiAudioStream.writable]); + // const videoTrack = + // new MediaStreamTrackProcessor(mediaStream.getVideoTracks()[0]); + // mediaWorker.postMessage( + // ['video-source', videoTrack.readable], [videoTrack.readable]); + // mediaWorker.postMessage( + // ['send-stream-video', bidirectionalStream.writable], + // [bidirectionalStream.writable]); } async function writeData() { @@ -173,12 +188,12 @@ window.addEventListener('load', () => { }); document.getElementById('start-sending').addEventListener('click', async () => { - if (!bidirectionalStream) { + if (!bidiAudioStream) { updateConferenceStatus('Stream is not created.'); return; } - //writeVideoData(); - writeTask = setInterval(writeData, 200); + writeMediaData(); + //writeTask = setInterval(writeData, 200); updateConferenceStatus('Started sending.'); }); From 7ee222a2580606a7e16bf93c658d19a8977e487e Mon Sep 17 00:00:00 2001 From: Jianjun Zhu Date: Wed, 27 Oct 2021 14:37:35 +0800 Subject: [PATCH 4/4] Support sending MediaStream over WebTransport streams. --- docs/mdfiles/changelog.md | 2 + docs/mdfiles/index.md | 2 + scripts/Gruntfile.js | 21 ++- src/samples/conference/public/quic.html | 4 +- .../conference/public/scripts/media-worker.js | 158 ------------------ src/samples/conference/public/scripts/quic.js | 116 ++++++------- src/sdk/base/logger.js | 8 +- src/sdk/base/stream.js | 2 - src/sdk/conference/client.js | 6 +- src/sdk/conference/webtransport/connection.js | 133 +++++++++------ .../conference/webtransport/media-worker.js | 140 ++++++++++++++++ .../webtransport/receive-stream-worker.js | 3 + 12 files changed, 304 insertions(+), 291 deletions(-) delete mode 100644 src/samples/conference/public/scripts/media-worker.js create mode 100644 src/sdk/conference/webtransport/media-worker.js diff --git a/docs/mdfiles/changelog.md b/docs/mdfiles/changelog.md index 7ddd9a61..2a80d1d6 100644 --- a/docs/mdfiles/changelog.md +++ b/docs/mdfiles/changelog.md @@ -6,6 +6,8 @@ Change Log * Add a new property `rtpTransceivers` to `TransportSettings` and `TransportConstraints`. * Add a new property `peerConnection` to `ConferenceClient`. * The second argument of `ConferenceClient.publish` could be a list of `RTCRtpTransceiver`s. +* Add support to publish a MediaStream over WebTransport. + # 5.0 * Add WebTransport support for conference mode, see [this design doc](../../design/webtransport.md) for detailed information. * All publications and subscriptions for the same conference use the same `PeerConnection`. diff --git a/docs/mdfiles/index.md b/docs/mdfiles/index.md index 1dedfeff..b88d7595 100644 --- a/docs/mdfiles/index.md +++ b/docs/mdfiles/index.md @@ -142,6 +142,8 @@ WebTransport is supported in conference mode as an experimental feature. QUIC ag - [JavaScript SDK design doc for WebTransport support](https://github.com/open-webrtc-toolkit/owt-client-javascript/blob/master/docs/design/webtransport.md) - [QUIC programming guide for OWT server](https://github.com/open-webrtc-toolkit/owt-server/blob/master/doc/design/quic-programming-guide.md) +Publishing a MediaStream over WebTransport requires an additional worker for I/O. The worker is a standalone ES module, not included in owt.js. As we're moving the SDK from traditional JavaScript script to ES module, there is no plan to support this worker in old browsers. + # 7 Events The JavaScript objects fires events using `Owt.Base.EventDispatchers`. For more detailed events, please refer to the specific class description page. diff --git a/scripts/Gruntfile.js b/scripts/Gruntfile.js index 5672e0c1..c03daeaf 100644 --- a/scripts/Gruntfile.js +++ b/scripts/Gruntfile.js @@ -76,6 +76,10 @@ window.L = L;\n\ watch: true }, }, + worker:{ + src: ['dist/sdk/conference/webtransport/media-worker.js'], + dest: 'dist/sdk/media-worker.js', + }, sinon: { src: ['node_modules/sinon/lib/sinon.js'], dest: 'test/unit/resources/scripts/gen/sinon-browserified.js', @@ -102,7 +106,15 @@ window.L = L;\n\ options: { base: '.', port: 7080, - keepalive: true + keepalive: true, + middleware: function(connect, options, middlewares) { + middlewares.unshift((req, res, next) => { + res.setHeader('Cross-Origin-Embedder-Policy', 'require-corp'); + res.setHeader('Cross-Origin-Opener-Policy', 'same-origin'); + next(); + }); + return middlewares; + } }, }, }, @@ -189,7 +201,8 @@ window.L = L;\n\ {expand: true,cwd:'src/extension/',src:['**'],dest:'dist/',flatten:false}, {expand: true,cwd:'dist/sdk/',src:['owt.js'],dest:'dist/samples/conference/public/scripts/',flatten:false}, {expand: true,cwd:'dist/samples/conference/public/scripts',src:['rest.js'],dest:'dist/samples/conference/',flatten:false}, - {expand: true,cwd:'dist/sdk/',src:['owt.js'],dest:'dist/samples/p2p/js/',flatten:false} + {expand: true,cwd:'dist/sdk/',src:['owt.js'],dest:'dist/samples/p2p/js/',flatten:false}, + {expand: true,cwd: 'dist/sdk/',src: ['media-worker.js'],dest: 'dist/samples/conference/public/scripts/',flatten: false}, ] } }, @@ -267,8 +280,8 @@ window.L = L;\n\ grunt.registerTask('check', ['eslint:src']); grunt.registerTask('prepare', ['browserify:sinon', 'browserify:chai_as_promised']); - grunt.registerTask('pack', ['browserify:dist', 'concat:rest', 'uglify:dist', 'copy:dist', 'string-replace', 'compress:dist', 'jsdoc:dist']); - grunt.registerTask('dev', ['browserify:dev', 'connect:server']); + grunt.registerTask('pack', ['browserify:dist', 'browserify:worker', 'concat:rest', 'uglify:dist', 'copy:dist', 'string-replace', 'compress:dist', 'jsdoc:dist']); + grunt.registerTask('dev', ['browserify:dev', 'browserify:worker', 'connect:server']); grunt.registerTask('debug', ['browserify:dev']); grunt.registerTask('default', ['check', 'pack']); }; diff --git a/src/samples/conference/public/quic.html b/src/samples/conference/public/quic.html index 300ce4d1..8fdb15e8 100644 --- a/src/samples/conference/public/quic.html +++ b/src/samples/conference/public/quic.html @@ -23,9 +23,9 @@

Open WebRTC Toolkit

-

Sample of QuicTransport

+

Sample of WebTransport

-

This sample works with the latest Chrome.

+

This sample works with the Chrome >= 97.

diff --git a/src/samples/conference/public/scripts/media-worker.js b/src/samples/conference/public/scripts/media-worker.js deleted file mode 100644 index ab0c48b4..00000000 --- a/src/samples/conference/public/scripts/media-worker.js +++ /dev/null @@ -1,158 +0,0 @@ -// Copyright (C) <2021> Intel Corporation -// -// SPDX-License-Identifier: Apache-2.0 - -/* eslint-disable require-jsdoc */ -/* global AudioEncoder, EncodedAudioChunk, VideoEncoder, VideoDecoder, - * EncodedVideoChunk */ - -let videoBidiStreamWritable, audioEncoder, videoEncoder, frameBuffer, - audioSendStreamWriter, videoSendStreamWriter, mediaSession, - datagramReceiver, videoDecoder; -// 4 bytes for frame size before each frame. The 1st byte is reserved, always 0. -const sizePrefix = 4; - -onmessage = (e) => { - if (e.data[0] === 'audio-source') { - readMediaData(e.data[1], 'audio'); - } else if (e.data[0] === 'video-source') { - readMediaData(e.data[1], 'video'); - } else if (e.data[0] === 'send-stream-audio') { - const audioBidiStreamWritable = e.data[1]; - audioSendStreamWriter = audioBidiStreamWritable.getWriter(); - writeTrackId('audio', audioSendStreamWriter); - initAudioEncoder(); - } else if (e.data[0] === 'send-stream-video') { - videoBidiStreamWritable = e.data[1]; - videoSendStreamWriter = videoBidiStreamWritable.getWriter(); - writeTrackId('video', videoSendStreamWriter); - initVideoEncoder(); - } else if (e.data[0] === 'datagram-receiver') { - datagramReceiver = e.data[1]; - } else if (e.data[0] === 'encoded-video-frame') { - if (videoDecoder.state === 'closed') { - return; - } - videoDecoder.decode(new EncodedVideoChunk( - {timestamp: Date.now(), data: e.data[1], type: 'key'})); - } -}; - -async function videoOutput(chunk, metadata) { - return; - if (videoBidiStreamWritable) { - if (!frameBuffer || - frameBuffer.byteLength < chunk.byteLength + sizePrefix) { - frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix); - } - const bufferView = new Uint8Array(frameBuffer, sizePrefix); - chunk.copyTo(bufferView); - const dataView = - new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix); - dataView.setUint32(0, chunk.byteLength); - await videoSendStreamWriter.ready; - await videoSendStreamWriter.write(dataView); - } -} - -function videoError(error) { - console.log('Video encode error, ' + error.message); -} - -async function audioOutput(chunk, metadata) { - if (audioSendStreamWriter) { - if (!frameBuffer || - frameBuffer.byteLength < chunk.byteLength + sizePrefix) { - frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix); - } - const bufferView = new Uint8Array(frameBuffer, sizePrefix); - chunk.copyTo(bufferView); - const dataView = - new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix); - dataView.setUint32(0, chunk.byteLength); - await audioSendStreamWriter.ready; - await audioSendStreamWriter.write(dataView); - console.log('Wrote an audio frame. '+chunk.byteLength); - } -} - -function audioError(error) { - console.log(`Audio encode error: ${error.message}`); -} - -async function writeTrackId(kind, writer) { - const id = new Uint8Array(16); - id[15] = (kind === 'audio' ? 1 : 2); - await writer.ready; - writer.write(id); - console.log('Wrote track ID for '+kind); -} - -function initAudioEncoder() { - audioEncoder = new AudioEncoder({output: audioOutput, error: audioError}); - audioEncoder.configure( - {codec: 'opus', numberOfChannels: 1, sampleRate: 48000}); -} - -function initVideoEncoder() { - videoEncoder = new VideoEncoder({output: videoOutput, error: videoError}); - videoEncoder.configure({ - codec: 'avc1.4d002a', - width: 640, - height: 480, - framerate: 30, - latencyMode: 'realtime', - avc: {format: 'annexb'}, - }); -} - -function initVideoDecoder() { - videoDecoder = new VideoDecoder({ - output: videoFrameOutputCallback, - error: webCodecsErrorCallback, - }); - videoDecoder.configure({codec: 'avc1.42400a', optimizeForLatency: true}); -} - -function videoFrameOutputCallback(frame) { - postMessage(['video-frame', frame], [frame]); -} - -function webCodecsErrorCallback(error) { - console.log('error: ' + error.message); -} - -// Read data from media track. -async function readMediaData(readable, kind) { - const reader = readable.getReader(); - while (true) { - const {value, done} = await reader.read(); - if (done) { - console.log('MediaStream ends.'); - break; - } - if (kind === 'audio') { - audioEncoder.encode(value); - } else if (kind === 'video') { - videoEncoder.encode(value); - } - value.close(); - } -} - -async function fetchWasm() { - const Module={}; - Module['instantiateWasm'] = async (imports, successCallback) => { - const response = await fetch('./owt.wasm'); - const buffer = await response.arrayBuffer(); - const module=new WebAssembly.Module(buffer); - const instance = await WebAssembly.instantiate(module, imports); - successCallback(instance, module); - return {}; - }; - // Module['wasmModule']=new WebAssembly.Module(buffer); - importScripts('./owt.js'); - console.log('Got wasm binary.'); -} - -initVideoDecoder(); diff --git a/src/samples/conference/public/scripts/quic.js b/src/samples/conference/public/scripts/quic.js index cb4cf8ee..f4f3d9d2 100644 --- a/src/samples/conference/public/scripts/quic.js +++ b/src/samples/conference/public/scripts/quic.js @@ -9,27 +9,31 @@ let quicChannel = null; let bidirectionalStream = null; let bidiAudioStream = null; -let writeTask, mediaStream, mediaWorker, conferenceId, myId, mixedStream, generatorWriter; +let writeTask, dataWorker, conferenceId, myId, mixedStream, generatorWriter, + mediaPublication; +const isMedia = true; window.Module={}; -const conference = new Owt.Conference.ConferenceClient({ - webTransportConfiguration: { - serverCertificateFingerprints: [{ - value: - '59:74:C6:C5:2C:D8:E8:18:A9:D2:14:77:ED:94:89:87:DF:83:BA:B3:96:4C:4C:0B:B8:D3:22:58:11:55:67:1A', - algorithm: 'sha-256', - }], - serverCertificateHashes: [{ - value: new Uint8Array([ - 0x59, 0x74, 0xC6, 0xC5, 0x2C, 0xD8, 0xE8, 0x18, 0xA9, 0xD2, 0x14, - 0x77, 0xED, 0x94, 0x89, 0x87, 0xDF, 0x83, 0xBA, 0xB3, 0x96, 0x4C, - 0x4C, 0x0B, 0xB8, 0xD3, 0x22, 0x58, 0x11, 0x55, 0x67, 0x1A - ]), - algorithm: 'sha-256', - }], - } -}); +const conference = new Owt.Conference.ConferenceClient( + { + webTransportConfiguration: { + serverCertificateFingerprints: [{ + value: + 'FD:CD:87:EB:92:97:84:FD:D9:E9:C1:9F:AF:57:12:0E:32:AF:0D:C0:58:5F:33:BB:59:4A:2E:6E:C3:18:7A:93', + algorithm: 'sha-256', + }], + serverCertificateHashes: [{ + value: new Uint8Array([ + 0xFD, 0xCD, 0x87, 0xEB, 0x92, 0x97, 0x84, 0xFD, 0xD9, 0xE9, 0xC1, + 0x9F, 0xAF, 0x57, 0x12, 0x0E, 0x32, 0xAF, 0x0D, 0xC0, 0x58, 0x5F, + 0x33, 0xBB, 0x59, 0x4A, 0x2E, 0x6E, 0xC3, 0x18, 0x7A, 0x93 + ]), + algorithm: 'sha-256', + }], + } + }, + '../../../sdk/conference/webtransport'); conference.addEventListener('streamadded', async (event) => { console.log(event.stream); if (event.stream.origin == myId) { @@ -60,8 +64,8 @@ function updateConferenceStatus(message) { } function initWorker() { - mediaWorker = new Worker('./scripts/media-worker.js'); - mediaWorker.onmessage=((e) => { + dataWorker = new Worker('./scripts/data-worker.js'); + dataWorker.onmessage=((e) => { if (e.data[0] === 'video-frame') { generatorWriter.write(e.data[1]); //console.log(e.data[1]); @@ -115,29 +119,13 @@ async function attachReader(stream) { } async function createSendChannel() { - //bidirectionalStream = await conference.createSendStream(); - bidiAudioStream = await conference.createSendStream(); - const localStream = new Owt.Base.LocalStream( - bidiAudioStream, - new Owt.Base.StreamSourceInfo('mic', undefined, undefined)); - attachReader(bidiAudioStream); - const publication = await conference.publish(localStream, { - audio: {codec: 'opus', numberOfChannels: 2, sampleRate: 48000}, - video: false, - transport: {type: 'quic'}, - }); - // const localStream = new Owt.Base.LocalStream( - // bidirectionalStream, - // new Owt.Base.StreamSourceInfo(undefined, undefined, true)); - // const publication = await conference.publish( - // localStream, {transport: {type: 'quic'}}); - console.log(publication); + bidirectionalStream = await conference.createSendStream(); updateConferenceStatus('Created send channel.'); } async function windowOnLoad() { await joinConference(); - await createSendChannel(); + //await createSendChannel(); } async function writeUuid() { @@ -151,31 +139,12 @@ async function writeUuid() { return; } -async function writeMediaData() { - mediaStream = - await navigator.mediaDevices.getUserMedia({audio: true, video: true}); - const audioTrack = - new MediaStreamTrackProcessor(mediaStream.getAudioTracks()[0]); - mediaWorker.postMessage( - ['audio-source', audioTrack.readable], [audioTrack.readable]); - mediaWorker.postMessage( - ['send-stream-audio', bidiAudioStream.writable], - [bidiAudioStream.writable]); - // const videoTrack = - // new MediaStreamTrackProcessor(mediaStream.getVideoTracks()[0]); - // mediaWorker.postMessage( - // ['video-source', videoTrack.readable], [videoTrack.readable]); - // mediaWorker.postMessage( - // ['send-stream-video', bidirectionalStream.writable], - // [bidirectionalStream.writable]); -} - async function writeData() { const encoder = new TextEncoder(); const encoded = encoder.encode('message', {stream: true}); const writer = bidirectionalStream.writable.getWriter(); await writer.ready; - const ab=new Uint8Array(10000); + const ab = new Uint8Array(10000); ab.fill(1, 0); await writer.write(ab); writer.releaseLock(); @@ -188,17 +157,34 @@ window.addEventListener('load', () => { }); document.getElementById('start-sending').addEventListener('click', async () => { - if (!bidiAudioStream) { - updateConferenceStatus('Stream is not created.'); - return; + if (isMedia) { + const mediaStream = + await navigator.mediaDevices.getUserMedia({audio: true, video: true}); + const localStream = new Owt.Base.LocalStream( + mediaStream, new Owt.Base.StreamSourceInfo('mic', 'camera', undefined)); + mediaPublication = await conference.publish(localStream, { + audio: {codec: 'opus', numberOfChannels: 2, sampleRate: 48000}, + video: {codec: 'h264'}, + transport: {type: 'quic'}, + }); + } else { + if (!bidirectionalStream) { + updateConferenceStatus('Stream is not created.'); + return; + } + writeTask = setInterval(writeData, 200); } - writeMediaData(); - //writeTask = setInterval(writeData, 200); updateConferenceStatus('Started sending.'); }); document.getElementById('stop-sending').addEventListener('click', () => { - clearInterval(writeTask); + if (isMedia) { + if (mediaPublication) { + mediaPublication.stop(); + } + } else { + clearInterval(writeTask); + } updateConferenceStatus('Stopped sending.'); }); @@ -213,7 +199,7 @@ document.getElementById('start-receiving') const receiver = ms.createRtpVideoReceiver(); receiver.setCompleteFrameCallback((frame) => { const copiedFrame = frame.slice(0); - mediaWorker.postMessage( + dataWorker.postMessage( ['encoded-video-frame', copiedFrame], [copiedFrame.buffer]); }); subscribeMixedStream(); diff --git a/src/sdk/base/logger.js b/src/sdk/base/logger.js index bd61aa9b..65f39d8f 100644 --- a/src/sdk/base/logger.js +++ b/src/sdk/base/logger.js @@ -26,7 +26,7 @@ // This file is borrowed from lynckia/licode with some modifications. -/* global window */ +/* global console */ 'use strict'; @@ -55,13 +55,13 @@ const Logger = (function() { }; that.log = (...args) => { - window.console.log((new Date()).toISOString(), ...args); + console.log((new Date()).toISOString(), ...args); }; const bindType = function(type) { - if (typeof window.console[type] === 'function') { + if (typeof console[type] === 'function') { return (...args) => { - window.console[type]((new Date()).toISOString(), ...args); + console[type]((new Date()).toISOString(), ...args); }; } else { return that.log; diff --git a/src/sdk/base/stream.js b/src/sdk/base/stream.js index e036b65c..0dfe92d9 100644 --- a/src/sdk/base/stream.js +++ b/src/sdk/base/stream.js @@ -2,8 +2,6 @@ // // SPDX-License-Identifier: Apache-2.0 -/* global SendStream, BidirectionalStream */ - 'use strict'; import * as Utils from './utils.js'; import {EventDispatcher, OwtEvent} from './event.js'; diff --git a/src/sdk/conference/client.js b/src/sdk/conference/client.js index a5f1729f..bb64d4be 100644 --- a/src/sdk/conference/client.js +++ b/src/sdk/conference/client.js @@ -117,9 +117,10 @@ class ConferenceClientConfiguration { // eslint-disable-line no-unused-vars * @extends Owt.Base.EventDispatcher * @constructor * @param {?Owt.Conference.ConferenceClientConfiguration } config Configuration for ConferenceClient. + * @param {string} workerDir Path of the directory for workers shipped with OWT SDK. It could be an relative path to your HTML file or an absolute path. * @param {?Owt.Conference.SioSignaling } signalingImpl Signaling channel implementation for ConferenceClient. SDK uses default signaling channel implementation if this parameter is undefined. Currently, a Socket.IO signaling channel implementation was provided as ics.conference.SioSignaling. However, it is not recommended to directly access signaling channel or customize signaling channel for ConferenceClient as this time. */ -export const ConferenceClient = function(config, signalingImpl) { +export const ConferenceClient = function(config, workerDir, signalingImpl) { Object.setPrototypeOf(this, new EventModule.EventDispatcher()); config = config || {}; const self = this; @@ -439,7 +440,8 @@ export const ConferenceClient = function(config, signalingImpl) { if (typeof WebTransport === 'function' && token.webTransportUrl) { quicTransportChannel = new QuicConnection( token.webTransportUrl, resp.webTransportToken, - createSignalingForChannel(), config.webTransportConfiguration); + createSignalingForChannel(), config.webTransportConfiguration, + workerDir); } const conferenceInfo = new ConferenceInfo( resp.room.id, Array.from(participants.values()), diff --git a/src/sdk/conference/webtransport/connection.js b/src/sdk/conference/webtransport/connection.js index 51520f07..fa8f9a1e 100644 --- a/src/sdk/conference/webtransport/connection.js +++ b/src/sdk/conference/webtransport/connection.js @@ -3,15 +3,15 @@ // SPDX-License-Identifier: Apache-2.0 /* eslint-disable require-jsdoc */ -/* global Promise, Map, WebTransport, Uint8Array, Uint32Array, TextEncoder, - * ArrayBuffer */ +/* global Promise, Map, WebTransport, WebTransportBidirectionalStream, + Uint8Array, Uint32Array, TextEncoder, Worker, MediaStreamTrackProcessor */ 'use strict'; import Logger from '../../base/logger.js'; import {EventDispatcher} from '../../base/event.js'; import {Publication} from '../../base/publication.js'; -import {SubscribeOptions, Subscription} from '../subscription.js'; +import {Subscription} from '../subscription.js'; import {Base64} from '../../base/base64.js'; const uuidByteLength = 16; @@ -26,21 +26,24 @@ const uuidByteLength = 16; export class QuicConnection extends EventDispatcher { // `tokenString` is a base64 string of the token object. It's in the return // value of `ConferenceClient.join`. - constructor(url, tokenString, signaling, webTransportOptions) { + constructor(url, tokenString, signaling, webTransportOptions, workerDir) { super(); this._tokenString = tokenString; this._token = JSON.parse(Base64.decodeBase64(tokenString)); this._signaling = signaling; this._ended = false; - this._quicStreams = new Map(); // Key is publication or subscription ID. + // Key is publication or subscription ID, value is a list of streams. + this._quicDataStreams = new Map(); + // Key is MediaStreamTrack ID, value is a bidirectional stream. + this._quicMediaStreamTracks = new Map(); this._quicTransport = new WebTransport(url, webTransportOptions); this._subscribePromises = new Map(); // Key is subscription ID. this._subscribeOptions = new Map(); // Key is subscription ID. this._subscriptionInfoReady = - new Map(); // Key is subscription ID, value is a promise. + new Map(); // Key is subscription ID, value is a promise. this._transportId = this._token.transportId; this._initReceiveStreamReader(); - //this._initDatagrams(); + this._worker = new Worker(workerDir + '/media-worker.js', {type: 'module'}); } /** @@ -74,14 +77,6 @@ export class QuicConnection extends EventDispatcher { await this._authenticate(this._tokenString); } - async _initDatagrams() { - const datagramReader = this._quicTransport.datagrams.readable.getReader(); - while (true) { - const value = await datagramReader.read(); - console.log(value); - } - } - async _initReceiveStreamReader() { const receiveStreamReader = this._quicTransport.incomingBidirectionalStreams.getReader(); @@ -103,7 +98,7 @@ export class QuicConnection extends EventDispatcher { // https://github.com/w3c/webtransport/issues/131. Issue tracker: // https://crbug.com/1182905. const chunkReader = receiveStream.readable.getReader(); - let readingChunksDone = false; + const readingChunksDone = false; let readingHeaderDone = false; let mediaStream = false; let subscriptionId; @@ -169,7 +164,7 @@ export class QuicConnection extends EventDispatcher { } } chunkReader.releaseLock(); - this._quicStreams.set(subscriptionId, receiveStream); + this._quicDataStreams.set(subscriptionId, [receiveStream]); if (this._subscribePromises.has(subscriptionId)) { const subscription = this._createSubscription(subscriptionId, receiveStream); @@ -212,33 +207,71 @@ export class QuicConnection extends EventDispatcher { return quicStream; } - async createSendStream1(sessionId) { - Logger.info('Create stream.'); - await this._quicTransport.ready; - // TODO: Potential failure because of publication stream is created faster - // than signaling stream(created by the 1st call to initiatePublication). - const publicationId = await this._initiatePublication(); - const quicStream = await this._quicTransport.createSendStream(); - const writer = quicStream.writable.getWriter(); - await writer.ready; - writer.write(this._uuidToUint8Array(publicationId)); - writer.releaseLock(); - this._quicStreams.set(publicationId, quicStream); - return quicStream; - } - async publish(stream, options) { // TODO: Avoid a stream to be published twice. The first 16 bit data send to // server must be it's publication ID. // TODO: Potential failure because of publication stream is created faster // than signaling stream(created by the 1st call to initiatePublication). const publicationId = await this._initiatePublication(stream, options); - const quicStream = stream.stream; - const writer = quicStream.writable.getWriter(); - await writer.ready; - writer.write(this._uuidToUint8Array(publicationId)); - writer.releaseLock(); - this._quicStreams.set(publicationId, quicStream); + const quicStreams = []; + if (stream.stream instanceof WebTransportBidirectionalStream) { + quicStreams.push(stream.stream); + this._quicDataStreams.set(publicationId, stream.streams); + } else if (stream.stream instanceof MediaStream) { + if (typeof MediaStreamTrackProcessor === 'undefined') { + throw new TypeError( + 'MediaStreamTrackProcessor is not supported by your browser.'); + } + for (const track of stream.stream.getTracks()) { + const quicStream = + await this._quicTransport.createBidirectionalStream(); + this._quicMediaStreamTracks.set(track.id, quicStream); + quicStreams.push(quicStream); + } + } else { + throw new TypeError('Invalid stream.'); + } + for (const quicStream of quicStreams) { + const writer = quicStream.writable.getWriter(); + await writer.ready; + writer.write(this._uuidToUint8Array(publicationId)); + writer.releaseLock(); + } + if (stream.stream instanceof MediaStream) { + for (const track of stream.stream.getTracks()) { + let encoderConfig; + if (track.kind === 'audio') { + encoderConfig = { + codec: 'opus', + numberOfChannels: 1, + sampleRate: 48000, + }; + } else if (track.kind === 'video') { + encoderConfig = { + codec: 'avc1.4d002a', + width: 640, + height: 480, + framerate: 30, + latencyMode: 'realtime', + avc: {format: 'annexb'}, + }; + } + const quicStream = this._quicMediaStreamTracks.get(track.id); + const processor = new MediaStreamTrackProcessor(track); + this._worker.postMessage( + [ + 'media-sender', + [ + track.id, + track.kind, + processor.readable, + quicStream.writable, + encoderConfig, + ], + ], + [processor.readable, quicStream.writable]); + } + } const publication = new Publication(publicationId, () => { this._signaling.sendSignalingMessage('unpublish', {id: publication}) .catch((e) => { @@ -250,7 +283,7 @@ export class QuicConnection extends EventDispatcher { } hasContentSessionId(id) { - return this._quicStreams.has(id); + return this._quicDataStreams.has(id); } _uuidToUint8Array(uuidString) { @@ -343,13 +376,14 @@ export class QuicConnection extends EventDispatcher { .then((data) => { this._subscribeOptions.set(data.id, options); Logger.debug('Subscribe info is set.'); - if (this._quicStreams.has(data.id)) { + if (this._quicDataStreams.has(data.id)) { // QUIC stream created before signaling returns. + // TODO: Update subscription to accept list of QUIC streams. const subscription = this._createSubscription( - data.id, this._quicStreams.get(data.id)); + data.id, this._quicDataStreams.get(data.id)[0]); resolve(subscription); } else { - this._quicStreams.set(data.id, null); + this._quicDataStreams.set(data.id, null); // QUIC stream is not created yet, resolve promise after getting // QUIC stream. this._subscribePromises.set( @@ -363,15 +397,6 @@ export class QuicConnection extends EventDispatcher { return p; } - _readAndPrint() { - this._quicStreams[0].waitForReadable(5).then(() => { - const data = new Uint8Array(this._quicStreams[0].readBufferedAmount); - this._quicStreams[0].readInto(data); - Logger.info('Read data: ' + data); - this._readAndPrint(); - }); - } - async _initiatePublication(stream, options) { const media = {tracks: []}; if (stream.source.audio) { @@ -410,15 +435,15 @@ export class QuicConnection extends EventDispatcher { }; media.tracks.push(track); } - const data = await this._signaling.sendSignalingMessage('publish', { + const resp = await this._signaling.sendSignalingMessage('publish', { media: stream.source.data ? null : media, data: stream.source.data, transport: {type: 'quic', id: this._transportId}, }); - if (this._transportId !== data.transportId) { + if (this._transportId !== resp.transportId) { throw new Error('Transport ID not match.'); } - return data.id; + return resp.id; } _readyHandler() { diff --git a/src/sdk/conference/webtransport/media-worker.js b/src/sdk/conference/webtransport/media-worker.js new file mode 100644 index 00000000..dd571817 --- /dev/null +++ b/src/sdk/conference/webtransport/media-worker.js @@ -0,0 +1,140 @@ +// Copyright (C) <2021> Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +/* eslint-disable require-jsdoc */ +/* global AudioEncoder, VideoEncoder, VideoDecoder, Map, ArrayBuffer, + Uint8Array, DataView */ + +import Logger from '../../base/logger.js'; + +// Key is MediaStreamTrack ID, value is AudioEncoder or VideoEncoder. +const encoders = new Map(); +// Key is MediaStreamTrack ID, value is WritableStreamDefaultWriter. +const writers = new Map(); + +let frameBuffer; +let videoDecoder; +// 4 bytes for frame size before each frame. The 1st byte is reserved, always 0. +const sizePrefix = 4; + +/* Messages it accepts: + * media-sender: [MediaStreamTrack, WebTransportStream, + * AudioEncoderConfig/VideoEncoderConfig] + */ +// eslint-disable-next-line no-undef +onmessage = (e) => { + if (e.data[0] === 'media-sender') { + const [trackId, trackKind, trackReadable, sendStreamWritable, config] = + e.data[1]; + let encoder; + const writer = sendStreamWritable.getWriter(); + if (trackKind === 'audio') { + encoder = initAudioEncoder(config, writer); + } else { // Video. + encoder = initVideoEncoder(config, writer); + } + encoders.set(trackId, encoder); + writers.set(trackId, writer); + readMediaData(trackReadable, encoder); + writeTrackId(trackKind, writer); + } +}; + +async function videoOutput(writer, chunk, metadata) { + // TODO: Combine audio and video output callback. + if (!frameBuffer || frameBuffer.byteLength < chunk.byteLength + sizePrefix) { + frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix); + } + const bufferView = new Uint8Array(frameBuffer, sizePrefix); + chunk.copyTo(bufferView); + const dataView = new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix); + dataView.setUint32(0, chunk.byteLength); + await writer.ready; + await writer.write(dataView); +} + +function videoError(error) { + Logger.error('Video encode error: ' + error.message); +} + +async function audioOutput(writer, chunk, metadata) { + if (!frameBuffer || frameBuffer.byteLength < chunk.byteLength + sizePrefix) { + frameBuffer = new ArrayBuffer(chunk.byteLength + sizePrefix); + } + const bufferView = new Uint8Array(frameBuffer, sizePrefix); + chunk.copyTo(bufferView); + const dataView = new DataView(frameBuffer, 0, chunk.byteLength + sizePrefix); + dataView.setUint32(0, chunk.byteLength); + await writer.ready; + await writer.write(dataView); +} + +function audioError(error) { + Logger.error(`Audio encode error: ${error.message}`); +} + +async function writeTrackId(kind, writer) { + const id = new Uint8Array(16); + id[15] = (kind === 'audio' ? 1 : 2); + await writer.ready; + writer.write(id); +} + +function initAudioEncoder(config, writer) { + const audioEncoder = new AudioEncoder( + {output: audioOutput.bind(null, writer), error: audioError}); + // TODO: Respect config. + audioEncoder.configure( + {codec: 'opus', numberOfChannels: 1, sampleRate: 48000}); + return audioEncoder; +} + +function initVideoEncoder(config, writer) { + const videoEncoder = new VideoEncoder( + {output: videoOutput.bind(null, writer), error: videoError}); + // TODO: Respect config. + videoEncoder.configure({ + codec: 'avc1.4d002a', + width: 640, + height: 480, + framerate: 30, + latencyMode: 'realtime', + avc: {format: 'annexb'}, + }); + return videoEncoder; +} + +function initVideoDecoder() { + videoDecoder = new VideoDecoder({ + output: videoFrameOutputCallback, + error: webCodecsErrorCallback, + }); + videoDecoder.configure({codec: 'avc1.42400a', optimizeForLatency: true}); +} + +function videoFrameOutputCallback(frame) { + // eslint-disable-next-line no-undef + postMessage(['video-frame', frame], [frame]); +} + +function webCodecsErrorCallback(error) { + Logger.warn('error: ' + error.message); +} + +// Read data from media track. +async function readMediaData(trackReadable, encoder) { + const reader = trackReadable.getReader(); + // eslint-disable-next-line no-constant-condition + while (true) { + const {value, done} = await reader.read(); + if (done) { + Logger.debug('MediaStream ends.'); + break; + } + encoder.encode(value); + value.close(); + } +} + +initVideoDecoder(); diff --git a/src/sdk/conference/webtransport/receive-stream-worker.js b/src/sdk/conference/webtransport/receive-stream-worker.js index 256f1244..89e60a45 100644 --- a/src/sdk/conference/webtransport/receive-stream-worker.js +++ b/src/sdk/conference/webtransport/receive-stream-worker.js @@ -5,6 +5,9 @@ /* eslint-disable require-jsdoc */ /* global AudioDecoder, postMessage */ +// TODO: Enable ESLint for this file. +/* eslint-disable */ + 'use strict'; import Logger from '../../base/logger.js';