@@ -12,8 +12,9 @@ use clap::Args;
1212use rand:: { SeedableRng , make_rng} ;
1313use rand_xoshiro:: Xoshiro256StarStar ;
1414use std:: {
15+ fmt:: Debug ,
1516 io:: { BufRead , Read , Write } ,
16- path:: PathBuf ,
17+ path:: { Path , PathBuf } ,
1718} ;
1819use zoe:: {
1920 data:: records:: HeaderReadable ,
@@ -89,7 +90,7 @@ pub fn sampler_process(args: SamplerArgs) -> Result<(), std::io::Error> {
8990 let ( io_args, rng, target, verbose) = parse_sampler_args ( args) ?;
9091
9192 // Get the population sequence count from one of the files if possible
92- let mut seq_count = get_paired_seq_count ( io_args. filepath1 , io_args . filepath2 , io_args . reader1 . inner_iter ( ) ) ?;
93+ let mut seq_count = get_paired_seq_count ( & io_args) ?;
9394
9495 let is_single = io_args. reader2 . is_none ( ) && matches ! ( io_args. writer, RecordWriters :: SingleEnd ( _) ) ;
9596
@@ -106,24 +107,41 @@ pub fn sampler_process(args: SamplerArgs) -> Result<(), std::io::Error> {
106107 ( SamplingTarget :: Percent ( percent) , None ) => SamplingTarget :: Percent ( percent) ,
107108 } ;
108109
109- let ( total_original, total_downsampled) = match ( io_args. reader1 . dispatch ( ) , io_args. reader2 . map ( |x| x. dispatch ( ) ) ) {
110- ( DispatchFastX :: Fastq ( reader) , None ) => sample_single_input ( reader, io_args. writer , target, seq_count, rng) ?,
111- ( DispatchFastX :: Fastq ( reader1) , Some ( DispatchFastX :: Fastq ( reader2) ) ) => {
112- sample_paired_input ( reader1, reader2, io_args. writer , target, seq_count, rng) ?
113- }
114- ( DispatchFastX :: Fasta ( reader) , None ) => sample_single_input ( reader, io_args. writer , target, seq_count, rng) ?,
115- ( DispatchFastX :: Fasta ( reader1) , Some ( DispatchFastX :: Fasta ( reader2) ) ) => {
116- sample_paired_input ( reader1, reader2, io_args. writer , target, seq_count, rng) ?
117- }
118- ( DispatchFastX :: Fastq ( _) , Some ( DispatchFastX :: Fasta ( _) ) ) => {
119- return Err ( std:: io:: Error :: other (
120- "Paired read inputs must be both FASTQ or both FASTA. Found FASTQ for first input and FASTA for second input." ,
121- ) ) ;
110+ let Reader {
111+ path : input_path1,
112+ iter : reader1,
113+ } = io_args. reader1 ;
114+
115+ let ( total_original, total_downsampled) = if let Some ( reader2) = io_args. reader2 {
116+ let Reader {
117+ path : input_path2,
118+ iter : reader2,
119+ } = reader2;
120+
121+ let input_paths = [ input_path1, input_path2] ;
122+
123+ match ( reader1. dispatch ( ) , reader2. dispatch ( ) ) {
124+ ( DispatchFastX :: Fastq ( reader1) , DispatchFastX :: Fastq ( reader2) ) => {
125+ sample_paired_input ( reader1, reader2, io_args. writer , target, seq_count, rng, input_paths) ?
126+ }
127+ ( DispatchFastX :: Fasta ( reader1) , DispatchFastX :: Fasta ( reader2) ) => {
128+ sample_paired_input ( reader1, reader2, io_args. writer , target, seq_count, rng, input_paths) ?
129+ }
130+ ( DispatchFastX :: Fastq ( _) , DispatchFastX :: Fasta ( _) ) => {
131+ return Err ( std:: io:: Error :: other (
132+ "Paired read inputs must be both FASTQ or both FASTA. Found FASTQ for first input and FASTA for second input." ,
133+ ) ) ;
134+ }
135+ ( DispatchFastX :: Fasta ( _) , DispatchFastX :: Fastq ( _) ) => {
136+ return Err ( std:: io:: Error :: other (
137+ "Paired read inputs must be both FASTQ or both FASTA. Found FASTA for first input and FASTQ for second input." ,
138+ ) ) ;
139+ }
122140 }
123- ( DispatchFastX :: Fasta ( _ ) , Some ( DispatchFastX :: Fastq ( _ ) ) ) => {
124- return Err ( std :: io :: Error :: other (
125- "Paired read inputs must be both FASTQ or both FASTA. Found FASTA for first input and FASTQ for second input." ,
126- ) ) ;
141+ } else {
142+ match reader1 . dispatch ( ) {
143+ DispatchFastX :: Fastq ( reader ) => sample_single_input ( reader , io_args . writer , target , seq_count , rng ) ? ,
144+ DispatchFastX :: Fasta ( reader ) => sample_single_input ( reader , io_args . writer , target , seq_count , rng ) ? ,
127145 }
128146 } ;
129147
@@ -188,13 +206,19 @@ where
188206/// Each pair of reads counts once.
189207fn sample_paired_input < R1 , R2 , W , A > (
190208 reader1 : R1 , reader2 : R2 , writer : RecordWriters < W > , target : SamplingTarget , seq_count : Option < usize > ,
191- rng : Xoshiro256StarStar ,
209+ rng : Xoshiro256StarStar , input_paths : [ PathBuf ; 2 ] ,
192210) -> std:: io:: Result < ( usize , usize ) >
193211where
194212 R1 : Iterator < Item = std:: io:: Result < A > > ,
195213 R2 : Iterator < Item = std:: io:: Result < A > > ,
196214 W : Write ,
197- A : HeaderReadable + WriteRecord < W > , {
215+ A : HeaderReadable + WriteRecord < W > + Debug + Sync + Send + ' static , {
216+ // Zip the paired reads, and add context including the paths to any zipping
217+ // errors
218+ let iterator = reader1
219+ . zip_paired_reads ( reader2)
220+ . map ( |res| res. map_err ( |e| e. add_path_context ( & input_paths[ 0 ] , & input_paths[ 1 ] ) ) ) ;
221+
198222 // Don't perform sampling if target is higher than population sequence count
199223 if let SamplingTarget :: Count ( target_count) = target
200224 && let Some ( seq_count) = seq_count
@@ -203,12 +227,10 @@ where
203227 eprintln ! (
204228 "Sampler Warning: Target sample size ({target_count}) was greater than population size ({seq_count}); no downsampling has occurred." ,
205229 ) ;
206- reader1 . zip_paired_reads ( reader2 ) . write_records ( writer) ?;
230+ iterator . write_records ( writer) ?;
207231 return Ok ( ( seq_count, seq_count) ) ;
208232 }
209233
210- // Determine the proper iterator type for the inputs, then dispatch
211- let iterator = reader1. zip_paired_reads ( reader2) ;
212234 sample_and_write_results ( iterator, writer, target, seq_count, rng)
213235}
214236
@@ -308,27 +330,41 @@ where
308330 Ok ( ( total_original, total_downsampled) )
309331}
310332
311- fn get_paired_seq_count < R : Read > (
312- fastq_path1 : Option < PathBuf > , fastq_path2 : Option < PathBuf > , reader1 : & FastXReader < R > ,
313- ) -> std:: io:: Result < Option < usize > > {
314- if let Some ( path) = fastq_path1 {
315- Ok ( Some ( get_seq_count ( & path, reader1) ?) )
316- } else if let Some ( path) = fastq_path2 {
317- Ok ( Some ( get_seq_count ( & path, reader1) ?) )
333+ /// Gets the number of input sequences, using whichever paired input exists, is
334+ /// a file, and is not zipped.
335+ ///
336+ /// If neither meets these conditions, `None` is returned.
337+ fn get_paired_seq_count ( io_args : & IOArgs ) -> std:: io:: Result < Option < usize > > {
338+ let IOArgs {
339+ reader1,
340+ reader2,
341+ writer : _,
342+ } = & io_args;
343+
344+ if reader1. path . is_file ( ) && !is_gz ( & reader1. path ) {
345+ Ok ( Some ( get_seq_count ( & reader1. path , reader1. iter . inner_iter ( ) ) ?) )
346+ } else if let Some ( reader2) = reader2
347+ && reader2. path . is_file ( )
348+ && !is_gz ( & reader2. path )
349+ {
350+ Ok ( Some ( get_seq_count ( & reader2. path , reader2. iter . inner_iter ( ) ) ?) )
318351 } else {
319352 Ok ( None )
320353 }
321354}
322355
356+ /// The type sampler uses for input, along with the input path for error
357+ /// context.
358+ struct Reader {
359+ path : PathBuf ,
360+ iter : IterWithContext < FastXReader < ReadFileZipPipe > > ,
361+ }
362+
363+ /// The IO arguments used by sampler, including up to two readers and writers.
323364struct IOArgs {
324- /// This is only `Some` if the path corresponds to a file and is not zipped
325- filepath1 : Option < PathBuf > ,
326- /// This is only `Some` if paired ends are used, the path corresponds to a
327- /// non-zipped file
328- filepath2 : Option < PathBuf > ,
329- reader1 : IterWithContext < FastXReader < ReadFileZipPipe > > ,
330- reader2 : Option < IterWithContext < FastXReader < ReadFileZipPipe > > > ,
331- writer : RecordWriters < WriteFileZipStdout > ,
365+ reader1 : Reader ,
366+ reader2 : Option < Reader > ,
367+ writer : RecordWriters < WriteFileZipStdout > ,
332368}
333369
334370/// The target number of sequences to sample
@@ -364,23 +400,13 @@ fn parse_sampler_args(args: SamplerArgs) -> Result<(IOArgs, Xoshiro256StarStar,
364400
365401 let RecordReaders { reader1, reader2 } = readers;
366402
367- let filepath1 = if args. input_file . is_file ( ) && !is_gz ( & args. input_file ) {
368- Some ( args. input_file )
369- } else {
370- None
371- } ;
372- let filepath2 = if let Some ( path) = args. input_file2
373- && path. is_file ( )
374- && !is_gz ( & path)
375- {
376- Some ( path)
377- } else {
378- None
403+ let reader1 = Reader {
404+ path : args. input_file ,
405+ iter : reader1,
379406 } ;
407+ let reader2 = args. input_file2 . zip ( reader2) . map ( |( path, iter) | Reader { path, iter } ) ;
380408
381409 let io_args = IOArgs {
382- filepath1,
383- filepath2,
384410 reader1,
385411 reader2,
386412 writer,
@@ -395,8 +421,13 @@ fn parse_sampler_args(args: SamplerArgs) -> Result<(IOArgs, Xoshiro256StarStar,
395421 Ok ( ( io_args, rng, target, args. verbose ) )
396422}
397423
398- fn get_seq_count < R : Read > ( fastq : & PathBuf , reader : & FastXReader < R > ) -> std:: io:: Result < usize > {
399- let input = InputOptions :: new_from_path ( fastq) . use_file ( ) . open ( ) ?;
424+ /// Gets the count of the number of records in `input_file`.
425+ ///
426+ /// This is achieved by counting the number of lines, and dividing it by the
427+ /// proper amount (2 for FASTA, and 4 for FASTQ). The input file must exist, be
428+ /// a file, and not be zipped.
429+ fn get_seq_count < R : Read > ( input_file : & Path , reader : & FastXReader < R > ) -> std:: io:: Result < usize > {
430+ let input = InputOptions :: new_from_path ( input_file) . use_file ( ) . open ( ) ?;
400431 let line_count = input. lines ( ) . process_results ( |iter| iter. count ( ) ) ?;
401432 match reader {
402433 FastXReader :: Fasta ( _) => Ok ( line_count / 2 ) ,
0 commit comments