Skip to content

Commit dfb6ed9

Browse files
authored
Merge pull request #79 from rhuygen/fix/kernel-concurrency
Fix kernel concurrency
2 parents 438008b + b7d5b72 commit dfb6ed9

13 files changed

Lines changed: 318 additions & 151 deletions

File tree

src/gui_executor/client.py

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,21 @@ def wait_for_ready(self, timeout: float = 60.0):
6464
"""Wait for kernel to be ready."""
6565
msg_id = self._client.kernel_info()
6666

67-
start = time.time()
67+
if VERBOSE_DEBUG:
68+
LOGGER.debug(f"{id(self)}: Waiting for kernel to be ready with msg_id {msg_id}...")
69+
70+
start = time.monotonic()
6871
while True:
6972
try:
70-
msg = self._client.get_shell_msg(timeout=1.0)
73+
msg = self._client.get_shell_msg(timeout=0.2)
74+
if VERBOSE_DEBUG:
75+
LOGGER.debug(f"{id(self)}: Received message while waiting for kernel to be ready: {msg}")
7176
if msg["msg_type"] == "kernel_info_reply" and msg["parent_header"].get("msg_id") == msg_id:
7277
return True
7378
except queue.Empty:
7479
pass
7580

76-
if time.time() - start > timeout:
81+
if time.monotonic() - start > timeout:
7782
raise TimeoutError("Kernel did not become ready within the specified timeout.")
7883

7984
def stop_channels(self):
@@ -85,7 +90,7 @@ def get_kernel_info(self) -> dict:
8590
if VERBOSE_DEBUG:
8691
LOGGER.debug(f"{id(self)}: {msg_id = }")
8792

88-
shell_msg = self._client.get_shell_msg()
93+
shell_msg = self._client.get_shell_msg(timeout=self._timeout)
8994
if VERBOSE_DEBUG:
9095
LOGGER.debug(f"{id(self)}: {shell_msg = }")
9196

@@ -136,12 +141,16 @@ def run_snippet(self, snippet: str, allow_stdin: bool = True):
136141
# fetch the output
137142

138143
output: List[str] = []
144+
reply = None
145+
saw_idle = False
146+
reply_received_at = None
139147

140148
while True:
141149
try:
142150
io_msg = self._client.get_iopub_msg(timeout=self._timeout)
143151
io_msg_type = io_msg["msg_type"]
144152
io_msg_content = io_msg["content"]
153+
io_parent_msg_id = io_msg.get("parent_header", {}).get("msg_id")
145154

146155
if VERBOSE_DEBUG:
147156
LOGGER.debug(f"{id(self)}: io_msg = {io_msg}")
@@ -150,12 +159,14 @@ def run_snippet(self, snippet: str, allow_stdin: bool = True):
150159
if VERBOSE_DEBUG:
151160
LOGGER.debug(f"{id(self)}: io_msg_content = {io_msg_content}")
152161

162+
if io_msg_type != "iopub_welcome" and io_parent_msg_id != msg_id:
163+
continue
164+
153165
if io_msg_type == "status":
154166
if io_msg_content["execution_state"] == "idle":
155-
# self.signals.data.emit("Execution State is Idle, terminating...")
167+
saw_idle = True
156168
if VERBOSE_DEBUG:
157169
LOGGER.debug(f"{id(self)}: Execution State is Idle, terminating...")
158-
break
159170
elif io_msg_type == "stream":
160171
if "text" in io_msg_content:
161172
text = io_msg_content["text"].rstrip()
@@ -167,21 +178,42 @@ def run_snippet(self, snippet: str, allow_stdin: bool = True):
167178
elif io_msg_type == "error":
168179
... # ignore this message type
169180
elif io_msg_type == "execute_result":
170-
... # ignore this message type
181+
data = io_msg_content.get("data", {})
182+
text = data.get("text/plain")
183+
if text is not None:
184+
output.append(str(text).rstrip())
171185
elif io_msg_type == "iopub_welcome":
172186
... # ignore this message type
173187
else:
174188
LOGGER.warning(f"{id(self)}: Unknown io_msg_type: {io_msg_type}")
175189
except queue.Empty:
176-
LOGGER.warning(f"{id(self)}: IOPub timed out waiting for idle — exiting loop")
190+
if VERBOSE_DEBUG:
191+
LOGGER.debug(f"{id(self)}: IOPub timed out while waiting for execution to complete; continuing")
192+
193+
try:
194+
shell_msg = self._client.get_shell_msg(timeout=0.05)
195+
if shell_msg["msg_type"] == "execute_reply" and shell_msg["parent_header"].get("msg_id") == msg_id:
196+
reply = shell_msg
197+
if reply_received_at is None:
198+
reply_received_at = time.monotonic()
199+
except queue.Empty:
200+
pass
201+
202+
if reply is not None and saw_idle:
177203
break
178204

