Skip to content

Commit 1a7238b

Browse files
committed
Fix tests
1 parent 2d06cc5 commit 1a7238b

File tree

7 files changed

+60
-96
lines changed

7 files changed

+60
-96
lines changed

jupyter_client/asynchronous/client.py

+4-20
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,6 @@ class AsyncKernelClient(KernelClient):
2323
get_stdin_msg = KernelClient._async_get_stdin_msg
2424
get_control_msg = KernelClient._async_get_control_msg
2525

26-
#@property
27-
#def hb_channel(self):
28-
# """Get the hb channel object for this kernel."""
29-
# if self._hb_channel is None:
30-
# url = self._make_url('hb')
31-
# self.log.debug("connecting heartbeat channel to %s", url)
32-
# loop = asyncio.new_event_loop()
33-
# self._hb_channel = self.hb_channel_class(
34-
# self.context, self.session, url, loop
35-
# )
36-
# return self._hb_channel
37-
3826
wait_for_ready = KernelClient._async_wait_for_ready
3927

4028
# The classes to use for the various channels
@@ -45,7 +33,7 @@ class AsyncKernelClient(KernelClient):
4533
control_channel_class = Type(ZMQSocketChannel)
4634

4735

48-
_recv_reply = KernelClient._async__recv_reply
36+
_recv_reply = KernelClient._async_recv_reply
4937

5038

5139
# replies come on the shell channel
@@ -55,14 +43,10 @@ class AsyncKernelClient(KernelClient):
5543
inspect = reqrep(KernelClient._async_inspect)
5644
kernel_info = reqrep(KernelClient._async_kernel_info)
5745
comm_info = reqrep(KernelClient._async_comm_info)
58-
59-
# replies come on the control channel
60-
shutdown = reqrep(KernelClient._async_shutdown, channel='control')
61-
6246
is_alive = KernelClient._async_is_alive
63-
6447
execute_interactive = KernelClient._async_execute_interactive
65-
6648
stop_channels = KernelClient._async_stop_channels
67-
6849
channels_running = property(KernelClient._async_channels_running)
50+
51+
# replies come on the control channel
52+
shutdown = reqrep(KernelClient._async_shutdown, channel='control')

jupyter_client/blocking/client.py

+4-8
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class BlockingKernelClient(KernelClient):
3737
control_channel_class = Type(ZMQSocketChannel)
3838

3939

40-
_recv_reply = run_sync(KernelClient._async__recv_reply)
40+
_recv_reply = run_sync(KernelClient._async_recv_reply)
4141

4242

4343
# replies come on the shell channel
@@ -47,14 +47,10 @@ class BlockingKernelClient(KernelClient):
4747
inspect = run_sync(reqrep(KernelClient._async_inspect))
4848
kernel_info = run_sync(reqrep(KernelClient._async_kernel_info))
4949
comm_info = run_sync(reqrep(KernelClient._async_comm_info))
50-
51-
# replies come on the control channel
52-
shutdown = run_sync(reqrep(KernelClient._async_shutdown, channel='control'))
53-
5450
is_alive = run_sync(KernelClient._async_is_alive)
55-
5651
execute_interactive = run_sync(KernelClient._async_execute_interactive)
57-
5852
stop_channels = run_sync(KernelClient._async_stop_channels)
59-
6053
channels_running = property(run_sync(KernelClient._async_channels_running))
54+
55+
# replies come on the control channel
56+
shutdown = run_sync(reqrep(KernelClient._async_shutdown, channel='control'))

jupyter_client/channels.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class HBChannel(Thread):
4747
_pause = None
4848
_beating = None
4949

