From 375761724c7333d79c80ecd720630b0aa773f07c Mon Sep 17 00:00:00 2001 From: Manish Kumar Date: Wed, 1 Apr 2026 14:24:18 +0530 Subject: [PATCH 1/3] feat: unit test on subgroup and tracks --- moq-transport/src/serve/subgroup.rs | 385 ++++++++++++++++++++++++++++ moq-transport/src/serve/tracks.rs | 215 ++++++++++++++++ 2 files changed, 600 insertions(+) diff --git a/moq-transport/src/serve/subgroup.rs b/moq-transport/src/serve/subgroup.rs index 2d0fc0c0..6fd8ede9 100644 --- a/moq-transport/src/serve/subgroup.rs +++ b/moq-transport/src/serve/subgroup.rs @@ -627,3 +627,388 @@ impl Deref for SubgroupObjectReader { &self.info } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::coding::TrackNamespace; + + /// Helper: create a SubgroupsWriter/Reader pair with a dummy track. + fn make_subgroups() -> (SubgroupsWriter, SubgroupsReader) { + let track = Arc::new(Track::new( + TrackNamespace::from_utf8_path("test/ns"), + "test-track".to_string(), + )); + Subgroups { track }.produce() + } + + /// Helper: deterministic payload for (group_id, object_id). + fn payload(group_id: u64, object_id: u64) -> Bytes { + Bytes::from(format!("g{}-o{}", group_id, object_id)) + } + + // --------------------------------------------------------------- + // U1: Single group, single object + // --------------------------------------------------------------- + #[tokio::test] + async fn single_group_single_object() { + let (mut writer, mut reader) = make_subgroups(); + + let mut sg = writer.append(0).unwrap(); + let group_id = sg.group_id; + sg.write(payload(group_id, 0)).unwrap(); + drop(sg); // close the subgroup + + let sub = reader.next().await.unwrap().expect("expected a subgroup"); + assert_eq!(sub.group_id, group_id); + + let mut obj = sub.clone(); + let data = obj.read_next().await.unwrap().expect("expected an object"); + assert_eq!(data, payload(group_id, 0)); + + // No more objects + assert!(obj.read_next().await.unwrap().is_none()); + } + + // --------------------------------------------------------------- + // U2: Single group, multiple objects + // --------------------------------------------------------------- + #[tokio::test] + async fn single_group_multiple_objects() { + let (mut writer, mut reader) = make_subgroups(); + + let mut sg = writer.append(0).unwrap(); + let gid = sg.group_id; + for oid in 0..5u64 { + sg.write(payload(gid, oid)).unwrap(); + } + drop(sg); + + let mut sub = reader.next().await.unwrap().unwrap(); + for oid in 0..5u64 { + let obj = sub.next().await.unwrap().expect("expected object"); + assert_eq!(obj.object_id, oid); + let mut obj = obj; + let data = obj.read_all().await.unwrap(); + assert_eq!(data, payload(gid, oid)); + } + assert!(sub.next().await.unwrap().is_none()); + } + + // --------------------------------------------------------------- + // U3: Multiple groups, multiple objects + // --------------------------------------------------------------- + #[tokio::test] + async fn multiple_groups_multiple_objects() { + let (mut writer, mut reader) = make_subgroups(); + + let num_groups = 3u64; + let objects_per_group = 4u64; + let mut expected: Vec<(u64, u64, Bytes)> = Vec::new(); + + for _ in 0..num_groups { + let mut sg = writer.append(0).unwrap(); + let gid = sg.group_id; + for oid in 0..objects_per_group { + let p = payload(gid, oid); + sg.write(p.clone()).unwrap(); + expected.push((gid, oid, p)); + } + drop(sg); + } + drop(writer); // close the subgroups writer + + // Reader sees each group as the latest when epoch bumps. + // Because latest-only, we collect what we can. + let mut received: Vec<(u64, u64, Bytes)> = Vec::new(); + while let Ok(Some(mut sub)) = reader.next().await { + let gid = sub.group_id; + while let Ok(Some(mut obj)) = sub.next().await { + let oid = obj.object_id; + let data = obj.read_all().await.unwrap(); + received.push((gid, oid, data)); + } + } + + // Every received tuple must match what was published. + for (gid, oid, data) in &received { + assert_eq!(data, &payload(*gid, *oid)); + } + + // At minimum the last group must be received. + let last_group_id = num_groups - 1; + assert!( + received.iter().any(|(g, _, _)| *g == last_group_id), + "last group must be received" + ); + } + + // --------------------------------------------------------------- + // U4: Variable payload sizes + // --------------------------------------------------------------- + #[tokio::test] + async fn variable_payload_sizes() { + let (mut writer, mut reader) = make_subgroups(); + + let payloads: Vec = vec![ + Bytes::new(), // 0-byte + Bytes::from_static(b"x"), // 1-byte + Bytes::from(vec![0xAB; 4096]), // 4 KB + Bytes::from(vec![0xCD; 65536]), // 64 KB + ]; + + let mut sg = writer.append(0).unwrap(); + for p in &payloads { + sg.write(p.clone()).unwrap(); + } + drop(sg); + + let mut sub = reader.next().await.unwrap().unwrap(); + for expected in &payloads { + let mut obj = sub.next().await.unwrap().unwrap(); + let data = obj.read_all().await.unwrap(); + assert_eq!(data, *expected); + } + assert!(sub.next().await.unwrap().is_none()); + } + + // --------------------------------------------------------------- + // U5: Multi-chunk writes + // --------------------------------------------------------------- + #[tokio::test] + async fn multi_chunk_writes() { + let (mut writer, mut reader) = make_subgroups(); + + let chunk1 = Bytes::from_static(b"hello "); + let chunk2 = Bytes::from_static(b"world"); + let full = Bytes::from_static(b"hello world"); + + let mut sg = writer.append(0).unwrap(); + let mut obj_writer = sg.create(full.len(), None).unwrap(); + obj_writer.write(chunk1).unwrap(); + obj_writer.write(chunk2).unwrap(); + drop(obj_writer); + drop(sg); + + let mut sub = reader.next().await.unwrap().unwrap(); + let mut obj = sub.next().await.unwrap().unwrap(); + let data = obj.read_all().await.unwrap(); + assert_eq!(data, full); + } + + // --------------------------------------------------------------- + // U6: Latest-only semantics + // --------------------------------------------------------------- + #[tokio::test] + async fn latest_only_semantics() { + let (mut writer, reader) = make_subgroups(); + + // Write group 0 + let mut sg0 = writer.append(0).unwrap(); + sg0.write(payload(sg0.group_id, 0)).unwrap(); + drop(sg0); + + // Write group 1 + let mut sg1 = writer.append(0).unwrap(); + let gid1 = sg1.group_id; + sg1.write(payload(gid1, 0)).unwrap(); + drop(sg1); + + // A fresh clone of reader should only see group 1 (the latest). + let mut fresh = reader.clone(); + let sub = fresh.next().await.unwrap().unwrap(); + assert_eq!(sub.group_id, gid1); + } + + // --------------------------------------------------------------- + // U7: Older group silently dropped + // --------------------------------------------------------------- + #[tokio::test] + async fn older_group_silently_dropped() { + let (mut writer, mut reader) = make_subgroups(); + + // Write group 5 via create() + let sg5 = writer + .create(Subgroup { + group_id: 5, + subgroup_id: 0, + priority: 0, + }) + .unwrap(); + drop(sg5); + + // Write group 3 (older) -- should not become latest + let sg3 = writer + .create(Subgroup { + group_id: 3, + subgroup_id: 0, + priority: 0, + }) + .unwrap(); + drop(sg3); + + // Reader should see group 5 as latest (epoch bumped once for group 5). + let sub = reader.next().await.unwrap().unwrap(); + assert_eq!(sub.group_id, 5); + } + + // --------------------------------------------------------------- + // U8: Duplicate group_id + subgroup_id rejected + // --------------------------------------------------------------- + #[tokio::test] + async fn duplicate_rejected() { + let (mut writer, _reader) = make_subgroups(); + + let _sg = writer + .create(Subgroup { + group_id: 5, + subgroup_id: 0, + priority: 0, + }) + .unwrap(); + + let result = writer.create(Subgroup { + group_id: 5, + subgroup_id: 0, + priority: 0, + }); + + match result { + Err(ServeError::Duplicate) => {} // expected + Err(e) => panic!("expected Duplicate, got {:?}", e), + Ok(_) => panic!("expected Duplicate error, got Ok"), + } + } + + // --------------------------------------------------------------- + // U9: Higher subgroup_id within same group + // --------------------------------------------------------------- + #[tokio::test] + async fn higher_subgroup_id_updates_latest() { + let (mut writer, mut reader) = make_subgroups(); + + let _sg0 = writer + .create(Subgroup { + group_id: 1, + subgroup_id: 0, + priority: 0, + }) + .unwrap(); + + let _sg1 = writer + .create(Subgroup { + group_id: 1, + subgroup_id: 1, + priority: 0, + }) + .unwrap(); + + // Reader should see subgroup_id=1 as latest (two epoch bumps). + // Drain to the latest. + let mut latest_sub = None; + // Read available updates (non-blocking after writer is done). + // We'll read what we can and the last one should be subgroup_id=1. + loop { + let sub = reader.next().await.unwrap(); + match sub { + Some(s) => latest_sub = Some(s), + None => break, + } + // Break after we've seen subgroup_id=1 to avoid blocking forever + // since writer is still alive. + if latest_sub.as_ref().map(|s| s.subgroup_id) == Some(1) { + break; + } + } + + assert_eq!(latest_sub.unwrap().subgroup_id, 1); + } + + // --------------------------------------------------------------- + // U10: Writer close propagates + // --------------------------------------------------------------- + #[tokio::test] + async fn writer_close_propagates() { + let (mut writer, mut reader) = make_subgroups(); + + let mut sg = writer.append(0).unwrap(); + sg.write(Bytes::from_static(b"data")).unwrap(); + drop(sg); + + writer.close(ServeError::Done).unwrap(); + + // Reader should get the subgroup, then Done error. + let _sub = reader.next().await.unwrap().unwrap(); + let result = reader.next().await; + match result { + Err(ServeError::Done) => {} // expected + Err(e) => panic!("expected Done, got {:?}", e), + Ok(_) => panic!("expected Done error, got Ok"), + } + } + + // --------------------------------------------------------------- + // U11: Object size mismatch + // --------------------------------------------------------------- + #[tokio::test] + async fn object_size_mismatch() { + let (mut writer, mut reader) = make_subgroups(); + + let mut sg = writer.append(0).unwrap(); + + // Declare size=10, write only 5 bytes, then drop writer + let mut obj_writer = sg.create(10, None).unwrap(); + obj_writer.write(Bytes::from(vec![0u8; 5])).unwrap(); + drop(obj_writer); // Drop triggers Size error because remain != 0 + + drop(sg); + + let mut sub = reader.next().await.unwrap().unwrap(); + let mut obj = sub.next().await.unwrap().unwrap(); + + // Read the 5 bytes that were written + let chunk = obj.read().await.unwrap(); + assert!(chunk.is_some()); + + // Next read should get the Size error + let result = obj.read().await; + assert_eq!(result.unwrap_err(), ServeError::Size); + } + + // --------------------------------------------------------------- + // U12: Concurrent readers (fan-out) + // --------------------------------------------------------------- + #[tokio::test] + async fn concurrent_readers_fanout() { + let (mut writer, mut reader1) = make_subgroups(); + let mut reader2 = reader1.clone(); + + let mut sg = writer.append(0).unwrap(); + let gid = sg.group_id; + for oid in 0..3u64 { + sg.write(payload(gid, oid)).unwrap(); + } + drop(sg); + drop(writer); + + // Both readers should see the same subgroup. + let mut sub1 = reader1.next().await.unwrap().unwrap(); + let mut sub2 = reader2.next().await.unwrap().unwrap(); + + assert_eq!(sub1.group_id, sub2.group_id); + + // Read all objects from both and compare. + let mut data1 = Vec::new(); + while let Ok(Some(mut obj)) = sub1.next().await { + data1.push((obj.object_id, obj.read_all().await.unwrap())); + } + + let mut data2 = Vec::new(); + while let Ok(Some(mut obj)) = sub2.next().await { + data2.push((obj.object_id, obj.read_all().await.unwrap())); + } + + assert_eq!(data1, data2); + assert_eq!(data1.len(), 3); + } +} diff --git a/moq-transport/src/serve/tracks.rs b/moq-transport/src/serve/tracks.rs index ad3970e1..1b65ce3d 100644 --- a/moq-transport/src/serve/tracks.rs +++ b/moq-transport/src/serve/tracks.rs @@ -273,6 +273,32 @@ impl Deref for TracksReader { #[cfg(test)] mod tests { use super::*; + use crate::serve::TrackReaderMode; + use bytes::Bytes; + + /// Helper: deterministic payload for (group_id, object_id). + fn payload(group_id: u64, object_id: u64) -> Bytes { + Bytes::from(format!("g{}-o{}", group_id, object_id)) + } + + /// Helper: drain all objects from a TrackReader in Subgroups mode. + async fn drain_subgroups(track_reader: TrackReader) -> Vec<(u64, u64, Bytes)> { + let mode = track_reader.mode().await.unwrap(); + let mut received: Vec<(u64, u64, Bytes)> = Vec::new(); + if let TrackReaderMode::Subgroups(mut subgroups) = mode { + while let Ok(Some(mut subgroup)) = subgroups.next().await { + let gid = subgroup.group_id; + while let Ok(Some(mut obj)) = subgroup.next().await { + let oid = obj.object_id; + let data: Bytes = obj.read_all().await.unwrap(); + received.push((gid, oid, data)); + } + } + } else { + panic!("expected Subgroups mode"); + } + received + } /// Regression test for the stale track caching bug. /// @@ -398,4 +424,193 @@ mod tests { assert_eq!(track_reader_1.name, track_reader_2.name); assert_eq!(track_reader_1.namespace, track_reader_2.namespace); } + + // --------------------------------------------------------------- + // I1: Write-read round-trip through Track + // --------------------------------------------------------------- + #[tokio::test] + async fn track_round_trip() { + let namespace = TrackNamespace::from_utf8_path("test/ns"); + let (track_writer, track_reader) = + Track::new(namespace, "data-track".to_string()).produce(); + + let num_objects = 10u64; + + // Writer: create subgroups, write 1 group x 10 objects + let write_handle = tokio::spawn(async move { + let mut subgroups = track_writer.subgroups().unwrap(); + let mut sg = subgroups.append(0).unwrap(); + for oid in 0..num_objects { + sg.write(payload(0, oid)).unwrap(); + } + drop(sg); + subgroups.close(ServeError::Done).ok(); + }); + + // Reader: drain all data + let received = drain_subgroups(track_reader).await; + write_handle.await.unwrap(); + + assert_eq!(received.len(), num_objects as usize); + for (gid, oid, data) in &received { + assert_eq!(*gid, 0); + assert_eq!(data, &payload(0, *oid)); + } + } + + // --------------------------------------------------------------- + // I2: Tracks subscribe round-trip + // --------------------------------------------------------------- + #[tokio::test] + async fn tracks_subscribe_round_trip() { + let namespace = TrackNamespace::from_utf8_path("test/ns"); + let track_name = "data-track"; + let num_objects = 5u64; + + let (writer, mut request, mut reader) = Tracks::new(namespace.clone()).produce(); + + // Subscriber side: request the track + let track_reader = reader + .subscribe(namespace.clone(), track_name) + .expect("subscribe should succeed"); + + // Publisher side: receive request, write data + let pub_handle = tokio::spawn(async move { + let track_writer = request + .next() + .await + .expect("should receive track request"); + + assert_eq!(track_writer.name, track_name); + + let mut subgroups = track_writer.subgroups().unwrap(); + let mut sg = subgroups.append(0).unwrap(); + for oid in 0..num_objects { + sg.write(payload(0, oid)).unwrap(); + } + drop(sg); + subgroups.close(ServeError::Done).ok(); + + // Keep writer alive until data is consumed + drop(writer); + }); + + // Subscriber side: read data + let received = drain_subgroups(track_reader).await; + pub_handle.await.unwrap(); + + assert_eq!(received.len(), num_objects as usize); + for (gid, oid, data) in &received { + assert_eq!(*gid, 0); + assert_eq!(data, &payload(0, *oid)); + } + } + + // --------------------------------------------------------------- + // I3: Multiple tracks independence + // --------------------------------------------------------------- + #[tokio::test] + async fn multiple_tracks_independence() { + let namespace = TrackNamespace::from_utf8_path("test/ns"); + + // Create two independent tracks directly (no Tracks layer needed). + let (track_a_writer, track_a_reader) = + Track::new(namespace.clone(), "track-a".to_string()).produce(); + let (track_b_writer, track_b_reader) = + Track::new(namespace.clone(), "track-b".to_string()).produce(); + + // Write different data to each + let handle_a = tokio::spawn(async move { + let mut subgroups = track_a_writer.subgroups().unwrap(); + let mut sg = subgroups.append(0).unwrap(); + sg.write(Bytes::from_static(b"alpha-0")).unwrap(); + sg.write(Bytes::from_static(b"alpha-1")).unwrap(); + drop(sg); + subgroups.close(ServeError::Done).ok(); + }); + + let handle_b = tokio::spawn(async move { + let mut subgroups = track_b_writer.subgroups().unwrap(); + let mut sg = subgroups.append(0).unwrap(); + sg.write(Bytes::from_static(b"beta-0")).unwrap(); + sg.write(Bytes::from_static(b"beta-1")).unwrap(); + sg.write(Bytes::from_static(b"beta-2")).unwrap(); + drop(sg); + subgroups.close(ServeError::Done).ok(); + }); + + let data_a = drain_subgroups(track_a_reader).await; + let data_b = drain_subgroups(track_b_reader).await; + + handle_a.await.unwrap(); + handle_b.await.unwrap(); + + // Verify no cross-contamination + assert_eq!(data_a.len(), 2); + assert_eq!(data_a[0].2, Bytes::from_static(b"alpha-0")); + assert_eq!(data_a[1].2, Bytes::from_static(b"alpha-1")); + + assert_eq!(data_b.len(), 3); + assert_eq!(data_b[0].2, Bytes::from_static(b"beta-0")); + assert_eq!(data_b[1].2, Bytes::from_static(b"beta-1")); + assert_eq!(data_b[2].2, Bytes::from_static(b"beta-2")); + } + + // --------------------------------------------------------------- + // I4: Stale track re-subscribe with data integrity + // --------------------------------------------------------------- + #[tokio::test] + async fn stale_resubscribe_data_integrity() { + let namespace = TrackNamespace::from_utf8_path("test/ns"); + let track_name = "data-track"; + + let (_writer, mut request, mut reader) = Tracks::new(namespace.clone()).produce(); + + // First subscription + let _track_reader_1 = reader + .subscribe(namespace.clone(), track_name) + .expect("first subscribe"); + + // Publisher receives and closes (simulates failure) + let track_writer_1 = request.next().await.expect("first request"); + track_writer_1.close(ServeError::Cancel).unwrap(); + + // Wait for close to propagate + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Second subscription (stale eviction should happen) + let track_reader_2 = reader + .subscribe(namespace.clone(), track_name) + .expect("second subscribe"); + + // Publisher receives the new request and writes real data + let track_writer_2 = tokio::time::timeout( + std::time::Duration::from_millis(100), + request.next(), + ) + .await + .expect("should receive second request") + .expect("second request should be Some"); + + let write_handle = tokio::spawn(async move { + let mut subgroups = track_writer_2.subgroups().unwrap(); + let mut sg = subgroups.append(0).unwrap(); + for oid in 0..3u64 { + sg.write(payload(0, oid)).unwrap(); + } + drop(sg); + subgroups.close(ServeError::Done).ok(); + }); + + // Read data from the second subscription + let received = drain_subgroups(track_reader_2).await; + write_handle.await.unwrap(); + + // All 3 objects from the second subscription must be intact + assert_eq!(received.len(), 3); + for (gid, oid, data) in &received { + assert_eq!(*gid, 0); + assert_eq!(data, &payload(0, *oid)); + } + } } From 8fb11dd18357dc4ec54977f2168572f78f4569ae Mon Sep 17 00:00:00 2001 From: Manish Kumar Date: Wed, 1 Apr 2026 15:10:14 +0530 Subject: [PATCH 2/3] fix: typo on closing delimiter in tracks.rs --- moq-transport/src/serve/tracks.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/moq-transport/src/serve/tracks.rs b/moq-transport/src/serve/tracks.rs index 2e261ac8..9016375d 100644 --- a/moq-transport/src/serve/tracks.rs +++ b/moq-transport/src/serve/tracks.rs @@ -615,6 +615,8 @@ mod tests { assert_eq!(*gid, 0); assert_eq!(data, &payload(0, *oid)); } + } + /// Test that a track is NOT considered stale after the writer transitions to /// subgroups mode. This is the core regression: TrackWriter::subgroups() /// consumes self, dropping the Track-level State, but the SubgroupsWriter From b5f1d7b1ee5e23078dc921af34ee985408eca94e Mon Sep 17 00:00:00 2001 From: Manish Kumar Date: Wed, 1 Apr 2026 15:13:07 +0530 Subject: [PATCH 3/3] fix: cargo fmt --- moq-transport/src/serve/tracks.rs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/moq-transport/src/serve/tracks.rs b/moq-transport/src/serve/tracks.rs index 9016375d..16209862 100644 --- a/moq-transport/src/serve/tracks.rs +++ b/moq-transport/src/serve/tracks.rs @@ -479,10 +479,7 @@ mod tests { // Publisher side: receive request, write data let pub_handle = tokio::spawn(async move { - let track_writer = request - .next() - .await - .expect("should receive track request"); + let track_writer = request.next().await.expect("should receive track request"); assert_eq!(track_writer.name, track_name); @@ -587,13 +584,11 @@ mod tests { .expect("second subscribe"); // Publisher receives the new request and writes real data - let track_writer_2 = tokio::time::timeout( - std::time::Duration::from_millis(100), - request.next(), - ) - .await - .expect("should receive second request") - .expect("second request should be Some"); + let track_writer_2 = + tokio::time::timeout(std::time::Duration::from_millis(100), request.next()) + .await + .expect("should receive second request") + .expect("second request should be Some"); let write_handle = tokio::spawn(async move { let mut subgroups = track_writer_2.subgroups().unwrap();