1+ package org .qcmg .qmule ;
2+
3+ import htsjdk .samtools .*;
4+ import htsjdk .samtools .util .CloseableIterator ;
5+ import org .qcmg .common .log .QLogger ;
6+ import org .qcmg .common .log .QLoggerFactory ;
7+
8+ import java .io .File ;
9+ import java .io .IOException ;
10+ import java .io .InputStream ;
11+ import java .util .AbstractQueue ;
12+ import java .util .concurrent .*;
13+
14+ public class AsyncCRAMReader {
15+
16+ private static final QLogger logger = QLoggerFactory .getLogger (AsyncCRAMReader .class );
17+
18+ public static final int CHECK_SIZE_INTERVAL = 2000000 ;
19+
20+ private final ExecutorService executor ;
21+ private final SamReader samReader ;
22+ private final AbstractQueue <SAMRecord > recordQueue ;
23+ private final int queueCapacity ;
24+ private volatile boolean isIteratorEmpty = false ;
25+
26+ public AsyncCRAMReader (InputStream cramStream , File referenceFile , int queueCapacity ) {
27+ this .executor = Executors .newSingleThreadExecutor ();
28+ this .samReader = SamReaderFactory .makeDefault ()
29+ .referenceSequence (referenceFile )
30+ .validationStringency (ValidationStringency .DEFAULT_STRINGENCY )
31+ .open (SamInputResource .of (cramStream ));
32+ this .recordQueue = new ConcurrentLinkedQueue <>();
33+ this .queueCapacity = queueCapacity ;
34+ startReading ();
35+ }
36+
37+ private void startReading () {
38+ logger .info ("Start reading" );
39+ executor .submit (() -> {
40+ try (CloseableIterator <SAMRecord > iterator = samReader .iterator ()) {
41+ int i = 0 ;
42+ int j = 1 ;
43+ while (iterator .hasNext ()) {
44+ SAMRecord record = iterator .next ();
45+ recordQueue .add (record );
46+ if (++i >= CHECK_SIZE_INTERVAL ) {
47+ logger .info ("Processed " + (i * j ) + " records" );
48+ if (recordQueue .size () >= queueCapacity ) {
49+ logger .info ("Queue is full, waiting for it to be emptied" );
50+ while (recordQueue .size () >= queueCapacity ) {
51+ logger .info ("Queue is full, waiting for it to be emptied: " + recordQueue .size ());
52+ Thread .sleep (100 );
53+ }
54+ }
55+ i = 0 ;
56+ j ++;
57+ }
58+ }
59+ logger .info ("iterator is empty, processed " + (j * CHECK_SIZE_INTERVAL + i ) + " records" );
60+ } catch (InterruptedException e ) {
61+ throw new RuntimeException (e );
62+ } finally {
63+ executor .shutdown ();
64+ isIteratorEmpty = true ;
65+ logger .info ("iterator is empty (in finally)" );
66+ }
67+ });
68+ }
69+
70+ public SAMRecord take () throws InterruptedException {
71+ return recordQueue .poll ();
72+ }
73+
74+ public boolean isIteratorEmpty () {
75+ return isIteratorEmpty && recordQueue .isEmpty ();
76+ }
77+
78+ public void close () throws IOException {
79+ samReader .close ();
80+ executor .shutdownNow ();
81+ }
82+ }
0 commit comments