50-
def __init__(self, context=None, session=None, address=None, loop=None):
50+
def __init__(self, context=None, session=None, address=None):
5151
"""Create the heartbeat monitor thread.
5252
5353
Parameters
@@ -62,8 +62,6 @@ def __init__(self, context=None, session=None, address=None, loop=None):
6262
super().__init__()
6363
self.daemon = True
6464

65-
self.loop = loop
66-
6765
self.context = context
6866
self.session = session
6967
if isinstance(address, tuple):
@@ -93,6 +91,12 @@ def _create_socket(self):
9391
# close previous socket, before opening a new one
9492
self.poller.unregister(self.socket)
9593
self.socket.close()
94+
try:
95+
loop = asyncio.get_event_loop()
96+
except RuntimeError:
97+
loop = asyncio.new_event_loop()
98+
asyncio.set_event_loop(loop)
99+
96100
self.socket = self.context.socket(zmq.REQ)
97101
self.socket.linger = 1000
98102
self.socket.connect(self.address)
@@ -134,8 +138,6 @@ def _poll(self, start_time):
134138

135139
def run(self):
136140
"""The thread's main activity. Call start() instead."""
137-
if self.loop is not None:
138-
asyncio.set_event_loop(self.loop)
139141
self._create_socket()
140142
self._running = True
141143
self._beating = True

jupyter_client/client.py

+4-16
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ async def wrapped(self, *args, **kwargs):
5151
if not reply:
5252
return msg_id
5353

54-
return await self._async__recv_reply(msg_id, timeout=timeout, channel=channel)
54+
return await self._async_recv_reply(msg_id, timeout=timeout, channel=channel)
5555

5656
if not meth.__doc__:
5757
# python -OO removes docstrings,
@@ -199,7 +199,7 @@ async def _async_wait_for_ready(self, timeout=None):
199199
except Empty:
200200
break
201201

202-
async def _async__recv_reply(self, msg_id, timeout=None, channel='shell'):
202+
async def _async_recv_reply(self, msg_id, timeout=None, channel='shell'):
203203
"""Receive and return the reply for a given request"""
204204
if timeout is not None:
205205
deadline = time.monotonic() + timeout
@@ -359,23 +359,11 @@ def hb_channel(self):
359359
if self._hb_channel is None:
360360
url = self._make_url('hb')
361361
self.log.debug("connecting heartbeat channel to %s", url)
362-
loop = asyncio.new_event_loop()
363362
self._hb_channel = self.hb_channel_class(
364-
self.context, self.session, url, loop
363+
self.context, self.session, url
365364
)
366365
return self._hb_channel
367366

