diff --git a/Cargo.lock b/Cargo.lock index ef27c5dc..807401f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2181,6 +2181,7 @@ version = "0.1.0" dependencies = [ "boxfnonce 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "byte-slice-cast 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "euclid 0.20.0 (registry+https://github.com/rust-lang/crates.io-index)", "glib 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)", "glib-sys 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "gstreamer 0.15.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2258,7 +2259,9 @@ dependencies = [ name = "servo-media-streams" version = "0.1.0" dependencies = [ + "euclid 0.20.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2708,6 +2711,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/backends/dummy/lib.rs b/backends/dummy/lib.rs index 35d0a30e..bb75f2a4 100644 --- a/backends/dummy/lib.rs +++ b/backends/dummy/lib.rs @@ -21,7 +21,7 @@ use servo_media_player::{audio, video, Player, PlayerError, PlayerEvent, StreamT use servo_media_streams::capture::MediaTrackConstraintSet; use servo_media_streams::device_monitor::{MediaDeviceInfo, MediaDeviceMonitor}; use servo_media_streams::registry::{register_stream, unregister_stream, MediaStreamId}; -use servo_media_streams::{MediaOutput, MediaSocket, MediaStream, MediaStreamType}; +use servo_media_streams::{MediaOutput, MediaSocket, MediaSource, MediaStream, MediaStreamType}; use servo_media_traits::{ClientContextId, MediaInstance}; use servo_media_webrtc::{ thread, BundlePolicy, DataChannelId, DataChannelInit, DataChannelMessage, IceCandidate, @@ -74,7 +74,11 @@ impl Backend for DummyBackend { (Box::new(DummySocket), id) } - fn create_videoinput_stream(&self, _: MediaTrackConstraintSet) -> Option { + fn create_videoinput_stream( + &self, + _: MediaTrackConstraintSet, + _: MediaSource, + ) -> Option { Some(register_stream(Arc::new(Mutex::new(DummyMediaStream { id: MediaStreamId::new(), })))) @@ -118,6 +122,8 @@ impl Backend for DummyBackend { fn get_device_monitor(&self) -> Box { Box::new(DummyMediaDeviceMonitor {}) } + + fn push_stream_data(&self, _: &MediaStreamId, _: Vec) {} } impl AudioBackend for DummyBackend { @@ -242,6 +248,8 @@ impl MediaStream for DummyMediaStream { fn ty(&self) -> MediaStreamType { MediaStreamType::Audio } + + fn push_data(&self, _: Vec) {} } impl Drop for DummyMediaStream { diff --git a/backends/gstreamer/Cargo.toml b/backends/gstreamer/Cargo.toml index 21c96723..eff77d91 100644 --- a/backends/gstreamer/Cargo.toml +++ b/backends/gstreamer/Cargo.toml @@ -14,6 +14,7 @@ path = "lib.rs" [dependencies] boxfnonce = "0.1.0" +euclid = "0.20" mime = "0.3.13" log = "0.4" diff --git a/backends/gstreamer/lib.rs b/backends/gstreamer/lib.rs index 44be69a8..77b3aecf 100644 --- a/backends/gstreamer/lib.rs +++ b/backends/gstreamer/lib.rs @@ -1,6 +1,7 @@ #![feature(nll)] extern crate boxfnonce; extern crate byte_slice_cast; +extern crate euclid; extern crate mime; extern crate glib_sys as glib_ffi; @@ -64,7 +65,7 @@ use servo_media_player::{Player, PlayerEvent, StreamType}; use servo_media_streams::capture::MediaTrackConstraintSet; use servo_media_streams::device_monitor::MediaDeviceMonitor; use servo_media_streams::registry::MediaStreamId; -use servo_media_streams::{MediaOutput, MediaSocket, MediaStreamType}; +use servo_media_streams::{MediaOutput, MediaSocket, MediaSource, MediaStreamType}; use servo_media_traits::{BackendMsg, ClientContextId, MediaInstance}; use servo_media_webrtc::{WebRtcBackend, WebRtcController, WebRtcSignaller}; use std::collections::HashMap; @@ -245,12 +246,20 @@ impl Backend for GStreamerBackend { media_capture::create_audioinput_stream(set) } - fn create_videoinput_stream(&self, set: MediaTrackConstraintSet) -> Option { + fn create_videoinput_stream( + &self, + set: MediaTrackConstraintSet, + source: MediaSource, + ) -> Option { if self.capture_mocking.load(Ordering::Acquire) { // XXXManishearth we should caps filter this return Some(self.create_videostream()); } - media_capture::create_videoinput_stream(set) + media_capture::create_videoinput_stream(set, source) + } + + fn push_stream_data(&self, stream: &MediaStreamId, data: Vec) { + GStreamerMediaStream::push_data(stream, data); } fn can_play_type(&self, media_type: &str) -> SupportsMediaType { diff --git a/backends/gstreamer/media_capture.rs b/backends/gstreamer/media_capture.rs index 1ef476ad..050de058 100644 --- a/backends/gstreamer/media_capture.rs +++ b/backends/gstreamer/media_capture.rs @@ -1,9 +1,10 @@ use crate::media_stream::GStreamerMediaStream; + use gst; use gst::prelude::*; use servo_media_streams::capture::*; use servo_media_streams::registry::MediaStreamId; -use servo_media_streams::MediaStreamType; +use servo_media_streams::{MediaSource, MediaStreamType}; use std::i32; trait AddToCaps { @@ -150,23 +151,34 @@ pub struct GstMediaTrack { fn create_input_stream( stream_type: MediaStreamType, constraint_set: MediaTrackConstraintSet, + source: MediaSource, ) -> Option { let devices = GstMediaDevices::new(); devices .get_track(stream_type == MediaStreamType::Video, constraint_set) - .map(|track| { - let f = match stream_type { - MediaStreamType::Audio => GStreamerMediaStream::create_audio_from, - MediaStreamType::Video => GStreamerMediaStream::create_video_from, - }; - f(track.element) + .map(|track| match stream_type { + MediaStreamType::Audio => GStreamerMediaStream::create_audio_from(match source { + MediaSource::Device => track.element, + MediaSource::App(_) => unimplemented!(), + }), + MediaStreamType::Video => match source { + MediaSource::Device => GStreamerMediaStream::create_video_from(track.element, None), + MediaSource::App(size) => { + let appsrc = + gst::ElementFactory::make("appsrc", None).expect("appsrc creation failed"); + GStreamerMediaStream::create_video_from(appsrc, Some(size)) + } + }, }) } pub fn create_audioinput_stream(constraint_set: MediaTrackConstraintSet) -> Option { - create_input_stream(MediaStreamType::Audio, constraint_set) + create_input_stream(MediaStreamType::Audio, constraint_set, MediaSource::Device) } -pub fn create_videoinput_stream(constraint_set: MediaTrackConstraintSet) -> Option { - create_input_stream(MediaStreamType::Video, constraint_set) +pub fn create_videoinput_stream( + constraint_set: MediaTrackConstraintSet, + source: MediaSource, +) -> Option { + create_input_stream(MediaStreamType::Video, constraint_set, source) } diff --git a/backends/gstreamer/media_stream.rs b/backends/gstreamer/media_stream.rs index 6215d34c..cde23d7e 100644 --- a/backends/gstreamer/media_stream.rs +++ b/backends/gstreamer/media_stream.rs @@ -1,7 +1,10 @@ use super::BACKEND_BASE_TIME; + +use euclid::default::Size2D; use glib::prelude::*; use gst; use gst::prelude::*; +use gst_app::AppSrc; use servo_media_streams::registry::{ get_stream, register_stream, unregister_stream, MediaStreamId, }; @@ -29,6 +32,7 @@ pub struct GStreamerMediaStream { type_: MediaStreamType, elements: Vec, pipeline: Option, + video_app_source: Option, } impl MediaStream for GStreamerMediaStream { @@ -47,6 +51,15 @@ impl MediaStream for GStreamerMediaStream { fn ty(&self) -> MediaStreamType { self.type_ } + + fn push_data(&self, data: Vec) { + if let Some(ref appsrc) = self.video_app_source { + let buffer = gst::Buffer::from_slice(data); + if let Err(error) = appsrc.push_buffer(buffer) { + warn!("{}", error); + } + } + } } impl GStreamerMediaStream { @@ -56,6 +69,7 @@ impl GStreamerMediaStream { type_, elements, pipeline: None, + video_app_source: None, } } @@ -91,6 +105,10 @@ impl GStreamerMediaStream { self.elements.last().unwrap().clone() } + pub fn first_element(&self) -> gst::Element { + self.elements.first().unwrap().clone() + } + pub fn attach_to_pipeline(&mut self, pipeline: &gst::Pipeline) { assert!(self.pipeline.is_none()); let elements: Vec<_> = self.elements.iter().collect(); @@ -123,7 +141,7 @@ impl GStreamerMediaStream { .set_property("is-live", &true) .expect("videotestsrc doesn't have expected 'is-live' property"); - Self::create_video_from(videotestsrc) + Self::create_video_from(videotestsrc, None) } /// Attaches encoding adapters to the stream, returning the source element @@ -174,14 +192,74 @@ impl GStreamerMediaStream { } } - pub fn create_video_from(source: gst::Element) -> MediaStreamId { + pub fn set_video_app_source(&mut self, source: &AppSrc) { + self.video_app_source = Some(source.clone()); + } + + pub fn create_video_from(source: gst::Element, size: Option>) -> MediaStreamId { + let src = gst::ElementFactory::make("proxysrc", None).unwrap(); let videoconvert = gst::ElementFactory::make("videoconvert", None).unwrap(); let queue = gst::ElementFactory::make("queue", None).unwrap(); - - register_stream(Arc::new(Mutex::new(GStreamerMediaStream::new( + let stream = Arc::new(Mutex::new(GStreamerMediaStream::new( MediaStreamType::Video, - vec![source, videoconvert, queue], - )))) + vec![src, videoconvert, queue], + ))); + + let pipeline = gst::Pipeline::new(Some("video pipeline")); + let clock = gst::SystemClock::obtain(); + pipeline.set_start_time(gst::ClockTime::none()); + pipeline.set_base_time(*BACKEND_BASE_TIME); + pipeline.use_clock(Some(&clock)); + + let decodebin = gst::ElementFactory::make("decodebin", None).unwrap(); + + let stream_ = stream.clone(); + let video_pipeline = pipeline.clone(); + decodebin.connect_pad_added(move |decodebin, _| { + // Append a proxysink to the video pipeline. + let proxy_sink = gst::ElementFactory::make("proxysink", None).unwrap(); + video_pipeline.add(&proxy_sink).unwrap(); + gst::Element::link_many(&[decodebin, &proxy_sink]).unwrap(); + + // And connect the video and media stream pipelines. + let stream = stream_.lock().unwrap(); + let first_element = stream.first_element(); + first_element + .set_property("proxysink", &proxy_sink) + .unwrap(); + + proxy_sink.sync_state_with_parent().unwrap(); + decodebin.sync_state_with_parent().unwrap(); + }); + + if let Some(size) = size { + let caps = gst::Caps::builder("video/x-raw") + .field("format", &gst_video::VideoFormat::Bgra.to_string()) + .field("pixel-aspect-ratio", &gst::Fraction::from((1, 1))) + .field("width", &(size.width as i32)) + .field("height", &(size.height as i32)) + .build(); + source + .set_property("caps", &caps) + .expect("source doesn't have expected 'caps' property"); + } + + if let Some(appsrc) = source.downcast_ref::() { + appsrc.set_property_format(gst::Format::Time); + stream.lock().unwrap().set_video_app_source(appsrc); + } + + pipeline.add_many(&[&source, &decodebin]).unwrap(); + gst::Element::link_many(&[&source, &decodebin]).unwrap(); + + pipeline.set_state(gst::State::Playing).unwrap(); + + #[cfg(debug_assertions)] + pipeline + .upcast::() + .debug_to_dot_file(gst::DebugGraphDetails::all(), "VideoPipeline_PLAYING"); + + register_stream(stream) } pub fn create_audio() -> MediaStreamId { @@ -212,11 +290,21 @@ impl GStreamerMediaStream { proxy_src.set_property("proxysink", &proxy_sink).unwrap(); let stream = match ty { MediaStreamType::Audio => Self::create_audio_from(proxy_src), - MediaStreamType::Video => Self::create_video_from(proxy_src), + MediaStreamType::Video => Self::create_video_from(proxy_src, None), }; (stream, GstreamerMediaSocket { proxy_sink }) } + + pub fn push_data(stream: &MediaStreamId, data: Vec) { + let stream = get_stream(stream).expect("Media streams registry does not contain such ID"); + let mut stream = stream.lock().unwrap(); + let stream = stream + .as_mut_any() + .downcast_mut::() + .unwrap(); + stream.push_data(data); + } } impl Drop for GStreamerMediaStream { diff --git a/backends/gstreamer/media_stream_source.rs b/backends/gstreamer/media_stream_source.rs index 8dc49176..629c891d 100644 --- a/backends/gstreamer/media_stream_source.rs +++ b/backends/gstreamer/media_stream_source.rs @@ -65,7 +65,7 @@ mod imp { // Append a proxysink to the media stream pipeline. let pipeline = stream.pipeline_or_new(); - let last_element = stream.src_element(); + let last_element = stream.encoded(); let sink = gst::ElementFactory::make("proxysink", None).unwrap(); pipeline.add(&sink).unwrap(); gst::Element::link_many(&[&last_element, &sink][..]).unwrap(); @@ -77,6 +77,11 @@ mod imp { sink.sync_state_with_parent().unwrap(); pipeline.set_state(gst::State::Playing).unwrap(); + + #[cfg(debug_assertions)] + pipeline + .upcast::() + .debug_to_dot_file(gst::DebugGraphDetails::all(), "ServoMediaStreamSrc_PLAYING"); } fn setup_proxy_src( diff --git a/backends/gstreamer/webrtc.rs b/backends/gstreamer/webrtc.rs index fe393bc5..6689c219 100644 --- a/backends/gstreamer/webrtc.rs +++ b/backends/gstreamer/webrtc.rs @@ -744,7 +744,7 @@ fn on_incoming_decodebin_stream( let (stream, ty) = if name == "video" { ( - GStreamerMediaStream::create_video_from(proxy_src), + GStreamerMediaStream::create_video_from(proxy_src, None), MediaStreamType::Video, ) } else { diff --git a/examples/simple_webrtc.rs b/examples/simple_webrtc.rs index c24e1342..b906b5b4 100644 --- a/examples/simple_webrtc.rs +++ b/examples/simple_webrtc.rs @@ -132,7 +132,7 @@ impl State { let (video, audio) = if !self.peer_id.is_some() { ( self.media - .create_videoinput_stream(Default::default()) + .create_videoinput_stream(Default::default(), MediaSource::Device) .unwrap_or_else(|| self.media.create_videostream()), self.media .create_audioinput_stream(Default::default()) diff --git a/examples/videoinput_stream.rs b/examples/videoinput_stream.rs index 7bf25c0b..50313778 100644 --- a/examples/videoinput_stream.rs +++ b/examples/videoinput_stream.rs @@ -1,12 +1,15 @@ extern crate servo_media; extern crate servo_media_auto; +use servo_media::streams::MediaSource; use servo_media::ServoMedia; use std::sync::Arc; use std::{thread, time}; fn run_example(servo_media: Arc) { - if let Some(stream) = servo_media.create_videoinput_stream(Default::default()) { + if let Some(stream) = + servo_media.create_videoinput_stream(Default::default(), MediaSource::Device) + { let mut output = servo_media.create_stream_output(); output.add_stream(&stream); thread::sleep(time::Duration::from_millis(6000)); diff --git a/servo-media/lib.rs b/servo-media/lib.rs index 88cb35b2..01508621 100644 --- a/servo-media/lib.rs +++ b/servo-media/lib.rs @@ -18,7 +18,7 @@ use player::{Player, PlayerEvent, StreamType}; use streams::capture::MediaTrackConstraintSet; use streams::device_monitor::MediaDeviceMonitor; use streams::registry::MediaStreamId; -use streams::{MediaOutput, MediaSocket, MediaStreamType}; +use streams::{MediaOutput, MediaSocket, MediaSource, MediaStreamType}; use webrtc::{WebRtcController, WebRtcSignaller}; pub struct ServoMedia(Box); @@ -48,7 +48,12 @@ pub trait Backend: Send + Sync { ty: MediaStreamType, ) -> (Box, MediaStreamId); fn create_audioinput_stream(&self, set: MediaTrackConstraintSet) -> Option; - fn create_videoinput_stream(&self, set: MediaTrackConstraintSet) -> Option; + fn create_videoinput_stream( + &self, + set: MediaTrackConstraintSet, + source: MediaSource, + ) -> Option; + fn push_stream_data(&self, stream: &MediaStreamId, data: Vec); fn create_audio_context( &self, id: &ClientContextId, @@ -76,7 +81,6 @@ pub trait Backend: Send + Sync { /// and the media instances created for these contexts. /// The client context identifier is currently an abstraction of Servo's PipelineId. fn resume(&self, _id: &ClientContextId) {} - fn get_device_monitor(&self) -> Box; } diff --git a/streams/Cargo.toml b/streams/Cargo.toml index 62c661c3..0f3b8a2a 100644 --- a/streams/Cargo.toml +++ b/streams/Cargo.toml @@ -9,5 +9,7 @@ name = "servo_media_streams" path = "lib.rs" [dependencies] +euclid = "0.20" lazy_static = "1.0" -uuid = { version = "0.8", features = ["v4"] } +serde = "1.0.66" +uuid = { version = "0.8", features = ["v4", "serde"] } diff --git a/streams/lib.rs b/streams/lib.rs index ba192984..aeae7d81 100644 --- a/streams/lib.rs +++ b/streams/lib.rs @@ -1,10 +1,14 @@ +extern crate euclid; #[macro_use] extern crate lazy_static; +#[macro_use] +extern crate serde; pub mod capture; pub mod device_monitor; pub mod registry; +use euclid::default::Size2D; use std::any::Any; pub use registry::*; @@ -14,6 +18,7 @@ pub trait MediaStream: Any + Send { fn as_mut_any(&mut self) -> &mut dyn Any; fn set_id(&mut self, id: registry::MediaStreamId); fn ty(&self) -> MediaStreamType; + fn push_data(&self, data: Vec); } /// A MediaSocket is a way for a backend to represent a @@ -22,6 +27,16 @@ pub trait MediaSocket: Any + Send { fn as_any(&self) -> &dyn Any; } +/// Determines the source of the media stream. +pub enum MediaSource { + // The media stream source is a capture device. + // i.e. getUserMedia + Device, + // The media stream source is the client application. + // i.e. captureStream + App(Size2D), +} + /// This isn't part of the webrtc spec; it's a leaky abstaction while media streams /// are under development and example consumers need to be able to inspect them. pub trait MediaOutput: Send { diff --git a/streams/registry.rs b/streams/registry.rs index 1215e222..42bb0324 100644 --- a/streams/registry.rs +++ b/streams/registry.rs @@ -8,7 +8,7 @@ lazy_static! { Mutex::new(HashMap::new()); } -#[derive(Clone, Copy, Hash, Eq, PartialEq)] +#[derive(Clone, Copy, Debug, Deserialize, Hash, Eq, PartialEq, Serialize)] pub struct MediaStreamId(Uuid); impl MediaStreamId { pub fn new() -> MediaStreamId {