diff --git a/Cargo.toml b/Cargo.toml index ec651ca405e..ac459ae375e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ members = [ "components/sources/cu_vlp16", "components/sources/cu_wt901", "components/sources/cu_rp_encoder", + "components/sources/cu_ryuw122", "components/sources/cu_sen0682", "components/tasks/cu_aligner", "components/tasks/cu_ahrs", @@ -102,6 +103,7 @@ members = [ "examples/cu_runtime_matrix", "examples/cu_parallel_mandelbrot", "examples/cu_remote_debug_session", + "examples/cu_ryuw122_probe", "support/cargo_cunew", ] diff --git a/components/payloads/cu_sensor_payloads/Cargo.toml b/components/payloads/cu_sensor_payloads/Cargo.toml index bef97b6c144..ef47bcc024f 100644 --- a/components/payloads/cu_sensor_payloads/Cargo.toml +++ b/components/payloads/cu_sensor_payloads/Cargo.toml @@ -39,3 +39,4 @@ textlogs = ["cu29/textlogs"] image = ["std", "dep:image"] kornia = ["std", "dep:kornia-image"] rerun = ["std", "dep:rerun"] +reflect = ["cu29/reflect"] diff --git a/components/payloads/cu_sensor_payloads/src/lib.rs b/components/payloads/cu_sensor_payloads/src/lib.rs index 9a439f8b1ed..343ad5a75a9 100644 --- a/components/payloads/cu_sensor_payloads/src/lib.rs +++ b/components/payloads/cu_sensor_payloads/src/lib.rs @@ -10,6 +10,7 @@ mod image; mod imu; #[cfg(feature = "std")] mod pointcloud; +mod ranging; #[cfg(feature = "rerun")] mod rerun_components; @@ -22,3 +23,4 @@ pub use image::*; pub use imu::*; #[cfg(feature = "std")] pub use pointcloud::*; +pub use ranging::*; diff --git a/components/payloads/cu_sensor_payloads/src/ranging.rs b/components/payloads/cu_sensor_payloads/src/ranging.rs new file mode 100644 index 00000000000..902139f7c4b --- /dev/null +++ b/components/payloads/cu_sensor_payloads/src/ranging.rs @@ -0,0 +1,412 @@ +use bincode::de::Decoder; +use bincode::enc::Encoder; +use bincode::error::{DecodeError, EncodeError}; +use bincode::{Decode, Encode}; +use core::fmt; +use cu29::clock::{CuTime, CuTimeRange}; +use cu29::prelude::*; +use cu29::units::si::f32::Length; +use cu29::units::si::length::meter; +use serde::{Deserialize, Deserializer, Serialize, Serializer, de, ser::SerializeStruct}; + +pub const RANGE_PEER_ID_CAPACITY: usize = 24; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum RangePeerIdError { + Empty, + TooLong { len: usize, max: usize }, +} + +impl fmt::Display for RangePeerIdError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Empty => write!(f, "range peer id must not be empty"), + Self::TooLong { len, max } => { + write!(f, "range peer id length {len} exceeds {max} bytes") + } + } + } +} + +impl core::error::Error for RangePeerIdError {} + +/// Fixed-capacity peer identifier used by range observations. +/// +/// This stays allocation-free on the runtime path while remaining flexible +/// enough for common tag, anchor, beacon, or node identifiers. +#[derive( + Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize, Encode, Decode, Reflect, +)] +pub struct RangePeerId { + bytes: [u8; RANGE_PEER_ID_CAPACITY], + len: u8, +} + +impl RangePeerId { + pub fn new(peer_id: &str) -> Result { + Self::try_from(peer_id) + } + + pub fn as_str(&self) -> &str { + let len = self.len as usize; + core::str::from_utf8(&self.bytes[..len]).expect("RangePeerId stores valid UTF-8") + } + + pub fn len(&self) -> usize { + self.len as usize + } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } +} + +impl TryFrom<&str> for RangePeerId { + type Error = RangePeerIdError; + + fn try_from(peer_id: &str) -> Result { + if peer_id.is_empty() { + return Err(RangePeerIdError::Empty); + } + if peer_id.len() > RANGE_PEER_ID_CAPACITY { + return Err(RangePeerIdError::TooLong { + len: peer_id.len(), + max: RANGE_PEER_ID_CAPACITY, + }); + } + + let mut bytes = [0_u8; RANGE_PEER_ID_CAPACITY]; + bytes[..peer_id.len()].copy_from_slice(peer_id.as_bytes()); + + Ok(Self { + bytes, + len: peer_id.len() as u8, + }) + } +} + +/// Standardized distance measurement to an identified peer. +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, Encode, Decode, Reflect)] +pub struct PeerRangeObservation { + pub peer_id: RangePeerId, + pub distance: Length, + pub rssi_dbm: Option, +} + +impl Default for PeerRangeObservation { + fn default() -> Self { + Self { + peer_id: RangePeerId::default(), + distance: Length::new::(0.0), + rssi_dbm: None, + } + } +} + +impl PeerRangeObservation { + pub fn from_meters(peer_id: RangePeerId, distance_m: f32, rssi_dbm: Option) -> Self { + Self { + peer_id, + distance: Length::new::(distance_m), + rssi_dbm, + } + } + + pub fn from_centimeters(peer_id: RangePeerId, distance_cm: u32, rssi_dbm: Option) -> Self { + Self::from_meters(peer_id, distance_cm as f32 / 100.0, rssi_dbm) + } +} + +/// A peer range measurement with its own time of validity. +/// +/// Use this inside aggregate payloads when samples were not captured at the same instant. For a +/// single `CuMsg`, prefer carrying the time in the Copper message envelope. +#[derive( + Clone, Copy, Debug, Default, PartialEq, Serialize, Deserialize, Encode, Decode, Reflect, +)] +pub struct PeerRangeSample { + pub tov: CuTime, + pub observation: PeerRangeObservation, +} + +impl PeerRangeSample { + pub fn new(tov: CuTime, observation: PeerRangeObservation) -> Self { + Self { tov, observation } + } +} + +/// Bounded collection of timestamped peer ranges. +/// +/// The enclosing Copper message should use `Tov::Range` when the samples span more than one +/// validity instant. +#[derive(Clone, Copy, Debug, PartialEq, Reflect)] +pub struct PeerRangeSnapshot { + pub len: usize, + pub samples: [PeerRangeSample; N], +} + +impl Encode for PeerRangeSnapshot { + fn encode(&self, encoder: &mut E) -> Result<(), EncodeError> { + Encode::encode(&self.samples().len(), encoder)?; + for sample in self.samples() { + Encode::encode(sample, encoder)?; + } + Ok(()) + } +} + +impl Decode<()> for PeerRangeSnapshot { + fn decode>(decoder: &mut D) -> Result { + let len = >::decode(decoder)?; + if len > N { + return Err(DecodeError::ArrayLengthMismatch { + required: N, + found: len, + }); + } + + let mut snapshot = Self::default(); + for _ in 0..len { + let sample = >::decode(decoder)?; + snapshot + .push(sample) + .map_err(|_| DecodeError::ArrayLengthMismatch { + required: N, + found: len, + })?; + } + + Ok(snapshot) + } +} + +impl Serialize for PeerRangeSnapshot { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("PeerRangeSnapshot", 2)?; + state.serialize_field("len", &self.samples().len())?; + state.serialize_field("samples", self.samples())?; + state.end() + } +} + +impl<'de, const N: usize> Deserialize<'de> for PeerRangeSnapshot { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + struct PeerRangeSnapshotSerde { + len: usize, + samples: alloc::vec::Vec, + } + + let decoded = PeerRangeSnapshotSerde::deserialize(deserializer)?; + if decoded.len != decoded.samples.len() { + return Err(de::Error::custom( + "peer range snapshot len does not match samples", + )); + } + if decoded.samples.len() > N { + return Err(de::Error::custom("peer range snapshot exceeds capacity")); + } + + let mut snapshot = Self::default(); + for sample in decoded.samples { + snapshot + .push(sample) + .map_err(|_| de::Error::custom("peer range snapshot exceeds capacity"))?; + } + + Ok(snapshot) + } +} + +impl Default for PeerRangeSnapshot { + fn default() -> Self { + Self { + len: 0, + samples: [PeerRangeSample::default(); N], + } + } +} + +impl PeerRangeSnapshot { + pub fn new() -> Self { + Self::default() + } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + pub fn is_full(&self) -> bool { + self.len >= N + } + + pub fn samples(&self) -> &[PeerRangeSample] { + &self.samples[..self.len.min(N)] + } + + pub fn push(&mut self, sample: PeerRangeSample) -> Result<(), PeerRangeSnapshotFull> { + if self.is_full() { + return Err(PeerRangeSnapshotFull { capacity: N }); + } + + self.samples[self.len] = sample; + self.len += 1; + Ok(()) + } + + pub fn tov_range(&self) -> Option { + let samples = self.samples(); + if samples.is_empty() { + return None; + } + + let mut start = samples[0].tov; + let mut end = samples[0].tov; + for sample in &samples[1..] { + start = start.min(sample.tov); + end = end.max(sample.tov); + } + + Some(CuTimeRange { start, end }) + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct PeerRangeSnapshotFull { + pub capacity: usize, +} + +impl fmt::Display for PeerRangeSnapshotFull { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "peer range snapshot capacity {} is full", self.capacity) + } +} + +impl core::error::Error for PeerRangeSnapshotFull {} + +#[cfg(test)] +mod tests { + use super::*; + use bincode::config; + use cu29::units::si::length::meter; + + #[test] + fn range_peer_id_round_trip() { + let peer_id = RangePeerId::new("DAVID123").unwrap(); + assert_eq!(peer_id.as_str(), "DAVID123"); + assert_eq!(peer_id.len(), 8); + } + + #[test] + fn range_peer_id_rejects_too_long_values() { + let err = RangePeerId::new("1234567890123456789012345").unwrap_err(); + assert!(matches!(err, RangePeerIdError::TooLong { .. })); + } + + #[test] + fn peer_range_observation_round_trip_encode_decode() { + let payload = PeerRangeObservation::from_centimeters( + RangePeerId::new("TAG-01").unwrap(), + 245, + Some(-71), + ); + + let cfg = config::standard(); + let mut buffer = [0u8; 128]; + let len = bincode::encode_into_slice(payload, &mut buffer, cfg).unwrap(); + let (decoded, used) = + bincode::decode_from_slice::(&buffer[..len], cfg).unwrap(); + + assert_eq!(used, len); + assert_eq!(decoded.peer_id.as_str(), "TAG-01"); + assert_eq!(decoded.rssi_dbm, Some(-71)); + assert_eq!(decoded.distance.get::(), 2.45); + } + + #[test] + fn peer_range_snapshot_tracks_sample_time_range() { + let mut snapshot = PeerRangeSnapshot::<2>::new(); + snapshot + .push(PeerRangeSample::new( + CuTime::from_nanos(20), + PeerRangeObservation::from_centimeters( + RangePeerId::new("TAG-02").unwrap(), + 245, + None, + ), + )) + .unwrap(); + snapshot + .push(PeerRangeSample::new( + CuTime::from_nanos(10), + PeerRangeObservation::from_centimeters( + RangePeerId::new("TAG-01").unwrap(), + 120, + Some(-71), + ), + )) + .unwrap(); + + let range = snapshot.tov_range().unwrap(); + assert_eq!(range.start, CuTime::from_nanos(10)); + assert_eq!(range.end, CuTime::from_nanos(20)); + assert_eq!(snapshot.samples()[1].observation.peer_id.as_str(), "TAG-01"); + } + + #[test] + fn peer_range_snapshot_rejects_over_capacity_push() { + let mut snapshot = PeerRangeSnapshot::<1>::new(); + let sample = PeerRangeSample::new( + CuTime::from_nanos(1), + PeerRangeObservation::from_centimeters(RangePeerId::new("TAG-01").unwrap(), 100, None), + ); + + snapshot.push(sample).unwrap(); + assert_eq!( + snapshot.push(sample), + Err(PeerRangeSnapshotFull { capacity: 1 }) + ); + } + + #[test] + fn peer_range_snapshot_round_trip_encode_decode() { + let mut snapshot = PeerRangeSnapshot::<4>::new(); + snapshot + .push(PeerRangeSample::new( + CuTime::from_nanos(10), + PeerRangeObservation::from_centimeters( + RangePeerId::new("TAG-01").unwrap(), + 100, + Some(-70), + ), + )) + .unwrap(); + snapshot + .push(PeerRangeSample::new( + CuTime::from_nanos(20), + PeerRangeObservation::from_centimeters( + RangePeerId::new("TAG-02").unwrap(), + 200, + None, + ), + )) + .unwrap(); + + let cfg = config::standard(); + let mut buffer = [0u8; 256]; + let len = bincode::encode_into_slice(snapshot, &mut buffer, cfg).unwrap(); + let (decoded, used) = + bincode::decode_from_slice::, _>(&buffer[..len], cfg).unwrap(); + + assert_eq!(used, len); + assert_eq!(decoded.len, 2); + assert_eq!(decoded.samples()[0].observation.peer_id.as_str(), "TAG-01"); + assert_eq!(decoded.samples()[1].observation.peer_id.as_str(), "TAG-02"); + } +} diff --git a/components/sources/cu_ryuw122/Cargo.toml b/components/sources/cu_ryuw122/Cargo.toml new file mode 100644 index 00000000000..28cc9945854 --- /dev/null +++ b/components/sources/cu_ryuw122/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "cu-ryuw122" +description = "Copper initiator-side ranging source for the REYAX RYUW122 UWB modem" +readme = "README.md" +version.workspace = true +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +keywords.workspace = true +categories.workspace = true +homepage.workspace = true +repository.workspace = true + +[package.metadata.copper] +kind = "source" +domains = ["sensor/uwb", "sensor/ranging"] +environments = ["host", "embedded"] + +[dependencies] +cu29 = { path = "../../../core/cu29", version = "1.0.0-rc2", default-features = false } +cu-linux-resources = { workspace = true, optional = true } +cu-sensor-payloads = { path = "../../payloads/cu_sensor_payloads", version = "1.0.0-rc2", default-features = false } +embedded-io = "0.7.1" + +[features] +default = ["std"] +std = ["cu29/std", "cu-sensor-payloads/std", "dep:cu-linux-resources"] +textlogs = ["cu29/textlogs", "cu-sensor-payloads/textlogs"] +reflect = ["cu29/reflect", "cu-sensor-payloads/reflect"] diff --git a/components/sources/cu_ryuw122/README.md b/components/sources/cu_ryuw122/README.md new file mode 100644 index 00000000000..dae1f3ee6f6 --- /dev/null +++ b/components/sources/cu_ryuw122/README.md @@ -0,0 +1,107 @@ +# cu-ryuw122 + +Initiator-side Copper range source for the REYAX `RYUW122` UWB modem. + +This source runs on the modem that uses the REYAX vendor `ANCHOR` role. That is the side that +can issue `AT+ANCHOR_SEND` and receive `+ANCHOR_RCV=...` lines with distance over UART. + +In robotics localization terms, this modem is typically mounted on the moving robot. The fixed +anchors in the space usually run the vendor `TAG` role and respond to the robot's initiator +requests. + +Use it when you want: + +- a robot-side distance feed to fixed anchors +- a generic UWB range source for downstream estimation tasks +- a clean Copper boundary between modem I/O and localization logic + +This source does not solve 2D or 3D position itself. It only emits single-anchor range +observations. Snapshotting, safety decisions, and multilateration belong in downstream Copper +tasks. + +## Behavior + +The source runs one outstanding initiator request at a time: + +- send `AT+ANCHOR_SEND` for the current anchor +- watch later runtime cycles for the matching `+ANCHOR_RCV` +- emit a `PeerRangeObservation` when a response arrives +- immediately move on to the next anchor +- if an anchor stays silent, advance on timeout and continue polling + +This keeps the driver non-blocking and suitable for regular Copper source execution. + +## Current assumptions + +- the modem that runs this source is already configured in REYAX vendor `ANCHOR` mode +- network id, address, and CPIN are already set appropriately +- the serial resource timeout is kept low enough for runtime use + +## Output + +The source emits: + +- `cu_sensor_payloads::PeerRangeObservation` + +Each observation contains: + +- `peer_id`: fixed-capacity identifier for the responding anchor +- `distance`: meters +- `rssi_dbm`: optional RSSI when the modem is configured to include it + +## Configuration + +The task binds a single `serial` resource and expects these config keys: + +- `anchor_ids`: list of fixed anchor addresses to poll +- `poll_payload`: ASCII payload sent with each `AT+ANCHOR_SEND` +- `response_timeout_ms`: timeout before the source advances to the next anchor +- `read_buffer_bytes`: serial read scratch size +- `max_pending_observations`: queue depth for parsed observations + +`anchor_ids` and `poll_payload` must be ASCII. RYUW122 modem addresses are 8 bytes max, and the +poll payload is 12 bytes max. + +## Example + +On Linux, bind the source through `cu_linux_resources::LinuxResources`: + +```ron +( + resources: [ + ( + id: "linux", + provider: "cu_linux_resources::LinuxResources", + config: { + "serial3_dev": "/dev/ttyACM0", + "serial3_baudrate": 115200, + "serial3_timeout_ms": 20, + }, + ), + ], + tasks: [ + ( + id: "uwb_ranges", + type: "cu_ryuw122::Ryuw122InitiatorSource", + resources: { + "serial": "linux.serial3", + }, + config: { + "anchor_ids": ["ANCH0001", "ANCH0002"], + "poll_payload": "PING", + "response_timeout_ms": 250, + "read_buffer_bytes": 512, + "max_pending_observations": 32, + }, + ), + ], +) +``` + +## Downstream usage + +Typical next steps are: + +- threshold the output for safety logic +- accumulate the most recent ranges by anchor id +- feed those snapshots into a separate multilateration task diff --git a/components/sources/cu_ryuw122/docs/AT_Command_RYUW122.pdf b/components/sources/cu_ryuw122/docs/AT_Command_RYUW122.pdf new file mode 100644 index 00000000000..771bf963376 Binary files /dev/null and b/components/sources/cu_ryuw122/docs/AT_Command_RYUW122.pdf differ diff --git a/components/sources/cu_ryuw122/docs/RYUW122_EN.pdf b/components/sources/cu_ryuw122/docs/RYUW122_EN.pdf new file mode 100644 index 00000000000..ae4f370fb25 Binary files /dev/null and b/components/sources/cu_ryuw122/docs/RYUW122_EN.pdf differ diff --git a/components/sources/cu_ryuw122/src/lib.rs b/components/sources/cu_ryuw122/src/lib.rs new file mode 100644 index 00000000000..48dad6c17b2 --- /dev/null +++ b/components/sources/cu_ryuw122/src/lib.rs @@ -0,0 +1,624 @@ +#![cfg_attr(not(feature = "std"), no_std)] + +extern crate alloc; + +mod protocol; + +use alloc::collections::VecDeque; +use alloc::format; +use alloc::string::{String, ToString}; +use alloc::vec::Vec; +use core::fmt; +#[cfg(feature = "std")] +use cu_linux_resources::LinuxSerialPort; +use cu_sensor_payloads::{PeerRangeObservation, RangePeerId}; +use cu29::clock::{CuTime, Tov}; +use cu29::prelude::*; +use cu29::resource::{Owned, ResourceBindingMap, ResourceBindings, ResourceManager}; +use embedded_io::{ErrorKind, ErrorType, Read, Write}; + +use crate::protocol::{ModemEvent, parse_line}; + +const DEFAULT_READ_BUFFER_BYTES: usize = 512; +const DEFAULT_MAX_PENDING_OBSERVATIONS: usize = 32; +const DEFAULT_RESPONSE_TIMEOUT_MS: u64 = 250; +const DEFAULT_POLL_PAYLOAD: &str = "PING"; +const DEFAULT_LINE_BUFFER_BYTES: usize = 256; +const MODEM_ADDRESS_MAX_BYTES: usize = 8; +const MODEM_PAYLOAD_MAX_BYTES: usize = 12; + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum Binding { + Serial, +} + +pub struct Ryuw122ResourcesT { + pub serial: Owned, +} + +#[cfg(feature = "std")] +pub type Ryuw122Resources = Ryuw122ResourcesT; + +impl<'r, S: 'static + Send + Sync> ResourceBindings<'r> for Ryuw122ResourcesT { + type Binding = Binding; + + fn from_bindings( + manager: &'r mut ResourceManager, + mapping: Option<&ResourceBindingMap>, + ) -> CuResult { + let mapping = mapping.ok_or_else(|| { + CuError::from("Ryuw122InitiatorSourceTask requires a `serial` resource mapping") + })?; + let path = mapping.get(Binding::Serial).ok_or_else(|| { + CuError::from( + "Ryuw122InitiatorSourceTask resources must include `serial: `", + ) + })?; + + let serial = manager + .take::(path.typed()) + .map_err(|e| e.add_cause("Failed to fetch RYUW122 serial resource"))?; + + Ok(Self { serial }) + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +struct InFlightRequest { + anchor_index: usize, + sent_at_ns: u64, +} + +#[derive(Clone, Copy, Debug, PartialEq)] +struct PendingObservation { + observed_at: CuTime, + observation: PeerRangeObservation, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum DecodedLine { + Observation { + anchor_id: RangePeerId, + distance_cm: u32, + rssi_dbm: Option, + }, + Error, + Ignore, +} + +#[derive(Reflect)] +#[reflect(no_field_bounds, from_reflect = false, type_path = false)] +pub struct Ryuw122InitiatorSourceTask { + #[reflect(ignore)] + serial: S, + #[reflect(ignore)] + read_buffer: Vec, + #[reflect(ignore)] + line_buffer: Vec, + #[reflect(ignore)] + pending_observations: VecDeque, + max_pending_observations: usize, + #[reflect(ignore)] + anchor_ids: Vec, + #[reflect(ignore)] + anchor_commands: Vec>, + poll_payload: String, + response_timeout_ns: u64, + next_anchor_index: usize, + #[reflect(ignore)] + in_flight: Option, +} + +#[cfg(feature = "std")] +pub type Ryuw122InitiatorSource = Ryuw122InitiatorSourceTask; + +impl TypePath for Ryuw122InitiatorSourceTask { + fn type_path() -> &'static str { + "cu_ryuw122::Ryuw122InitiatorSourceTask" + } + + fn short_type_path() -> &'static str { + "Ryuw122InitiatorSourceTask" + } + + fn type_ident() -> Option<&'static str> { + Some("Ryuw122InitiatorSourceTask") + } + + fn crate_name() -> Option<&'static str> { + Some("cu_ryuw122") + } + + fn module_path() -> Option<&'static str> { + Some("cu_ryuw122") + } +} + +impl fmt::Debug for Ryuw122InitiatorSourceTask { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Ryuw122InitiatorSourceTask") + .field("anchor_count", &self.anchor_ids.len()) + .field("poll_payload", &self.poll_payload) + .field("response_timeout_ns", &self.response_timeout_ns) + .field("max_pending_observations", &self.max_pending_observations) + .field("next_anchor_index", &self.next_anchor_index) + .field("in_flight", &self.in_flight) + .finish() + } +} + +impl Freezable for Ryuw122InitiatorSourceTask {} + +impl CuSrcTask for Ryuw122InitiatorSourceTask +where + S: Read + Write + ErrorType + Send + Sync + 'static, + ::Error: embedded_io::Error + fmt::Debug + 'static, +{ + type Resources<'r> = Ryuw122ResourcesT; + type Output<'m> = output_msg!(PeerRangeObservation); + + fn new(config: Option<&ComponentConfig>, resources: Self::Resources<'_>) -> CuResult + where + Self: Sized, + { + let anchor_ids = config_anchor_ids(config)?; + let poll_payload = config_string(config, "poll_payload", DEFAULT_POLL_PAYLOAD)?; + validate_poll_payload(&poll_payload)?; + + let read_buffer_bytes = config_u32( + config, + "read_buffer_bytes", + DEFAULT_READ_BUFFER_BYTES as u32, + )? as usize; + let max_pending_observations = config_u32( + config, + "max_pending_observations", + DEFAULT_MAX_PENDING_OBSERVATIONS as u32, + )? as usize; + let response_timeout_ms = + config_u64(config, "response_timeout_ms", DEFAULT_RESPONSE_TIMEOUT_MS)?; + + Ok(Self { + serial: resources.serial.0, + read_buffer: alloc::vec![0_u8; read_buffer_bytes.max(64)], + line_buffer: Vec::with_capacity(read_buffer_bytes.max(DEFAULT_LINE_BUFFER_BYTES)), + pending_observations: VecDeque::with_capacity(max_pending_observations.max(1)), + max_pending_observations: max_pending_observations.max(1), + anchor_commands: build_anchor_commands(&anchor_ids, &poll_payload), + anchor_ids, + poll_payload, + response_timeout_ns: response_timeout_ms.max(1).saturating_mul(1_000_000), + next_anchor_index: 0, + in_flight: None, + }) + } + + fn start(&mut self, ctx: &CuContext) -> CuResult<()> { + self.line_buffer.clear(); + self.pending_observations.clear(); + self.next_anchor_index = 0; + self.in_flight = None; + self.drive_request_cycle(ctx.now().as_nanos()) + } + + fn process(&mut self, ctx: &CuContext, output: &mut Self::Output<'_>) -> CuResult<()> { + let observed_at = ctx.now(); + self.read_and_decode(observed_at)?; + self.drive_request_cycle(observed_at.as_nanos())?; + + if let Some(pending) = self.pending_observations.pop_front() { + output.tov = Tov::Time(pending.observed_at); + output.set_payload(pending.observation); + } + + Ok(()) + } + + fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> { + self.in_flight = None; + self.pending_observations.clear(); + self.line_buffer.clear(); + Ok(()) + } +} + +impl Ryuw122InitiatorSourceTask +where + S: Read + Write + ErrorType + Send + Sync + 'static, + ::Error: embedded_io::Error + fmt::Debug + 'static, +{ + fn drive_request_cycle(&mut self, now_ns: u64) -> CuResult<()> { + if self.anchor_ids.is_empty() { + return Ok(()); + } + + if let Some(in_flight) = self.in_flight { + if now_ns.saturating_sub(in_flight.sent_at_ns) < self.response_timeout_ns { + return Ok(()); + } + self.next_anchor_index = (in_flight.anchor_index + 1) % self.anchor_ids.len(); + self.in_flight = None; + } + + self.send_request(self.next_anchor_index, now_ns) + } + + fn send_request(&mut self, anchor_index: usize, now_ns: u64) -> CuResult<()> { + let serial = &mut self.serial; + let command = &self.anchor_commands[anchor_index]; + write_all(serial, command)?; + serial + .flush() + .map_err(|e| CuError::from(format!("RYUW122 flush failed: {e:?}")))?; + self.in_flight = Some(InFlightRequest { + anchor_index, + sent_at_ns: now_ns, + }); + Ok(()) + } + + fn read_and_decode(&mut self, observed_at: CuTime) -> CuResult<()> { + loop { + match self.serial.read(&mut self.read_buffer) { + Ok(0) => break, + Ok(n) => { + let (line_buffer, read_buffer) = (&mut self.line_buffer, &self.read_buffer); + append_read_bytes(line_buffer, &read_buffer[..n]); + self.decode_from_buffer(observed_at); + + if n < self.read_buffer.len() { + break; + } + } + Err(e) + if matches!( + embedded_io::Error::kind(&e), + ErrorKind::TimedOut | ErrorKind::Interrupted + ) => + { + break; + } + Err(e) => return Err(CuError::from(format!("RYUW122 serial read failed: {e:?}"))), + } + } + + Ok(()) + } + + fn decode_from_buffer(&mut self, observed_at: CuTime) { + while let Some(newline_idx) = self.line_buffer.iter().position(|byte| *byte == b'\n') { + let decoded = { + let line_bytes = &self.line_buffer[..=newline_idx]; + let line = match core::str::from_utf8(line_bytes) { + Ok(line) => line, + Err(_) => { + self.line_buffer.drain(..=newline_idx); + continue; + } + }; + + match parse_line(line) { + Ok(ModemEvent::RangeResponse(event)) => { + match RangePeerId::try_from(event.peer_id) { + Ok(anchor_id) => DecodedLine::Observation { + anchor_id, + distance_cm: event.distance_cm, + rssi_dbm: event.rssi_dbm, + }, + Err(_) => DecodedLine::Ignore, + } + } + Ok(ModemEvent::Error) => DecodedLine::Error, + Ok(_) | Err(_) => DecodedLine::Ignore, + } + }; + + self.line_buffer.drain(..=newline_idx); + + match decoded { + DecodedLine::Observation { + anchor_id, + distance_cm, + rssi_dbm, + } => self.handle_range_response(observed_at, anchor_id, distance_cm, rssi_dbm), + DecodedLine::Error => { + if let Some(in_flight) = self.in_flight.take() { + self.next_anchor_index = + (in_flight.anchor_index + 1) % self.anchor_ids.len(); + } + } + DecodedLine::Ignore => {} + } + } + } + + fn handle_range_response( + &mut self, + observed_at: CuTime, + anchor_id: RangePeerId, + distance_cm: u32, + rssi_dbm: Option, + ) { + self.push_pending_observation(PendingObservation { + observed_at, + observation: PeerRangeObservation::from_centimeters(anchor_id, distance_cm, rssi_dbm), + }); + + if let Some(in_flight) = self.in_flight + && self.anchor_ids[in_flight.anchor_index] == anchor_id + { + self.next_anchor_index = (in_flight.anchor_index + 1) % self.anchor_ids.len(); + self.in_flight = None; + } + } + + fn push_pending_observation(&mut self, observation: PendingObservation) { + if self.pending_observations.len() >= self.max_pending_observations { + self.pending_observations.pop_front(); + } + self.pending_observations.push_back(observation); + } +} + +fn config_u32(config: Option<&ComponentConfig>, key: &str, default: u32) -> CuResult { + if let Some(cfg) = config { + Ok(cfg.get::(key)?.unwrap_or(default)) + } else { + Ok(default) + } +} + +fn config_u64(config: Option<&ComponentConfig>, key: &str, default: u64) -> CuResult { + if let Some(cfg) = config { + Ok(cfg.get::(key)?.unwrap_or(default)) + } else { + Ok(default) + } +} + +fn config_string(config: Option<&ComponentConfig>, key: &str, default: &str) -> CuResult { + if let Some(cfg) = config { + Ok(cfg + .get::(key)? + .unwrap_or_else(|| default.to_string())) + } else { + Ok(default.to_string()) + } +} + +fn config_anchor_ids(config: Option<&ComponentConfig>) -> CuResult> { + let config = config.ok_or_else(|| { + CuError::from("Ryuw122InitiatorSourceTask requires a config with non-empty `anchor_ids`") + })?; + + let raw_anchor_ids: Vec = config.get_value("anchor_ids")?.ok_or_else(|| { + CuError::from("Ryuw122InitiatorSourceTask config must include `anchor_ids`") + })?; + + if raw_anchor_ids.is_empty() { + return Err(CuError::from( + "Ryuw122InitiatorSourceTask config must include at least one anchor id", + )); + } + + raw_anchor_ids + .into_iter() + .map(|anchor_id| { + validate_anchor_id(&anchor_id)?; + RangePeerId::try_from(anchor_id.as_str()).map_err(|err| { + CuError::from(format!("anchor id `{anchor_id}` is not valid: {err}")) + }) + }) + .collect() +} + +fn build_anchor_commands(anchor_ids: &[RangePeerId], payload: &str) -> Vec> { + let payload_len = payload.len(); + anchor_ids + .iter() + .map(|anchor_id| { + format!( + "AT+ANCHOR_SEND={},{},{}\r\n", + anchor_id.as_str(), + payload_len, + payload + ) + .into_bytes() + }) + .collect() +} + +fn append_read_bytes(line_buffer: &mut Vec, bytes: &[u8]) { + if line_buffer.len() + bytes.len() > line_buffer.capacity() { + line_buffer.clear(); + } + + debug_assert!(bytes.len() <= line_buffer.capacity()); + line_buffer.extend_from_slice(bytes); +} + +fn write_all(serial: &mut S, bytes: &[u8]) -> CuResult<()> +where + S: Write + ErrorType, + ::Error: embedded_io::Error + fmt::Debug + 'static, +{ + let mut written = 0; + while written < bytes.len() { + let n = serial + .write(&bytes[written..]) + .map_err(|e| CuError::from(format!("RYUW122 write failed: {e:?}")))?; + if n == 0 { + return Err(CuError::from("RYUW122 write failed: zero-byte write")); + } + written += n; + } + Ok(()) +} + +fn validate_anchor_id(anchor_id: &str) -> CuResult<()> { + if anchor_id.is_empty() { + return Err(CuError::from("RYUW122 anchor ids must not be empty")); + } + if !anchor_id.is_ascii() { + return Err(CuError::from("RYUW122 anchor ids must be ASCII")); + } + if anchor_id.len() > MODEM_ADDRESS_MAX_BYTES { + return Err(CuError::from(format!( + "RYUW122 anchor id `{anchor_id}` exceeds {MODEM_ADDRESS_MAX_BYTES} bytes" + ))); + } + Ok(()) +} + +fn validate_poll_payload(payload: &str) -> CuResult<()> { + if !payload.is_ascii() { + return Err(CuError::from("RYUW122 poll payload must be ASCII")); + } + if payload.len() > MODEM_PAYLOAD_MAX_BYTES { + return Err(CuError::from(format!( + "RYUW122 poll payload exceeds {MODEM_PAYLOAD_MAX_BYTES} bytes" + ))); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use alloc::collections::VecDeque; + use cu29::clock::CuDuration; + + #[derive(Default)] + struct FakeSerial { + reads: VecDeque, std::io::Error>>, + writes: Vec>, + } + + impl FakeSerial { + fn with_reads(reads: impl IntoIterator, std::io::Error>>) -> Self { + Self { + reads: reads.into_iter().collect(), + writes: Vec::new(), + } + } + + fn writes_as_strings(&self) -> Vec { + self.writes + .iter() + .map(|bytes| String::from_utf8(bytes.clone()).unwrap()) + .collect() + } + } + + impl ErrorType for FakeSerial { + type Error = std::io::Error; + } + + impl Read for FakeSerial { + fn read(&mut self, buf: &mut [u8]) -> Result { + let Some(next) = self.reads.pop_front() else { + return Err(std::io::Error::from(std::io::ErrorKind::TimedOut)); + }; + match next { + Ok(chunk) => { + let len = chunk.len().min(buf.len()); + buf[..len].copy_from_slice(&chunk[..len]); + Ok(len) + } + Err(err) => Err(err), + } + } + } + + impl Write for FakeSerial { + fn write(&mut self, buf: &[u8]) -> Result { + self.writes.push(buf.to_vec()); + Ok(buf.len()) + } + + fn flush(&mut self) -> Result<(), Self::Error> { + Ok(()) + } + } + + fn build_task(serial: FakeSerial) -> Ryuw122InitiatorSourceTask { + Ryuw122InitiatorSourceTask { + serial, + read_buffer: alloc::vec![0_u8; 64], + line_buffer: Vec::with_capacity(64), + pending_observations: VecDeque::new(), + max_pending_observations: 8, + anchor_ids: alloc::vec![ + RangePeerId::new("ANCH0001").unwrap(), + RangePeerId::new("ANCH0002").unwrap(), + ], + anchor_commands: alloc::vec![ + b"AT+ANCHOR_SEND=ANCH0001,4,PING\r\n".to_vec(), + b"AT+ANCHOR_SEND=ANCH0002,4,PING\r\n".to_vec(), + ], + poll_payload: "PING".to_string(), + response_timeout_ns: 50_000_000, + next_anchor_index: 0, + in_flight: None, + } + } + + #[test] + fn start_sends_first_request_immediately() { + let mut task = build_task(FakeSerial::default()); + let (ctx, _mock) = CuContext::new_mock_clock(); + + task.start(&ctx).unwrap(); + + assert_eq!( + task.serial.writes_as_strings(), + alloc::vec!["AT+ANCHOR_SEND=ANCH0001,4,PING\r\n".to_string()] + ); + } + + #[test] + fn advances_to_next_anchor_after_timeout() { + let mut task = build_task(FakeSerial::default()); + let (ctx, mock) = CuContext::new_mock_clock(); + + task.start(&ctx).unwrap(); + mock.increment(CuDuration::from_millis(60)); + + let mut output = as CuSrcTask>::Output::default(); + task.process(&ctx, &mut output).unwrap(); + + assert_eq!( + task.serial.writes_as_strings(), + alloc::vec![ + "AT+ANCHOR_SEND=ANCH0001,4,PING\r\n".to_string(), + "AT+ANCHOR_SEND=ANCH0002,4,PING\r\n".to_string() + ] + ); + } + + #[test] + fn response_emits_observation_and_immediately_sends_next_request() { + let response = b"+ANCHOR_RCV=ANCH0001,5,HELLO,40 cm,-71 dBm\r\n".to_vec(); + let mut task = build_task(FakeSerial::with_reads([Ok(response)])); + let (ctx, _mock) = CuContext::new_mock_clock(); + + task.start(&ctx).unwrap(); + let mut output = as CuSrcTask>::Output::default(); + task.process(&ctx, &mut output).unwrap(); + + let payload = output.payload().expect("observation payload"); + assert_eq!(payload.peer_id.as_str(), "ANCH0001"); + assert_eq!( + payload.distance.get::(), + 0.4 + ); + assert_eq!(payload.rssi_dbm, Some(-71)); + assert_eq!( + task.serial.writes_as_strings(), + alloc::vec![ + "AT+ANCHOR_SEND=ANCH0001,4,PING\r\n".to_string(), + "AT+ANCHOR_SEND=ANCH0002,4,PING\r\n".to_string() + ] + ); + } +} diff --git a/components/sources/cu_ryuw122/src/protocol.rs b/components/sources/cu_ryuw122/src/protocol.rs new file mode 100644 index 00000000000..6137161e4c5 --- /dev/null +++ b/components/sources/cu_ryuw122/src/protocol.rs @@ -0,0 +1,155 @@ +use core::fmt; + +const RANGE_RESPONSE_PREFIX: &str = "+ANCHOR_RCV="; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct RangeResponseEvent<'a> { + pub peer_id: &'a str, + pub distance_cm: u32, + pub rssi_dbm: Option, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum ModemEvent<'a> { + RangeResponse(RangeResponseEvent<'a>), + CommandOk, + Ready, + Reset, + Error, + Other(&'a str), +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum ParseError { + MissingComma, + InvalidUtf8, + InvalidPayloadLength, + InvalidPayloadBytes, + InvalidDistance, + InvalidRssi, +} + +impl fmt::Display for ParseError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::MissingComma => write!(f, "missing comma in modem line"), + Self::InvalidUtf8 => write!(f, "modem line is not valid UTF-8"), + Self::InvalidPayloadLength => write!(f, "invalid modem payload length"), + Self::InvalidPayloadBytes => write!(f, "modem payload bytes do not match length"), + Self::InvalidDistance => write!(f, "invalid modem distance field"), + Self::InvalidRssi => write!(f, "invalid modem RSSI field"), + } + } +} + +impl core::error::Error for ParseError {} + +pub fn parse_line(line: &str) -> Result, ParseError> { + let line = line.trim(); + + if line.is_empty() { + return Ok(ModemEvent::Other(line)); + } + if line == "+OK" { + return Ok(ModemEvent::CommandOk); + } + if line == "+READY" { + return Ok(ModemEvent::Ready); + } + if line == "+RESET" { + return Ok(ModemEvent::Reset); + } + if line.starts_with("+ERR") { + return Ok(ModemEvent::Error); + } + if let Some(rest) = line.strip_prefix(RANGE_RESPONSE_PREFIX) { + return parse_range_response(rest).map(ModemEvent::RangeResponse); + } + + Ok(ModemEvent::Other(line)) +} + +fn parse_range_response(rest: &str) -> Result, ParseError> { + let first_comma = rest.find(',').ok_or(ParseError::MissingComma)?; + let peer_id = &rest[..first_comma]; + + let remaining = &rest[first_comma + 1..]; + let second_comma = remaining.find(',').ok_or(ParseError::MissingComma)?; + let payload_len: usize = remaining[..second_comma] + .trim() + .parse() + .map_err(|_| ParseError::InvalidPayloadLength)?; + + let payload_and_tail = &remaining[second_comma + 1..]; + let payload_and_tail_bytes = payload_and_tail.as_bytes(); + if payload_and_tail_bytes.len() < payload_len + 1 { + return Err(ParseError::InvalidPayloadBytes); + } + if payload_and_tail_bytes[payload_len] != b',' { + return Err(ParseError::InvalidPayloadBytes); + } + let _payload = core::str::from_utf8(&payload_and_tail_bytes[..payload_len]) + .map_err(|_| ParseError::InvalidUtf8)?; + + let tail = core::str::from_utf8(&payload_and_tail_bytes[payload_len + 1..]) + .map_err(|_| ParseError::InvalidUtf8)?; + let (distance_field, rssi_field) = match tail.split_once(',') { + Some((distance, rssi)) => (distance, Some(rssi)), + None => (tail, None), + }; + + let distance_cm = parse_distance_cm(distance_field)?; + let rssi_dbm = rssi_field.map(parse_rssi_dbm).transpose()?; + + Ok(RangeResponseEvent { + peer_id: peer_id.trim(), + distance_cm, + rssi_dbm, + }) +} + +fn parse_distance_cm(field: &str) -> Result { + let field = field.trim(); + let field = field.strip_suffix("cm").unwrap_or(field); + let field = field.trim(); + field.parse().map_err(|_| ParseError::InvalidDistance) +} + +fn parse_rssi_dbm(field: &str) -> Result { + let field = field.trim(); + let field = field.strip_suffix("dBm").unwrap_or(field); + let field = field.strip_suffix("dbm").unwrap_or(field); + let field = field.trim(); + field.parse().map_err(|_| ParseError::InvalidRssi) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_range_response_without_rssi() { + let event = parse_line("+ANCHOR_RCV=DAVID123,5,HELLO,40 cm").unwrap(); + assert_eq!( + event, + ModemEvent::RangeResponse(RangeResponseEvent { + peer_id: "DAVID123", + distance_cm: 40, + rssi_dbm: None, + }) + ); + } + + #[test] + fn parses_range_response_with_rssi_and_comma_payload() { + let event = parse_line("+ANCHOR_RCV=DAVID123,5,HE,LO,40 cm,-71 dBm").unwrap(); + assert_eq!( + event, + ModemEvent::RangeResponse(RangeResponseEvent { + peer_id: "DAVID123", + distance_cm: 40, + rssi_dbm: Some(-71), + }) + ); + } +} diff --git a/examples/cu_ryuw122_probe/Cargo.toml b/examples/cu_ryuw122_probe/Cargo.toml new file mode 100644 index 00000000000..24f5d1c5462 --- /dev/null +++ b/examples/cu_ryuw122_probe/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "cu-ryuw122-probe" +description = "Small robot-side probe for the REYAX RYUW122 Copper source" +version.workspace = true +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +keywords.workspace = true +categories.workspace = true +homepage.workspace = true +repository.workspace = true +default-run = "cu-ryuw122-probe" + +publish = false + +[[bin]] +name = "cu-ryuw122-probe" +path = "src/main.rs" + +[dependencies] +clap = { workspace = true } +ctrlc = { workspace = true } +cu-linux-resources = { workspace = true } +cu-ryuw122 = { path = "../../components/sources/cu_ryuw122", version = "1.0.0-rc2" } +cu-sensor-payloads = { workspace = true } +cu29 = { workspace = true } +serialport = { workspace = true } diff --git a/examples/cu_ryuw122_probe/README.md b/examples/cu_ryuw122_probe/README.md new file mode 100644 index 00000000000..b5c5f4a1e40 --- /dev/null +++ b/examples/cu_ryuw122_probe/README.md @@ -0,0 +1,54 @@ +# cu_ryuw122_probe + +Minimal robot-side Copper example for two REYAX `RYUW122` modules. + +It gives you three short flows: + +- configure the robot module that runs the Copper source +- configure the second module as the fixed localization anchor +- run the Copper source against the robot serial port and print live ranges + +The Copper source runs on the modem that uses the REYAX vendor `ANCHOR` role, because that is +the side that reports distance over UART. The fixed space anchor uses the vendor `TAG` role. + +The example keeps the test setup intentionally fixed: + +- robot address: `ROBOT001` +- fixed anchor address: `ANCH0001` +- network id: `REYAX123` +- CPIN: `FABC0002EEDCAA90FABC0002EEDCAA90` + +## Quick start + +From this directory: + +```bash +just robot +just anchor +just probe +``` + +Default serial ports: + +- robot / Copper source: `/dev/ttyACM0` +- fixed anchor: `/dev/ttyACM1` + +Override them by passing a port explicitly: + +```bash +just robot /dev/ttyUSB0 +just anchor /dev/ttyUSB1 +just probe /dev/ttyUSB0 +``` + +The `probe` command writes a normal Copper log to: + +```text +logs/cu_ryuw122_probe.copper +``` + +If no ranges appear, check that: + +- both modules are powered and visible as serial devices +- the second module was configured with `just anchor` +- the robot modem is the port passed to `just probe` diff --git a/examples/cu_ryuw122_probe/build.rs b/examples/cu_ryuw122_probe/build.rs new file mode 100644 index 00000000000..68691d2cad8 --- /dev/null +++ b/examples/cu_ryuw122_probe/build.rs @@ -0,0 +1,6 @@ +fn main() { + println!( + "cargo:rustc-env=LOG_INDEX_DIR={}", + std::env::var("OUT_DIR").unwrap() + ); +} diff --git a/examples/cu_ryuw122_probe/copperconfig.ron b/examples/cu_ryuw122_probe/copperconfig.ron new file mode 100644 index 00000000000..ddf92b22318 --- /dev/null +++ b/examples/cu_ryuw122_probe/copperconfig.ron @@ -0,0 +1,44 @@ +( + logging: ( + enable_task_logging: true, + slab_size_mib: 16, + section_size_mib: 4, + keyframe_interval: 3, + ), + resources: [ + ( + id: "linux", + provider: "cu_linux_resources::LinuxResources", + config: { + "serial0_dev": "/dev/ttyACM0", + "serial0_baudrate": 115200, + "serial0_timeout_ms": 20, + }, + ), + ], + tasks: [ + ( + id: "ranges", + type: "cu_ryuw122::Ryuw122InitiatorSource", + resources: {"serial": "linux.serial0"}, + config: { + "anchor_ids": ["ANCH0001"], + "poll_payload": "PING", + "response_timeout_ms": 250, + "read_buffer_bytes": 512, + "max_pending_observations": 32, + }, + ), + ( + id: "sink", + type: "tasks::RangeCaptureSink", + ), + ], + cnx: [ + ( + src: "ranges", + dst: "sink", + msg: "cu_sensor_payloads::PeerRangeObservation", + ), + ], +) \ No newline at end of file diff --git a/examples/cu_ryuw122_probe/justfile b/examples/cu_ryuw122_probe/justfile new file mode 100644 index 00000000000..d00deb7120d --- /dev/null +++ b/examples/cu_ryuw122_probe/justfile @@ -0,0 +1,26 @@ +import "../../justfile" + +# Configure the robot-side modem. This uses the REYAX vendor initiator role. +robot port="/dev/ttyACM0": + #!/usr/bin/env bash + set -euo pipefail + cd "{{ROOT}}" + cargo run -p cu-ryuw122-probe -- robot "{{port}}" + +# Configure a fixed localization anchor. This uses the REYAX vendor responder role. +anchor port="/dev/ttyACM1": + #!/usr/bin/env bash + set -euo pipefail + cd "{{ROOT}}" + cargo run -p cu-ryuw122-probe -- anchor "{{port}}" + +# Run the Copper source against the robot serial port. +probe port="/dev/ttyACM0" duration_s="": + #!/usr/bin/env bash + set -euo pipefail + cd "{{ROOT}}" + if [[ -n "{{duration_s}}" ]]; then + cargo run -p cu-ryuw122-probe -- probe "{{port}}" --duration-s "{{duration_s}}" + else + cargo run -p cu-ryuw122-probe -- probe "{{port}}" + fi diff --git a/examples/cu_ryuw122_probe/src/lib.rs b/examples/cu_ryuw122_probe/src/lib.rs new file mode 100644 index 00000000000..24618492d66 --- /dev/null +++ b/examples/cu_ryuw122_probe/src/lib.rs @@ -0,0 +1,476 @@ +use clap::{Args, Parser, Subcommand}; +use cu_linux_resources::{SERIAL0_BAUDRATE_KEY, SERIAL0_DEV_KEY, SERIAL0_TIMEOUT_MS_KEY}; +use cu_sensor_payloads::PeerRangeObservation; +use cu29::prelude::*; +use cu29::units::si::length::meter; +use serialport::SerialPort; +use std::fs; +use std::io::ErrorKind; +use std::path::{Path, PathBuf}; +use std::sync::{ + Arc, LazyLock, Mutex, + atomic::{AtomicBool, Ordering}, +}; +use std::thread; +use std::time::{Duration, Instant}; + +const DEFAULT_BAUDRATE: u32 = 115_200; +const DEFAULT_SERIAL_TIMEOUT_MS: u64 = 20; +const DEFAULT_SETUP_TIMEOUT_MS: u64 = 100; +const DEFAULT_NETWORK_ID: &str = "REYAX123"; +const DEFAULT_CPIN: &str = "FABC0002EEDCAA90FABC0002EEDCAA90"; +const DEFAULT_ROBOT_ADDRESS: &str = "ROBOT001"; +const DEFAULT_FIXED_ANCHOR_ADDRESS: &str = "ANCH0001"; +const LOG_SLAB_SIZE: Option = Some(16 * 1024 * 1024); + +#[derive(Clone, Copy, Debug, Default, PartialEq)] +pub struct ProbeSnapshot { + pub count: u64, + pub last: Option, +} + +static SNAPSHOT: LazyLock> = + LazyLock::new(|| Mutex::new(ProbeSnapshot::default())); + +pub fn reset_probe_snapshot() { + *SNAPSHOT.lock().expect("snapshot mutex poisoned") = ProbeSnapshot::default(); +} + +pub fn probe_snapshot() -> ProbeSnapshot { + *SNAPSHOT.lock().expect("snapshot mutex poisoned") +} + +pub mod tasks { + use super::*; + + #[derive(Default, Reflect)] + pub struct RangeCaptureSink; + + impl Freezable for RangeCaptureSink {} + + impl CuSinkTask for RangeCaptureSink { + type Resources<'r> = (); + type Input<'m> = input_msg!(PeerRangeObservation); + + fn new( + _config: Option<&ComponentConfig>, + _resources: Self::Resources<'_>, + ) -> CuResult { + Ok(Self) + } + + fn process(&mut self, _ctx: &CuContext, input: &Self::Input<'_>) -> CuResult<()> { + let Some(observation) = input.payload() else { + return Ok(()); + }; + + let mut snapshot = SNAPSHOT.lock().expect("snapshot mutex poisoned"); + snapshot.count = snapshot.count.saturating_add(1); + snapshot.last = Some(*observation); + Ok(()) + } + } +} + +#[copper_runtime(config = "copperconfig.ron")] +struct Ryuw122ProbeApp {} + +#[derive(Parser, Debug)] +#[command( + name = "cu-ryuw122-probe", + about = "Configure and probe a robot-side REYAX RYUW122 ranging link" +)] +struct Cli { + #[command(subcommand)] + command: Command, +} + +#[derive(Subcommand, Debug)] +enum Command { + Robot(SerialPortArgs), + Anchor(SerialPortArgs), + Probe(ProbeArgs), +} + +#[derive(Args, Debug, Clone)] +struct SerialPortArgs { + #[arg(value_name = "PORT")] + port: PathBuf, + + #[arg(long, default_value_t = DEFAULT_BAUDRATE)] + baudrate: u32, +} + +#[derive(Args, Debug, Clone)] +struct ProbeArgs { + #[command(flatten)] + serial: SerialPortArgs, + + #[arg(long)] + duration_s: Option, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum DeviceProfile { + Robot, + Anchor, +} + +impl DeviceProfile { + fn label(self) -> &'static str { + match self { + Self::Robot => "robot", + Self::Anchor => "fixed anchor", + } + } + + fn vendor_mode_value(self) -> u8 { + match self { + Self::Robot => 1, + Self::Anchor => 0, + } + } + + fn vendor_mode_label(self) -> &'static str { + match self { + Self::Robot => "initiator", + Self::Anchor => "responder", + } + } + + fn address(self) -> &'static str { + match self { + Self::Robot => DEFAULT_ROBOT_ADDRESS, + Self::Anchor => DEFAULT_FIXED_ANCHOR_ADDRESS, + } + } +} + +pub fn run_cli() -> CuResult<()> { + match Cli::parse().command { + Command::Robot(args) => configure_modem(DeviceProfile::Robot, &args), + Command::Anchor(args) => configure_modem(DeviceProfile::Anchor, &args), + Command::Probe(args) => run_probe(&args), + } +} + +fn configure_modem(profile: DeviceProfile, args: &SerialPortArgs) -> CuResult<()> { + let mut port = open_serial_port(&args.port, args.baudrate, DEFAULT_SETUP_TIMEOUT_MS)?; + let _ = read_response( + port.as_mut(), + Duration::from_millis(40), + Duration::from_millis(120), + ); + + expect_contains(&send_command(port.as_mut(), "AT")?, "+OK", "probing modem")?; + + for command in setup_commands(profile) { + let response = send_command(port.as_mut(), &command)?; + expect_contains(&response, "+OK", &format!("running `{command}`"))?; + println!("{command} -> {}", one_line(&response)); + } + + for (command, expected) in verify_queries(profile) { + let response = send_command(port.as_mut(), &command)?; + expect_contains(&response, &expected, &format!("verifying `{command}`"))?; + println!("{command} -> {}", one_line(&response)); + } + + println!( + "configured {} {} as vendor {} ({}) on network {}", + profile.label(), + args.port.display(), + profile.vendor_mode_label(), + profile.address(), + DEFAULT_NETWORK_ID, + ); + Ok(()) +} + +fn run_probe(args: &ProbeArgs) -> CuResult<()> { + change_to_manifest_dir()?; + reset_probe_snapshot(); + + let mut config = cu29::read_configuration("copperconfig.ron")?; + override_serial_resource( + &mut config, + &args.serial.port, + args.serial.baudrate, + DEFAULT_SERIAL_TIMEOUT_MS, + ); + + let log_path = prepare_log_path()?; + let mut app = Ryuw122ProbeApp::builder() + .with_config(config) + .with_log_path(&log_path, LOG_SLAB_SIZE)? + .build()?; + + let running = Arc::new(AtomicBool::new(true)); + let running_for_signal = Arc::clone(&running); + ctrlc::set_handler(move || { + running_for_signal.store(false, Ordering::Relaxed); + }) + .map_err(|err| CuError::new_with_cause("failed to install Ctrl-C handler", err))?; + + println!( + "probing fixed anchor {} from robot modem {}", + DEFAULT_FIXED_ANCHOR_ADDRESS, + args.serial.port.display(), + ); + println!("writing Copper log to {}", log_path.display()); + if let Some(duration_s) = args.duration_s { + println!("running for {duration_s}s"); + } else { + println!("press Ctrl-C to stop"); + } + + let started_at = Instant::now(); + let deadline = args.duration_s.map(Duration::from_secs); + + app.start_all_tasks()?; + + let mut last_count = 0; + let mut run_error = None; + while running.load(Ordering::Relaxed) + && deadline + .map(|duration| started_at.elapsed() < duration) + .unwrap_or(true) + { + if let Err(err) = app.run_one_iteration() { + run_error = Some(err); + break; + } + + let snapshot = probe_snapshot(); + if snapshot.count != last_count { + if let Some(last) = snapshot.last { + println!("[{:>4}] {}", snapshot.count, format_observation(last),); + } + last_count = snapshot.count; + } + + thread::sleep(Duration::from_millis(5)); + } + + app.stop_all_tasks()?; + + if let Some(err) = run_error { + return Err(err); + } + + let snapshot = probe_snapshot(); + let Some(last) = snapshot.last else { + return Err(CuError::from(format!( + "no range observations captured; configure the robot with `just robot [port]`, configure the fixed anchor with `just anchor [port]`, and re-run `just probe [port]` for {}", + args.serial.port.display() + ))); + }; + + println!( + "captured {} observations; last {}", + snapshot.count, + format_observation(last), + ); + Ok(()) +} + +fn setup_commands(profile: DeviceProfile) -> Vec { + let mut commands = vec![ + format!("AT+MODE={}", profile.vendor_mode_value()), + format!("AT+NETWORKID={DEFAULT_NETWORK_ID}"), + format!("AT+ADDRESS={}", profile.address()), + format!("AT+CPIN={DEFAULT_CPIN}"), + ]; + + match profile { + DeviceProfile::Robot => commands.push("AT+RSSI=1".to_string()), + DeviceProfile::Anchor => commands.push("AT+TAGD=0,0".to_string()), + } + + commands +} + +fn verify_queries(profile: DeviceProfile) -> Vec<(String, String)> { + let mut queries = vec![ + ( + "AT+MODE?".to_string(), + format!("+MODE={}", profile.vendor_mode_value()), + ), + ( + "AT+NETWORKID?".to_string(), + format!("+NETWORKID={DEFAULT_NETWORK_ID}"), + ), + ( + "AT+ADDRESS?".to_string(), + format!("+ADDRESS={}", profile.address()), + ), + ("AT+CPIN?".to_string(), "+CPIN=".to_string()), + ]; + + match profile { + DeviceProfile::Robot => queries.push(("AT+RSSI?".to_string(), "+RSSI=1".to_string())), + DeviceProfile::Anchor => queries.push(("AT+TAGD?".to_string(), "+TAGD=0,0".to_string())), + } + + queries +} + +fn open_serial_port(port: &Path, baudrate: u32, timeout_ms: u64) -> CuResult> { + serialport::new(port.display().to_string(), baudrate) + .timeout(Duration::from_millis(timeout_ms)) + .open() + .map_err(|err| CuError::new_with_cause("failed to open serial port", err)) +} + +fn send_command(port: &mut dyn SerialPort, command: &str) -> CuResult { + let mut bytes = command.as_bytes().to_vec(); + bytes.extend_from_slice(b"\r\n"); + port.write_all(&bytes) + .map_err(|err| CuError::new_with_cause("failed to write modem command", err))?; + port.flush() + .map_err(|err| CuError::new_with_cause("failed to flush modem command", err))?; + + read_response(port, Duration::from_millis(50), Duration::from_millis(500)) +} + +fn read_response( + port: &mut dyn SerialPort, + quiet_period: Duration, + overall_timeout: Duration, +) -> CuResult { + let started_at = Instant::now(); + let mut last_data_at = None; + let mut response = Vec::new(); + let mut scratch = [0_u8; 256]; + + while started_at.elapsed() < overall_timeout { + match port.read(&mut scratch) { + Ok(0) => {} + Ok(n) => { + response.extend_from_slice(&scratch[..n]); + last_data_at = Some(Instant::now()); + } + Err(err) + if matches!( + err.kind(), + ErrorKind::TimedOut | ErrorKind::WouldBlock | ErrorKind::Interrupted + ) => {} + Err(err) => { + return Err(CuError::new_with_cause( + "failed to read modem response", + err, + )); + } + } + + if !response.is_empty() + && last_data_at + .map(|last| last.elapsed() >= quiet_period) + .unwrap_or(false) + { + break; + } + } + + Ok(String::from_utf8_lossy(&response).trim().to_string()) +} + +fn expect_contains(response: &str, needle: &str, context: &str) -> CuResult<()> { + if response.contains("+ERR") { + return Err(CuError::from(format!( + "{context} failed with modem error: {}", + one_line(response) + ))); + } + if !response.contains(needle) { + return Err(CuError::from(format!( + "{context} expected `{needle}` but got `{}`", + one_line(response) + ))); + } + Ok(()) +} + +fn one_line(response: &str) -> String { + response + .lines() + .map(str::trim) + .filter(|line| !line.is_empty()) + .collect::>() + .join(" | ") +} + +fn change_to_manifest_dir() -> CuResult<()> { + std::env::set_current_dir(env!("CARGO_MANIFEST_DIR")) + .map_err(|err| CuError::new_with_cause("failed to switch to example directory", err)) +} + +fn prepare_log_path() -> CuResult { + let log_path = PathBuf::from("logs/cu_ryuw122_probe.copper"); + if let Some(parent) = log_path.parent() { + fs::create_dir_all(parent) + .map_err(|err| CuError::new_with_cause("failed to create log directory", err))?; + } + Ok(log_path) +} + +fn override_serial_resource(config: &mut CuConfig, port: &Path, baudrate: u32, timeout_ms: u64) { + let Some(bundle) = config + .resources + .iter_mut() + .find(|bundle| bundle.id == "linux") + else { + return; + }; + + let bundle_config = bundle.config.get_or_insert_with(ComponentConfig::new); + bundle_config.set(SERIAL0_DEV_KEY, port.display().to_string()); + bundle_config.set(SERIAL0_BAUDRATE_KEY, baudrate); + bundle_config.set(SERIAL0_TIMEOUT_MS_KEY, timeout_ms); +} + +fn format_observation(observation: PeerRangeObservation) -> String { + let distance_m = observation.distance.get::(); + let rssi = observation + .rssi_dbm + .map(|value| format!("{value} dBm")) + .unwrap_or_else(|| "n/a".to_string()); + + format!( + "anchor={} distance={distance_m:.2}m rssi={rssi}", + observation.peer_id.as_str(), + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn robot_setup_matches_probe_defaults() { + assert_eq!( + setup_commands(DeviceProfile::Robot), + vec![ + "AT+MODE=1".to_string(), + format!("AT+NETWORKID={DEFAULT_NETWORK_ID}"), + format!("AT+ADDRESS={DEFAULT_ROBOT_ADDRESS}"), + format!("AT+CPIN={DEFAULT_CPIN}"), + "AT+RSSI=1".to_string(), + ] + ); + } + + #[test] + fn anchor_setup_matches_probe_target() { + assert_eq!( + setup_commands(DeviceProfile::Anchor), + vec![ + "AT+MODE=0".to_string(), + format!("AT+NETWORKID={DEFAULT_NETWORK_ID}"), + format!("AT+ADDRESS={DEFAULT_FIXED_ANCHOR_ADDRESS}"), + format!("AT+CPIN={DEFAULT_CPIN}"), + "AT+TAGD=0,0".to_string(), + ] + ); + } +} diff --git a/examples/cu_ryuw122_probe/src/main.rs b/examples/cu_ryuw122_probe/src/main.rs new file mode 100644 index 00000000000..c5fddb58f1a --- /dev/null +++ b/examples/cu_ryuw122_probe/src/main.rs @@ -0,0 +1,8 @@ +use cu_ryuw122_probe::run_cli; + +fn main() { + if let Err(err) = run_cli() { + eprintln!("cu-ryuw122-probe failed: {err}"); + std::process::exit(1); + } +}