Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 33cca83

Browse files
committed
Allow data flow between QUIC agent and video agent.
1 parent d7dcb33 commit 33cca83

13 files changed

+190
-51
lines changed

doc/Client-Portal Protocol.md

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -362,14 +362,14 @@ This a format for client reconnects.
362362
```
363363
object(PublicationRequest)::
364364
{
365-
media: object(WebRTCMediaOptions) | null,
365+
media: object(WebRTCMediaOptions) | object(WebCodecsMediaOptions) | null,
366366
data: true | false,
367367
transport: object(TransportOptions),
368368
attributes: object(ClientDefinedAttributes) | null
369369
}
370370
```
371371

372-
A publication can send either media or data, but a QUIC *transport* channel can support multiple stream for both media and data. Setting `media:null` and `data:false` is meaningless, so it should be rejected by server. Protocol itself doesn't forbit to create WebRTC connection for data. However, SCTP data channel is not implemented at server side, so currently `data:true` is only support by QUIC transport channels.
372+
A publication can send either media or data. Setting `media:null` and `data:false` is meaningless, so it should be rejected by server. Protocol itself doesn't forbit to create WebRTC connection for data. However, SCTP data channel is not implemented at server side, so currently `data:true` is only support by WebTransport channels. When transport's type is "webrtc", `media` should be an object of `WebRTCMediaOptions`. When transport's type is "quic", `media` should be an object of `WebCodecsMediaOptions` or `null`.
373373

374374
```
375375
object(WebRTCMediaOptions)::
@@ -385,6 +385,20 @@ A publication can send either media or data, but a QUIC *transport* channel can
385385
}
386386
```
387387

388+
```
389+
object(WebCodecsMediaOptions)::
390+
{
391+
tracks: [
392+
{
393+
type: "audio" | "video",
394+
source: "mic" | "screen-cast" | ... | "encoded-file",
395+
format: object(AudioFormat) | object(VideoFormat)
396+
}
397+
]
398+
}
399+
}
400+
```
401+
388402
**ResponseData**: The PublicationResult object with following definition if **ResponseStatus** is “ok”:
389403

390404
object(PublicationResult)::

