diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..44045cd --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,26 @@ +# AGENTS.md - Hub for fsdr-blocks + +Building blocks for FutureSDR signal processing library for SDR and real-time DSP. + +## Tech Stack +- **Language:** Rust (Edition 2024, **Nightly channel**) +- **Core Library:** [FutureSDR](https://www.futuresdr.org) +- **Acceleration:** Explicit SIMD via `std::simd` (portable SIMD) +- **Serialization:** Serde, custom PMT (Polymorphic Types) +- **Testing:** Cargo test, QuickCheck, Criterion (benchmarks) + +## Critical Commands +- **Install:** `cargo build` +- **Lint:** `./check.sh` (Runs fmt, clippy, and tests with all features) +- **Test:** `cargo test --all-features` +- **Bench:** `cargo bench --all-features` + +## Documentation Index +- [Architecture](agent_docs/architecture.md): **Trigger:** Designing new blocks or understanding flowgraph connectivity. +- [Conventions](agent_docs/conventions.md): **Trigger:** Before writing any code to ensure alignment with Rust 2024 and FutureSDR idioms. +- [SDR & DSP](agent_docs/sdr_dsp.md): **Trigger:** Modifying signal processing logic, gain control, or frequency shifts. +- [SigMF](agent_docs/sigmf.md): **Trigger:** Working with Signal Metadata Format (SigMF) recordings or collections. + +## Verification Loop +You MUST run `./check.sh` and ensure all tests pass before declaring a task "done." +Always verify that your changes didn't break conditional feature flags (`crossbeam`, `async-channel`, `cw`). diff --git a/Cargo.toml b/Cargo.toml index 22ded0a..24664dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,3 +70,8 @@ name = "shared" path = "benches/cw/shared.rs" harness = false required-features = ["cw"] + +[[bench]] +name = "deinterleave" +path = "benches/stream/deinterleave.rs" +harness = false diff --git a/agent_docs/architecture.md b/agent_docs/architecture.md new file mode 100644 index 0000000..c7177c0 --- /dev/null +++ b/agent_docs/architecture.md @@ -0,0 +1,20 @@ +# Architecture - fsdr-blocks + +This project follows the **Flowgraph/Block** paradigm provided by FutureSDR. + +## Core Concepts +- **Blocks:** The fundamental units of processing. Defined using `#[derive(Block)]`. +- **Flowgraph:** A directed acyclic graph (usually) where blocks are nodes and streams/messages are edges. +- **Kernels:** The execution logic of a block. Most blocks in this repository are CPU-based and implement the `Kernel` trait. + +## Data Movement +- **Streams:** High-throughput data (e.g., IQ samples) passed via `CpuBufferReader`/`CpuBufferWriter`. +- **Messages:** Asynchronous control signals or metadata passed as `Pmt` (Polymorphic Types). + +## Dependency on FutureSDR +This project is tightly coupled with `futuresdr`. It uses a local path dependency in `Cargo.toml` by default (`../FutureSDR`), which indicates it is often developed alongside the core library. + +## Block Types +- **Sources/Sinks:** Handle I/O (e.g., `StdinSink`, `SigmfSource`). +- **Processing Blocks:** Transform data (e.g., `Agc`, `FreqShift`). +- **Adapters:** Connect different async runtimes or channel types (e.g., `CrossbeamSink`). diff --git a/agent_docs/conventions.md b/agent_docs/conventions.md new file mode 100644 index 0000000..304d4e8 --- /dev/null +++ b/agent_docs/conventions.md @@ -0,0 +1,32 @@ +# Conventions - fsdr-blocks + +## Rust Standards +- **Edition:** 2024. Use modern idioms (e.g., `async fn` in traits, let-else). +- **Formatting:** Strictly adhere to `cargo fmt`. + +## Block Implementation Boilerplate +Every block should typically include: +1. **Struct Definition:** Use `#[derive(Block)]` and specify `#[message_inputs(...)]` if applicable. +2. **Implementation:** A `new` method and message handler methods (returning `Result`). +3. **Kernel Trait:** Implement `async fn work` to handle the data processing loop. +4. **Builder Pattern:** Use a `BlockBuilder` struct for complex configuration (see `src/agc.rs`). + +## Performance Acceleration +For blocks in hot loops (AGC, Frequency Shift, Deinterleave), prefer explicit SIMD over compiler-dependent autovectorization: +1. **Feature Flags:** Ensure `portable_simd` and `specialization` are enabled in `src/lib.rs`. +2. **Specialization Pattern:** Define a `*Supported` trait (e.g., `DeinterleaveSupported`) with a `default` scalar implementation and specialized SIMD implementations for `f32`, `u8`, `i8`, `i16`. +3. **Macro Reuse:** Use macros to implement SIMD logic across different types to avoid code duplication. +4. **Benchmarking:** Every accelerated block MUST have a corresponding `Criterion` benchmark in `benches/`. + +## Error Handling +- Use `futuresdr::anyhow::Result` for block operations. +- Prefer `Context` from `anyhow` for descriptive error messages in I/O operations. + +## Feature Management +- Use `#[cfg(feature = "...")]` for blocks that depend on optional crates like `crossbeam-channel` or `async-channel`. +- Always test with `--all-features` to ensure no regressions in optional components. + +## Testing +- **Unit Tests:** Located in `tests/`. +- **Property-Based Testing:** Use `quickcheck` for robust validation of DSP algorithms. +- **Benchmarks:** Located in `benches/`, using `Criterion`. diff --git a/agent_docs/sdr_dsp.md b/agent_docs/sdr_dsp.md new file mode 100644 index 0000000..05a1599 --- /dev/null +++ b/agent_docs/sdr_dsp.md @@ -0,0 +1,16 @@ +# SDR & DSP - fsdr-blocks + +## Signal Processing Idioms +- **Complex Numbers:** Use `num_complex::Complex32` (or generic `T: ComplexFloat`). +- **Buffers:** Always use `input.slice()` and `output.slice()` in the `work` function. +- **Consumption/Production:** Explicitly call `input.consume(n)` and `output.produce(n)` after processing. + +## Key Blocks +- **AGC (Automatic Gain Control):** Implements a feedback loop to maintain target power. See `src/agc.rs`. +- **FreqShift:** Performs digital down-conversion/up-conversion. See `src/math/freq_shift.rs`. +- **Type Converters:** Crucial for translating between raw bytes and SDR-specific types. + +## Math Operations +- **Prefer Explicit SIMD:** Use `std::simd` and specialization (see `DeinterleaveSupported`) for performance-critical blocks in hot loops. This avoids dependency on brittle compiler autovectorization. +- **Precision:** Be mindful of floating-point precision and squelch thresholds. +- **Error Accumulation:** Periodically re-calculate phase in recurrence relations to prevent drift (e.g., in `FreqShift`). diff --git a/agent_docs/sigmf.md b/agent_docs/sigmf.md new file mode 100644 index 0000000..71f9d4c --- /dev/null +++ b/agent_docs/sigmf.md @@ -0,0 +1,16 @@ +# SigMF - fsdr-blocks + +## Overview +The `sigmf` crate (in `crates/sigmf`) provides a Rust implementation of the [Signal Metadata Format](https://github.com/sigmf/SigMF). + +## Structure +- **Global:** Top-level metadata about the recording. +- **Captures:** Segment-specific metadata (sample rate, frequency). +- **Annotations:** Time/frequency-indexed labels. + +## Usage in fsdr-blocks +- `SigmfSource`: Reads `.sigmf-meta` and `.sigmf-data` files into a FutureSDR flowgraph. +- `SigmfSink`: Records flowgraph data into SigMF-compliant files. + +## Extensions +Supports SigMF extensions (e.g., `AntennaExtension`). New extensions should be added as modules in `crates/sigmf/src/`. diff --git a/benches/stream/deinterleave.rs b/benches/stream/deinterleave.rs new file mode 100644 index 0000000..b2e12b9 --- /dev/null +++ b/benches/stream/deinterleave.rs @@ -0,0 +1,28 @@ +use criterion::{Criterion, Throughput, criterion_group, criterion_main}; +use fsdr_blocks::stream::Deinterleave; +use futuresdr::runtime::mocker::{Mocker, Reader, Writer}; +use rand::RngExt; + +pub fn deinterleave_f32(c: &mut Criterion) { + let n_samp = 8192; + let mut rng = rand::rng(); + let input: Vec = (0..n_samp).map(|_| rng.random()).collect(); + + let mut group = c.benchmark_group("deinterleave"); + group.throughput(Throughput::Elements(n_samp as u64)); + + group.bench_function("deinterleave_f32", |b| { + b.iter(|| { + let block: Deinterleave, Writer, Writer> = + Deinterleave::new(); + let mut mocker = Mocker::new(block); + mocker.input().set(input.clone()); + mocker.run(); + }); + }); + + group.finish(); +} + +criterion_group!(benches, deinterleave_f32); +criterion_main!(benches); diff --git a/src/lib.rs b/src/lib.rs index 5f08f41..70350bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,6 @@ -//! This library acts as a toolbox on top of [FutureSDR][`futuresdr`] to easily build your own flowgraph. +#![feature(portable_simd)] +#![feature(specialization)] +//! This library acts as a toolbox on top of [`futuresdr`] to easily build your own flowgraph. //! It is made by the community for the community. // #![feature(async_fn_in_trait)] diff --git a/src/stream/deinterleave.rs b/src/stream/deinterleave.rs index 389c3d6..80c9ac5 100644 --- a/src/stream/deinterleave.rs +++ b/src/stream/deinterleave.rs @@ -1,3 +1,4 @@ +use core::simd::Simd; use futuresdr::prelude::*; /// This blocks deinterleave a unique stream into two separate stream. @@ -41,6 +42,115 @@ where } } +pub trait DeinterleaveSupported: Copy { + fn deinterleave( + first: &mut bool, + input: &[Self], + out0: &mut [Self], + out1: &mut [Self], + ) -> (usize, usize, usize); +} + +fn deinterleave_scalar_logic( + first: &mut bool, + input: &[A], + out0: &mut [A], + out1: &mut [A], +) -> (usize, usize, usize) { + let mut n_in = input.len(); + let n_o0 = out0.len(); + let n_o1 = out1.len(); + + let mut i_ptr = 0; + let mut o0_ptr = 0; + let mut o1_ptr = 0; + + if !*first && n_in > 0 && n_o1 > 0 { + out1[o1_ptr] = input[i_ptr]; + i_ptr += 1; + o1_ptr += 1; + n_in -= 1; + *first = true; + } + + let n = (n_in / 2).min(n_o0 - o0_ptr).min(n_o1 - o1_ptr); + for j in 0..n { + out0[o0_ptr + j] = input[i_ptr + 2 * j]; + out1[o1_ptr + j] = input[i_ptr + 2 * j + 1]; + } + + i_ptr += 2 * n; + o0_ptr += n; + o1_ptr += n; + n_in -= 2 * n; + + if *first && n_in > 0 && out0.len() > o0_ptr { + out0[o0_ptr] = input[i_ptr]; + i_ptr += 1; + o0_ptr += 1; + *first = false; + } + (i_ptr, o0_ptr, o1_ptr) +} + +impl DeinterleaveSupported for A { + default fn deinterleave( + first: &mut bool, + input: &[Self], + out0: &mut [Self], + out1: &mut [Self], + ) -> (usize, usize, usize) { + deinterleave_scalar_logic(first, input, out0, out1) + } +} + +macro_rules! impl_deinterleave_simd { + ($($t:ty),*) => { + $( + impl DeinterleaveSupported for $t { + fn deinterleave(first: &mut bool, input: &[Self], out0: &mut [Self], out1: &mut [Self]) -> (usize, usize, usize) { + let mut n_in = input.len(); + let n_o0 = out0.len(); + let n_o1 = out1.len(); + + let mut i_ptr = 0; + let mut o0_ptr = 0; + let mut o1_ptr = 0; + + if !*first && n_in > 0 && n_o1 > 0 { + out1[o1_ptr] = input[i_ptr]; + i_ptr += 1; + o1_ptr += 1; + n_in -= 1; + *first = true; + } + + const LANES: usize = 8; + let n_simd = (n_in / (2 * LANES)).min((n_o0 - o0_ptr) / LANES).min((n_o1 - o1_ptr) / LANES); + + for _ in 0..n_simd { + let v0 = Simd::<$t, LANES>::from_slice(&input[i_ptr..i_ptr + LANES]); + let v1 = Simd::<$t, LANES>::from_slice(&input[i_ptr + LANES..i_ptr + 2 * LANES]); + + let (even, odd) = v0.deinterleave(v1); + even.copy_to_slice(&mut out0[o0_ptr..o0_ptr + LANES]); + odd.copy_to_slice(&mut out1[o1_ptr..o1_ptr + LANES]); + + i_ptr += 2 * LANES; + o0_ptr += LANES; + o1_ptr += LANES; + } + + let (i_rem, o0_rem, o1_rem) = deinterleave_scalar_logic(first, &input[i_ptr..], &mut out0[o0_ptr..], &mut out1[o1_ptr..]); + (i_ptr + i_rem, o0_ptr + o0_rem, o1_ptr + o1_rem) + } + } + )* + }; +} + +impl_deinterleave_simd!(f32, u8, i8, i16); + impl Default for Deinterleave where A: Send + Sync + Default + Clone + std::fmt::Debug + 'static + Copy, @@ -56,7 +166,7 @@ where #[doc(hidden)] impl Kernel for Deinterleave where - A: Send + Sync + Default + Clone + std::fmt::Debug + 'static + Copy, + A: Send + Sync + Default + Clone + std::fmt::Debug + 'static + Copy + DeinterleaveSupported, I: CpuBufferReader, O0: CpuBufferWriter, O1: CpuBufferWriter, @@ -68,35 +178,11 @@ where _meta: &mut BlockMeta, ) -> Result<()> { let (m, m0, m1) = { - let i0 = self.input.slice(); + let i = self.input.slice(); let o0 = self.out0.slice(); let o1 = self.out1.slice(); - let mut m0 = 0; - let mut m1 = 0; - - let mut it0 = o0.iter_mut(); - let mut it1 = o1.iter_mut(); - - for x in i0.iter() { - if self.first { - if let Some(d) = it0.next() { - *d = *x; - m0 += 1; - } else { - break; - } - } else { - if let Some(d) = it1.next() { - *d = *x; - m1 += 1; - } else { - break; - } - } - self.first = !self.first; - } - (m0 + m1, m0, m1) + A::deinterleave(&mut self.first, i, o0, o1) }; self.input.consume(m); diff --git a/tests/stream/deinterleave.rs b/tests/stream/deinterleave.rs index d60f284..d50ca79 100644 --- a/tests/stream/deinterleave.rs +++ b/tests/stream/deinterleave.rs @@ -12,7 +12,7 @@ fn deinterleave_u8() -> Result<()> { let deinterleaver = Deinterleave::::new(); - let orig: Vec = vec![0, 1, 0, 1, 0, 1, 0, 1, 0, 1]; + let orig: Vec = (0..100).map(|i| (i % 2) as u8).collect(); let src = VectorSource::::new(orig.clone()); let vect_sink_0 = VectorSink::::new(1024); let vect_sink_1 = VectorSink::::new(1024); @@ -30,10 +30,42 @@ fn deinterleave_u8() -> Result<()> { let snk_1 = vect_sink_1.get()?; let snk_1 = snk_1.items(); - assert_eq!(snk_0.len(), orig.len() / 2); - assert_eq!(snk_0.len(), snk_1.len()); + assert_eq!(snk_0.len(), 50); + assert_eq!(snk_1.len(), 50); assert!(snk_0.iter().all(|v| *v == 0)); assert!(snk_1.iter().all(|v| *v == 1)); Ok(()) } + +#[test] +fn deinterleave_odd_f32() -> Result<()> { + let mut fg = Flowgraph::new(); + + let deinterleaver = Deinterleave::::new(); + + let orig: Vec = vec![0.0, 1.0, 2.0, 3.0, 4.0]; + let src = VectorSource::::new(orig.clone()); + let vect_sink_0 = VectorSink::::new(1024); + let vect_sink_1 = VectorSink::::new(1024); + + connect!(fg, + src > deinterleaver; + deinterleaver.out0 > vect_sink_0; + deinterleaver.out1 > vect_sink_1; + ); + Runtime::new().run(fg)?; + + let snk_0 = vect_sink_0.get()?; + let snk_0 = snk_0.items(); + + let snk_1 = vect_sink_1.get()?; + let snk_1 = snk_1.items(); + + assert_eq!(snk_0.len(), 3); + assert_eq!(snk_1.len(), 2); + assert_eq!(snk_0, &[0.0, 2.0, 4.0]); + assert_eq!(snk_1, &[1.0, 3.0]); + + Ok(()) +}