Skip to content

Commit 7df16d5

Browse files
authored
Merge pull request #620 from Carreau/sigterm
Interrupt and sigterm before sigkill kernel.
2 parents 987f3e1 + 60aa969 commit 7df16d5

File tree

4 files changed

+237
-17
lines changed

4 files changed

+237
-17
lines changed

docs/changelog.rst

+7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@
44
Changes in Jupyter Client
55
=========================
66

7+
dev
8+
===
9+
10+
- Shutdown request sequence has been modified to be more graceful, it now is
11+
preceded by interrupt, and will also send a ``SIGTERM`` before forcibly
12+
killing the kernel. :ghpull:`620`
13+
714
6.1.11
815
======
916
- Move jedi pinning to test requirements (:ghpull:`599`)

jupyter_client/manager.py

+134-7
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import time
1313
import warnings
1414

15+
from enum import Enum
16+
1517
import zmq
1618

1719
from ipython_genutils.importstring import import_item
@@ -29,13 +31,30 @@
2931
KernelManagerABC
3032
)
3133

34+
class _ShutdownStatus(Enum):
35+
"""
36+
37+
This is so far used only for testing in order to track the internal state of
38+
the shutdown logic, and verifying which path is taken for which
39+
missbehavior.
40+
41+
"""
42+
Unset = None
43+
ShutdownRequest = "ShutdownRequest"
44+
SigtermRequest = "SigtermRequest"
45+
SigkillRequest = "SigkillRequest"
46+
3247

3348
class KernelManager(ConnectionFileMixin):
3449
"""Manages a single kernel in a subprocess on this host.
3550
3651
This version starts kernels with Popen.
3752
"""
3853

54+
def __init__(self, *args, **kwargs):
55+
super().__init__(*args, **kwargs)
56+
self._shutdown_status = _ShutdownStatus.Unset
57+
3958
_created_context = Bool(False)
4059

4160
# The PyZMQ Context to use for communication with the kernel.
@@ -71,7 +90,13 @@ def _kernel_spec_manager_changed(self, change):
7190
shutdown_wait_time = Float(
7291
5.0, config=True,
7392
help="Time to wait for a kernel to terminate before killing it, "
74-
"in seconds.")
93+
"in seconds. When a shutdown request is initiated, the kernel "
94+
"will be immediately send and interrupt (SIGINT), followed"
95+
"by a shutdown_request message, after 1/2 of `shutdown_wait_time`"
96+
"it will be sent a terminate (SIGTERM) request, and finally at "
97+
"the end of `shutdown_wait_time` will be killed (SIGKILL). terminate "
98+
"and kill may be equivalent on windows.",
99+
)
75100

76101
kernel_name = Unicode(kernelspec.NATIVE_KERNEL_NAME)
77102