205+
# In rare cases, idle can be delayed or dropped; once we have the matching
206+
# execute_reply, don't block forever waiting for idle.
207+
if reply is not None and reply_received_at is not None:
208+
if time.monotonic() - reply_received_at > self._timeout:
209+
LOGGER.warning(f"{id(self)}: Execute reply received but no idle status observed; proceeding")
210+
break
211+
179212
if VERBOSE_DEBUG:
180213
LOGGER.debug(f"{id(self)}: {output = }")
181214

182-
# fetch the reply message
183-
184-
reply = self._client.get_shell_msg(timeout=1.0)
215+
if reply is None:
216+
raise TimeoutError("Did not receive execute_reply for the submitted snippet.")
185217

186218
if VERBOSE_DEBUG:
187219
LOGGER.debug(f"{id(self)}: {type(reply) = }")

src/gui_executor/utils.py

Lines changed: 58 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
import binascii
44
import contextlib
5+
from dataclasses import dataclass
56
import datetime
67
import functools
7-
import importlib
88
import inspect
99
import logging
1010
import os
@@ -16,15 +16,10 @@
1616
from enum import Enum
1717
from io import StringIO
1818
from pathlib import Path
19-
from typing import Any
20-
from typing import Callable
21-
from typing import Dict
22-
from typing import List
23-
from typing import Tuple
19+
from typing import Any, Callable, Dict, List, Tuple, Type
2420

2521
import rich
26-
from PyQt5.QtWidgets import QComboBox
27-
from PyQt5.QtWidgets import QFileDialog
22+
from PyQt5.QtWidgets import QComboBox, QFileDialog
2823
from rich import box
2924
from rich.panel import Panel
3025
from rich.syntax import Syntax
@@ -41,7 +36,7 @@ def bool_env(var_name: str, default: bool = False) -> bool:
4136
return default
4237

4338

44-
def replace_environment_variable(input_string: str) -> str:
39+
def replace_environment_variable(input_string: str) -> str | None:
4540
"""Returns the `input_string` with all occurrences of ENV['var'] expanded.
4641
4742
>>> replace_environment_variable("ENV['HOME']/data/CSL")
@@ -87,9 +82,15 @@ def expand_path(path: Path | str) -> Path:
8782
8883
Returns:
8984
An absolute path.
85+
86+
Raises:
87+
ValueError: when the path contains an environment variable that is not found.
9088
"""
91-
path = replace_environment_variable(str(path))
92-
path = Path(path).expanduser()
89+
path_or_env = replace_environment_variable(str(path))
90+
if path_or_env is None:
91+
raise ValueError(f"Environment variable not found for path: {str(path)}")
92+
93+
path = Path(path_or_env).expanduser()
9394

9495
return path.resolve()
9596

@@ -134,25 +135,25 @@ def copy_func(func, module_display_name=None, function_display_name=None):
134135
func.__closure__,
135136
)
136137

137-
new_func.__wrapped__ = func.__wrapped__
138+
new_func.__wrapped__ = func.__wrapped__ # type: ignore
138139

139140
for ui_attr in func.__dict__:
140141
if ui_attr.startswith("__ui"):
141142
setattr(new_func, ui_attr, getattr(func, ui_attr))
142143

143144
if module_display_name:
144-
new_func.__ui_module_display_name__ = module_display_name
145+
new_func.__ui_module_display_name__ = module_display_name # type: ignore
145146

146147
if function_display_name:
147-
new_func.__ui_display_name__ = function_display_name
148+
new_func.__ui_display_name__ = function_display_name # type: ignore
148149

149150
# Update the lineno of the function source, this shall be the lineno where the copy_func() is called.
150151
# This will overwrite the lineno of the original function as set by for wrapper
151152

152-
caller = inspect.currentframe().f_back
153+
caller = inspect.currentframe().f_back # type: ignore
153154
# module_name = inspect.getmodule(caller).__name__
154155
# print(f"{module_name} {func.__name__ = } {caller.f_lineno = }")
155-
new_func.__ui_lineno__ = caller.f_lineno
156+
new_func.__ui_lineno__ = caller.f_lineno # type: ignore
156157

157158
return new_func
158159

@@ -202,16 +203,17 @@ def replace_required_args(code: List | str, args: List) -> List | str:
202203
for match in matches:
203204
print(f"{match = }")
204205
name, expected_type = match.split(":") if ":" in match else (match, None)
205-
line = line.replace(f"<<{match}>>", f"****")
206+
line = line.replace(f"<<{match}>>", "****")
206207
new_code_lines.append(line)
207208
return new_code_lines
208209

209210

210211
def var_exists(var_name: str):
211212
frame = inspect.currentframe()
213+
assert frame is not None # this should never happen, but mypy doesn't know that
212214

213215
try:
214-
return var_name in frame.f_back.f_locals or var_name in frame.f_back.f_globals
216+
return var_name in frame.f_back.f_locals or var_name in frame.f_back.f_globals # type: ignore
215217
finally:
216218
del frame
217219

