diff --git a/docker/Dockerfile b/docker/Dockerfile index 272e576..c1a00ee 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -85,9 +85,8 @@ RUN git clone -b 0.x https://github.com/meetecho/janus-gateway.git && \ cd / && rm -rf janus-gateway -RUN git clone -b master https://github.com/networked-aframe/janus-plugin-sfu.git && \ +RUN git clone -b reenable-subscribe https://github.com/networked-aframe/janus-plugin-sfu.git && \ cd janus-plugin-sfu && \ - git checkout 1914dfa7e22c793f4a684ebeb002304661270519 && \ echo version 2 increment this line to invalidate cache of this layer while iterating build during development && \ cargo build --release && \ mkdir -p /usr/lib/janus/plugins && \ diff --git a/docs/api.md b/docs/api.md index 6af7be1..0e43135 100644 --- a/docs/api.md +++ b/docs/api.md @@ -12,8 +12,7 @@ expect consumers of this plugin to use WebSockets, but you can probably use what 1. Signal your attachment to the Janus plugin. See the [Janus documentation][janus-transports] on how to attach to a plugin. This plugin's name is `janus.plugin.sfu`. -2. Determine your user ID. This should be a unique ID that nobody else is likely to share. In the future, we will actually - have authentication; as it stands just pick a big random ID and pray for no collisions. +2. Determine your user ID. This should be a unique ID that nobody else is likely to share. Pick a big random ID and pray for no collisions. 3. Create an RTC connection. @@ -55,13 +54,48 @@ join a room. You can only join one room with any connection. "kind": "join", "room_id": room ID, "user_id": user ID, - "subscribe": [none|subscription object] + "subscribe": { + "notifications": [none|boolean], + "data": [none|boolean], + "media": [none|user ID] + }, + "token": [none|string] } ``` -If `subscription: {...}` is passed, you will synchronously configure an initial subscription to the traffic that you -want to get pushed through your connection. The format of the subscription should be identical to that in the -[subscribe](#subscribe) message, below. +If `notifications` is `true`, you will get websocket events corresponding to every time someone joins or leaves the server, someone blocked or unblocked you. + +If `data` is `true`, you will get all data traffic from other users (publishers) in your room. You can also send data. Currently this flag is used to register the +connection as a publisher, if false or not defined the connection is registered as a subscriber. + +If `media` is a user ID, the server will respond with a JSEP offer which you can use to establish a connection suitable to receive audio and video RTP data coming from that user ID. + +Although `subscribe: {...}` can be omitted and is valid, it doesn't +make much sense to register a connection that does nothing. + +This is usually used as follow for a publisher connection: + + "subscribe": { + "notifications": true, + "data": true + } + +to send data, receive data, and subscribe to notifications in the currently-joined room. + +And for a subscriber connection: + + "subscribe": { + "media": user ID + } + +`token` is a JWT to verify you allowed to connect to the room `room_id`, this is verified if you specified an `auth_key` to a public RSA key in DER format in `janus.plugin.sfu.cfg`. The JWT contains the following claims: + +``` +{kick_users: [true|false], join_hub: true, room_ids: ["room_alpha"]} +``` + +`room_ids` is optional. If `room_ids` is not specified, you can connect to the room `room_id` if `join_hub` is `true`. +If `room_ids` is specified, `room_id` needs to be listed in `room_ids` to be able to connect to the room. The response will return the users on the server in the room you joined, as below, including yourself. If you `subscribe`d to a user's media, you will also get a JSEP offer you can use to get that user's RTP traffic. @@ -69,30 +103,53 @@ The response will return the users on the server in the room you joined, as belo { "success": true, "response": { - "users": {room_alpha: ["123", "789"]} + "users": {"room_alpha": ["123", "789"]} } } ``` ### Subscribe -Subscribes to some kind of traffic coming from the server. +Subscribes to audio/video of a user (publisher) if you know the user ID. +This message is only useful if you're using an external user presence system +to know the participants currently in the room. +With this plugin alone, you can't know a publisher user ID without +being a publisher in the room to get the users in the room, and in this case +you use the Join message to subscribe to other users, not this message. ``` { "kind": "subscribe", - "notifications": [none|boolean], - "data": [none|boolean], - "media": [none|user ID] + "what": { + "notifications": [none|boolean], + "data": [none|boolean], + "media": [none|user ID] + }, + "token": [none|string] } ``` -If `notifications` is `true`, you will get websocket events corresponding to every time someone joins or leaves the server. - -If `data` is `true`, you will get all data traffic from other users in your room, if you've joined a room. +`notifications` and `data` are completely ignored here. Those flags are only +checked for a publisher connection with the Join message. If `media` is a user ID, the server will respond with a JSEP offer which you can use to establish a connection suitable to receive audio and video RTP data coming from that user ID. +If `token` is specified, `room_ids` claim is present in the JWT and `auth_key` configured, the following security check is done: +the user is allowed to subscribe to a publisher user ID (specified in `media`) if this publisher is connected to a room listed in the JWT `room_ids`. + +### Kick + +Kick another user from the room. You need a token with the `kick_users: true` claim. + +``` +{ + "kind": "kick", + "room_id": room ID, + "user_id": user ID, + "token": string +} +``` + ### Block Blocks another user. Blocks are bidirectional; the targeted user won't get your data, audio, or video, and you won't get @@ -101,7 +158,7 @@ theirs. That user will get a `blocked` event letting them know. ``` { "kind": "block", - "whom": [user ID] + "whom": user ID } ``` @@ -114,7 +171,7 @@ Unblock a user who you previously blocked. That user will get an `unblocked` eve ``` { "kind": "block", - "whom": [user ID] + "whom": user ID } ``` diff --git a/src/lib.rs b/src/lib.rs index 43a434d..36183f8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -568,27 +568,62 @@ fn process_unblock(from: &Arc, whom: UserId) -> MessageResult { } } -// fn process_subscribe(from: &Arc, what: &Subscription) -> MessageResult { -// janus_info!("Processing subscription from {:p}: {:?}", from.handle, what); -// if let Err(_existing) = from.subscription.set(what.clone()) { -// return Err(From::from("Users may only subscribe once!")); -// } - -// let mut switchboard = SWITCHBOARD.write()?; -// if let Some(ref publisher_id) = what.media { -// let publisher = switchboard -// .get_publisher(publisher_id) -// .ok_or("Can't subscribe to a nonexistent publisher.")? -// .clone(); -// let jsep = json!({ -// "type": "offer", -// "sdp": publisher.subscriber_offer.lock().unwrap().as_ref().unwrap() -// }); -// switchboard.subscribe_to_user(from.clone(), publisher); -// return Ok(MessageResponse::new(json!({}), jsep)); -// } -// Ok(MessageResponse::msg(json!({}))) -// } +fn process_subscribe(from: &Arc, what: Subscription, token: Option) -> MessageResult { + janus_info!("Processing subscription from {:p}: {:?}", from.handle, what); + if let Err(_existing) = from.subscription.set(what.clone()) { + return Err(From::from("Users may only subscribe once!")); + } + + let mut switchboard = SWITCHBOARD.write()?; + if let Some(ref publisher_id) = what.media { + let publisher = switchboard + .get_publisher(publisher_id) + .ok_or("Can't subscribe to a nonexistent publisher.")? + .clone(); + + let config = CONFIG.get().unwrap(); + match (&config.auth_key, token) { + (None, _) => { + janus_verb!( + "No auth_key configured. Allowing subscription from {:p} to publisher {}.", + from.handle, + publisher_id + ); + } + (Some(_), None) => { + janus_warn!("Rejecting anonymous subscription from {:p} to publisher {}.", from.handle, publisher_id); + return Err(From::from("Rejecting anonymous subscription!")); + } + (Some(key), Some(ref token)) => match ValidatedToken::from_str(token, key) { + Ok(ref claims) => { + if let Some(joined) = publisher.join_state.get() { + if claims.may_join(&joined.room_id) { + janus_verb!("Allowing subscription from {:p} to publisher {}.", from.handle, publisher_id); + } else { + janus_warn!("Rejecting subscription from {:p} to publisher {}.", from.handle, publisher_id); + return Err(From::from("Rejecting subscription without permission!")); + } + } else { + janus_warn!("Cannot subscribe from {:p} to a publisher {} not in a room.", from.handle, publisher_id); + return Err(From::from("Cannot subscribe to a publisher not in a room.")); + } + } + Err(e) => { + janus_warn!("Rejecting subscription from {:p} to publisher {}. Error: {}", from.handle, publisher_id, e); + return Err(From::from("Rejecting subscription with invalid token!")); + } + }, + } + + let jsep = json!({ + "type": "offer", + "sdp": publisher.subscriber_offer.lock().unwrap().as_ref().unwrap() + }); + switchboard.subscribe_to_user(from.clone(), publisher); + return Ok(MessageResponse::new(json!({}), jsep)); + } + Ok(MessageResponse::msg(json!({}))) +} fn process_data(from: &Arc, whom: Option, body: &str) -> MessageResult { janus_huge!("Processing data message from {:p}: {:?}", from.handle, body); @@ -616,9 +651,7 @@ fn process_message(from: &Arc, msg: MessageKind) -> MessageResult { token, } => process_join(from, room_id, user_id, subscribe, token), MessageKind::Kick { room_id, user_id, token } => process_kick(from, room_id, user_id, token), - // process_subscribe doesn't check the JWT, we need to change the API to add room_id and token, - // comment it for now. - // MessageKind::Subscribe { what } => process_subscribe(from, &what), + MessageKind::Subscribe { what, token } => process_subscribe(from, what, token), MessageKind::Block { whom } => process_block(from, whom), MessageKind::Unblock { whom } => process_unblock(from, whom), MessageKind::Data { whom, body } => process_data(from, whom, &body), diff --git a/src/messages.rs b/src/messages.rs index 632fba0..6d93763 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -71,7 +71,7 @@ pub enum MessageKind { Kick { room_id: RoomId, user_id: UserId, token: String }, /// Indicates that a client wishes to subscribe to traffic described by the given subscription specification. - // Subscribe { what: Subscription }, + Subscribe { what: Subscription, token: Option }, /// Indicates that a given user should be blocked from receiving your traffic, and that you should not /// receive their traffic (superseding any subscriptions you have.) @@ -88,7 +88,7 @@ pub enum MessageKind { #[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize)] #[serde(default)] pub struct Subscription { - /// Whether to subscribe to server-wide notifications (e.g. user joins and leaves, room creates and destroys). + /// Whether to subscribe to server-wide notifications (e.g. user joins and leaves, someone blocked/unblocked you). pub notifications: bool, /// Whether to subscribe to data in the currently-joined room. @@ -163,20 +163,21 @@ mod tests { ); } - // #[test] - // fn parse_subscribe() { - // let json = r#"{"kind": "subscribe", "what": {"notifications": false, "data": true, "media": "steve"}}"#; - // let result: MessageKind = serde_json::from_str(json).unwrap(); - // assert_eq!( - // result, - // MessageKind::Subscribe { - // what: Subscription { - // notifications: false, - // data: true, - // media: Some("steve".into()) - // } - // } - // ); - // } + #[test] + fn parse_subscribe() { + let json = r#"{"kind": "subscribe", "what": {"notifications": false, "data": true, "media": "steve"}}"#; + let result: MessageKind = serde_json::from_str(json).unwrap(); + assert_eq!( + result, + MessageKind::Subscribe { + what: Subscription { + notifications: false, + data: true, + media: Some("steve".into()) + }, + token: None + } + ); + } } }