diff --git a/cli/Cargo.lock b/cli/Cargo.lock index b2e618f..2e7ca15 100644 --- a/cli/Cargo.lock +++ b/cli/Cargo.lock @@ -111,6 +111,7 @@ dependencies = [ "chrono", "clap", "csv", + "glob", "indicatif", "noodles", "rand 0.8.5", @@ -472,6 +473,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "glob" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" + [[package]] name = "hashbrown" version = "0.14.5" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index a47889f..bec81a4 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -29,5 +29,6 @@ csv = "1.3" rand = { version = "0.8", features = ["std"] } reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls"] } noodles = { version = "0.104.0", features = ["vcf", "tabix", "bgzf", "core"] } +glob = "0.3" [dev-dependencies] diff --git a/cli/src/commands/long_aggregate.rs b/cli/src/commands/long_aggregate.rs new file mode 100644 index 0000000..30d1c6c --- /dev/null +++ b/cli/src/commands/long_aggregate.rs @@ -0,0 +1,856 @@ +use std::cmp::Ordering; +use std::collections::{BinaryHeap, HashMap, HashSet}; +use std::fs::{self, File}; +use std::hash::{Hash, Hasher}; +use std::io::{BufRead, BufReader, BufWriter, Write}; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; +use std::sync::Mutex; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; + +use anyhow::{bail, Context, Result}; +use rayon::prelude::*; +use rayon::ThreadPoolBuilder; + +use crate::long_rows::{LongRow, LongRowReader, LongRowWriter}; +use crate::AggregateLongArgs; + +pub fn run_long_aggregate(args: AggregateLongArgs) -> Result<()> { + let overall_start = Instant::now(); + let thread_count = resolve_thread_count(args.threads)?; + let pool = ThreadPoolBuilder::new() + .num_threads(thread_count) + .build() + .context("build aggregate-long thread pool")?; + + let input_files = collect_long_files( + &args.inputs, + args.input_list.as_ref(), + args.input_glob.as_deref(), + )?; + if input_files.is_empty() { + bail!(format_input_error(&args)); + } + eprintln!( + "▶️ aggregate-long: {} input file(s), threads={}", + input_files.len(), + thread_count + ); + + let tmp_dir = args + .tmp_dir + .clone() + .unwrap_or_else(|| std::env::temp_dir().join("bvs-long")); + fs::create_dir_all(&tmp_dir).with_context(|| format!("Create temp dir {:?}", tmp_dir))?; + + let chunk_records = + resolve_chunk_records(args.chunk_records, args.max_ram_percent, thread_count)?; + eprintln!( + "🧵 aggregate-long: building chunks + participants (single pass), chunk_records={}", + chunk_records + ); + let chunk_build_start = Instant::now(); + let chunk_result = pool.install(|| { + build_sharded_chunks( + &input_files, + &tmp_dir, + thread_count, + chunk_records, + thread_count, + ) + })?; + if chunk_result.total_rows == 0 { + bail!("No records found in long-row inputs"); + } + eprintln!( + "📦 aggregate-long: scanned {} files, {} rows, {} shards", + input_files.len(), + chunk_result.total_rows, + chunk_result.chunk_files.len() + ); + eprintln!( + "⏱️ aggregate-long: chunk build took {:.2}s", + chunk_build_start.elapsed().as_secs_f64() + ); + + let participant_list = { + let mut list: Vec = chunk_result.participants.into_iter().collect(); + list.sort(); + list + }; + let participant_index: HashMap = participant_list + .iter() + .enumerate() + .map(|(idx, pid)| (pid.clone(), idx)) + .collect(); + + if participant_list.is_empty() { + bail!("No participants found in long-row inputs"); + } + + if thread_count <= 1 { + eprintln!("🧵 aggregate-long: single-threaded merge path"); + let ctx = AggregateContext::new(&participant_list, &participant_index); + aggregate_chunks( + &ctx, + &chunk_result.chunk_files[0], + &args.matrix_tsv, + &args.allele_freq_tsv, + )?; + } else { + eprintln!("🧵 aggregate-long: parallel shard merge path"); + let shard_start = Instant::now(); + let part_outputs = pool.install(|| { + chunk_result + .chunk_files + .par_iter() + .enumerate() + .filter(|(_, chunks)| !chunks.is_empty()) + .map(|(idx, shard_chunks)| { + eprintln!( + "🧩 aggregate-long: merging shard {} ({} chunks)", + idx, + shard_chunks.len() + ); + let part_dir = tmp_dir.join(format!("part-{}", idx)); + fs::create_dir_all(&part_dir) + .with_context(|| format!("Create partition output dir {:?}", part_dir))?; + let matrix_path = part_dir.join("matrix.tsv"); + let allele_path = part_dir.join("allele.tsv"); + let ctx = AggregateContext::new(&participant_list, &participant_index); + aggregate_chunks(&ctx, shard_chunks, &matrix_path, &allele_path)?; + Ok(PartitionOutput { + index: idx, + matrix: matrix_path, + allele: allele_path, + }) + }) + .collect::>>() + })?; + eprintln!( + "⏱️ aggregate-long: shard aggregation took {:.2}s", + shard_start.elapsed().as_secs_f64() + ); + + let concat_start = Instant::now(); + concat_partition_outputs( + &part_outputs, + &participant_list, + &args.matrix_tsv, + &args.allele_freq_tsv, + )?; + eprintln!( + "⏱️ aggregate-long: concatenation took {:.2}s", + concat_start.elapsed().as_secs_f64() + ); + } + + eprintln!( + "✅ aggregate-long: total elapsed {:.2}s", + overall_start.elapsed().as_secs_f64() + ); + Ok(()) +} + +#[derive(Clone)] +struct PartitionOutput { + index: usize, + matrix: PathBuf, + allele: PathBuf, +} + +struct AggregateContext<'a> { + participant_list: &'a [String], + participant_index: &'a HashMap, +} + +impl<'a> AggregateContext<'a> { + fn new(participant_list: &'a [String], participant_index: &'a HashMap) -> Self { + Self { + participant_list, + participant_index, + } + } +} + +fn aggregate_chunks( + ctx: &AggregateContext<'_>, + chunk_files: &[PathBuf], + matrix_path: &Path, + allele_path: &Path, +) -> Result<()> { + let mut matrix_writer = BufWriter::new( + File::create(matrix_path).with_context(|| format!("Create {:?}", matrix_path))?, + ); + write_matrix_header(&mut matrix_writer, ctx.participant_list)?; + + let mut allele_writer = BufWriter::new( + File::create(allele_path).with_context(|| format!("Create {:?}", allele_path))?, + ); + write_allele_header(&mut allele_writer)?; + + merge_chunks( + chunk_files, + ctx.participant_index, + ctx.participant_list, + &mut matrix_writer, + &mut allele_writer, + )?; + + matrix_writer.flush()?; + allele_writer.flush()?; + Ok(()) +} + +struct ShardedChunks { + participants: HashSet, + chunk_files: Vec>, + total_rows: u64, +} + +fn build_sharded_chunks( + input_files: &[PathBuf], + tmp_dir: &Path, + shard_count: usize, + chunk_records: usize, + thread_count: usize, +) -> Result { + let shard_count = shard_count.max(1); + let shard_chunks: Mutex>> = + Mutex::new((0..shard_count).map(|_| Vec::new()).collect()); + let participants: Mutex> = Mutex::new(HashSet::new()); + let total_rows = AtomicU64::new(0); + let next_log_at = AtomicU64::new(1_000_000); + + input_files + .par_iter() + .enumerate() + .try_for_each(|(file_idx, path)| -> Result<()> { + let file = File::open(path).with_context(|| format!("Open {:?}", path))?; + let mut reader = LongRowReader::new(file); + let mut buffers: Vec> = (0..shard_count) + .map(|_| Vec::with_capacity(10_000)) + .collect(); + let mut local_participants: HashSet = HashSet::new(); + let mut chunk_indices: Vec = vec![0; shard_count]; + let mut local_rows: u64 = 0; + + while let Some(row) = reader.read_row()? { + local_participants.insert(row.participant_id.clone()); + let shard = shard_for(&row.locus_key, shard_count); + buffers[shard].push(row); + local_rows += 1; + + if buffers[shard].len() >= chunk_records { + let chunk_path = write_shard_chunk( + tmp_dir, + shard, + file_idx, + chunk_indices[shard], + &mut buffers[shard], + thread_count, + )?; + { + let mut guard = shard_chunks.lock().expect("shard chunk mutex poisoned"); + guard[shard].push(chunk_path); + } + chunk_indices[shard] += 1; + } + } + + for shard in 0..shard_count { + if !buffers[shard].is_empty() { + let chunk_path = write_shard_chunk( + tmp_dir, + shard, + file_idx, + chunk_indices[shard], + &mut buffers[shard], + thread_count, + )?; + let mut guard = shard_chunks.lock().expect("shard chunk mutex poisoned"); + guard[shard].push(chunk_path); + chunk_indices[shard] += 1; + } + } + + { + let mut guard = participants.lock().expect("participants mutex poisoned"); + guard.extend(local_participants); + } + + let new_total = total_rows.fetch_add(local_rows, AtomicOrdering::Relaxed) + local_rows; + let mut next = next_log_at.load(AtomicOrdering::Relaxed); + while new_total >= next { + if next_log_at + .compare_exchange( + next, + next + 1_000_000, + AtomicOrdering::Relaxed, + AtomicOrdering::Relaxed, + ) + .is_ok() + { + eprintln!("🧩 aggregate-long: partitioned {} rows", next); + break; + } + next = next_log_at.load(AtomicOrdering::Relaxed); + } + + Ok(()) + })?; + + let participants = participants + .into_inner() + .expect("participants mutex poisoned"); + let chunk_files = shard_chunks + .into_inner() + .expect("shard chunk mutex poisoned"); + + Ok(ShardedChunks { + participants, + chunk_files, + total_rows: total_rows.load(AtomicOrdering::Relaxed), + }) +} + +fn shard_for(value: &str, shard_count: usize) -> usize { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + value.hash(&mut hasher); + (hasher.finish() as usize) % shard_count +} + +fn concat_partition_outputs( + outputs: &[PartitionOutput], + participants: &[String], + matrix_path: &Path, + allele_path: &Path, +) -> Result<()> { + let mut ordered = outputs.to_vec(); + ordered.sort_by_key(|out| out.index); + + let mut matrix_writer = BufWriter::new( + File::create(matrix_path).with_context(|| format!("Create {:?}", matrix_path))?, + ); + write_matrix_header(&mut matrix_writer, participants)?; + for part in &ordered { + append_without_header(&part.matrix, &mut matrix_writer)?; + } + matrix_writer.flush()?; + + let mut allele_writer = BufWriter::new( + File::create(allele_path).with_context(|| format!("Create {:?}", allele_path))?, + ); + write_allele_header(&mut allele_writer)?; + for part in &ordered { + append_without_header(&part.allele, &mut allele_writer)?; + } + allele_writer.flush()?; + Ok(()) +} + +fn append_without_header(path: &Path, writer: &mut BufWriter) -> Result<()> { + let file = File::open(path).with_context(|| format!("Open {:?}", path))?; + let mut reader = BufReader::new(file); + let mut line = String::new(); + let mut is_first = true; + loop { + line.clear(); + let bytes = reader.read_line(&mut line)?; + if bytes == 0 { + break; + } + if is_first { + is_first = false; + continue; + } + writer.write_all(line.as_bytes())?; + } + Ok(()) +} + +fn collect_long_files( + inputs: &[PathBuf], + input_list: Option<&PathBuf>, + input_glob: Option<&str>, +) -> Result> { + let mut files = Vec::new(); + + if let Some(list_path) = input_list { + let list_files = read_input_list(list_path)?; + for path in list_files { + if path.is_file() { + if is_long_file(&path) { + files.push(path); + } + } else if path.is_dir() { + collect_from_dir(&path, &mut files)?; + } else { + eprintln!("⚠️ Missing input path from list: {}", path.display()); + } + } + } + + if let Some(pattern) = input_glob { + match glob::glob(pattern) { + Ok(paths) => { + for entry in paths { + match entry { + Ok(path) => { + if path.is_file() && is_long_file(&path) { + files.push(path); + } else if path.is_dir() { + collect_from_dir(&path, &mut files)?; + } + } + Err(err) => eprintln!("⚠️ Glob error: {err}"), + } + } + } + Err(err) => eprintln!("⚠️ Invalid glob pattern '{pattern}': {err}"), + } + } + + for input in inputs { + if input.is_file() { + if is_long_file(input) { + files.push(input.to_path_buf()); + } + continue; + } + if input.is_dir() { + collect_from_dir(input, &mut files)?; + continue; + } + eprintln!("⚠️ Input path {:?} is not a file or directory", input); + } + + if files.is_empty() && inputs.is_empty() && input_list.is_none() && input_glob.is_none() { + bail!("Provide --input, --input-list, or --input-glob"); + } + + files.sort(); + files.dedup(); + Ok(files) +} + +fn is_long_file(path: &Path) -> bool { + path.extension().and_then(|ext| ext.to_str()) == Some("bvlr") +} + +fn collect_from_dir(dir: &Path, files: &mut Vec) -> Result<()> { + for entry in walkdir::WalkDir::new(dir) + .into_iter() + .filter_map(|e| e.ok()) + { + if !entry.file_type().is_file() { + continue; + } + let path = entry.path(); + if is_long_file(path) { + files.push(path.to_path_buf()); + } + } + Ok(()) +} + +fn read_input_list(path: &Path) -> Result> { + let file = File::open(path).with_context(|| format!("Open input list {:?}", path))?; + let reader = BufReader::new(file); + let mut paths = Vec::new(); + for line in reader.lines() { + let line = line?; + let trimmed = line.trim(); + if trimmed.is_empty() || trimmed.starts_with('#') { + continue; + } + paths.push(PathBuf::from(trimmed)); + } + Ok(paths) +} + +fn format_input_error(args: &AggregateLongArgs) -> String { + let mut lines = Vec::new(); + lines.push("No .bvlr files found.".to_string()); + if !args.inputs.is_empty() { + lines.push(format!("--input count: {}", args.inputs.len())); + } + if let Some(list) = &args.input_list { + lines.push(format!("--input-list: {}", list.display())); + } + if let Some(pattern) = &args.input_glob { + lines.push(format!("--input-glob: {pattern}")); + } + lines.join(" ") +} + +fn write_shard_chunk( + tmp_dir: &Path, + shard: usize, + file_idx: usize, + idx: usize, + buffer: &mut Vec, + thread_count: usize, +) -> Result { + if thread_count > 1 { + buffer.par_sort_by(compare_row); + } else { + buffer.sort_by(compare_row); + } + let filename = format!( + "shard-{shard}-file-{file_idx}-chunk-{idx}-{}.bvlr", + unique_suffix() + ); + let path = tmp_dir.join(filename); + let file = File::create(&path).with_context(|| format!("Create {:?}", path))?; + let mut writer = LongRowWriter::new(BufWriter::new(file)); + for row in buffer.iter() { + writer.write_row(row)?; + } + writer.flush()?; + buffer.clear(); + Ok(path) +} + +fn merge_chunks( + chunks: &[PathBuf], + participant_index: &HashMap, + participants: &[String], + matrix_writer: &mut BufWriter, + allele_writer: &mut BufWriter, +) -> Result<()> { + let mut readers: Vec> = Vec::new(); + for path in chunks { + let file = File::open(path).with_context(|| format!("Open {:?}", path))?; + readers.push(LongRowReader::new(file)); + } + + let mut heap: BinaryHeap = BinaryHeap::new(); + for (idx, reader) in readers.iter_mut().enumerate() { + if let Some(row) = reader.read_row()? { + heap.push(HeapItem::new(idx, row)); + } + } + + let mut current_locus: Option = None; + let mut current_rsid = String::new(); + let mut current_row: Vec = Vec::new(); + + let mut allele_count: i64 = 0; + let mut n_obs: i64 = 0; + let mut num_homo: i64 = 0; + let mut num_hetero: i64 = 0; + + let mut merged_rows: u64 = 0; + let mut next_log_at: u64 = 1_000_000; + while let Some(mut item) = heap.pop() { + let row = &item.row; + if current_locus.as_deref() != Some(&row.locus_key) { + if let Some(locus) = current_locus.take() { + flush_matrix_row(matrix_writer, &locus, ¤t_rsid, ¤t_row)?; + flush_allele_row( + allele_writer, + &locus, + allele_count, + n_obs, + num_homo, + num_hetero, + ¤t_rsid, + )?; + } + current_locus = Some(row.locus_key.clone()); + current_rsid = row.rsid.clone(); + current_row = vec![-1; participants.len()]; + allele_count = 0; + n_obs = 0; + num_homo = 0; + num_hetero = 0; + } else if current_rsid.is_empty() && !row.rsid.is_empty() { + current_rsid = row.rsid.clone(); + } + + if let Some(idx) = participant_index.get(&row.participant_id) { + if current_row[*idx] == -1 && row.dosage != -1 { + current_row[*idx] = row.dosage; + } + } + + if row.dosage != -1 { + allele_count += row.dosage as i64; + n_obs += 1; + if row.dosage == 2 { + num_homo += 1; + } else if row.dosage == 1 { + num_hetero += 1; + } + } + + if let Some(next_row) = readers[item.reader_idx].read_row()? { + item.row = next_row; + heap.push(item); + } + merged_rows += 1; + if merged_rows >= next_log_at { + eprintln!("🔀 aggregate-long: merged {} rows", merged_rows); + next_log_at += 1_000_000; + } + } + + if let Some(locus) = current_locus.take() { + flush_matrix_row(matrix_writer, &locus, ¤t_rsid, ¤t_row)?; + flush_allele_row( + allele_writer, + &locus, + allele_count, + n_obs, + num_homo, + num_hetero, + ¤t_rsid, + )?; + } + + eprintln!("✅ aggregate-long: merge complete ({} rows)", merged_rows); + Ok(()) +} + +fn write_matrix_header(writer: &mut BufWriter, participants: &[String]) -> Result<()> { + write!(writer, "locus_key\trsid")?; + for pid in participants { + write!(writer, "\t{pid}")?; + } + writeln!(writer)?; + Ok(()) +} + +fn write_allele_header(writer: &mut BufWriter) -> Result<()> { + writeln!( + writer, + "locus_key\tallele_count\tallele_number\tnum_homo\tnum_hetero\tallele_freq\trsid" + )?; + Ok(()) +} + +fn resolve_thread_count(requested: usize) -> Result { + if requested > 0 { + return Ok(requested); + } + let count = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + Ok(count.max(1)) +} + +fn resolve_chunk_records(requested: usize, max_ram_percent: u8, threads: usize) -> Result { + if requested > 0 { + return Ok(requested); + } + let available = detect_available_memory_bytes().unwrap_or(512 * 1024 * 1024); + let percent = max_ram_percent.clamp(10, 95) as u64; + let budget = available.saturating_mul(percent) / 100; + // Conservative row size estimate for BVLR rows (strings + overhead). + let bytes_per_row: u64 = 128; + let total_rows = budget / bytes_per_row; + let per_thread = (total_rows / threads.max(1) as u64).max(10_000); + Ok(per_thread as usize) +} + +fn detect_available_memory_bytes() -> Option { + // cgroups v2 + if let Ok(value) = std::fs::read_to_string("/sys/fs/cgroup/memory.max") { + let trimmed = value.trim(); + if trimmed != "max" { + if let Ok(bytes) = trimmed.parse::() { + if bytes > 0 { + return Some(bytes); + } + } + } + } + // cgroups v1 + if let Ok(value) = std::fs::read_to_string("/sys/fs/cgroup/memory/memory.limit_in_bytes") { + if let Ok(bytes) = value.trim().parse::() { + if bytes > 0 { + return Some(bytes); + } + } + } + // /proc/meminfo fallback + if let Ok(meminfo) = std::fs::read_to_string("/proc/meminfo") { + for line in meminfo.lines() { + if let Some(rest) = line.strip_prefix("MemAvailable:") { + let parts: Vec<&str> = rest.split_whitespace().collect(); + if parts.len() >= 2 { + if let Ok(kb) = parts[0].parse::() { + return Some(kb * 1024); + } + } + } + } + } + None +} + +fn flush_matrix_row( + writer: &mut BufWriter, + locus: &str, + rsid: &str, + row: &[i8], +) -> Result<()> { + write!(writer, "{locus}\t{rsid}")?; + for val in row { + write!(writer, "\t{val}")?; + } + writeln!(writer)?; + Ok(()) +} + +fn flush_allele_row( + writer: &mut BufWriter, + locus: &str, + allele_count: i64, + n_obs: i64, + num_homo: i64, + num_hetero: i64, + rsid: &str, +) -> Result<()> { + let allele_number = 2 * n_obs; + let allele_freq = if allele_number > 0 { + allele_count as f64 / allele_number as f64 + } else { + 0.0 + }; + writeln!( + writer, + "{locus}\t{allele_count}\t{allele_number}\t{num_homo}\t{num_hetero}\t{allele_freq:.6}\t{rsid}" + )?; + Ok(()) +} + +fn compare_row(a: &LongRow, b: &LongRow) -> Ordering { + let locus_cmp = a.locus_key.cmp(&b.locus_key); + if locus_cmp != Ordering::Equal { + return locus_cmp; + } + a.participant_id.cmp(&b.participant_id) +} + +fn unique_suffix() -> u128 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos()) + .unwrap_or(0) +} + +#[derive(Debug)] +struct HeapItem { + reader_idx: usize, + row: LongRow, +} + +impl HeapItem { + fn new(reader_idx: usize, row: LongRow) -> Self { + Self { reader_idx, row } + } +} + +impl Eq for HeapItem {} + +impl PartialEq for HeapItem { + fn eq(&self, other: &Self) -> bool { + compare_row(&self.row, &other.row) == Ordering::Equal + } +} + +impl Ord for HeapItem { + fn cmp(&self, other: &Self) -> Ordering { + // Reverse for min-heap behavior. + compare_row(&other.row, &self.row) + } +} + +impl PartialOrd for HeapItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Read; + + fn write_rows(path: &Path, rows: &[LongRow]) { + let file = File::create(path).unwrap(); + let mut writer = LongRowWriter::new(BufWriter::new(file)); + for row in rows { + writer.write_row(row).unwrap(); + } + writer.flush().unwrap(); + } + + #[test] + fn aggregate_emits_matrix_and_allele_freq() { + let tmp = std::env::temp_dir().join(format!("bvs-test-{}", unique_suffix())); + fs::create_dir_all(&tmp).unwrap(); + let file1 = tmp.join("a.bvlr"); + let file2 = tmp.join("b.bvlr"); + + write_rows( + &file1, + &[ + LongRow { + locus_key: "1-100-A-G".to_string(), + rsid: "rs1".to_string(), + participant_id: "P1".to_string(), + dosage: 2, + }, + LongRow { + locus_key: "1-100-A-G".to_string(), + rsid: "".to_string(), + participant_id: "P2".to_string(), + dosage: 1, + }, + ], + ); + write_rows( + &file2, + &[LongRow { + locus_key: "1-200-C-T".to_string(), + rsid: "rs2".to_string(), + participant_id: "P1".to_string(), + dosage: -1, + }], + ); + + let args = AggregateLongArgs { + inputs: vec![file1.clone(), file2.clone()], + input_list: None, + input_glob: None, + matrix_tsv: tmp.join("matrix.tsv"), + allele_freq_tsv: tmp.join("allele.tsv"), + tmp_dir: Some(tmp.join("tmp")), + chunk_records: 2, + threads: 0, + max_ram_percent: 80, + }; + run_long_aggregate(args).unwrap(); + + let mut matrix = String::new(); + File::open(tmp.join("matrix.tsv")) + .unwrap() + .read_to_string(&mut matrix) + .unwrap(); + assert!(matrix.contains("locus_key\trsid\tP1\tP2")); + assert!(matrix.contains("1-100-A-G\trs1\t2\t1")); + + let mut allele = String::new(); + File::open(tmp.join("allele.tsv")) + .unwrap() + .read_to_string(&mut allele) + .unwrap(); + assert!(allele.contains( + "locus_key\tallele_count\tallele_number\tnum_homo\tnum_hetero\tallele_freq\trsid" + )); + assert!(allele.contains("1-100-A-G\t3\t4\t1\t1\t0.750000\trs1")); + } +} diff --git a/cli/src/commands/long_dump.rs b/cli/src/commands/long_dump.rs new file mode 100644 index 0000000..dd80962 --- /dev/null +++ b/cli/src/commands/long_dump.rs @@ -0,0 +1,38 @@ +use std::fs::File; +use std::io::{BufWriter, Write}; +use std::time::Instant; + +use anyhow::{Context, Result}; + +use crate::long_rows::LongRowReader; +use crate::DumpLongArgs; + +pub fn run_long_dump(args: DumpLongArgs) -> Result<()> { + let start = Instant::now(); + let input = File::open(&args.input).with_context(|| format!("Open {:?}", args.input))?; + let mut reader = LongRowReader::new(input); + let mut writer = BufWriter::new( + File::create(&args.output).with_context(|| format!("Create {:?}", args.output))?, + ); + + writeln!(writer, "locus_key\trsid\tparticipant_id\tdosage")?; + let mut rows: u64 = 0; + while let Some(row) = reader.read_row()? { + writeln!( + writer, + "{}\t{}\t{}\t{}", + row.locus_key, row.rsid, row.participant_id, row.dosage + )?; + rows += 1; + if rows.is_multiple_of(5_000_000) { + eprintln!("🔎 dump-long: wrote {} rows", rows); + } + } + writer.flush().context("flush long dump")?; + eprintln!("✅ dump-long: wrote {} rows", rows); + eprintln!( + "⏱️ dump-long: elapsed {:.2}s", + start.elapsed().as_secs_f64() + ); + Ok(()) +} diff --git a/cli/src/commands/long_emit.rs b/cli/src/commands/long_emit.rs new file mode 100644 index 0000000..4234ab6 --- /dev/null +++ b/cli/src/commands/long_emit.rs @@ -0,0 +1,482 @@ +use std::collections::HashMap; +use std::fs::File; +use std::io::{BufRead, BufReader, BufWriter, Read, Write}; +use std::path::{Path, PathBuf}; +use std::time::Instant; + +use anyhow::{bail, Context, Result}; + +use crate::download::ensure_reference_db; +use crate::genotype_reader::{detect_delimiter, RowOutcome, RowParser}; +use crate::long_rows::{ + is_snp, locus_key, normalize_sequence, parse_alternates, parse_genotype_dosages, + parse_vcf_gt_dosages, vcf_gt_from_sample, vcf_sample_name, LongRow, LongRowWriter, +}; +use crate::rsid_cache::normalize_rsid; +use crate::stats::{ReferenceVariant, StatsStore}; +use crate::EmitLongArgs; +use rusqlite::OptionalExtension; + +const LOOKAHEAD_LINES: usize = 2048; + +pub fn run_long_emit(args: EmitLongArgs) -> Result<()> { + let overall_start = Instant::now(); + let has_vcf = args.vcf.is_some(); + let has_inputs = !args.inputs.is_empty(); + if has_vcf == has_inputs { + bail!("Provide either --vcf or --input (genotype files), but not both"); + } + + if has_vcf { + let vcf_path = args.vcf.as_ref().expect("vcf path"); + let output_path = resolve_output_path(vcf_path, args.output.as_ref())?; + let stats = emit_from_vcf(vcf_path, &output_path, args.participant.as_ref())?; + eprintln!( + "✅ emit-long (vcf): {} variants, {} rows, {} missing-gt", + stats.variants_seen, stats.rows_emitted, stats.missing_gt + ); + eprintln!( + "⏱️ emit-long: elapsed {:.2}s", + overall_start.elapsed().as_secs_f64() + ); + println!("✅ Emitted long rows to {}", output_path.display()); + return Ok(()); + } + + if args.inputs.len() > 1 && args.output.is_some() { + bail!("--output can only be used with a single --input file"); + } + if args.inputs.len() > 1 && args.participant.is_some() { + bail!("--participant can only be used with a single --input file"); + } + if args.inputs.len() > 1 && args.missing_ref_log.is_some() { + bail!("--missing-ref-log can only be used with a single --input file"); + } + + let sqlite_path = ensure_reference_db(Some(&args.sqlite), args.force_download)?; + let store = StatsStore::connect_read_only(&sqlite_path)?; + let mut resolver = ReferenceResolver::new(&store)?; + + for input in &args.inputs { + let start = Instant::now(); + let output_path = resolve_output_path(input, args.output.as_ref())?; + let participant = args + .participant + .clone() + .unwrap_or_else(|| default_participant(input)); + let mut missing_logger = MissingRefLogger::new(args.missing_ref_log.as_deref())?; + let stats = emit_from_genotype( + input, + &output_path, + &participant, + &mut resolver, + &mut missing_logger, + )?; + eprintln!( + "✅ emit-long (genotype): {} parsed, {} rows, {} missing-ref, {} missing-gt", + stats.rows_parsed, stats.rows_emitted, stats.missing_reference, stats.missing_gt + ); + eprintln!( + "⏱️ emit-long: {} took {:.2}s", + input.display(), + start.elapsed().as_secs_f64() + ); + println!( + "✅ Emitted long rows to {} (participant {})", + output_path.display(), + participant + ); + } + eprintln!( + "⏱️ emit-long: total elapsed {:.2}s", + overall_start.elapsed().as_secs_f64() + ); + Ok(()) +} + +struct EmitStats { + variants_seen: u64, + rows_emitted: u64, + missing_gt: u64, + rows_parsed: u64, + missing_reference: u64, +} + +fn emit_from_vcf( + input: &Path, + output: &Path, + participant_override: Option<&String>, +) -> Result { + let reader: Box = if is_gz(input) { + let file = File::open(input).with_context(|| format!("Open {:?}", input))?; + Box::new(noodles::bgzf::io::Reader::new(file)) + } else { + Box::new(File::open(input).with_context(|| format!("Open {:?}", input))?) + }; + let mut buf = BufReader::new(reader); + let mut line = String::new(); + let mut participant = participant_override + .cloned() + .unwrap_or_else(|| default_participant(input)); + let mut saw_header = false; + + let mut writer = LongRowWriter::new(BufWriter::new( + File::create(output).with_context(|| format!("Create {:?}", output))?, + )); + + let mut stats = EmitStats { + variants_seen: 0, + rows_emitted: 0, + missing_gt: 0, + rows_parsed: 0, + missing_reference: 0, + }; + + loop { + line.clear(); + let bytes = buf.read_line(&mut line)?; + if bytes == 0 { + break; + } + if line.starts_with("##") { + continue; + } + if line.starts_with("#CHROM") { + saw_header = true; + if participant_override.is_none() { + if let Some(sample) = vcf_sample_name(&line) { + participant = sample; + } + } + continue; + } + if !saw_header { + continue; + } + + let parts: Vec<&str> = line.trim_end().split('\t').collect(); + if parts.len() < 10 { + continue; + } + let chrom = parts[0]; + let pos = parts[1].parse::().unwrap_or(0); + let rsid = parts[2]; + let reference = normalize_sequence(parts[3]); + let alts = parse_alternates(parts[4]); + if pos == 0 || !is_snp(&reference, &alts) { + continue; + } + stats.variants_seen += 1; + let sample = parts[9]; + let gt = vcf_gt_from_sample(sample).unwrap_or("."); + let dosages = parse_vcf_gt_dosages(gt, alts.len()); + let rsid_val = if rsid == "." { "" } else { rsid }; + + if let Some(dosages) = dosages { + for (alt, dosage) in alts.iter().zip(dosages.iter()) { + writer.write_row(&LongRow { + locus_key: locus_key(chrom, pos, &reference, alt), + rsid: rsid_val.to_string(), + participant_id: participant.clone(), + dosage: *dosage, + })?; + stats.rows_emitted += 1; + } + } else { + stats.missing_gt += 1; + for alt in &alts { + writer.write_row(&LongRow { + locus_key: locus_key(chrom, pos, &reference, alt), + rsid: rsid_val.to_string(), + participant_id: participant.clone(), + dosage: -1, + })?; + stats.rows_emitted += 1; + } + } + } + + writer.flush()?; + Ok(stats) +} + +fn emit_from_genotype( + input: &Path, + output: &Path, + participant: &str, + resolver: &mut ReferenceResolver, + missing_logger: &mut MissingRefLogger, +) -> Result { + let input_file = File::open(input).with_context(|| format!("Open {:?}", input))?; + let mut reader = BufReader::new(input_file); + let mut buffered_lines: Vec = Vec::new(); + let mut buffer = String::new(); + + while buffered_lines.len() < LOOKAHEAD_LINES { + buffer.clear(); + let bytes = reader.read_line(&mut buffer)?; + if bytes == 0 { + break; + } + buffered_lines.push(buffer.clone()); + } + + if buffered_lines.is_empty() { + bail!("Input file {:?} is empty", input); + } + + let delimiter = detect_delimiter(&buffered_lines); + let mut parser = RowParser::new(delimiter); + let mut writer = LongRowWriter::new(BufWriter::new( + File::create(output).with_context(|| format!("Create {:?}", output))?, + )); + let mut stats = EmitStats { + variants_seen: 0, + rows_emitted: 0, + missing_gt: 0, + rows_parsed: 0, + missing_reference: 0, + }; + + let mut line_number: u64 = 0; + let mut ctx = GenotypeEmitContext { + parser: &mut parser, + writer: &mut writer, + participant, + resolver, + stats: &mut stats, + missing_logger, + input, + }; + + for line in &buffered_lines { + line_number += 1; + consume_genotype_line(&mut ctx, line, line_number)?; + } + + buffer.clear(); + loop { + buffer.clear(); + let bytes = reader.read_line(&mut buffer)?; + if bytes == 0 { + break; + } + line_number += 1; + consume_genotype_line(&mut ctx, &buffer, line_number)?; + } + + writer.flush()?; + Ok(stats) +} + +struct GenotypeEmitContext<'a> { + parser: &'a mut RowParser, + writer: &'a mut LongRowWriter>, + participant: &'a str, + resolver: &'a mut ReferenceResolver, + stats: &'a mut EmitStats, + missing_logger: &'a mut MissingRefLogger, + input: &'a Path, +} + +fn consume_genotype_line( + ctx: &mut GenotypeEmitContext<'_>, + line: &str, + line_number: u64, +) -> Result<()> { + match ctx.parser.consume_line(line)? { + RowOutcome::Parsed(row) => { + ctx.stats.rows_parsed += 1; + let rsid_label = row.rsid.trim(); + if rsid_label.is_empty() { + return Ok(()); + } + let reference = match ctx.resolver.resolve(rsid_label)? { + Some(reference) => reference, + None => { + ctx.stats.missing_reference += 1; + ctx.missing_logger + .log(ctx.input, line_number, rsid_label, line)?; + return Ok(()); + } + }; + let reference_base = normalize_sequence(&reference.reference); + let alternates = parse_alternates(&reference.alternates); + if !is_snp(&reference_base, &alternates) { + return Ok(()); + } + let dosages = + parse_genotype_dosages(row.genotype.as_deref(), &reference_base, &alternates); + if let Some(dosages) = dosages { + for (alt, dosage) in alternates.iter().zip(dosages.iter()) { + ctx.writer.write_row(&LongRow { + locus_key: locus_key( + &reference.chromosome, + reference.position, + &reference_base, + alt, + ), + rsid: rsid_label.to_string(), + participant_id: ctx.participant.to_string(), + dosage: *dosage, + })?; + ctx.stats.rows_emitted += 1; + } + } else { + ctx.stats.missing_gt += 1; + for alt in &alternates { + ctx.writer.write_row(&LongRow { + locus_key: locus_key( + &reference.chromosome, + reference.position, + &reference_base, + alt, + ), + rsid: rsid_label.to_string(), + participant_id: ctx.participant.to_string(), + dosage: -1, + })?; + ctx.stats.rows_emitted += 1; + } + } + } + RowOutcome::Skipped | RowOutcome::Ignored => {} + } + Ok(()) +} + +struct MissingRefLogger { + writer: Option>, +} + +impl MissingRefLogger { + fn new(path: Option<&Path>) -> Result { + let writer = if let Some(path) = path { + if let Some(parent) = path.parent() { + if !parent.as_os_str().is_empty() { + std::fs::create_dir_all(parent) + .with_context(|| format!("Create {:?}", parent))?; + } + } + let file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(path) + .with_context(|| format!("Open {:?}", path))?; + Some(BufWriter::new(file)) + } else { + None + }; + Ok(Self { writer }) + } + + fn log(&mut self, input: &Path, line_number: u64, rsid: &str, raw_line: &str) -> Result<()> { + if let Some(writer) = self.writer.as_mut() { + let trimmed = raw_line.trim_end(); + writeln!( + writer, + "{}\t{}\t{}\tmissing_ref\t{}", + input.display(), + line_number, + rsid, + trimmed + )?; + } + Ok(()) + } +} + +fn resolve_output_path(input: &Path, output: Option<&PathBuf>) -> Result { + if let Some(out) = output { + return Ok(out.clone()); + } + let stem = input + .file_stem() + .and_then(|s| s.to_str()) + .unwrap_or("output"); + let mut out = input.with_file_name(format!("{stem}.bvlr")); + if let Some(parent) = input.parent() { + if parent.as_os_str().is_empty() { + out = PathBuf::from(format!("{stem}.bvlr")); + } + } + Ok(out) +} + +fn default_participant(input: &Path) -> String { + input + .file_stem() + .and_then(|stem| stem.to_str()) + .map(|stem| stem.replace(' ', "_")) + .filter(|stem| !stem.is_empty()) + .unwrap_or_else(|| "SAMPLE".to_string()) +} + +fn is_gz(path: &Path) -> bool { + path.extension().and_then(|ext| ext.to_str()) == Some("gz") +} + +struct ReferenceResolver { + conn: rusqlite::Connection, + cache: HashMap>, +} + +impl ReferenceResolver { + fn new(store: &StatsStore) -> Result { + let conn = store.open_connection()?; + Ok(Self { + conn, + cache: HashMap::new(), + }) + } + + fn resolve(&mut self, rsid: &str) -> Result> { + let rsid_norm = normalize_rsid(rsid); + let rsid_int = match rsid_norm.trim_start_matches("rs").parse::() { + Ok(value) => value, + Err(_) => return Ok(None), + }; + if let Some(cached) = self.cache.get(&rsid_int) { + return Ok(cached.clone()); + } + + let mut row = self + .conn + .prepare_cached( + "SELECT rsid, chromosome, position, reference, alternates + FROM rsid_reference_user WHERE rsid = ?1", + )? + .query_row([rsid_int], |row| { + Ok(ReferenceVariant { + rsid: row.get(0)?, + chromosome: row.get(1)?, + position: row.get(2)?, + reference: row.get(3)?, + alternates: row.get(4)?, + }) + }) + .optional()?; + + if row.is_none() { + row = self + .conn + .prepare_cached( + "SELECT rsid, chromosome, position, reference, alternates + FROM rsid_reference WHERE rsid = ?1", + )? + .query_row([rsid_int], |row| { + Ok(ReferenceVariant { + rsid: row.get(0)?, + chromosome: row.get(1)?, + position: row.get(2)?, + reference: row.get(3)?, + alternates: row.get(4)?, + }) + }) + .optional()?; + } + + self.cache.insert(rsid_int, row.clone()); + Ok(row) + } +} diff --git a/cli/src/long_rows.rs b/cli/src/long_rows.rs new file mode 100644 index 0000000..955aae3 --- /dev/null +++ b/cli/src/long_rows.rs @@ -0,0 +1,308 @@ +use std::io::{self, Read, Write}; + +use anyhow::{bail, Context, Result}; + +const MAGIC: &[u8; 4] = b"BVLR"; +const VERSION: u8 = 1; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LongRow { + pub locus_key: String, + pub rsid: String, + pub participant_id: String, + pub dosage: i8, +} + +pub struct LongRowWriter { + writer: W, + header_written: bool, +} + +impl LongRowWriter { + pub fn new(writer: W) -> Self { + Self { + writer, + header_written: false, + } + } + + pub fn write_header(&mut self) -> Result<()> { + if self.header_written { + return Ok(()); + } + self.writer.write_all(MAGIC)?; + self.writer.write_all(&[VERSION])?; + self.header_written = true; + Ok(()) + } + + pub fn write_row(&mut self, row: &LongRow) -> Result<()> { + self.write_header()?; + write_string(&mut self.writer, &row.locus_key)?; + write_string(&mut self.writer, &row.rsid)?; + write_string(&mut self.writer, &row.participant_id)?; + self.writer.write_all(&[row.dosage as u8])?; + Ok(()) + } + + pub fn flush(&mut self) -> Result<()> { + self.writer.flush().context("flush long-row writer") + } +} + +pub struct LongRowReader { + reader: R, + header_read: bool, +} + +impl LongRowReader { + pub fn new(reader: R) -> Self { + Self { + reader, + header_read: false, + } + } + + pub fn read_row(&mut self) -> Result> { + if !self.header_read { + self.read_header()?; + } + + let locus_key = match read_string(&mut self.reader)? { + Some(value) => value, + None => return Ok(None), + }; + let rsid = read_string_required(&mut self.reader, "rsid")?; + let participant_id = read_string_required(&mut self.reader, "participant_id")?; + let dosage = read_i8(&mut self.reader)?; + Ok(Some(LongRow { + locus_key, + rsid, + participant_id, + dosage, + })) + } + + fn read_header(&mut self) -> Result<()> { + let mut magic = [0u8; 4]; + self.reader + .read_exact(&mut magic) + .context("read long-row header")?; + if &magic != MAGIC { + bail!("Invalid long-row file header"); + } + let mut version = [0u8; 1]; + self.reader.read_exact(&mut version)?; + if version[0] != VERSION { + bail!("Unsupported long-row version: {}", version[0]); + } + self.header_read = true; + Ok(()) + } +} + +fn write_string(writer: &mut W, value: &str) -> Result<()> { + let bytes = value.as_bytes(); + let len = bytes.len() as u32; + writer.write_all(&len.to_le_bytes())?; + writer.write_all(bytes)?; + Ok(()) +} + +fn read_string(reader: &mut R) -> Result> { + let mut len_bytes = [0u8; 4]; + match reader.read_exact(&mut len_bytes) { + Ok(_) => {} + Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), + Err(err) => return Err(err).context("read string length"), + } + let len = u32::from_le_bytes(len_bytes) as usize; + let mut buffer = vec![0u8; len]; + reader.read_exact(&mut buffer)?; + let value = String::from_utf8(buffer).context("decode long-row string")?; + Ok(Some(value)) +} + +fn read_string_required(reader: &mut R, label: &str) -> Result { + read_string(reader)?.ok_or_else(|| anyhow::anyhow!("Unexpected EOF reading {label}")) +} + +fn read_i8(reader: &mut R) -> Result { + let mut buffer = [0u8; 1]; + reader.read_exact(&mut buffer)?; + Ok(buffer[0] as i8) +} + +pub fn is_snp(reference: &str, alts: &[String]) -> bool { + reference.len() == 1 && alts.iter().all(|alt| alt.len() == 1) +} + +pub fn parse_vcf_gt_dosages(gt: &str, alt_count: usize) -> Option> { + if gt == "." || gt == "./." || gt == ".|." { + return None; + } + let cleaned = gt.replace('|', "/"); + let parts: Vec<&str> = cleaned.split('/').collect(); + if parts.len() != 2 || parts.iter().any(|part| part == &".") { + return None; + } + let a = parts[0].parse::().ok()?; + let b = parts[1].parse::().ok()?; + if a > alt_count || b > alt_count { + return None; + } + let mut counts = vec![0i8; alt_count]; + if a > 0 { + counts[a - 1] += 1; + } + if b > 0 { + counts[b - 1] += 1; + } + Some(counts) +} + +pub fn parse_genotype_dosages( + genotype: Option<&str>, + reference: &str, + alternates: &[String], +) -> Option> { + let genotype = genotype?.trim().to_uppercase(); + if genotype.is_empty() || matches!(genotype.as_str(), "--" | "NN" | "00" | ".." | ".") { + return None; + } + + let alleles = split_genotype_tokens(&genotype)?; + let idx1 = allele_index(&alleles.0, reference, alternates)?; + let idx2 = allele_index(&alleles.1, reference, alternates)?; + let mut counts = vec![0i8; alternates.len()]; + if idx1 > 0 { + counts[idx1 - 1] += 1; + } + if idx2 > 0 { + counts[idx2 - 1] += 1; + } + Some(counts) +} + +fn split_genotype_tokens(genotype: &str) -> Option<(String, String)> { + if genotype.contains('/') || genotype.contains('|') { + let parts: Vec<&str> = genotype.split(['/', '|']).collect(); + if parts.len() >= 2 { + return Some((parts[0].to_string(), parts[1].to_string())); + } + } + + let compact: String = genotype.chars().filter(|c| !c.is_whitespace()).collect(); + if compact.is_empty() { + return None; + } + if compact.len() == 1 { + return Some((compact.clone(), compact)); + } + if compact.len() == 2 { + let mut chars = compact.chars(); + let a1 = chars.next()?.to_string(); + let a2 = chars.next()?.to_string(); + return Some((a1, a2)); + } + let mid = compact.len() / 2; + Some((compact[..mid].to_string(), compact[mid..].to_string())) +} + +fn allele_index(allele: &str, reference: &str, alternates: &[String]) -> Option { + if let Ok(idx) = allele.parse::() { + if idx <= alternates.len() { + return Some(idx); + } + } + if allele == reference { + return Some(0); + } + alternates + .iter() + .position(|alt| alt == allele) + .map(|idx| idx + 1) +} + +pub fn vcf_sample_name(header_line: &str) -> Option { + let parts: Vec<&str> = header_line.trim_end().split('\t').collect(); + if parts.len() >= 10 { + Some(parts[9].to_string()) + } else { + None + } +} + +pub fn normalize_sequence(value: &str) -> String { + value.trim().to_ascii_uppercase() +} + +pub fn locus_key(chrom: &str, pos: i64, reference: &str, alternate: &str) -> String { + format!("{chrom}-{pos}-{reference}-{alternate}") +} + +pub fn parse_alternates(value: &str) -> Vec { + value + .split(',') + .map(normalize_sequence) + .filter(|alt| !alt.is_empty() && alt != ".") + .collect() +} + +pub fn vcf_gt_from_sample(sample: &str) -> Option<&str> { + sample.split(':').next() +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Cursor; + + #[test] + fn long_row_roundtrip() { + let mut buf = Vec::new(); + { + let mut writer = LongRowWriter::new(&mut buf); + writer + .write_row(&LongRow { + locus_key: "1-100-A-G".to_string(), + rsid: "rs1".to_string(), + participant_id: "P1".to_string(), + dosage: 2, + }) + .unwrap(); + writer.flush().unwrap(); + } + let mut reader = LongRowReader::new(Cursor::new(buf)); + let row = reader.read_row().unwrap().unwrap(); + assert_eq!( + row, + LongRow { + locus_key: "1-100-A-G".to_string(), + rsid: "rs1".to_string(), + participant_id: "P1".to_string(), + dosage: 2, + } + ); + assert!(reader.read_row().unwrap().is_none()); + } + + #[test] + fn parse_vcf_gt() { + let dosages = parse_vcf_gt_dosages("1/2", 2).unwrap(); + assert_eq!(dosages, vec![1, 1]); + let dosages = parse_vcf_gt_dosages("2/2", 2).unwrap(); + assert_eq!(dosages, vec![0, 2]); + assert!(parse_vcf_gt_dosages("./.", 2).is_none()); + } + + #[test] + fn parse_genotype() { + let alts = vec!["C".to_string(), "G".to_string()]; + let dosages = parse_genotype_dosages(Some("A/G"), "A", &alts).unwrap(); + assert_eq!(dosages, vec![0, 1]); + let dosages = parse_genotype_dosages(Some("CC"), "A", &alts).unwrap(); + assert_eq!(dosages, vec![2, 0]); + assert!(parse_genotype_dosages(Some("--"), "A", &alts).is_none()); + } +} diff --git a/cli/src/main.rs b/cli/src/main.rs index b66869f..1191176 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -6,6 +6,7 @@ use clap::{ArgAction, Args, Parser, Subcommand}; mod download; mod genotype; mod genotype_reader; +mod long_rows; mod rsid_cache; mod stats; mod util; @@ -15,6 +16,9 @@ use crate::commands::genostats::run_genostats; use crate::commands::genotype_to_vcf::run_genotype_to_vcf; use crate::commands::list_missing_cache::run_list_missing_cache; use crate::commands::list_missing_rsids::run_list_missing_rsids; +use crate::commands::long_aggregate::run_long_aggregate; +use crate::commands::long_dump::run_long_dump; +use crate::commands::long_emit::run_long_emit; use crate::commands::reference_load::run_reference_load; use crate::commands::resolve_rsids::run_resolve_rsids; use crate::commands::sync_rsid_cache::run_sync_rsid_cache; @@ -27,6 +31,9 @@ mod commands { pub mod genotype_to_vcf; pub mod list_missing_cache; pub mod list_missing_rsids; + pub mod long_aggregate; + pub mod long_dump; + pub mod long_emit; pub mod reference_load; pub mod resolve_rsids; pub mod sync_rsid_cache; @@ -49,6 +56,12 @@ enum Commands { AlleleReport(AlleleReportArgs), /// Convert a genotype file into a single-sample VCF. GenotypeToVcf(GenotypeToVcfArgs), + /// Emit compact long-row records from a VCF or genotype file. + EmitLong(EmitLongArgs), + /// Aggregate long-row records into matrix and allele frequency TSVs. + AggregateLong(AggregateLongArgs), + /// Convert a .bvlr file into TSV for debugging. + DumpLong(DumpLongArgs), /// List rsids missing from the database based on a cache file. ListMissingCache(ListMissingCacheArgs), /// List rsids missing from the reference database. @@ -144,6 +157,72 @@ pub struct GenotypeToVcfArgs { pub cache: Option, } +#[derive(Args, Clone)] +pub struct EmitLongArgs { + /// Input genotype file(s) (tab/space/comma-delimited). + #[arg(short = 'i', long = "input")] + pub inputs: Vec, + /// Input VCF file (plain or gz). Mutually exclusive with --input. + #[arg(long)] + pub vcf: Option, + /// Path to the SQLite database containing rsid_reference data. + #[arg(long, default_value = "data/genostats.sqlite")] + pub sqlite: PathBuf, + /// Force re-download of the reference database. + #[arg(long, action = ArgAction::SetTrue)] + pub force_download: bool, + /// Output file (defaults to .bvlr or .bvlr). + #[arg(long)] + pub output: Option, + /// Participant id (defaults to filename stem or VCF sample name if present). + #[arg(long)] + pub participant: Option, + /// Optional log file for missing reference rows (appends). + #[arg(long)] + pub missing_ref_log: Option, +} + +#[derive(Args, Clone)] +pub struct AggregateLongArgs { + /// Input files or directories containing .bvlr files. + #[arg(short = 'i', long = "input")] + pub inputs: Vec, + /// File containing input paths (one per line, # comments allowed). + #[arg(long)] + pub input_list: Option, + /// Glob pattern for input files (e.g. "*.bvlr"). + #[arg(long)] + pub input_glob: Option, + /// Output matrix TSV path. + #[arg(long)] + pub matrix_tsv: PathBuf, + /// Output allele frequency TSV path. + #[arg(long)] + pub allele_freq_tsv: PathBuf, + /// Optional temp directory (defaults to system temp). + #[arg(long)] + pub tmp_dir: Option, + /// Number of records per chunk during external sort (0 = auto). + #[arg(long, default_value = "0")] + pub chunk_records: usize, + /// Max RAM percent to use when auto-sizing chunk records. + #[arg(long, default_value = "80")] + pub max_ram_percent: u8, + /// Number of worker threads to use (0 = auto/all cores). + #[arg(long, default_value = "0")] + pub threads: usize, +} + +#[derive(Args, Clone)] +pub struct DumpLongArgs { + /// Input .bvlr file to dump. + #[arg(long)] + pub input: PathBuf, + /// Output TSV path. + #[arg(long)] + pub output: PathBuf, +} + #[derive(Args, Clone)] pub struct ListMissingRsidsArgs { /// Input file or directory paths to scan. @@ -298,6 +377,9 @@ fn main() -> Result<()> { Commands::Genostats(args) => run_genostats(args), Commands::AlleleReport(args) => run_allele_report(args), Commands::GenotypeToVcf(args) => run_genotype_to_vcf(args), + Commands::EmitLong(args) => run_long_emit(args), + Commands::AggregateLong(args) => run_long_aggregate(args), + Commands::DumpLong(args) => run_long_dump(args), Commands::ListMissingCache(args) => run_list_missing_cache(args), Commands::ListMissingRsids(args) => run_list_missing_rsids(args), Commands::ResolveRsids(args) => run_resolve_rsids(args),