From aba01f0bc52581e9040be51dbe21d7d52f38b3d3 Mon Sep 17 00:00:00 2001 From: "DESKTOP-6UR65OP\\Administrator" Date: Fri, 6 Jun 2025 16:48:50 -0400 Subject: [PATCH] feat: Implement ETL processor for JSONL to CSV conversion --- .gitignore | 23 ++++++++++ Cargo.toml | 12 +++++ README.md | 54 ++++++++++++++++++++++ src/main.rs | 129 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 218 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 src/main.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b23a4d0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +# Generated by Cargo +/target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb + +# Output files +*.csv +output/ + +# IDE specific files +.idea/ +.vscode/ +*.swp +*.swo +.DS_Store \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..3b2be1b --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "rust-etl-code-test" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +csv = "1.2" +clap = { version = "4.4", features = ["derive"] } +tokio = { version = "1.0", features = ["full"] } +futures = "0.3" \ No newline at end of file diff --git a/README.md b/README.md index 1003148..edf503a 100644 --- a/README.md +++ b/README.md @@ -11,3 +11,57 @@ Given the sample data provided, convert to csv in the format specified: - The program should accept inputs of unbounded size. - The program should accept input from a file or STDIN. - Output should be written to a file or STDOUT. + +## Project Documentation + +### Requirements + +- Rust 1.70 or higher +- Cargo (Rust's package manager) + +### Usage + +The program can be run in several ways: + +1. Process a file and output to a file: + +```bash +cargo run --release -- -i sample.jsonl -o output.csv +``` + +2. Process from STDIN and output to STDOUT: + +```bash +cargo run --release +``` + +3. Process a file and output to STDOUT: + +```bash +cargo run --release -- -i sample.jsonl +``` + +4. Process from STDIN and output to a file: + +```bash +cargo run --release -- -o output.csv +``` + +#### Performance Optimizations + +- **Streaming Processing**: Processes data line by line +- **Buffered I/O**: Uses buffered readers and writers +- **Memory Management**: Minimal memory footprint through streaming + +#### Error Handling + +- Gracefully handles JSON parsing errors +- Provides detailed error reporting +- Continues processing despite individual record errors + +### Dependencies + +- `serde`: For JSON deserialization +- `csv`: For CSV output +- `clap`: For command-line argument parsing +- `tokio`: For async runtime support diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..3fca96b --- /dev/null +++ b/src/main.rs @@ -0,0 +1,129 @@ +use clap::Parser; +use csv::Writer; +use serde::{Deserialize, Serialize}; +use std::{ + fs::File, + io::{self, BufRead, BufReader, Write}, + path::PathBuf, +}; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Input file path (if not provided, reads from stdin) + #[arg(short, long)] + input: Option, + + /// Output file path (if not provided, writes to stdout) + #[arg(short, long)] + output: Option, +} + +#[derive(Debug, Deserialize)] +struct InputRecord { + name: String, + billing_code: String, + negotiated_rates: Vec, +} + +#[derive(Debug, Deserialize)] +struct NegotiatedRate { + negotiated_prices: Vec, +} + +#[derive(Debug, Deserialize)] +struct NegotiatedPrice { + negotiated_rate: f64, +} + +#[derive(Debug, Serialize)] +struct OutputRecord { + name: String, + billing_code: String, + avg_rate: f64, +} + +fn calculate_average_rate(record: &InputRecord) -> Option { + let mut total_rate = 0.0; + let mut rate_count = 0; + + for negotiated_rate in &record.negotiated_rates { + for price in &negotiated_rate.negotiated_prices { + total_rate += price.negotiated_rate; + rate_count += 1; + } + } + + if rate_count > 0 { + Some(total_rate / rate_count as f64) + } else { + None + } +} + +fn process_records(reader: Box, writer: &mut Box) -> io::Result<()> { + let mut csv_writer = Writer::from_writer(writer); + let mut stats = ProcessingStats::default(); + + for line in reader.lines() { + stats.total_lines += 1; + let line = line?; + + match serde_json::from_str::(&line) { + Ok(record) => { + if let Some(avg_rate) = calculate_average_rate(&record) { + if avg_rate <= 30.0 { + csv_writer.serialize(OutputRecord { + name: record.name, + billing_code: record.billing_code, + avg_rate, + })?; + stats.successful_records += 1; + } + } + } + Err(e) => { + stats.error_count += 1; + println!("Error parsing line {}: {}", stats.total_lines, e); + } + } + } + + print_processing_summary(&stats); + + csv_writer.flush()?; + Ok(()) +} + +#[derive(Default)] +struct ProcessingStats { + total_lines: usize, + error_count: usize, + successful_records: usize, +} + +fn print_processing_summary(stats: &ProcessingStats) { + println!("\nProcessing Summary:"); + println!("Total lines processed: {}", stats.total_lines); + println!("Parsing errors: {}", stats.error_count); + println!("Records with avg_rate <= 30: {}", stats.successful_records); +} + +#[tokio::main] +async fn main() -> io::Result<()> { + let args = Args::parse(); + + // Setup input + let input: Box = match args.input { + Some(path) => Box::new(BufReader::new(File::open(path)?)), + None => Box::new(BufReader::new(io::stdin())), + }; + + // Setup output + let mut output: Box = match args.output { + Some(path) => Box::new(File::create(path)?), + None => Box::new(io::stdout()), + }; + + process_records(input, &mut output) +} \ No newline at end of file