diff --git a/jupyter_server/services/kernels/connection/channels.py b/jupyter_server/services/kernels/connection/channels.py index bde7f2fc9..5287f75f7 100644 --- a/jupyter_server/services/kernels/connection/channels.py +++ b/jupyter_server/services/kernels/connection/channels.py @@ -11,6 +11,7 @@ from textwrap import dedent from jupyter_client import protocol_version as client_protocol_version # type:ignore[attr-defined] +from packaging.version import Version from tornado import gen, web from tornado.ioloop import IOLoop from tornado.websocket import WebSocketClosedError @@ -120,6 +121,12 @@ def _default_close_future(self): """The default close future.""" return Future() + kernel_protocol_version = Instance(klass=Version) + + @default("kernel_protocol_version") + def _default_kernel_protocol_version(self): + return Version("0.0") + session_key = Unicode("") _iopub_window_msg_count = Int() @@ -155,6 +162,42 @@ def create_stream(self): self.channels[channel] = stream = meth(identity=identity) stream.channel = channel + def wait_for_iopub_welcome(self): + """Waits for an iopub_welcome message from the kernel's IOPub channel. + Used for kernels supporting protocol version >=5.4 that send explicit + welcome messages instead of requiring the nudge() handshake. Returns a + Future that resolves with the message type when a welcome is received, + or an empty dict on parsing errors.""" + + # The IOPub channel used by the client, where a welcome is expected. + iopub_channel = self.channels["iopub"] + iopub_future: Future[t.Any] = Future() + + def on_iopub(msg): + """Handle iopub replies.""" + _idents, msg = self.session.feed_identities(msg) + + try: + msg = self.session.deserialize(msg) + except BaseException: + self.log.error("Bad iopub reply", exc_info=True) + iopub_future.set_result({}) + return + else: + self.log.debug("Received iopub message: %s", msg) + if msg["msg_type"] == "iopub_welcome": + iopub_future.set_result(msg["msg_type"]) + + iopub_channel.on_recv(on_iopub) + + def cleanup(_=None): + """Common cleanup""" + iopub_channel.stop_on_recv() + + iopub_future.add_done_callback(cleanup) + + return _ensure_future(iopub_future) + def nudge(self): """Nudge the zmq connections with kernel_info_requests Returns a Future that will resolve when we have received @@ -336,6 +379,10 @@ def connect(self): """Handle a connection.""" self.multi_kernel_manager.notify_connect(self.kernel_id) + # Check if the kernel protocol supports XPUB; if so, a welcome message + # is to be expected. + wait_for_welcome = self.kernel_protocol_version >= Version("5.4") + # on new connections, flush the message buffer buffer_info = self.multi_kernel_manager.get_buffer(self.kernel_id, self.session_key) if buffer_info and buffer_info["session_key"] == self.session_key: @@ -348,7 +395,7 @@ def connect(self): # The kernel's ports have not changed; use the channels captured in the buffer self.channels = buffer_info["channels"] - connected = self.nudge() + connected = self.wait_for_iopub_welcome() if wait_for_welcome else self.nudge() def replay(value): replay_buffer = buffer_info["buffer"] @@ -362,7 +409,7 @@ def replay(value): else: try: self.create_stream() - connected = self.nudge() + connected = self.wait_for_iopub_welcome() if wait_for_welcome else self.nudge() except web.HTTPError as e: # Do not log error if the kernel is already shutdown, # as it's normal that it's not responding @@ -626,6 +673,8 @@ def _finish_kernel_info(self, info): and signal that connection can continue. """ protocol_version = info.get("protocol_version", client_protocol_version) + self.kernel_protocol_version = Version(protocol_version) + if protocol_version != client_protocol_version: self.session.adapt_version = int(protocol_version.split(".")[0]) self.log.info(