@@ -333,20 +358,45 @@ def finish_shutdown(self, waittime=None, pollinterval=0.1):
333358
"""
334359
if waittime is None:
335360
waittime = max(self.shutdown_wait_time, 0)
336-
for i in range(int(waittime/pollinterval)):
361+
self._shutdown_status = _ShutdownStatus.ShutdownRequest
362+
363+
def poll_or_sleep_to_kernel_gone():
364+
"""
365+
Poll until the kernel is not responding,
366+
then wait (the subprocess), until process gone.
367+
368+
After this function the kernel is either:
369+
- still responding; or
370+
- subprocess has been culled.
371+
"""
337372
if self.is_alive():
338373
time.sleep(pollinterval)
339374
else:
340375
# If there's still a proc, wait and clear
341376
if self.has_kernel:
342377
self.kernel.wait()
343378
self.kernel = None
379+
return True
380+
381+
# wait 50% of the shutdown timeout...
382+
for i in range(int(waittime / 2 / pollinterval)):
383+
if poll_or_sleep_to_kernel_gone():
344384
break
345385
else:
346-
# OK, we've waited long enough.
347-
if self.has_kernel:
348-
self.log.debug("Kernel is taking too long to finish, killing")
349-
self._kill_kernel()
386+
# if we've exited the loop normally (no break)
387+
# send sigterm and wait the other 50%.
388+
self.log.debug("Kernel is taking too long to finish, terminating")
389+
self._shutdown_status = _ShutdownStatus.SigtermRequest
390+
self._send_kernel_sigterm()
391+
for i in range(int(waittime / 2 / pollinterval)):
392+
if poll_or_sleep_to_kernel_gone():
393+
break
394+
else:
395+
# OK, we've waited long enough.
396+
if self.has_kernel:
397+
self.log.debug("Kernel is taking too long to finish, killing")
398+
self._shutdown_status = _ShutdownStatus.SigkillRequest
399+
self._kill_kernel()
350400

351401
def cleanup_resources(self, restart=False):
352402
"""Clean up resources when the kernel is shut down"""
@@ -388,6 +438,8 @@ def shutdown_kernel(self, now=False, restart=False):
388438
# Stop monitoring for restarting while we shutdown.
389439
self.stop_restarter()
390440

441+
self.interrupt_kernel()
442+
391443
if now:
392444
self._kill_kernel()
393445
else:
@@ -462,6 +514,36 @@ def has_kernel(self):
462514
"""Has a kernel been started that we are managing."""
463515
return self.kernel is not None
464516

517+
def _send_kernel_sigterm(self):
518+
"""similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
519+
if self.has_kernel:
520+
# Signal the kernel to terminate (sends SIGTERM on Unix and
521+
# if the kernel is a subprocess and we are on windows; this is
522+
# equivalent to kill
523+
try:
524+
if hasattr(self.kernel, "terminate"):
525+
self.kernel.terminate()
526+
elif hasattr(signal, "SIGTERM"):
527+
self.signal_kernel(signal.SIGTERM)
528+
else:
529+
self.log.debug(
530+
"Cannot set term signal to kernel, no"
531+
" `.terminate()` method and no values for SIGTERM"
532+
)
533+
except OSError as e:
534+
# In Windows, we will get an Access Denied error if the process
535+
# has already terminated. Ignore it.
536+
if sys.platform == "win32":
537+
if e.winerror != 5:
538+
raise
539+
# On Unix, we may get an ESRCH error if the process has already
540+
# terminated. Ignore it.
541+
else:
542+
from errno import ESRCH
543+
544+
if e.errno != ESRCH:
545+
raise
546+
465547
def _kill_kernel(self):
466548
"""Kill the running kernel.
467549
@@ -587,10 +669,23 @@ async def finish_shutdown(self, waittime=None, pollinterval=0.1):
587669
"""
588670
if waittime is None:
589671
waittime = max(self.shutdown_wait_time, 0)
672+
self._shutdown_status = _ShutdownStatus.ShutdownRequest
590673
try:
591-
await asyncio.wait_for(self._async_wait(pollinterval=pollinterval), timeout=waittime)
674+
await asyncio.wait_for(
675+
self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
676+
)
677+
except asyncio.TimeoutError:
678+
self.log.debug("Kernel is taking too long to finish, terminating")
679+
self._shutdown_status = _ShutdownStatus.SigtermRequest
680+
await self._send_kernel_sigterm()
681+
682+
try:
683+
await asyncio.wait_for(
684+
self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
685+
)
592686
except asyncio.TimeoutError:
593687
self.log.debug("Kernel is taking too long to finish, killing")
688+
self._shutdown_status = _ShutdownStatus.SigkillRequest
594689
await self._kill_kernel()
595690
else:
596691
# Process is no longer alive, wait and clear
@@ -620,6 +715,8 @@ async def shutdown_kernel(self, now=False, restart=False):
620715
# Stop monitoring for restarting while we shutdown.
621716
self.stop_restarter()
622717

718+
await self.interrupt_kernel()
719+
623720
if now:
624721
await self._kill_kernel()
625722
else:
@@ -678,6 +775,36 @@ async def restart_kernel(self, now=False, newports=False, **kw):
678775
await self.start_kernel(**self._launch_args)
679776
return None
680777

778+
async def _send_kernel_sigterm(self):
779+
"""similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
780+
if self.has_kernel:
781+
# Signal the kernel to terminate (sends SIGTERM on Unix and
782+
# if the kernel is a subprocess and we are on windows; this is
783+
# equivalent to kill
784+
try:
785+
if hasattr(self.kernel, "terminate"):
786+
self.kernel.terminate()
787+
elif hasattr(signal, "SIGTERM"):
788+
await self.signal_kernel(signal.SIGTERM)
789+
else:
790+
self.log.debug(
791+
"Cannot set term signal to kernel, no"
792+
" `.terminate()` method and no values for SIGTERM"
793+
)
794+
except OSError as e:
795+
# In Windows, we will get an Access Denied error if the process
796+
# has already terminated. Ignore it.
797+
if sys.platform == "win32":
798+
if e.winerror != 5:
799+
raise
800+
# On Unix, we may get an ESRCH error if the process has already
801+
# terminated. Ignore it.
802+
else:
803+
from errno import ESRCH
804+
805+
if e.errno != ESRCH:
806+
raise
807+
681808
async def _kill_kernel(self):
682809
"""Kill the running kernel.
683810

jupyter_client/tests/signalkernel.py

+12
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
from ipykernel.kernelbase import Kernel
1414
from ipykernel.kernelapp import IPKernelApp
1515

16+
from tornado.web import gen
17+
18+
import signal
19+
1620

1721
class SignalTestKernel(Kernel):
1822
"""Kernel for testing subprocess signaling"""
@@ -25,6 +29,14 @@ def __init__(self, **kwargs):
2529
super().__init__(**kwargs)
2630
self.children = []
2731

