@@ -149,6 +149,7 @@ def _default_kernel_buffers(self):
149
149
150
150
def __init__ (self , ** kwargs ):
151
151
self .pinned_superclass = MultiKernelManager
152
+ self ._pending_kernel_tasks = {}
152
153
self .pinned_superclass .__init__ (self , ** kwargs )
153
154
self .last_kernel_activity = utcnow ()
154
155
@@ -216,9 +217,11 @@ async def start_kernel(self, kernel_id=None, path=None, **kwargs):
216
217
kwargs ["kernel_id" ] = kernel_id
217
218
kernel_id = await ensure_async (self .pinned_superclass .start_kernel (self , ** kwargs ))
218
219
self ._kernel_connections [kernel_id ] = 0
219
- fut = asyncio .ensure_future (self ._finish_kernel_start (kernel_id ))
220
+ task = asyncio .create_task (self ._finish_kernel_start (kernel_id ))
220
221
if not getattr (self , "use_pending_kernels" , None ):
221
- await fut
222
+ await task
223
+ else :
224
+ self ._pending_kernel_tasks [kernel_id ] = task
222
225
# add busy/activity markers:
223
226
kernel = self .get_kernel (kernel_id )
224
227
kernel .execution_state = "starting"
@@ -245,8 +248,8 @@ async def _finish_kernel_start(self, kernel_id):
245
248
if hasattr (km , "ready" ):
246
249
try :
247
250
await km .ready
248
- except Exception :
249
- self .log .exception (km . ready . exception () )
251
+ except Exception as e :
252
+ self .log .exception (e )
250
253
return
251
254
252
255
self ._kernel_ports [kernel_id ] = km .ports
@@ -372,7 +375,7 @@ def stop_buffering(self, kernel_id):
372
375
buffer_info = self ._kernel_buffers .pop (kernel_id )
373
376
# close buffering streams
374
377
for stream in buffer_info ["channels" ].values ():
375
- if not stream .closed () :
378
+ if not stream .socket . closed :
376
379
stream .on_recv (None )
377
380
stream .close ()
378
381
@@ -387,13 +390,18 @@ def stop_buffering(self, kernel_id):
387
390
def shutdown_kernel (self , kernel_id , now = False , restart = False ):
388
391
"""Shutdown a kernel by kernel_id"""
389
392
self ._check_kernel_id (kernel_id )
390
- self .stop_watching_activity (kernel_id )
391
- self .stop_buffering (kernel_id )
392
393
393
394
# Decrease the metric of number of kernels
394
395
# running for the relevant kernel type by 1
395
396
KERNEL_CURRENTLY_RUNNING_TOTAL .labels (type = self ._kernels [kernel_id ].kernel_name ).dec ()
396
397
398
+ if kernel_id in self ._pending_kernel_tasks :
399
+ task = self ._pending_kernel_tasks .pop (kernel_id )
400
+ task .cancel ()
401
+ else :
402
+ self .stop_watching_activity (kernel_id )
403
+ self .stop_buffering (kernel_id )
404
+
397
405
self .pinned_superclass .shutdown_kernel (self , kernel_id , now = now , restart = restart )
398
406
399
407
async def restart_kernel (self , kernel_id , now = False ):
@@ -533,7 +541,8 @@ def stop_watching_activity(self, kernel_id):
533
541
"""Stop watching IOPub messages on a kernel for activity."""
534
542
kernel = self ._kernels [kernel_id ]
535
543
if getattr (kernel , "_activity_stream" , None ):
536
- kernel ._activity_stream .close ()
544
+ if not kernel ._activity_stream .socket .closed :
545
+ kernel ._activity_stream .close ()
537
546
kernel ._activity_stream = None
538
547
539
548
def initialize_culler (self ):
@@ -638,19 +647,24 @@ def __init__(self, **kwargs):
638
647
self .pinned_superclass = AsyncMultiKernelManager
639
648
self .pinned_superclass .__init__ (self , ** kwargs )
640
649
self .last_kernel_activity = utcnow ()
650
+ self ._pending_kernel_tasks = {}
641
651
642
652
async def shutdown_kernel (self , kernel_id , now = False , restart = False ):
643
653
"""Shutdown a kernel by kernel_id"""
644
654
self ._check_kernel_id (kernel_id )
645
- self .stop_watching_activity (kernel_id )
646
- self .stop_buffering (kernel_id )
647
655
648
656
# Decrease the metric of number of kernels
649
657
# running for the relevant kernel type by 1
650
658
KERNEL_CURRENTLY_RUNNING_TOTAL .labels (type = self ._kernels [kernel_id ].kernel_name ).dec ()
651
659
660
+ if kernel_id in self ._pending_kernel_tasks :
661
+ task = self ._pending_kernel_tasks .pop (kernel_id )
662
+ task .cancel ()
663
+ else :
664
+ self .stop_watching_activity (kernel_id )
665
+ self .stop_buffering (kernel_id )
666
+
652
667
# Finish shutting down the kernel before clearing state to avoid a race condition.
653
- ret = await self .pinned_superclass .shutdown_kernel (
668
+ return await self .pinned_superclass .shutdown_kernel (
654
669
self , kernel_id , now = now , restart = restart
655
670
)
656
- return ret
0 commit comments