Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 51 additions & 2 deletions jupyter_server/services/kernels/connection/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from textwrap import dedent

from jupyter_client import protocol_version as client_protocol_version # type:ignore[attr-defined]
from packaging.version import Version
from tornado import gen, web
from tornado.ioloop import IOLoop
from tornado.websocket import WebSocketClosedError
Expand Down Expand Up @@ -120,6 +121,12 @@ def _default_close_future(self):
"""The default close future."""
return Future()

kernel_protocol_version = Instance(klass=Version)

@default("kernel_protocol_version")
def _default_kernel_protocol_version(self):
return Version("0.0")

session_key = Unicode("")

_iopub_window_msg_count = Int()
Expand Down Expand Up @@ -155,6 +162,42 @@ def create_stream(self):
self.channels[channel] = stream = meth(identity=identity)
stream.channel = channel

def wait_for_iopub_welcome(self):
"""Waits for an iopub_welcome message from the kernel's IOPub channel.
Used for kernels supporting protocol version >=5.4 that send explicit
welcome messages instead of requiring the nudge() handshake. Returns a
Future that resolves with the message type when a welcome is received,
or an empty dict on parsing errors."""

# The IOPub channel used by the client, where a welcome is expected.
iopub_channel = self.channels["iopub"]
iopub_future: Future[t.Any] = Future()

def on_iopub(msg):
"""Handle iopub replies."""
_idents, msg = self.session.feed_identities(msg)

try:
msg = self.session.deserialize(msg)
except BaseException:
self.log.error("Bad iopub reply", exc_info=True)
iopub_future.set_result({})
return
else:
self.log.debug("Received iopub message: %s", msg)
if msg["msg_type"] == "iopub_welcome":
iopub_future.set_result(msg["msg_type"])

iopub_channel.on_recv(on_iopub)

def cleanup(_=None):
"""Common cleanup"""
iopub_channel.stop_on_recv()

iopub_future.add_done_callback(cleanup)

return _ensure_future(iopub_future)

def nudge(self):
"""Nudge the zmq connections with kernel_info_requests
Returns a Future that will resolve when we have received
Expand Down Expand Up @@ -336,6 +379,10 @@ def connect(self):
"""Handle a connection."""
self.multi_kernel_manager.notify_connect(self.kernel_id)

# Check if the kernel protocol supports XPUB; if so, a welcome message
# is to be expected.
wait_for_welcome = self.kernel_protocol_version >= Version("5.4")

# on new connections, flush the message buffer
buffer_info = self.multi_kernel_manager.get_buffer(self.kernel_id, self.session_key)
if buffer_info and buffer_info["session_key"] == self.session_key:
Expand All @@ -348,7 +395,7 @@ def connect(self):
# The kernel's ports have not changed; use the channels captured in the buffer
self.channels = buffer_info["channels"]

connected = self.nudge()
connected = self.wait_for_iopub_welcome() if wait_for_welcome else self.nudge()

def replay(value):
replay_buffer = buffer_info["buffer"]
Expand All @@ -362,7 +409,7 @@ def replay(value):
else:
try:
self.create_stream()
connected = self.nudge()
connected = self.wait_for_iopub_welcome() if wait_for_welcome else self.nudge()
except web.HTTPError as e:
# Do not log error if the kernel is already shutdown,
# as it's normal that it's not responding
Expand Down Expand Up @@ -626,6 +673,8 @@ def _finish_kernel_info(self, info):
and signal that connection can continue.
"""
protocol_version = info.get("protocol_version", client_protocol_version)
self.kernel_protocol_version = Version(protocol_version)

if protocol_version != client_protocol_version:
self.session.adapt_version = int(protocol_version.split(".")[0])
self.log.info(
Expand Down
Loading