@@ -129,26 +129,29 @@ public void Stop()
129
129
processingQueue . CompleteAdding ( ) ;
130
130
}
131
131
132
- private async Task ParseLoop ( ) {
133
- using ( var rateLimitParsing = new SemaphoreSlim ( MaxThreads ) ) {
134
- while ( await processingQueue . OutputAvailableAsync ( ) ) {
135
- try {
136
- var file = await processingQueue . TakeAsync ( ) ;
137
- await rateLimitParsing . Locked ( ( ) => {
138
- _ = WorkerPool . RunBackground ( async ( ) => {
139
- var replay = _analyzer . Analyze ( file ) ;
140
- if ( replay != null && file . UploadStatus == UploadStatus . Preprocessed ) {
141
- await FingerprintingQueue . EnqueueAsync ( ( replay , file ) ) ;
142
- }
143
- } ) ;
144
- } ) ;
145
- }
146
- catch ( Exception ex ) {
147
- _log . Error ( ex , "Error in parse loop" ) ;
148
- }
132
+ private async Task ParseLoop ( )
133
+ {
134
+ //OutputAvailableAsync will keep returning true
135
+ //untill all data is processed and processQueue.CompleteAdding is called
136
+ while ( await processingQueue . OutputAvailableAsync ( ) ) {
137
+ try {
138
+ var file = await processingQueue . TakeAsync ( ) ;
139
+ //don't wait for completion of background pool task.
140
+ //it's internally limited to a fixed number of low-priority threads
141
+ //so we can throw as much work on there as we want without choking it
142
+ _ = WorkerPool . RunBackground ( async ( ) => {
143
+ var replay = _analyzer . Analyze ( file ) ;
144
+ if ( replay != null && file . UploadStatus == UploadStatus . Preprocessed ) {
145
+ await FingerprintingQueue . EnqueueAsync ( ( replay , file ) ) ;
146
+ }
147
+ } ) ;
148
+ }
149
+ catch ( Exception ex ) {
150
+ _log . Error ( ex , "Error in parse loop" ) ;
149
151
}
150
152
}
151
153
}
154
+
152
155
private async Task FingerprintLoop ( ) {
153
156
while ( true ) {
154
157
var UnFingerprinted = await FingerprintingQueue . DequeueAsync ( ) ;
0 commit comments