33using  System . Collections . Generic ; 
44using  System . Threading ; 
55using  System . Threading . Tasks ; 
6+ using  Microsoft . Extensions . Logging ; 
67
78namespace  OmniSharp . Extensions . JsonRpc 
89{ 
910    public  class  ProcessScheduler  :  IScheduler 
1011    { 
11-         private  readonly  BlockingCollection < ( RequestProcessType  type ,  Func < Task >  request ) >  _queue ; 
12+         private  readonly  ILogger < ProcessScheduler >  _logger ; 
13+         private  readonly  BlockingCollection < ( RequestProcessType  type ,  string  name ,  Func < Task >  request ) >  _queue ; 
1214        private  readonly  CancellationTokenSource  _cancel ; 
1315        private  readonly  Thread  _thread ; 
1416
15-         public  ProcessScheduler ( ) 
17+         public  ProcessScheduler ( ILoggerFactory   loggerFactory ) 
1618        { 
17-             _queue  =  new  BlockingCollection < ( RequestProcessType  type ,  Func < Task >  request ) > ( ) ; 
19+             _logger  =  loggerFactory . CreateLogger < ProcessScheduler > ( ) ; 
20+             _queue  =  new  BlockingCollection < ( RequestProcessType  type ,  string  name ,  Func < Task >  request ) > ( ) ; 
1821            _cancel  =  new  CancellationTokenSource ( ) ; 
1922            _thread  =  new  Thread ( ProcessRequestQueue )  {  IsBackground  =  true ,  Name  =  "ProcessRequestQueue"  } ; 
2023        } 
@@ -24,9 +27,9 @@ public void Start()
2427            _thread . Start ( ) ; 
2528        } 
2629
27-         public  void  Add ( RequestProcessType  type ,  Func < Task >  request ) 
30+         public  void  Add ( RequestProcessType  type ,  string   name ,   Func < Task >  request ) 
2831        { 
29-             _queue . Add ( ( type ,  request ) ) ; 
32+             _queue . Add ( ( type ,  name ,   request ) ) ; 
3033        } 
3134
3235        private  Task  Start ( Func < Task >  request ) 
@@ -42,7 +45,7 @@ private List<Task> RemoveCompleteTasks(List<Task> list)
4245            if  ( list . Count  ==  0 )  return  list ; 
4346
4447            var  result  =  new  List < Task > ( ) ; 
45-             foreach ( var  t  in  list ) 
48+             foreach   ( var  t  in  list ) 
4649            { 
4750                if  ( t . IsFaulted ) 
4851                { 
@@ -69,20 +72,32 @@ private void ProcessRequestQueue()
6972                { 
7073                    if  ( _queue . TryTake ( out  var  item ,  Timeout . Infinite ,  token ) ) 
7174                    { 
72-                         var  ( type ,  request )  =  item ; 
73-                         if   ( type   ==   RequestProcessType . Serial ) 
75+                         var  ( type ,  name ,   request )  =  item ; 
76+                         try 
7477                        { 
75-                             Task . WaitAll ( waitables . ToArray ( ) ,  token ) ; 
76-                             Start ( request ) . Wait ( token ) ; 
78+                             if  ( type  ==  RequestProcessType . Serial ) 
79+                             { 
80+                                 Task . WaitAll ( waitables . ToArray ( ) ,  token ) ; 
81+                                 Start ( request ) . Wait ( token ) ; 
82+                             } 
83+                             else  if  ( type  ==  RequestProcessType . Parallel ) 
84+                             { 
85+                                 waitables . Add ( Start ( request ) ) ; 
86+                             } 
87+                             else 
88+                                 throw  new  NotImplementedException ( "Only Serial and Parallel execution types can be handled currently" ) ; 
89+                             waitables  =  RemoveCompleteTasks ( waitables ) ; 
90+                             Interlocked . Exchange ( ref  _TestOnly_NonCompleteTaskCount ,  waitables . Count ) ; 
7791                        } 
78-                         else   if   ( type  ==  RequestProcessType . Parallel ) 
92+                         catch   ( OperationCanceledException   ex )   when   ( ex . CancellationToken  ==  token ) 
7993                        { 
80-                             waitables . Add ( Start ( request ) ) ; 
94+                             throw ; 
95+                         } 
96+                         catch  ( Exception  e ) 
97+                         { 
98+                             // TODO: Create proper event ids 
99+                             _logger . LogCritical ( Events . UnhandledRequest ,  e ,  "Unhandled exception executing request {Name}" ,  name ) ; 
81100                        } 
82-                         else 
83-                             throw  new  NotImplementedException ( "Only Serial and Parallel execution types can be handled currently" ) ; 
84-                         waitables  =  RemoveCompleteTasks ( waitables ) ; 
85-                         Interlocked . Exchange ( ref  _TestOnly_NonCompleteTaskCount ,  waitables . Count ) ; 
86101                    } 
87102                } 
88103            } 
@@ -112,4 +127,9 @@ public void Dispose()
112127            _cancel . Dispose ( ) ; 
113128        } 
114129    } 
130+ 
131+     static class  Events 
132+     { 
133+         public  static EventId  UnhandledRequest  =  new  EventId ( 1337_100 ) ; 
134+     } 
115135} 
0 commit comments