diff --git a/Lib/_pyrepl/windows_console.py b/Lib/_pyrepl/windows_console.py index 17942c8df0731a..af68edff625192 100644 --- a/Lib/_pyrepl/windows_console.py +++ b/Lib/_pyrepl/windows_console.py @@ -64,6 +64,8 @@ def __init__(self, err: int | None, descr: str | None = None) -> None: if TYPE_CHECKING: from typing import IO +INPUT_BUFFER_LEN = 10 * 1024 + # Virtual-Key Codes: https://learn.microsoft.com/en-us/windows/win32/inputdev/virtual-key-codes VK_MAP: dict[int, str] = { 0x23: "end", # VK_END @@ -165,6 +167,10 @@ def __init__( # Console I/O is redirected, fallback... self.out = None + self.input_buffer = (INPUT_RECORD * INPUT_BUFFER_LEN)() + self.input_buffer_pos = 0 + self.input_buffer_count = 0 + def refresh(self, screen: list[str], c_xy: tuple[int, int]) -> None: """ Refresh the console screen. @@ -415,16 +421,27 @@ def _getscrollbacksize(self) -> int: return info.srWindow.Bottom # type: ignore[no-any-return] + def more_in_buffer(self) -> bool: + return self.input_buffer_pos < self.input_buffer_count - 1 + def _read_input(self, block: bool = True) -> INPUT_RECORD | None: if not block and not self.wait(timeout=0): return None - rec = INPUT_RECORD() + if self.more_in_buffer(): + self.input_buffer_pos += 1 + return self.input_buffer[self.input_buffer_pos] + + # read next chunk + self.input_buffer_pos = 0 + self.input_buffer_count = 0 read = DWORD() - if not ReadConsoleInput(InHandle, rec, 1, read): + + if not ReadConsoleInput(InHandle, self.input_buffer, INPUT_BUFFER_LEN, read): raise WinError(GetLastError()) - return rec + self.input_buffer_count = read.value + return self.input_buffer[0] def get_event(self, block: bool = True) -> Event | None: """Return an Event instance. Returns None if |block| is false @@ -521,9 +538,15 @@ def forgetinput(self) -> None: def getpending(self) -> Event: """Return the characters that have been typed but not yet processed.""" - return Event("key", "", b"") + e = Event("key", "", b"") + + while not self.event_queue.empty(): + e2 = self.event_queue.get() + e.data += e2.data - def wait(self, timeout: float | None) -> bool: + return e + + def wait_for_event(self, timeout: float | None) -> bool: """Wait for an event.""" if timeout is None: timeout = INFINITE @@ -536,6 +559,16 @@ def wait(self, timeout: float | None) -> bool: return False return True + def wait(self, timeout: float | None = None) -> bool: + """ + Wait for events on the console. + """ + return ( + not self.event_queue.empty() + or self.more_in_buffer() + or self.wait_for_event(timeout) + ) + def repaint(self) -> None: raise NotImplementedError("No repaint support")