doc/design/quic-transport-payload-format.md renamed to doc/design/web-transport-payload-format.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,22 @@ After creating a WebTransport, a stream with session 0 should be created for aut
3939
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
4040
```
4141

42+
## Media Stream
43+
44+
After sending 128 bit length session ID, a 128 bit length track ID is sent to remote side to indicates the track of a stream. Since audio track and video track of a single stream shares the same track ID at this time, track 1 is for audio and track 2 is for video.
45+
46+
When a WebTransport stream is used for transmitting data of a media stream track (e.g.: H.264 bitstream), a 32 (8+24) bit length header is added to indicate frame size.
47+
48+
```
49+
0 1 2 3
50+
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
51+
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
52+
| Reserved | Message length |
53+
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
54+
| Message ...
55+
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
56+
```
57+
4258
## Authentication
4359

4460
If signaling messages are transmitted over WebTransport, authentication follows the regular process defined by [Client-Portal Protocol](https://github.com/open-webrtc-toolkit/owt-server/blob/master/doc/Client-Portal%20Protocol.md). Otherwise, client sends a token for WebTransport as a signaling message. WebTransport token is issued during joining a conference. If the token is valid, server sends a 128 bit length zeros to client.

source/agent/addons/internalIO/InternalClientWrapper.cc

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -88,23 +88,36 @@ NAN_METHOD(InternalClient::close) {
8888
obj->me = nullptr;
8989
}
9090

91-
NAN_METHOD(InternalClient::addDestination) {
92-
InternalClient* obj = ObjectWrap::Unwrap<InternalClient>(info.Holder());
93-
owt_base::InternalClient* me = obj->me;
91+
NAN_METHOD(InternalClient::addDestination)
92+
{
93+
InternalClient* obj = ObjectWrap::Unwrap<InternalClient>(info.Holder());
94+
owt_base::InternalClient* me = obj->me;
9495

95-
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
96-
std::string track = std::string(*param0);
96+
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
97+
std::string track = std::string(*param0);
9798

98-
FrameDestination* param =
99-
ObjectWrap::Unwrap<FrameDestination>(
100-
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
101-
owt_base::FrameDestination* dest = param->dest;
99+
bool isNanDestination(false);
100+
if (info.Length() >= 3) {
101+
isNanDestination = info[2]->ToBoolean(Nan::GetCurrentContext()).ToLocalChecked()->Value();
102+
}
102103

103-
if (track == "audio") {
104-
me->addAudioDestination(dest);
105-
} else if (track == "video") {
106-
me->addVideoDestination(dest);
107-
}
104+
owt_base::FrameDestination* dest(nullptr);
105+
if (isNanDestination) {
106+
NanFrameNode* param = Nan::ObjectWrap::Unwrap<NanFrameNode>(info[1]->ToObject());
107+
dest = param->FrameDestination();
108+
} else {
109+
FrameDestination* param = ObjectWrap::Unwrap<FrameDestination>(
110+
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
111+
dest = param->dest;
112+
}
113+
114+
if (track == "audio") {
115+
me->addAudioDestination(dest);
116+
} else if (track == "video") {
117+
me->addVideoDestination(dest);
118+
} else if (track == "data") {
119+
me->addDataDestination(dest);
120+
}
108121
}
109122

110123
NAN_METHOD(InternalClient::removeDestination) {

source/agent/addons/internalIO/InternalServerWrapper.cc

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,20 @@ NAN_METHOD(InternalServer::addSource) {
9999
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
100100
std::string streamId = std::string(*param0);
101101

102-
FrameSource* param =
103-
ObjectWrap::Unwrap<FrameSource>(
104-
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
105-
owt_base::FrameSource* src = param->src;
102+
bool isNanSource(false);
103+
if (info.Length() >= 3) {
104+
isNanSource = info[2]->ToBoolean(Nan::GetCurrentContext()).ToLocalChecked()->Value();
105+
}
106+
107+
owt_base::FrameSource* src(nullptr);
108+
if (isNanSource) {
109+
NanFrameNode* param = Nan::ObjectWrap::Unwrap<NanFrameNode>(info[1]->ToObject());
110+
src = param->FrameSource();
111+
} else {
112+
FrameSource* param = ObjectWrap::Unwrap<FrameSource>(
113+
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
114+
src = param->src;
115+
}
106116

107117
me->addSource(streamId, src);
108118
}

source/agent/addons/internalIO/InternalServerWrapper.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ class InternalServer : public node::ObjectWrap,
4444
static NAN_METHOD(close);
4545

4646
static NAN_METHOD(getListeningPort);
47-
47+
// Arguments:
48+
// type: string, type of the source, "audio", "video" or "data".
49+
// source: A node addon object or NAN object.
50+
// isNanObject: indicates whether `source` is a NAN object.
4851
static NAN_METHOD(addSource);
4952

5053
static NAN_METHOD(removeSource);
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#ifndef QUIC_ADDON_MEDIA_FRAME_PACKETIZER_H_
2+
#define QUIC_ADDON_MEDIA_FRAME_PACKETIZER_H_
3+
4+
#endif

source/agent/addons/quic/QuicTransportStream.cc

Lines changed: 82 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66

77
#include "QuicTransportStream.h"
8+
#include "../common/MediaFramePipelineWrapper.h"
89

910
using v8::Function;
1011
using v8::FunctionTemplate;
@@ -30,6 +31,9 @@ QuicTransportStream::QuicTransportStream(owt::quic::WebTransportStreamInterface*
3031
, m_isPiped(false)
3132
, m_buffer(nullptr)
3233
, m_bufferSize(0)
34+
, m_isMedia(false)
35+
, m_currentFrameSize(0)
36+
, m_readFrameSize(0)
3337
{
3438
}
3539

@@ -113,15 +117,32 @@ NAN_METHOD(QuicTransportStream::close)
113117
NAN_METHOD(QuicTransportStream::addDestination)
114118
{
115119
QuicTransportStream* obj = Nan::ObjectWrap::Unwrap<QuicTransportStream>(info.Holder());
116-
if (info.Length() != 2) {
120+
if (info.Length() > 3) {
117121
Nan::ThrowTypeError("Invalid argument length for addDestination.");
118122
return;
119123
}
120-
// TODO: Check if info[0] is an Nan wrapped object.
121-
auto framePtr = Nan::ObjectWrap::Unwrap<QuicTransportStream>(info[1]->ToObject());
122-
// void* ptr = info[0]->ToObject()->GetAlignedPointerFromInternalField(0);
123-
// auto framePtr=static_cast<owt_base::FrameDestination*>(ptr);
124-
obj->addDataDestination(framePtr);
124+
Nan::Utf8String param0(Nan::To<v8::String>(info[0]).ToLocalChecked());
125+
std::string track = std::string(*param0);
126+
bool isNanDestination(false);
127+
if (info.Length() == 3) {
128+
isNanDestination = info[2]->ToBoolean(Nan::GetCurrentContext()).ToLocalChecked()->Value();
129+
}
130+
owt_base::FrameDestination* dest(nullptr);
131+
if (isNanDestination) {
132+
NanFrameNode* param = Nan::ObjectWrap::Unwrap<NanFrameNode>(info[1]->ToObject());
133+
dest = param->FrameDestination();
134+
} else {
135+
::FrameDestination* param = node::ObjectWrap::Unwrap<::FrameDestination>(
136+
info[1]->ToObject(Nan::GetCurrentContext()).ToLocalChecked());
137+
dest = param->dest;
138+
}
139+
if (track == "audio") {
140+
obj->addAudioDestination(dest);
141+
} else if (track == "video") {
142+
obj->addVideoDestination(dest);
143+
} else if (track == "data") {
144+
obj->addDataDestination(dest);
145+
}
125146
obj->m_isPiped = true;
126147
}
127148

@@ -153,6 +174,13 @@ void QuicTransportStream::MaybeReadContentSessionId()
153174
m_receivedContentSessionId = true;
154175
m_asyncOnContentSessionId.data = this;
155176
uv_async_send(&m_asyncOnContentSessionId);
177+
for (uint8_t d : m_contentSessionId) {
178+
if (d != 0) {
179+
m_isPiped = true;
180+
m_isMedia = true;
181+
break;
182+
}
183+
}
156184
if (m_stream->ReadableBytes() > 0) {
157185
SignalOnData();
158186
}
@@ -173,7 +201,6 @@ NAUV_WORK_CB(QuicTransportStream::onData)
173201
v8::Local<v8::Function> eventCallback = onEventLocal.As<Function>();
174202
Nan::AsyncResource* resource = new Nan::AsyncResource(Nan::New<v8::String>("ondata").ToLocalChecked());
175203
auto readableBytes = obj->m_stream->ReadableBytes();
176-
ELOG_DEBUG("Readable bytes: %d", readableBytes);
177204
uint8_t* buffer = new uint8_t[readableBytes]; // Use a shared buffer instead to reduce performance cost on new.
178205
obj->m_stream->Read(buffer, readableBytes);
179206
Local<Value> args[] = { Nan::NewBuffer((char*)buffer, readableBytes).ToLocalChecked() };
@@ -211,15 +238,55 @@ void QuicTransportStream::SignalOnData()
211238

212239
while (m_stream->ReadableBytes() > 0) {
213240
auto readableBytes = m_stream->ReadableBytes();
214-
if (readableBytes > m_bufferSize) {
215-
ReallocateBuffer(readableBytes);
241+
ELOG_DEBUG("Readable bytes: %d", readableBytes);
242+
if (m_isMedia) {
243+
// A new frame.
244+
if (m_currentFrameSize == 0 && m_readFrameSize == 0) {
245+
if (readableBytes >= 2) {
246+
const int headerSize = 4; // In bytes.
247+
uint8_t* frameSizeArray = new uint8_t[headerSize];
248+
memset(frameSizeArray, 0, headerSize * sizeof(uint8_t));
249+
m_stream->Read((uint8_t*)frameSizeArray, headerSize);
250+
// Usually only the last 2 bytes are used. The first two bits could be used for indicating frame size.
251+
for (int i = 0; i < headerSize; i++) {
252+
m_currentFrameSize <<= 8;
253+
m_currentFrameSize += frameSizeArray[i];
254+
}
255+
if (m_currentFrameSize > m_bufferSize) {
256+
ReallocateBuffer(m_currentFrameSize);
257+
}
258+
}
259+
continue;
260+
}
261+
if (m_readFrameSize < m_currentFrameSize) {
262+
// Append data to current frame.
263+
size_t readBytes = std::min(readableBytes, m_currentFrameSize - m_readFrameSize);
264+
m_stream->Read(m_buffer + m_readFrameSize, readBytes);
265+
m_readFrameSize += readBytes;
266+
}
267+
// Complete frame.
268+
if (m_readFrameSize == m_currentFrameSize) {
269+
owt_base::Frame frame;
270+
frame.format = owt_base::FRAME_FORMAT_I420;
271+
frame.length = m_currentFrameSize;
272+
frame.payload = m_buffer;
273+
// Transport layer doesn't know a frame's type. Video agent is able to parse the type of a frame from bistream. However, video agent doesn't feed the frame to decoder when a key frame is requested.
274+
frame.additionalInfo.video.isKeyFrame = "key";
275+
deliverFrame(frame);
276+
m_currentFrameSize = 0;
277+
m_readFrameSize = 0;
278+
}
279+
} else {
280+
if (readableBytes > m_bufferSize) {
281+
ReallocateBuffer(readableBytes);
282+
}
283+
owt_base::Frame frame;
284+
frame.format = owt_base::FRAME_FORMAT_DATA;
285+
frame.length = readableBytes;
286+
frame.payload = m_buffer;
287+
m_stream->Read(frame.payload, readableBytes);
288+
deliverFrame(frame);
216289
}
217-
owt_base::Frame frame;
218-
frame.format = owt_base::FRAME_FORMAT_DATA;
219-
frame.length = readableBytes;
220-
frame.payload = m_buffer;
221-
m_stream->Read(frame.payload, readableBytes);
222-
deliverFrame(frame);
223290
}
224291
}
225292

source/agent/addons/quic/QuicTransportStream.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77
#ifndef QUIC_QUICTRANSPORTSTREAM_H_
88
#define QUIC_QUICTRANSPORTSTREAM_H_
99

10-
#include "owt/quic/web_transport_stream_interface.h"
1110
#include "../../core/owt_base/MediaFramePipeline.h"
1211
#include "../common/MediaFramePipelineWrapper.h"
12+
#include "owt/quic/web_transport_stream_interface.h"
1313
#include <logger.h>
14-
#include <nan.h>
1514
#include <mutex>
15+
#include <nan.h>
1616
#include <string>
1717

1818
class QuicTransportStream : public owt_base::FrameSource, public owt_base::FrameDestination, public NanFrameNode, public owt::quic::WebTransportStreamInterface::Visitor {
@@ -37,7 +37,7 @@ class QuicTransportStream : public owt_base::FrameSource, public owt_base::Frame
3737
static NAN_METHOD(addDestination);
3838
static NAN_METHOD(removeDestination);
3939
static NAUV_WORK_CB(onContentSessionId);
40-
static NAUV_WORK_CB(onData); // TODO: Move to pipe.
40+
static NAUV_WORK_CB(onData); // TODO: Move to pipe.
4141

4242
// Overrides owt_base::FrameSource.
4343
void onFeedback(const owt_base::FeedbackMsg&) override;
@@ -72,6 +72,11 @@ class QuicTransportStream : public owt_base::FrameSource, public owt_base::Frame
7272
uint8_t* m_buffer;
7373
size_t m_bufferSize;
7474

75+
// Indicates whether this is a media stream. If this is not a media stream, it can only be piped to another QUIC agent.
76+
bool m_isMedia;
77+
size_t m_currentFrameSize;
78+
size_t m_readFrameSize;
79+
7580
uv_async_t m_asyncOnContentSessionId;
7681
uv_async_t m_asyncOnData;
7782
};

source/agent/conference/quicController.js

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,20 @@ class Transport {
3434

3535
class Operation {
3636
constructor(id, transport, direction, tracks, data) {
37-
if(tracks){
38-
throw new Error('QUIC agent does not support media stream tracks so far.');
39-
}
4037
this.id = id;
4138
this.transport = transport;
4239
this.transportId = transport.id;
4340
this.direction = direction;
41+
this.tracks = tracks;
4442
this.data = data;
4543
this.promise = Promise.resolve();
44+
this.tracks = this.tracks ? this.tracks.map(t => {
45+
if (t.type === 'video') {
46+
t.format = { codec : 'h264', profile : 'B' };
47+
}
48+
return t;
49+
})
50+
: undefined;
4651
}
4752
}
4853

@@ -168,6 +173,7 @@ class QuicController extends EventEmitter {
168173

169174
// Return Promise
170175
terminate(sessionId, direction, reason) {
176+
console.trace();
171177
log.debug(`terminate, sessionId: ${sessionId} direction: ${direction}, ${reason}`);
172178

173179
if (!this.operations.has(sessionId)) {

source/agent/connections.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,11 @@ module.exports = function Connections () {
124124
if (!dest) {
125125
return Promise.reject({ type : 'failed', reason : 'Destination connection(' + name + ') is not ready' });
126126
}
127-
connections[from].connection.addDestination(name, dest);
127+
let isNanObj=false;
128+
if (dest.constructor.name === 'QuicTransportStream'){
129+
isNanObj=true;
130+
}
131+
connections[from].connection.addDestination(name, dest, isNanObj);
128132
connections[connectionId][name + 'From'] = from;
129133
}
130134
}

source/agent/internalConnectionRouter.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,11 @@ class StreamSource {
107107
addDestination(track, sink) {
108108
if (!this.dests[track].has(sink.id)) {
109109
this.dests[track].set(sink.id, sink);
110-
this.conn.addDestination(track, sink.conn);
110+
let isNanObject = false;
111+
if (sink instanceof QuicTransportStreamPipeline) {
112+
isNanObject = true;
113+
}
114+
this.conn.addDestination(track, sink.conn, isNanObject);
111115
sink._addSource(track, this);
112116
}
113117
}

0 commit comments

Comments
 (0)