@@ -250,3 +250,120 @@ where
250250 R :: iter ( record_set) . map ( |r| Ok ( r?) )
251251 }
252252}
253+
254+ #[ cfg( test) ]
255+ mod tests {
256+ use std:: io:: Cursor ;
257+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
258+ use std:: sync:: Arc ;
259+
260+ use crate :: fastq;
261+ use crate :: parallel:: single:: ParallelReader ;
262+ use crate :: parallel:: { ParallelProcessor , ProcessError } ;
263+ use crate :: Record ;
264+
265+ fn make_fastq ( n : usize ) -> Vec < u8 > {
266+ ( 0 ..n)
267+ . flat_map ( |i| format ! ( "@seq{i}\n ACGT\n +\n IIII\n " ) . into_bytes ( ) )
268+ . collect ( )
269+ }
270+
271+ #[ derive( Clone , Default ) ]
272+ struct CountingProcessor {
273+ local_count : usize ,
274+ global_count : Arc < AtomicUsize > ,
275+ }
276+
277+ impl CountingProcessor {
278+ fn count ( & self ) -> usize {
279+ self . global_count . load ( Ordering :: Relaxed )
280+ }
281+ }
282+
283+ impl < Rf : Record > ParallelProcessor < Rf > for CountingProcessor {
284+ fn process_record ( & mut self , _record : Rf ) -> Result < ( ) , ProcessError > {
285+ self . local_count += 1 ;
286+ Ok ( ( ) )
287+ }
288+
289+ fn on_batch_complete ( & mut self ) -> Result < ( ) , ProcessError > {
290+ self . global_count
291+ . fetch_add ( self . local_count , Ordering :: Relaxed ) ;
292+ self . local_count = 0 ;
293+ Ok ( ( ) )
294+ }
295+ }
296+
297+ const N_RECORDS : usize = 500 ;
298+ const BATCH_SIZE : usize = 10 ;
299+ const LIMIT : usize = 50 ;
300+
301+ fn make_limited_reader ( data : Vec < u8 > , limit : usize ) -> fastq:: Reader < Cursor < Vec < u8 > > > {
302+ let mut reader =
303+ fastq:: Reader :: with_batch_size ( Cursor :: new ( data) , BATCH_SIZE ) . unwrap ( ) ;
304+ reader. set_record_limit ( limit) ;
305+ reader
306+ }
307+
308+ #[ test]
309+ fn test_record_limit_sequential ( ) {
310+ let reader = make_limited_reader ( make_fastq ( N_RECORDS ) , LIMIT ) ;
311+ let mut processor = CountingProcessor :: default ( ) ;
312+
313+ reader. process_parallel ( & mut processor, 1 ) . unwrap ( ) ;
314+
315+ assert_eq ! ( processor. count( ) , LIMIT ) ;
316+ }
317+
318+ #[ test]
319+ fn test_record_limit_parallel ( ) {
320+ let reader = make_limited_reader ( make_fastq ( N_RECORDS ) , LIMIT ) ;
321+ let mut processor = CountingProcessor :: default ( ) ;
322+
323+ reader. process_parallel ( & mut processor, 4 ) . unwrap ( ) ;
324+
325+ assert_eq ! ( processor. count( ) , LIMIT ) ;
326+ }
327+
328+ #[ test]
329+ fn test_record_limit_non_multiple_of_batch ( ) {
330+ // 45 is not a multiple of BATCH_SIZE (10), so the last batch is truncated.
331+ let reader = make_limited_reader ( make_fastq ( N_RECORDS ) , 45 ) ;
332+ let mut processor = CountingProcessor :: default ( ) ;
333+
334+ reader. process_parallel ( & mut processor, 4 ) . unwrap ( ) ;
335+
336+ assert_eq ! ( processor. count( ) , 45 ) ;
337+ }
338+
339+ #[ test]
340+ fn test_no_limit_processes_all_sequential ( ) {
341+ let reader = fastq:: Reader :: with_batch_size ( Cursor :: new ( make_fastq ( N_RECORDS ) ) , BATCH_SIZE ) . unwrap ( ) ;
342+ let mut processor = CountingProcessor :: default ( ) ;
343+
344+ reader. process_parallel ( & mut processor, 1 ) . unwrap ( ) ;
345+
346+ assert_eq ! ( processor. count( ) , N_RECORDS ) ;
347+ }
348+
349+ #[ test]
350+ fn test_no_limit_processes_all_parallel ( ) {
351+ let reader = fastq:: Reader :: with_batch_size ( Cursor :: new ( make_fastq ( N_RECORDS ) ) , BATCH_SIZE ) . unwrap ( ) ;
352+ let mut processor = CountingProcessor :: default ( ) ;
353+
354+ reader. process_parallel ( & mut processor, 4 ) . unwrap ( ) ;
355+
356+ assert_eq ! ( processor. count( ) , N_RECORDS ) ;
357+ }
358+
359+ #[ test]
360+ fn test_record_limit_larger_than_file ( ) {
361+ // Limit larger than file: process all available records.
362+ let reader = make_limited_reader ( make_fastq ( N_RECORDS ) , N_RECORDS * 2 ) ;
363+ let mut processor = CountingProcessor :: default ( ) ;
364+
365+ reader. process_parallel ( & mut processor, 4 ) . unwrap ( ) ;
366+
367+ assert_eq ! ( processor. count( ) , N_RECORDS ) ;
368+ }
369+ }
0 commit comments