@@ -228,8 +230,10 @@ def sys_path(path: Path | str):
228230
sys.path.pop(0)
229231

230232

231-
class Data(object):
232-
pass
233+
@dataclass
234+
class Data:
235+
stdout: str = ""
236+
stderr: str = ""
233237

234238

235239
@contextlib.contextmanager
@@ -257,9 +261,12 @@ def custom_repr(arg: Any):
257261
if not isinstance(arg, Enum):
258262
return repr(arg)
259263

260-
m = re.fullmatch(r"<([\w.]+): (.*)>", repr(arg))
264+
match = re.fullmatch(r"<([\w.]+): (.*)>", repr(arg))
261265

262-
return m[1]
266+
if match is None:
267+
raise ValueError(f"Unexpected repr format for Enum: {repr(arg)}")
268+
269+
return match[1]
263270

264271

265272
def stringify_args(args):
@@ -300,8 +307,8 @@ def extract_var_name_args_and_kwargs(ui_args: dict):
300307
Positional arguments are returned in a list, keyword arguments are returned in a dictionary.
301308
302309
"""
303-
from gui_executor.utypes import VariableName
304310
from gui_executor.exec import ArgumentKind
311+
from gui_executor.utypes import VariableName
305312

306313
args = [
307314
v.annotation.get_value()
@@ -335,13 +342,13 @@ def create_code_snippet(func: Callable, args: List, kwargs: Dict, call_func: boo
335342
{stringify_imports(args, kwargs)}
336343
error = False
337344
{stringify_var_name_checks(args, kwargs)}
338-
345+
339346
def main():
340347
response = {func.__name__}({stringify_args(args)}{", " if args else ""}{stringify_kwargs(kwargs)}) # [3405691582]
341348
if response is not None:
342349
print(response)
343350
return response
344-
351+
345352
if not error:
346353
{f"{func.__ui_capture_response__} = main()" if call_func else "pass"}
347354
"""
@@ -351,12 +358,12 @@ def main():
351358

352359

353360
def create_code_snippet_renderable(func: Callable, args: List, kwargs: Dict):
354-
snippet = f"{func.__ui_capture_response__} = {func.__name__}({stringify_args(args)}{', ' if args else ''}{stringify_kwargs(kwargs)})"
361+
snippet = f"{func.__ui_capture_response__} = {func.__name__}({stringify_args(args)}{', ' if args else ''}{stringify_kwargs(kwargs)})" # type: ignore
355362

356363
return Panel(Syntax(snippet, "python", theme="default", word_wrap=True), box=box.HORIZONTALS)
357364

358365

359-
def select_directory(directory: str = None) -> str:
366+
def select_directory(directory: str | None = None) -> str:
360367
dialog = QFileDialog()
361368
dialog.setOption(QFileDialog.ShowDirsOnly, True)
362369
dialog.setOption(QFileDialog.ReadOnly, True)
@@ -371,7 +378,7 @@ def select_directory(directory: str = None) -> str:
371378
return filenames[0] if filenames is not None else ""
372379

373380

374-
def select_file(filename: str = None, full_path: bool = True) -> str:
381+
def select_file(filename: str | None = None, full_path: bool = True) -> str:
375382
dialog = QFileDialog()
376383
dialog.setDirectory(filename)
377384
dialog.setOption(QFileDialog.ReadOnly, True)
@@ -384,7 +391,7 @@ def select_file(filename: str = None, full_path: bool = True) -> str:
384391
return filenames[0] if filenames is not None else ""
385392

386393

387-
def combo_box_from_enum(enumeration: Enum) -> QComboBox:
394+
def combo_box_from_enum(enumeration: Type[Enum]) -> QComboBox:
388395
cb = QComboBox()
389396
cb.addItems([x.name for x in enumeration])
390397
return cb
@@ -517,7 +524,7 @@ def b64decode(s, altchars=None, validate=False):
517524

518525
def print_system_info():
519526
import sys
520-
import rich
527+
521528
import distro
522529

523530
rich.print(f"distro: {distro.name()}, {distro.version(pretty=True)}")
@@ -650,3 +657,24 @@ def wrapper_timer(*args, **kwargs):
650657
return wrapper_timer
651658

652659
return actual_decorator
660+
661+
662+
def borg(cls):
663+
"""
664+
Use the Borg pattern to make a class with a shared state between its instances and subclasses.
665+
666+
from:
667+
[we don't need no singleton](
668+
http://code.activestate.com/recipes/66531-singleton-we-dont-need-no-stinkin-singleton-the-bo/)
669+
"""
670+
671+
cls._shared_state = {}
672+
orig_init = cls.__init__
673+
674+
def new_init(self, *args, **kwargs):
675+
self.__dict__ = cls._shared_state
676+
orig_init(self, *args, **kwargs)
677+
678+
cls.__init__ = new_init
679+
680+
return cls

0 commit comments

Comments
 (0)