Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
385 changes: 385 additions & 0 deletions moq-transport/src/serve/subgroup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,3 +636,388 @@ impl Deref for SubgroupObjectReader {
&self.info
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::coding::TrackNamespace;

/// Helper: create a SubgroupsWriter/Reader pair with a dummy track.
fn make_subgroups() -> (SubgroupsWriter, SubgroupsReader) {
let track = Arc::new(Track::new(
TrackNamespace::from_utf8_path("test/ns"),
"test-track".to_string(),
));
Subgroups { track }.produce()
}

/// Helper: deterministic payload for (group_id, object_id).
fn payload(group_id: u64, object_id: u64) -> Bytes {
Bytes::from(format!("g{}-o{}", group_id, object_id))
}

// ---------------------------------------------------------------
// U1: Single group, single object
// ---------------------------------------------------------------
#[tokio::test]
async fn single_group_single_object() {
let (mut writer, mut reader) = make_subgroups();

let mut sg = writer.append(0).unwrap();
let group_id = sg.group_id;
sg.write(payload(group_id, 0)).unwrap();
drop(sg); // close the subgroup

let sub = reader.next().await.unwrap().expect("expected a subgroup");
assert_eq!(sub.group_id, group_id);

let mut obj = sub.clone();
let data = obj.read_next().await.unwrap().expect("expected an object");
assert_eq!(data, payload(group_id, 0));

// No more objects
assert!(obj.read_next().await.unwrap().is_none());
}

// ---------------------------------------------------------------
// U2: Single group, multiple objects
// ---------------------------------------------------------------
#[tokio::test]
async fn single_group_multiple_objects() {
let (mut writer, mut reader) = make_subgroups();

let mut sg = writer.append(0).unwrap();
let gid = sg.group_id;
for oid in 0..5u64 {
sg.write(payload(gid, oid)).unwrap();
}
drop(sg);

let mut sub = reader.next().await.unwrap().unwrap();
for oid in 0..5u64 {
let obj = sub.next().await.unwrap().expect("expected object");
assert_eq!(obj.object_id, oid);
let mut obj = obj;
let data = obj.read_all().await.unwrap();
assert_eq!(data, payload(gid, oid));
}
assert!(sub.next().await.unwrap().is_none());
}

// ---------------------------------------------------------------
// U3: Multiple groups, multiple objects
// ---------------------------------------------------------------
#[tokio::test]
async fn multiple_groups_multiple_objects() {
let (mut writer, mut reader) = make_subgroups();

let num_groups = 3u64;
let objects_per_group = 4u64;
let mut expected: Vec<(u64, u64, Bytes)> = Vec::new();

for _ in 0..num_groups {
let mut sg = writer.append(0).unwrap();
let gid = sg.group_id;
for oid in 0..objects_per_group {
let p = payload(gid, oid);
sg.write(p.clone()).unwrap();
expected.push((gid, oid, p));
}
drop(sg);
}
drop(writer); // close the subgroups writer

// Reader sees each group as the latest when epoch bumps.
// Because latest-only, we collect what we can.
let mut received: Vec<(u64, u64, Bytes)> = Vec::new();
while let Ok(Some(mut sub)) = reader.next().await {
let gid = sub.group_id;
while let Ok(Some(mut obj)) = sub.next().await {
let oid = obj.object_id;
let data = obj.read_all().await.unwrap();
received.push((gid, oid, data));
}
}

// Every received tuple must match what was published.
for (gid, oid, data) in &received {
assert_eq!(data, &payload(*gid, *oid));
}

// At minimum the last group must be received.
let last_group_id = num_groups - 1;
assert!(
received.iter().any(|(g, _, _)| *g == last_group_id),
"last group must be received"
);
}

// ---------------------------------------------------------------
// U4: Variable payload sizes
// ---------------------------------------------------------------
#[tokio::test]
async fn variable_payload_sizes() {
let (mut writer, mut reader) = make_subgroups();

let payloads: Vec<Bytes> = vec![
Bytes::new(), // 0-byte
Bytes::from_static(b"x"), // 1-byte
Bytes::from(vec![0xAB; 4096]), // 4 KB
Bytes::from(vec![0xCD; 65536]), // 64 KB
];

let mut sg = writer.append(0).unwrap();
for p in &payloads {
sg.write(p.clone()).unwrap();
}
drop(sg);

let mut sub = reader.next().await.unwrap().unwrap();
for expected in &payloads {
let mut obj = sub.next().await.unwrap().unwrap();
let data = obj.read_all().await.unwrap();
assert_eq!(data, *expected);
}
assert!(sub.next().await.unwrap().is_none());
}

// ---------------------------------------------------------------
// U5: Multi-chunk writes
// ---------------------------------------------------------------
#[tokio::test]
async fn multi_chunk_writes() {
let (mut writer, mut reader) = make_subgroups();

let chunk1 = Bytes::from_static(b"hello ");
let chunk2 = Bytes::from_static(b"world");
let full = Bytes::from_static(b"hello world");

let mut sg = writer.append(0).unwrap();
let mut obj_writer = sg.create(full.len(), None).unwrap();
obj_writer.write(chunk1).unwrap();
obj_writer.write(chunk2).unwrap();
drop(obj_writer);
drop(sg);

let mut sub = reader.next().await.unwrap().unwrap();
let mut obj = sub.next().await.unwrap().unwrap();
let data = obj.read_all().await.unwrap();
assert_eq!(data, full);
}

// ---------------------------------------------------------------
// U6: Latest-only semantics
// ---------------------------------------------------------------
#[tokio::test]
async fn latest_only_semantics() {
let (mut writer, reader) = make_subgroups();

// Write group 0
let mut sg0 = writer.append(0).unwrap();
sg0.write(payload(sg0.group_id, 0)).unwrap();
drop(sg0);

// Write group 1
let mut sg1 = writer.append(0).unwrap();
let gid1 = sg1.group_id;
sg1.write(payload(gid1, 0)).unwrap();
drop(sg1);

// A fresh clone of reader should only see group 1 (the latest).
let mut fresh = reader.clone();
let sub = fresh.next().await.unwrap().unwrap();
assert_eq!(sub.group_id, gid1);
}

// ---------------------------------------------------------------
// U7: Older group silently dropped
// ---------------------------------------------------------------
#[tokio::test]
async fn older_group_silently_dropped() {
let (mut writer, mut reader) = make_subgroups();

// Write group 5 via create()
let sg5 = writer
.create(Subgroup {
group_id: 5,
subgroup_id: 0,
priority: 0,
})
.unwrap();
drop(sg5);

// Write group 3 (older) -- should not become latest
let sg3 = writer
.create(Subgroup {
group_id: 3,
subgroup_id: 0,
priority: 0,
})
.unwrap();
drop(sg3);

// Reader should see group 5 as latest (epoch bumped once for group 5).
let sub = reader.next().await.unwrap().unwrap();
assert_eq!(sub.group_id, 5);
}

// ---------------------------------------------------------------
// U8: Duplicate group_id + subgroup_id rejected
// ---------------------------------------------------------------
#[tokio::test]
async fn duplicate_rejected() {
let (mut writer, _reader) = make_subgroups();

let _sg = writer
.create(Subgroup {
group_id: 5,
subgroup_id: 0,
priority: 0,
})
.unwrap();

let result = writer.create(Subgroup {
group_id: 5,
subgroup_id: 0,
priority: 0,
});

match result {
Err(ServeError::Duplicate) => {} // expected
Err(e) => panic!("expected Duplicate, got {:?}", e),
Ok(_) => panic!("expected Duplicate error, got Ok"),
}
}

// ---------------------------------------------------------------
// U9: Higher subgroup_id within same group
// ---------------------------------------------------------------
#[tokio::test]
async fn higher_subgroup_id_updates_latest() {
let (mut writer, mut reader) = make_subgroups();

let _sg0 = writer
.create(Subgroup {
group_id: 1,
subgroup_id: 0,
priority: 0,
})
.unwrap();

let _sg1 = writer
.create(Subgroup {
group_id: 1,
subgroup_id: 1,
priority: 0,
})
.unwrap();

// Reader should see subgroup_id=1 as latest (two epoch bumps).
// Drain to the latest.
let mut latest_sub = None;
// Read available updates (non-blocking after writer is done).
// We'll read what we can and the last one should be subgroup_id=1.
loop {
let sub = reader.next().await.unwrap();
match sub {
Some(s) => latest_sub = Some(s),
None => break,
}
// Break after we've seen subgroup_id=1 to avoid blocking forever
// since writer is still alive.
if latest_sub.as_ref().map(|s| s.subgroup_id) == Some(1) {
break;
}
}

assert_eq!(latest_sub.unwrap().subgroup_id, 1);
}

// ---------------------------------------------------------------
// U10: Writer close propagates
// ---------------------------------------------------------------
#[tokio::test]
async fn writer_close_propagates() {
let (mut writer, mut reader) = make_subgroups();

let mut sg = writer.append(0).unwrap();
sg.write(Bytes::from_static(b"data")).unwrap();
drop(sg);

writer.close(ServeError::Done).unwrap();

// Reader should get the subgroup, then Done error.
let _sub = reader.next().await.unwrap().unwrap();
let result = reader.next().await;
match result {
Err(ServeError::Done) => {} // expected
Err(e) => panic!("expected Done, got {:?}", e),
Ok(_) => panic!("expected Done error, got Ok"),
}
}

// ---------------------------------------------------------------
// U11: Object size mismatch
// ---------------------------------------------------------------
#[tokio::test]
async fn object_size_mismatch() {
let (mut writer, mut reader) = make_subgroups();

let mut sg = writer.append(0).unwrap();

// Declare size=10, write only 5 bytes, then drop writer
let mut obj_writer = sg.create(10, None).unwrap();
obj_writer.write(Bytes::from(vec![0u8; 5])).unwrap();
drop(obj_writer); // Drop triggers Size error because remain != 0

drop(sg);

let mut sub = reader.next().await.unwrap().unwrap();
let mut obj = sub.next().await.unwrap().unwrap();

// Read the 5 bytes that were written
let chunk = obj.read().await.unwrap();
assert!(chunk.is_some());

// Next read should get the Size error
let result = obj.read().await;
assert_eq!(result.unwrap_err(), ServeError::Size);
}

// ---------------------------------------------------------------
// U12: Concurrent readers (fan-out)
// ---------------------------------------------------------------
#[tokio::test]
async fn concurrent_readers_fanout() {
let (mut writer, mut reader1) = make_subgroups();
let mut reader2 = reader1.clone();

let mut sg = writer.append(0).unwrap();
let gid = sg.group_id;
for oid in 0..3u64 {
sg.write(payload(gid, oid)).unwrap();
}
drop(sg);
drop(writer);

// Both readers should see the same subgroup.
let mut sub1 = reader1.next().await.unwrap().unwrap();
let mut sub2 = reader2.next().await.unwrap().unwrap();

assert_eq!(sub1.group_id, sub2.group_id);

// Read all objects from both and compare.
let mut data1 = Vec::new();
while let Ok(Some(mut obj)) = sub1.next().await {
data1.push((obj.object_id, obj.read_all().await.unwrap()));
}

let mut data2 = Vec::new();
while let Ok(Some(mut obj)) = sub2.next().await {
data2.push((obj.object_id, obj.read_all().await.unwrap()));
}

assert_eq!(data1, data2);
assert_eq!(data1.len(), 3);
}
}
Loading
Loading