diff --git a/Cargo.lock b/Cargo.lock index 952831a4..4d127fe0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,7 +26,7 @@ dependencies = [ "cfg-if", "once_cell", "version_check", - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -759,6 +759,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "event-listener" version = "5.4.0" @@ -938,10 +948,25 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.3.0-rc.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a78f88e84d239c7f2619ae8b091603c26208e1cb322571f5a29d6806f56ee5e" +dependencies = [ + "cfg-if", + "js-sys", + "libc", + "rustix", + "wasi 0.13.3+wasi-0.2.2", + "wasm-bindgen", + "windows-targets", +] + [[package]] name = "gimli" version = "0.31.1" @@ -1277,8 +1302,8 @@ dependencies = [ "num_enum", "once_cell", "pnet", - "rand", - "rand_core", + "rand 0.8.5", + "rand_core 0.6.4", "test-case", "thiserror 2.0.11", "tokio", @@ -1296,6 +1321,17 @@ dependencies = [ "lightway-core", ] +[[package]] +name = "lightway-raptor" +version = "0.1.0" +dependencies = [ + "anyhow", + "rand 0.9.0-beta.3", + "raptorq", + "time", + "tokio", +] + [[package]] name = "lightway-server" version = "0.1.0" @@ -1320,7 +1356,7 @@ dependencies = [ "pnet", "ppp", "pwhash", - "rand", + "rand 0.8.5", "serde", "serde_json", "socket2", @@ -1342,6 +1378,12 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "linux-raw-sys" +version = "0.4.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" + [[package]] name = "log" version = "0.4.25" @@ -1399,7 +1441,7 @@ dependencies = [ "ordered-float", "quanta", "radix_trie", - "rand", + "rand 0.8.5", "rand_xoshiro", "sketches-ddsketch", ] @@ -1426,7 +1468,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] @@ -1738,7 +1780,7 @@ version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" dependencies = [ - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -1791,7 +1833,7 @@ dependencies = [ "byteorder", "hmac", "md-5", - "rand", + "rand 0.8.5", "sha-1", "sha2", ] @@ -1806,7 +1848,7 @@ dependencies = [ "libc", "once_cell", "raw-cpuid", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "web-sys", "winapi", ] @@ -1837,8 +1879,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.0-beta.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fccbfebb3972a41a31c605a59207d9fba5489b9a87d9d87024cb6df73a32ec7" +dependencies = [ + "rand_chacha 0.9.0-beta.1", + "rand_core 0.9.0-beta.1", + "zerocopy 0.8.14", ] [[package]] @@ -1848,7 +1901,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0-beta.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f16da77124f4ee9fabd55ce6540866e9101431863b4876de58b68797f331adf2" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.0-beta.1", ] [[package]] @@ -1857,7 +1920,17 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom 0.2.15", +] + +[[package]] +name = "rand_core" +version = "0.9.0-beta.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a98fa0b8309344136abe6244130311e76997e546f76fae8054422a7539b43df7" +dependencies = [ + "getrandom 0.3.0-rc.0", + "zerocopy 0.8.14", ] [[package]] @@ -1866,7 +1939,16 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" dependencies = [ - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "raptorq" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90b1b1fad69672f0b901b5004863ea4307f03d168a3db5f2bcba4d3dfed88e97" +dependencies = [ + "serde", ] [[package]] @@ -1915,7 +1997,7 @@ checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" dependencies = [ "cc", "cfg-if", - "getrandom", + "getrandom 0.2.15", "libc", "spin", "untrusted", @@ -1934,6 +2016,19 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7fb8039b3032c191086b10f11f319a6e99e1e82889c5cc6046f515c9db1d497" +[[package]] +name = "rustix" +version = "0.38.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.59.0", +] + [[package]] name = "rustversion" version = "1.0.19" @@ -2460,9 +2555,9 @@ dependencies = [ [[package]] name = "tun" -version = "0.7.10" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b5ea2466ffcdd0be0831f7d3981daa0b953586c0062f6d33398cb374689b090" +checksum = "0df2e279123d6a96b1611b1d2bc126323900f970819fea3c5c2ed0c657fa2132" dependencies = [ "bytes", "cfg-if", @@ -2549,6 +2644,15 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.13.3+wasi-0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "wasm-bindgen" version = "0.2.100" @@ -2765,6 +2869,15 @@ dependencies = [ "winreg", ] +[[package]] +name = "wit-bindgen-rt" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c" +dependencies = [ + "bitflags", +] + [[package]] name = "wolfssl" version = "3.0.0" @@ -2802,7 +2915,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ "byteorder", - "zerocopy-derive", + "zerocopy-derive 0.7.35", +] + +[[package]] +name = "zerocopy" +version = "0.8.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a367f292d93d4eab890745e75a778da40909cab4d6ff8173693812f79c4a2468" +dependencies = [ + "zerocopy-derive 0.8.14", ] [[package]] @@ -2815,3 +2937,14 @@ dependencies = [ "quote", "syn 2.0.96", ] + +[[package]] +name = "zerocopy-derive" +version = "0.8.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3931cb58c62c13adec22e38686b559c86a30565e16ad6e8510a337cedc611e1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", +] diff --git a/Cargo.toml b/Cargo.toml index 6afc7e2d..67d31e5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "lightway-app-utils", "lightway-client", "lightway-server", + "lightway-raptor", ] resolver = "2" diff --git a/Earthfile b/Earthfile index 91509c17..69938a09 100644 --- a/Earthfile +++ b/Earthfile @@ -40,7 +40,7 @@ source: FROM +install-build-dependencies COPY --keep-ts Cargo.toml Cargo.lock ./ COPY --keep-ts deny.toml ./ - COPY --keep-ts --dir lightway-core lightway-app-utils lightway-client lightway-server .cargo ./ + COPY --keep-ts --dir lightway-core lightway-app-utils lightway-client lightway-server lightway-raptor .cargo ./ # build builds with the Cargo release profile build: diff --git a/lightway-raptor/Cargo.toml b/lightway-raptor/Cargo.toml new file mode 100644 index 00000000..6febed00 --- /dev/null +++ b/lightway-raptor/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "lightway-raptor" +version = "0.1.0" +edition = "2021" + +[dependencies] +raptorq = { version = "2.0.0", features = ["serde", "serde_support"] } +rand = "0.9.0-beta.0" +anyhow = "1.0.94" +tokio = "1.42.0" +time = "0.3.37" + +[lints] +workspace = true diff --git a/lightway-raptor/examples/basic_usage.rs b/lightway-raptor/examples/basic_usage.rs new file mode 100644 index 00000000..39f3e38e --- /dev/null +++ b/lightway-raptor/examples/basic_usage.rs @@ -0,0 +1,59 @@ +use lightway_raptor::LightwayDataFrame; +use rand::seq::SliceRandom; +use raptorq::{Decoder, Encoder, EncodingPacket}; + +fn main() -> Result<(), Box> { + // Create a LightwayDataFrame and add packets. + let mut frame = LightwayDataFrame::new(Vec::new()); + frame.add_packet(vec![1, 2, 3]); + frame.add_packet(vec![4, 5, 6, 7]); + + // Serialize the frame. + let serialized = frame.serialize()?; + println!("Serialized Frame: {:?}", serialized); + + // Deserialize the frame. + let deserialized_frame = LightwayDataFrame::deserialize(&serialized)?; + println!("Deserialized Frame: {:?}", deserialized_frame); + + // Retrieve and print individual packets. + if let Some(packet) = deserialized_frame.get_packet(0) { + println!("First Packet: {:?}", packet); + } + + // Create an encooder + let encoder = Encoder::with_defaults(&serialized, 10); + + // Perform the encoding, and serialize to Vec for transmission + let mut packets: Vec> = encoder + .get_encoded_packets(15) + .iter() + .map(|packet| packet.serialize()) + .collect(); + + println!("Encoded packets: {:?}", packets); + + packets.shuffle(&mut rand::rng()); + // Erase 10 packets at random + let length = packets.len(); + packets.truncate(length - 10); + + println!("Encoded packets after loss: {:?}", packets); + + println!("We have {} packets.", packets.len()); + let mut decoder = Decoder::new(encoder.get_config()); + + let mut counter = 0; + // Perform the decoding + let mut result = None; + while !packets.is_empty() { + counter += 1; + result = decoder.decode(EncodingPacket::deserialize(&packets.pop().unwrap())); + if result.is_some() { + break; + } + } + + println!("Packets needed to decode: {counter}"); + Ok(()) +} diff --git a/lightway-raptor/src/lib.rs b/lightway-raptor/src/lib.rs new file mode 100644 index 00000000..13d4619c --- /dev/null +++ b/lightway-raptor/src/lib.rs @@ -0,0 +1,7 @@ +//! This is the main library entry point for the `lightway_data_frame` crate. + +pub mod lightway_data_frame; +pub mod lightway_raptor; + +pub use lightway_data_frame::LightwayDataFrame; +pub use lightway_raptor::Raptor; diff --git a/lightway-raptor/src/lightway_data_frame.rs b/lightway-raptor/src/lightway_data_frame.rs new file mode 100644 index 00000000..e038a6ed --- /dev/null +++ b/lightway-raptor/src/lightway_data_frame.rs @@ -0,0 +1,221 @@ +use std::io::{self, Read}; + +/// A lightweight data frame for encapsulating and transferring packets. +#[derive(Debug)] +pub struct LightwayDataFrame { + packets: Vec>, // A vector of packets, where each packet is a Vec + number: u16, // The frane counter +} + +#[derive(Debug)] +pub struct LightwayRaptorFrame { + number: u16, + encoder_config: [u8; 12], +} + +impl LightwayRaptorFrame { + pub fn new(number: u16, encoder_config: [u8; 12]) -> Self { + Self { + number, + encoder_config, + } + } + + pub fn number(&self) -> u16 { + self.number + } + + pub fn encoder_config(&self) -> [u8; 12] { + self.encoder_config + } +} + +impl LightwayDataFrame { + /// Creates a new, empty `LightwayDataFrame`. + pub fn new(packets: Vec>) -> Self { + Self { packets, number: 0 } + } + + pub fn new_empty() -> Self { + Self { + packets: vec![], + number: 0, + } + } + + pub fn len(&self) -> usize { + let mut len = 0; + for packet in &self.packets { + len += packet.len(); + } + + len + } + + pub fn clear(&mut self) { + self.packets.clear(); + self.number = self.number.wrapping_add(1); + } + + pub fn number(&self) -> u16 { + self.number + } + pub fn is_empty(&self) -> bool { + self.packets.is_empty() + } + + pub fn packet_count(&self) -> usize { + self.packets.len() + } + + pub fn largest_packet_length(&self) -> usize { + let mut largest_packet_length = 0; + for packet in &self.packets { + if packet.len() > largest_packet_length { + largest_packet_length = packet.len(); + } + } + largest_packet_length + } + + /// Adds a new packet to the `LightwayDataFrame`. + pub fn add_packet(&mut self, packet: Vec) { + self.packets.push(packet); + } + + /// Retrieves a packet by its index. + pub fn get_packet(&self, index: usize) -> Option<&[u8]> { + self.packets.get(index).map(|packet| packet.as_slice()) + } + + /// Retrieves all packets as slices. + pub fn get_all_packets(&self) -> Vec<&[u8]> { + self.packets + .iter() + .map(|packet| packet.as_slice()) + .collect() + } + + /// Serializes the `LightwayDataFrame` into a contiguous byte vector. + pub fn serialize(&self) -> Result, io::Error> { + let mut buffer = Vec::new(); + + let num_packets = self.packets.len() as u16; + buffer.extend(&num_packets.to_be_bytes()); + + for packet in &self.packets { + let packet_len = packet.len() as u16; + buffer.extend(&packet_len.to_be_bytes()); + } + + for packet in &self.packets { + buffer.extend(packet); + } + + Ok(buffer) + } + + /// Deserializes a byte slice into a `LightwayDataFrame`. + pub fn deserialize(data: &[u8]) -> Result { + let mut cursor = io::Cursor::new(data); + + let mut num_packets_bytes = [0u8; 2]; + cursor.read_exact(&mut num_packets_bytes)?; + let num_packets = u16::from_be_bytes(num_packets_bytes); + + let mut lengths = Vec::new(); + + for _ in 0..num_packets { + let mut len_bytes = [0u8; 2]; + cursor.read_exact(&mut len_bytes)?; + let length = u16::from_be_bytes(len_bytes); + lengths.push(length as usize); + } + + let mut packets = Vec::new(); + + for length in lengths { + let mut packet = vec![0u8; length]; + cursor.read_exact(&mut packet)?; + packets.push(packet); + } + + Ok(Self { packets, number: 0 }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_create_empty_frame() { + let frame = LightwayDataFrame::new(Vec::new()); + assert!(frame.packets.is_empty()); + } + + #[test] + fn test_add_packet() { + let mut frame = LightwayDataFrame::new(Vec::new()); + frame.add_packet(vec![1, 2, 3]); + frame.add_packet(vec![4, 5, 6, 7]); + + assert_eq!(frame.packets.len(), 2); + assert_eq!(frame.packets[0], vec![1, 2, 3]); + assert_eq!(frame.packets[1], vec![4, 5, 6, 7]); + } + + #[test] + fn test_serialize_and_deserialize() { + let mut frame = LightwayDataFrame::new(Vec::new()); + frame.add_packet(vec![1, 2, 3]); + frame.add_packet(vec![4, 5, 6, 7]); + + let serialized = frame.serialize().expect("Serialization failed"); + let deserialized_frame = + LightwayDataFrame::deserialize(&serialized).expect("Deserialization failed"); + + assert_eq!(deserialized_frame.packets.len(), 2); + assert_eq!(deserialized_frame.packets[0], vec![1, 2, 3]); + assert_eq!(deserialized_frame.packets[1], vec![4, 5, 6, 7]); + } + + #[test] + fn test_get_packet() { + let mut frame = LightwayDataFrame::new(Vec::new()); + frame.add_packet(vec![1, 2, 3]); + frame.add_packet(vec![4, 5, 6, 7]); + + let first_packet = frame.get_packet(0); + assert_eq!(first_packet, Some(&[1, 2, 3][..])); + + let second_packet = frame.get_packet(1); + assert_eq!(second_packet, Some(&[4, 5, 6, 7][..])); + + let out_of_bounds_packet = frame.get_packet(2); + assert_eq!(out_of_bounds_packet, None); + } + + #[test] + fn test_get_all_packets() { + let mut frame = LightwayDataFrame::new(Vec::new()); + frame.add_packet(vec![1, 2, 3]); + frame.add_packet(vec![4, 5, 6, 7]); + + let packets = frame.get_all_packets(); + + assert_eq!(packets.len(), 2); + assert_eq!(packets[0], &[1, 2, 3][..]); + assert_eq!(packets[1], &[4, 5, 6, 7][..]); + } + + #[test] + fn test_empty_serialization() { + let frame = LightwayDataFrame::new(Vec::new()); + let serialized = frame.serialize().expect("Serialization failed"); + let deserialized_frame = + LightwayDataFrame::deserialize(&serialized).expect("Deserialization failed"); + + assert!(deserialized_frame.packets.is_empty()); + } +} diff --git a/lightway-raptor/src/lightway_raptor.rs b/lightway-raptor/src/lightway_raptor.rs new file mode 100644 index 00000000..473f8be1 --- /dev/null +++ b/lightway-raptor/src/lightway_raptor.rs @@ -0,0 +1,289 @@ +use crate::LightwayDataFrame; +use anyhow::Result; +use raptorq::{Decoder, Encoder, EncodingPacket, ObjectTransmissionInformation}; // Replace `some_module` with the actual module name where LightwayDataFrame is defined. + +use std::collections::HashMap; +use tokio::{ + net::UdpSocket, + time::{Duration, Instant}, +}; + +#[derive(Debug)] +struct DecoderState { + decoder: Decoder, + last_updated: Instant, + completed: bool, +} + +/// A single struct that can *send* (encode) and *receive* (decode) +/// Lightway frames over a UDP socket. +pub struct Raptor { + /// Maximum Transmission Unit (MTU) size. Used to control the size of symbols. + mtu: Option, + /// The underlying UDP socket. We can both send & receive on this socket. + socket: UdpSocket, + + /// The remote to which we send frames. If you’re truly P2P, + /// you might store multiple remotes or discover them dynamically. + remote_addr: Option, + + /// Outgoing aggregator that stores packets to be encoded. + outgoing_frame: LightwayDataFrame, + + /// Size limit (in bytes) before we decide to flush. + x_kb_limit: usize, + + /// Time in ms before a flush is triggered due to inactivity. + y_ms_timeout: u64, + + /// Internal counter for sending frames (frame IDs). + next_frame_id: u16, + + /// RaptorQ decoders, keyed by frame_id. + decoders: HashMap, + + /// How long to keep incomplete decoders before discarding. + decode_timeout_secs: u64, +} + +impl Raptor { + /// Runs the main Raptor engine loop to process incoming packets and handle send/timeout logic. + /// + /// This future must be awaited in your asynchronous runtime. + pub async fn run_engine(&mut self) -> Result<()> { + let mut deadline = Instant::now() + Duration::from_millis(self.y_ms_timeout); + + loop { + println!("[Raptor] Waiting for incoming data..."); + let mut pkt = vec![0u8; 1500]; // Max UDP packet size + + tokio::select! { + result = self.socket.recv_from(&mut pkt) => { + let (len, _addr) = result?; + pkt.truncate(len); + + // Add the received data to the outgoing aggregator + self.outgoing_frame.add_packet(pkt); + + // If size limit is reached, flush the aggregator + if self.outgoing_frame.len() >= self.x_kb_limit { + self.flush().await?; + deadline = Instant::now() + Duration::from_millis(self.y_ms_timeout); + } else { + // Reset the deadline as we received data + deadline = Instant::now() + Duration::from_millis(self.y_ms_timeout); + } + } + _ = tokio::time::sleep_until(deadline) => { + println!("[Raptor] Timeout reached!"); + // Timeout occurred, check if there is data to flush + if !self.outgoing_frame.is_empty() { + self.flush().await?; + } + // Reset the deadline for the next timeout + deadline = Instant::now() + Duration::from_millis(self.y_ms_timeout); + } + } + } + } + /// Create a new Raptor instance. + /// + /// * `socket` is a bound UdpSocket that we’ll use to both send & receive. + /// * `remote_addr` is optional if you might discover peers dynamically (you can pass `None`). + /// * `x_kb_limit` is the max aggregator size before flush (like 64 * 1024). + /// * `y_ms_timeout` is the flush timeout in milliseconds. + /// * `decode_timeout_secs` is how long to keep a partial decoder alive. + pub fn new( + socket: UdpSocket, + remote_addr: Option, + x_kb_limit: usize, + y_ms_timeout: u64, + decode_timeout_secs: u64, + mtu: Option, + ) -> Self { + Self { + socket, + remote_addr, + outgoing_frame: LightwayDataFrame::new_empty(), + x_kb_limit, + y_ms_timeout, + next_frame_id: 0, + decoders: HashMap::new(), + decode_timeout_secs, + mtu: None, + } + } + + // ------------------------------------------------- + // Sending part + // ------------------------------------------------- + + /// Add a single IP packet (or any arbitrary data packet) to our aggregator. + /// We’ll flush later if we reach x_kb_limit or after y_ms_timeout. + pub fn send_packet(&mut self, data: &[u8]) { + self.outgoing_frame.add_packet(data.to_vec()); + } + + /// Encode and send the current aggregator as a single RaptorQ frame. + /// + /// You can call this manually or from a background loop after a timeout. + pub async fn flush(&mut self) -> Result<()> { + // If empty, do nothing + if self.outgoing_frame.is_empty() { + return Ok(()); + } + + let serialized_data = self.outgoing_frame.serialize()?; + // Create RaptorQ encoder + //let encoder = Encoder::with_defaults(&serialized_data, self.outgoing_frame.largest_packet_length() as u16); + + // Determine the maximum symbol size, using the smaller of mtu (if set) and largest_packet_length + let max_symbol_size = match self.mtu { + Some(mtu) => mtu.min(self.outgoing_frame.largest_packet_length() as u16), + None => self.outgoing_frame.largest_packet_length() as u16, + }; + + // Create RaptorQ encoder with the determined max symbol size + let encoder = Encoder::with_defaults(&serialized_data, max_symbol_size); + + // Decide how many encoding packets to send + let symbol_count = self.outgoing_frame.packet_count() as u32 + 4; + let encoded_symbols: Vec> = encoder + .get_encoded_packets(symbol_count) + .iter() + .map(|sym| sym.serialize()) + .collect(); + + // For each symbol, prepend frame_id + OTI (12 bytes) to form one UDP packet + let frame_id = self.next_frame_id; + self.next_frame_id = self.next_frame_id.wrapping_add(1); + + for symbol in encoded_symbols { + let mut buf = Vec::with_capacity(symbol.len() + 14); + // 2 bytes frame_id (LE) + buf.extend_from_slice(&frame_id.to_le_bytes()); + // 12 bytes OTI + buf.extend_from_slice(&encoder.get_config().serialize()); + // symbol + buf.extend_from_slice(&symbol); + + // Send out + if let Some(remote) = self.remote_addr { + self.socket.send_to(&buf, remote).await?; + } else { + // If remote_addr is None, you might handle that differently: + // e.g. broadcast, or store a list of remote peers, etc. + eprintln!("No remote_addr configured for sending!"); + } + } + + println!( + "[Raptor] Flushed frame_id={} with {} symbols (packets={})", + frame_id, + symbol_count as usize, + self.outgoing_frame.packet_count() + ); + + // Clear aggregator + self.outgoing_frame.clear(); + Ok(()) + } + + // ------------------------------------------------- + // Receiving part + // ------------------------------------------------- + + /// Process an incoming Raptor-encoded symbol from `data`. + /// If a frame completes, returns `Ok(Some(Vec>))` + /// containing the original packets that were encoded. + /// If the frame is not yet complete, returns `Ok(None)`. + pub fn process_incoming(&mut self, data: &[u8]) -> Result>>> { + // Minimal check + if data.len() < 14 { + eprintln!("Incoming packet too short: len={}", data.len()); + return Ok(None); + } + + // Extract frame_id + let frame_id = u16::from_le_bytes([data[0], data[1]]); + // Extract OTI + let oti_data = &data[2..14]; + let symbol_data = &data[14..]; + + // Build OTI + let oti = ObjectTransmissionInformation::deserialize(oti_data.try_into()?); + + // Get or create a DecoderState + let entry = self + .decoders + .entry(frame_id) + .or_insert_with(|| DecoderState { + decoder: Decoder::new(oti), + last_updated: Instant::now(), + completed: false, + }); + + if entry.completed { + // Already done? Possibly a duplicate + return Ok(None); + } + + entry.last_updated = Instant::now(); + + // Decode + let maybe_data = entry + .decoder + .decode(EncodingPacket::deserialize(symbol_data.into()).into()); + + if let Some(decoded_bytes) = maybe_data { + // Mark as complete + entry.completed = true; + println!("[Raptor] Decoded complete frame_id={}", frame_id); + + // Attempt to parse as LightwayDataFrame + match LightwayDataFrame::deserialize(&decoded_bytes) { + Ok(lw_frame) => { + // Extract all original packets + let packets = lw_frame + .get_all_packets() + .into_iter() + .map(|p| p.to_vec()) + .collect(); + // Optionally remove from the map + // self.decoders.remove(&frame_id); + return Ok(Some(packets)); + } + Err(e) => { + eprintln!("Failed to deserialize LightwayDataFrame: {e}"); + // We have the raw bytes, but can’t parse them. + // Return None (or Some(...) if you want to just pass raw). + return Ok(None); + } + } + } + + // Not yet complete + Ok(None) + } + + /// Remove stale decoders older than `decode_timeout_secs`. + /// You might call this periodically in a background task. + pub fn cleanup_decoders(&mut self) { + let now = Instant::now(); + let timeout = Duration::from_secs(self.decode_timeout_secs); + let before = self.decoders.len(); + self.decoders.retain(|_frame_id, st| { + let age = now.duration_since(st.last_updated); + if age > timeout { + println!("[Raptor] Removing stale decoder after {age:?}"); + false + } else { + true + } + }); + let after = self.decoders.len(); + if after < before { + println!("[Raptor] Cleaned up {} old decoders", before - after); + } + } +}