368-
#@property
369-
#def hb_channel(self):
370-
# """Get the hb channel object for this kernel."""
371-
# if self._hb_channel is None:
372-
# url = self._make_url('hb')
373-
# self.log.debug("connecting heartbeat channel to %s", url)
374-
# self._hb_channel = self.hb_channel_class(
375-
# self.context, self.session, url
376-
# )
377-
# return self._hb_channel
378-
379367
@property
380368
def control_channel(self):
381369
"""Get the control channel object for this kernel."""
@@ -540,7 +528,7 @@ async def _async_execute_interactive(self, code, silent=False, store_history=Tru
540528
# output is done, get the reply
541529
if timeout is not None:
542530
timeout = max(0, deadline - time.monotonic())
543-
return await self._async__recv_reply(msg_id, timeout=timeout)
531+
return await self._async_recv_reply(msg_id, timeout=timeout)
544532

545533

546534
# Methods to send specific messages on channels

jupyter_client/manager.py

+18-18
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from .managerabc import (
3434
KernelManagerABC
3535
)
36-
from .util import run_sync
36+
from .util import run_sync, ensure_async
3737

3838
class _ShutdownStatus(Enum):
3939
"""
@@ -272,7 +272,7 @@ def from_ns(match):
272272

273273
return [pat.sub(from_ns, arg) for arg in cmd]
274274

275-
async def _async__launch_kernel(
275+
async def _async_launch_kernel(
276276
self,
277277
kernel_cmd: t.List[str],
278278
**kw
@@ -283,7 +283,7 @@ async def _async__launch_kernel(
283283
"""
284284
return launch_kernel(kernel_cmd, **kw)
285285

286-
_launch_kernel = run_sync(_async__launch_kernel)
286+
_launch_kernel = run_sync(_async_launch_kernel)
287287

288288
# Control socket used for polite kernel shutdown
289289

@@ -380,7 +380,7 @@ async def _async_start_kernel(self, **kw):
380380

381381
# launch the kernel subprocess
382382
self.log.debug("Starting kernel: %s", kernel_cmd)
383-
self.kernel = await self._async__launch_kernel(kernel_cmd, **kw)
383+
self.kernel = await ensure_async(self._launch_kernel(kernel_cmd, **kw))
384384
self.post_start_kernel(**kw)
385385

386386
start_kernel = run_sync(_async_start_kernel)
@@ -417,7 +417,7 @@ async def _async_finish_shutdown(
417417
except asyncio.TimeoutError:
418418
self.log.debug("Kernel is taking too long to finish, terminating")
419419
self._shutdown_status = _ShutdownStatus.SigtermRequest
420-
await self._async__send_kernel_sigterm()
420+
await self._async_send_kernel_sigterm()
421421

422422
try:
423423
await asyncio.wait_for(
@@ -426,7 +426,7 @@ async def _async_finish_shutdown(
426426
except asyncio.TimeoutError:
427427
self.log.debug("Kernel is taking too long to finish, killing")
428428
self._shutdown_status = _ShutdownStatus.SigkillRequest
429-
await self._async__kill_kernel()
429+
await ensure_async(self._kill_kernel())
430430
else:
431431
# Process is no longer alive, wait and clear
432432
if self.kernel is not None:
@@ -485,16 +485,16 @@ async def _async_shutdown_kernel(
485485
# Stop monitoring for restarting while we shutdown.
486486
self.stop_restarter()
487487

488-
await self._async_interrupt_kernel()
488+
await ensure_async(self.interrupt_kernel())
489489

490490
if now:
491-
await self._async__kill_kernel()
491+
await ensure_async(self._kill_kernel())
492492
else:
493493
self.request_shutdown(restart=restart)
494494
# Don't send any additional kernel kill messages immediately, to give
495495
# the kernel a chance to properly execute shutdown actions. Wait for at
496496
# most 1s, checking every 0.1s.
497-
await self._async_finish_shutdown()
497+
await ensure_async(self.finish_shutdown())
498498

499499
# In 6.1.5, a new method, cleanup_resources(), was introduced to address
500500
# a leak issue (https://github.com/jupyter/jupyter_client/pull/548) and
@@ -554,14 +554,14 @@ async def _async_restart_kernel(
554554
"No previous call to 'start_kernel'.")
555555
else:
556556
# Stop currently running kernel.
557-
await self._async_shutdown_kernel(now=now, restart=True)
557+
await ensure_async(self.shutdown_kernel(now=now, restart=True))
558558

559559
if newports:
560560
self.cleanup_random_ports()
561561

562562
# Start new kernel.
563563
self._launch_args.update(kw)
564-
await self._async_start_kernel(**self._launch_args)
564+
await ensure_async(self.start_kernel(**self._launch_args))
565565

566566
restart_kernel = run_sync(_async_restart_kernel)
567567

@@ -570,7 +570,7 @@ def has_kernel(self) -> bool:
570570
"""Has a kernel been started that we are managing."""
571571
return self.kernel is not None
572572

573-
async def _async__send_kernel_sigterm(self) -> None:
573+
async def _async_send_kernel_sigterm(self) -> None:
574574
"""similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
575575
if self.has_kernel:
576576
# Signal the kernel to terminate (sends SIGTERM on Unix and
@@ -600,9 +600,9 @@ async def _async__send_kernel_sigterm(self) -> None:
600600
if e.errno != ESRCH:
601601
raise
602602

603-
_send_kernel_sigterm = run_sync(_async__send_kernel_sigterm)
603+
_send_kernel_sigterm = run_sync(_async_send_kernel_sigterm)
604604

605-
async def _async__kill_kernel(self) -> None:
605+
async def _async_kill_kernel(self) -> None:
606606
"""Kill the running kernel.
607607
608608
This is a private method, callers should use shutdown_kernel(now=True).
@@ -641,7 +641,7 @@ async def _async__kill_kernel(self) -> None:
641641
self.kernel.wait()
642642
self.kernel = None
643643

644-
_kill_kernel = run_sync(_async__kill_kernel)
644+
_kill_kernel = run_sync(_async_kill_kernel)
645645

646646
async def _async_interrupt_kernel(self) -> None:
647647
"""Interrupts the kernel by sending it a signal.
@@ -723,13 +723,13 @@ class AsyncKernelManager(KernelManager):
723723
client_class: DottedObjectName = DottedObjectName('jupyter_client.asynchronous.AsyncKernelClient')
724724
client_factory: Type = Type(klass='jupyter_client.asynchronous.AsyncKernelClient')
725725

726-
_launch_kernel = KernelManager._async__launch_kernel
726+
_launch_kernel = KernelManager._async_launch_kernel
727727
start_kernel = KernelManager._async_start_kernel
728728
finish_shutdown = KernelManager._async_finish_shutdown
729729
shutdown_kernel = KernelManager._async_shutdown_kernel
730730
restart_kernel = KernelManager._async_restart_kernel
731-
_send_kernel_sigterm = KernelManager._async__send_kernel_sigterm
732-
_kill_kernel = KernelManager._async__kill_kernel
731+
_send_kernel_sigterm = KernelManager._async_send_kernel_sigterm
732+
_kill_kernel = KernelManager._async_kill_kernel
733733
interrupt_kernel = KernelManager._async_interrupt_kernel
734734
signal_kernel = KernelManager._async_signal_kernel
735735
is_alive = KernelManager._async_is_alive

jupyter_client/tests/test_kernelmanager.py

+18-27
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@
1010
import signal
1111
import sys
1212
import time
13-
import threading
14-
import multiprocessing as mp
13+
import concurrent.futures
1514
import pytest
1615

16+
import nest_asyncio
1717
from async_generator import async_generator, yield_
1818
from traitlets.config.loader import Config
1919
from jupyter_core import paths
@@ -349,41 +349,32 @@ def test_start_parallel_thread_kernels(self, config, install_kernel):
349349
pytest.skip("IPC transport is currently not working for this test!")
350350
self._run_signaltest_lifecycle(config)
351351

352-
thread = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
353-
thread2 = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
354-
try:
355-
thread.start()
356-
thread2.start()
357-
finally:
358-
thread.join()
359-
thread2.join()
352+
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as thread_executor:
353+
future1 = thread_executor.submit(self._run_signaltest_lifecycle, config)
354+
future2 = thread_executor.submit(self._run_signaltest_lifecycle, config)
355+
future1.result()
356+
future2.result()
360357

361358
@pytest.mark.timeout(TIMEOUT)
362359
def test_start_parallel_process_kernels(self, config, install_kernel):
363360
if config.KernelManager.transport == 'ipc': # FIXME
364361
pytest.skip("IPC transport is currently not working for this test!")
365362
self._run_signaltest_lifecycle(config)
366-
thread = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
367-
proc = mp.Process(target=self._run_signaltest_lifecycle, args=(config,))
368-
try:
369-
thread.start()
370-
proc.start()
371-
finally:
372-
thread.join()
373-
proc.join()
374-
375-
assert proc.exitcode == 0
363+
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_executor:
364+
future1 = thread_executor.submit(self._run_signaltest_lifecycle, config)
365+
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as process_executor:
366+
future2 = process_executor.submit(self._run_signaltest_lifecycle, config)
367+
future2.result()
368+
future1.result()
376369

377370
@pytest.mark.timeout(TIMEOUT)
378371
def test_start_sequence_process_kernels(self, config, install_kernel):
372+
if config.KernelManager.transport == 'ipc': # FIXME
373+
pytest.skip("IPC transport is currently not working for this test!")
379374
self._run_signaltest_lifecycle(config)
380-
proc = mp.Process(target=self._run_signaltest_lifecycle, args=(config,))
381-
try:
382-
proc.start()
383-
finally:
384-
proc.join()
385-
386-
assert proc.exitcode == 0
375+
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as pool_executor:
376+
future = pool_executor.submit(self._run_signaltest_lifecycle, config)
377+
future.result()
387378

388379
def _prepare_kernel(self, km, startup_timeout=TIMEOUT, **kwargs):
389380
km.start_kernel(**kwargs)

jupyter_client/util.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
import nest_asyncio
44
nest_asyncio.apply()
55

6-
loop = asyncio.get_event_loop()
7-
86
def run_sync(coro):
97
def wrapped(*args, **kwargs):
8+
try:
9+
loop = asyncio.get_event_loop()
10+
except RuntimeError:
11+
loop = asyncio.new_event_loop()
12+
asyncio.set_event_loop(loop)
1013
return loop.run_until_complete(coro(*args, **kwargs))
1114
wrapped.__doc__ = coro.__doc__
1215
return wrapped

0 commit comments

Comments
 (0)