32+
if os.environ.get("NO_SIGTERM_REPLY", None) == "1":
33+
signal.signal(signal.SIGTERM, signal.SIG_IGN)
34+
35+
@gen.coroutine
36+
def shutdown_request(self, stream, ident, parent):
37+
if os.environ.get("NO_SHUTDOWN_REPLY") != "1":
38+
yield gen.maybe_future(super().shutdown_request(stream, ident, parent))
39+
2840
def do_execute(self, code, silent, store_history=True, user_expressions=None,
2941
allow_stdin=False):
3042
code = code.strip()

jupyter_client/tests/test_kernelmanager.py

+84-10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from subprocess import PIPE
2222

2323
from ..manager import start_new_kernel, start_new_async_kernel
24+
from ..manager import _ShutdownStatus
2425
from .utils import test_env, skip_win32, AsyncKernelManagerSubclass, AsyncKernelManagerWithCleanup
2526

2627
pjoin = os.path.join
@@ -51,18 +52,42 @@ def config(transport):
5152
return c
5253

5354

55+
def _install_kernel(name="signaltest", extra_env=None):
56+
if extra_env is None:
57+
extra_env = dict()
58+
kernel_dir = pjoin(paths.jupyter_data_dir(), "kernels", name)
59+
os.makedirs(kernel_dir)
60+
with open(pjoin(kernel_dir, "kernel.json"), "w") as f:
61+
f.write(
62+
json.dumps(
63+
{
64+
"argv": [
65+
sys.executable,
66+
"-m",
67+
"jupyter_client.tests.signalkernel",
68+
"-f",
69+
"{connection_file}",
70+
],
71+
"display_name": "Signal Test Kernel",
72+
"env": {"TEST_VARS": "${TEST_VARS}:test_var_2", **extra_env},
73+
}
74+
)
75+
)
76+
77+
5478
@pytest.fixture
5579
def install_kernel():
56-
kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', 'signaltest')
57-
os.makedirs(kernel_dir)
58-
with open(pjoin(kernel_dir, 'kernel.json'), 'w') as f:
59-
f.write(json.dumps({
60-
'argv': [sys.executable,
61-
'-m', 'jupyter_client.tests.signalkernel',
62-
'-f', '{connection_file}'],
63-
'display_name': "Signal Test Kernel",
64-
'env': {'TEST_VARS': '${TEST_VARS}:test_var_2'},
65-
}))
80+
return _install_kernel()
81+
82+
83+
def install_kernel_dont_shutdown():
84+
_install_kernel("signaltest-no-shutdown", {"NO_SHUTDOWN_REPLY": "1"})
85+
86+
87+
def install_kernel_dont_terminate():
88+
return _install_kernel(
89+
"signaltest-no-terminate", {"NO_SHUTDOWN_REPLY": "1", "NO_SIGTERM_REPLY": "1"}
90+
)
6691

6792

6893
@pytest.fixture
@@ -104,6 +129,55 @@ async def start_async_kernel():
104129
assert km.context.closed
105130

106131

132+
class TestKernelManagerShutDownGracefully:
133+
parameters = (
134+
"name, install, expected",
135+
[
136+
("signaltest", _install_kernel, _ShutdownStatus.ShutdownRequest),
137+
(
138+
"signaltest-no-shutdown",
139+
install_kernel_dont_shutdown,
140+
_ShutdownStatus.SigtermRequest,
141+
),
142+
(
143+
"signaltest-no-terminate",
144+
install_kernel_dont_terminate,
145+
_ShutdownStatus.SigkillRequest,
146+
),
147+
],
148+
)
149+
150+
@pytest.mark.skipif(
151+
sys.platform == "win32", reason="Windows doesn't support signals"
152+
)
153+
@pytest.mark.parametrize(*parameters)
154+
def test_signal_kernel_subprocesses(self, name, install, expected):
155+
install()
156+
km, kc = start_new_kernel(kernel_name=name)
157+
assert km._shutdown_status == _ShutdownStatus.Unset
158+
assert km.is_alive()
159+
# kc.execute("1")
160+
kc.stop_channels()
161+
km.shutdown_kernel()
162+
163+
assert km._shutdown_status == expected
164+
165+
@pytest.mark.skipif(
166+
sys.platform == "win32", reason="Windows doesn't support signals"
167+
)
168+
@pytest.mark.parametrize(*parameters)
169+
async def test_async_signal_kernel_subprocesses(self, name, install, expected):
170+
install()
171+
km, kc = await start_new_async_kernel(kernel_name=name)
172+
assert km._shutdown_status == _ShutdownStatus.Unset
173+
assert await km.is_alive()
174+
# kc.execute("1")
175+
kc.stop_channels()
176+
await km.shutdown_kernel()
177+
178+
assert km._shutdown_status == expected
179+
180+
107181
class TestKernelManager:
108182

109183
def test_lifecycle(self, km):

0 commit comments

Comments
 (0)