diff --git a/src/source/range.rs b/src/source/range.rs index 84c5cc2..6e9a273 100644 --- a/src/source/range.rs +++ b/src/source/range.rs @@ -10,6 +10,8 @@ use crate::matcher::Matcher; use crate::output::Output; use crate::transform::{Input, Transform}; +const BATCH_SIZE: u64 = 1000; + /// Generate keys from a numeric range pub struct RangeSource { pub start: u64, @@ -34,14 +36,18 @@ impl Source for RangeSource { let pb = ProgressBar::new(count); pb.set_style(crate::default_progress_style()); - let range: Vec = (self.start..=self.end).collect(); - - let stats = std::sync::atomic::AtomicU64::new(0); let matches = std::sync::atomic::AtomicU64::new(0); - range.par_chunks(1000).for_each(|chunk| { - let inputs: Vec = chunk.iter().map(|&v| Input::from_u64(v)).collect(); + let num_batches = count / BATCH_SIZE + u64::from(count % BATCH_SIZE != 0); + + (0..num_batches).into_par_iter().for_each(|batch_idx| { + let batch_start = self.start + batch_idx * BATCH_SIZE; + let batch_end = batch_start.saturating_add(BATCH_SIZE - 1).min(self.end); + + let inputs: Vec = (batch_start..=batch_end) + .map(Input::from_u64) + .collect(); let mut buffer = Vec::with_capacity(inputs.len() * 3); for transform in transforms { @@ -59,7 +65,6 @@ impl Source for RangeSource { matches.fetch_add(1, std::sync::atomic::Ordering::Relaxed); } } else { - // No matcher - output all keys output.key(source, transform.name(), &derived).ok(); } @@ -67,7 +72,7 @@ impl Source for RangeSource { } } - pb.inc(chunk.len() as u64); + pb.inc((batch_end - batch_start + 1) as u64); }); pb.finish_and_clear(); diff --git a/src/source/timestamps.rs b/src/source/timestamps.rs index 73fba38..5f018e5 100644 --- a/src/source/timestamps.rs +++ b/src/source/timestamps.rs @@ -58,12 +58,10 @@ impl Source for TimestampSource { let pb = ProgressBar::new(total); pb.set_style(crate::default_progress_style()); - let timestamps: Vec = (self.start..=self.end).collect(); - let stats = std::sync::atomic::AtomicU64::new(0); let matches = std::sync::atomic::AtomicU64::new(0); - timestamps.par_iter().for_each(|&ts| { + (self.start..=self.end).into_par_iter().for_each(|ts| { // Process base timestamp process_timestamp(ts, transforms, &deriver, matcher, output, &stats, &matches);