diff --git a/latch_cli/menus/__init__.py b/latch_cli/menus/__init__.py new file mode 100644 index 00000000..ea2099ea --- /dev/null +++ b/latch_cli/menus/__init__.py @@ -0,0 +1,8 @@ +import sys + +if sys.platform == "win32": + from .win32 import raw_input, select_tui +else: + from .vt100 import raw_input, select_tui + +from .common import * diff --git a/latch_cli/menus/common.py b/latch_cli/menus/common.py new file mode 100644 index 00000000..9d29cde8 --- /dev/null +++ b/latch_cli/menus/common.py @@ -0,0 +1,37 @@ +import sys +from typing import Callable, Generic, Tuple + +from git import Optional +from typing_extensions import TypedDict, TypeVar + +old_print = print + + +def buffered_print() -> Tuple[Callable, Callable]: + buffer = [] + + def __print(*args): + for arg in args: + buffer.append(arg) + + def __show(): + nonlocal buffer + sys.stdout.write("".join(buffer)) + sys.stdout.flush() + + buffer = [] + + return __print, __show + + +# Allows for exactly one print per render, removing any weird flashing +# behavior and also speeding things up considerably +print, show = buffered_print() + + +T = TypeVar("T") + + +class SelectOption(TypedDict, Generic[T]): + display_name: str + value: T diff --git a/latch_cli/menus.py b/latch_cli/menus/vt100.py similarity index 74% rename from latch_cli/menus.py rename to latch_cli/menus/vt100.py index 9190881b..cfbf3faf 100644 --- a/latch_cli/menus.py +++ b/latch_cli/menus/vt100.py @@ -1,37 +1,25 @@ import os import sys -from typing import Any, Callable, Generic, List, Optional, Tuple, TypeVar +import termios +import tty +from functools import wraps +from typing import Callable, List, Optional, Tuple, TypeVar -from typing_extensions import TypedDict +from typing_extensions import ParamSpec from latch_cli.click_utils import AnsiCodes +from . import common -def buffered_print() -> Tuple[Callable, Callable]: - buffer = [] - - def __print(*args): - for arg in args: - buffer.append(arg) - - def __show(): - nonlocal buffer - print("".join(buffer), flush=True, end="") - buffer = [] - - return __print, __show - - -# Allows for exactly one print per render, removing any weird flashing -# behavior and also speeding things up considerably -_print, _show = buffered_print() +P = ParamSpec("P") +T = TypeVar("T") def clear(k: int): """ - Clear `k` lines below the cursor, returning the cursor to its original position + Clear `k` lines below the cursor, returning the cursor to the start of its original line """ - _print(f"\x1b[2K\x1b[1E" * (k) + f"\x1b[{k}F") + print(f"\x1b[2K\x1b[1E" * (k) + f"\x1b[{k}F") def draw_box( @@ -50,15 +38,15 @@ def draw_box( def clear_screen(): - _print("\x1b[2J") + print("\x1b[2J") def remove_cursor(): - _print("\x1b[?25l") + print("\x1b[?25l") def reveal_cursor(): - _print("\x1b[?25h") + print("\x1b[?25h") def move_cursor(pos: Tuple[int, int]): @@ -68,55 +56,45 @@ def move_cursor(pos: Tuple[int, int]): x, y = pos if x < 0 or y < 0: return - _print(f"\x1b[{y};{x}H") + print(f"\x1b[{y};{x}H") def move_cursor_up(n: int): if n <= 0: return - _print(f"\x1b[{n}A") + print(f"\x1b[{n}A") def line_up(n: int): """Moves to the start of the destination line""" if n <= 0: return - _print(f"\x1b[{n}F") + print(f"\x1b[{n}F") def move_cursor_down(n: int): if n <= 0: return - _print(f"\x1b[{n}B") + print(f"\x1b[{n}B") def line_down(n: int): """Moves to the start of the destination line""" if n <= 0: return - _print(f"\x1b[{n}E") + print(f"\x1b[{n}E") def move_cursor_right(n: int): if n <= 0: return - _print(f"\x1b[{n}C") + print(f"\x1b[{n}C") def move_cursor_left(n: int): if n <= 0: return - _print(f"\x1b[{n}D") - - -def current_cursor_position() -> Tuple[int, int]: - res = b"" - sys.stdout.write("\x1b[6n") - sys.stdout.flush() - while not res.endswith(b"R"): - res += sys.stdin.buffer.read(1) - y, x = res.strip(b"\x1b[R").split(b";") - return int(x), int(y) + print(f"\x1b[{n}D") def draw_vertical_line( @@ -134,16 +112,16 @@ def draw_vertical_line( return if color is not None: - _print(color) + print(color) sep = "\x1b[1A" if up else "\x1b[1B" for i in range(height): if i == 0 and make_corner: corner = "\u2514" if up else "\u2510" - _print(f"{corner}\x1b[1D{sep}") + print(f"{corner}\x1b[1D{sep}") else: - _print(f"\u2502\x1b[1D{sep}") + print(f"\u2502\x1b[1D{sep}") if color is not None: - _print("\x1b[0m") + print("\x1b[0m") def draw_horizontal_line( @@ -161,16 +139,30 @@ def draw_horizontal_line( return if color is not None: - _print(color) + print(color) sep = "\x1b[2D" if left else "" for i in range(width): if i == 0 and make_corner: corner = "\u2518" if left else "\u250c" - _print(f"{corner}{sep}") + print(f"{corner}{sep}") else: - _print(f"\u2500{sep}") + print(f"\u2500{sep}") if color is not None: - _print("\x1b[0m") + print("\x1b[0m") + + +def raw_input(f: Callable[P, T]) -> Callable[P, T]: + @wraps(f) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: + old_settings = termios.tcgetattr(sys.stdin.fileno()) + tty.setraw(sys.stdin.fileno()) + + try: + return f(*args, **kwargs) + finally: + termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, old_settings) + + return wrapper def read_next_byte() -> bytes: @@ -194,16 +186,9 @@ def read_bytes(num_bytes: int) -> bytes: return result -T = TypeVar("T") - - -class SelectOption(TypedDict, Generic[T]): - display_name: str - value: T - - +@raw_input def select_tui( - title: str, options: List[SelectOption[T]], clear_terminal: bool = True + title: str, options: List[common.SelectOption[T]], clear_terminal: bool = True ) -> Optional[T]: """ Renders a terminal UI that allows users to select one of the options @@ -228,7 +213,7 @@ def render( if curr_selected < 0 or curr_selected >= len(options): curr_selected = 0 - _print(title) + print(title) line_down(2) num_lines_rendered = 4 # 4 "extra" lines for header + footer @@ -244,27 +229,21 @@ def render( prefix = indent[:-2] + "> " - _print(f"{color}{bold}{prefix}{name}{reset}\x1b[1E") + print(f"{color}{bold}{prefix}{name}{reset}\x1b[1E") else: - _print(f"{indent}{name}\x1b[1E") + print(f"{indent}{name}\x1b[1E") num_lines_rendered += 1 line_down(1) control_str = "[ARROW-KEYS] Navigate\t[ENTER] Select\t[Q] Quit" - _print(control_str) + print(control_str) line_up(num_lines_rendered - 1) - _show() + common.show() return num_lines_rendered - import termios - import tty - - old_settings = termios.tcgetattr(sys.stdin.fileno()) - tty.setraw(sys.stdin.fileno()) - curr_selected = 0 start_index = 0 _, term_height = os.get_terminal_size() @@ -319,5 +298,4 @@ def render( finally: clear(num_lines_rendered) reveal_cursor() - _show() - termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, old_settings) + common.show() diff --git a/latch_cli/menus/win32.py b/latch_cli/menus/win32.py new file mode 100644 index 00000000..28de5010 --- /dev/null +++ b/latch_cli/menus/win32.py @@ -0,0 +1,412 @@ +import msvcrt +import os +import sys +from ctypes import byref, c_char, c_long, c_ulong, pointer, windll +from ctypes.wintypes import DWORD, HANDLE +from dataclasses import dataclass +from enum import Enum +from functools import wraps +from typing import Callable, List, Optional, Tuple, TypeVar + +from typing_extensions import ParamSpec + +from latch_cli.click_utils import AnsiCodes + +from . import common +from .win32_types import ( + CONSOLE_SCREEN_BUFFER_INFO, + COORD, + INPUT_RECORD, + KEY_EVENT_RECORD, + MOUSE_EVENT_RECORD, + STD_INPUT_HANDLE, + STD_OUTPUT_HANDLE, + EventTypes, +) + +P = ParamSpec("P") +T = TypeVar("T") + + +hconsole = HANDLE(windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)) + + +# convenience dataclasses for readability +@dataclass +class Pair: + x: int + y: int + + +@dataclass +class ConsoleScreenBufferInfo: + raw: CONSOLE_SCREEN_BUFFER_INFO + buffer_size: Pair + cursor_position: Pair + attributes: int + window_top_left: Pair + window_bottom_right: Pair + max_window_size: Pair + + def window_dimensions(self) -> Pair: + return Pair( + x=self.window_bottom_right.x - self.window_top_left.x, + y=self.window_bottom_right.y - self.window_top_left.y, + ) + + +def get_win32_screen_buffer_info() -> ConsoleScreenBufferInfo: + """ + https://github.com/prompt-toolkit/python-prompt-toolkit/blob/669541123c9a72da1fda662cbd0a18ffe9e6d113/src/prompt_toolkit/output/win32.py#L185 + """ + + sbinfo = CONSOLE_SCREEN_BUFFER_INFO() + + success = windll.kernel32.GetConsoleScreenBufferInfo(hconsole, byref(sbinfo)) + + if success: + return ConsoleScreenBufferInfo( + raw=sbinfo, + buffer_size=Pair( + x=sbinfo.dwSize.X, + y=sbinfo.dwSize.Y, + ), + cursor_position=Pair( + x=sbinfo.dwCursorPosition.X, + y=sbinfo.dwCursorPosition.Y, + ), + attributes=sbinfo.wAttributes, + window_top_left=Pair( + x=sbinfo.srWindow.Left, + y=sbinfo.srWindow.Top, + ), + window_bottom_right=Pair( + x=sbinfo.srWindow.Right, + y=sbinfo.srWindow.Bottom, + ), + max_window_size=Pair( + x=sbinfo.dwMaximumWindowSize.X, + y=sbinfo.dwMaximumWindowSize.Y, + ), + ) + else: + raise RuntimeError("No windows console found") + + +def _erase(start: COORD, length: int) -> None: + """ + https://github.com/prompt-toolkit/python-prompt-toolkit/blob/669541123c9a72da1fda662cbd0a18ffe9e6d113/src/prompt_toolkit/output/win32.py#L255 + """ + chars_written = c_ulong() + + windll.kernel32.FillConsoleOutputCharacterA( + hconsole, c_char(b" "), DWORD(length), start, byref(chars_written) + ) + + # Reset attributes. + sbinfo = get_win32_screen_buffer_info() + windll.kernel32.FillConsoleOutputAttribute( + hconsole, sbinfo.attributes, length, start, byref(chars_written) + ) + + +def clear(k: int): + """ + Clear `k` lines below the cursor, returning the cursor to the start of its original line + """ + sbinfo = get_win32_screen_buffer_info() + + length = sbinfo.window_dimensions().x * k + _erase(sbinfo.raw.dwCursorPosition, length) + + # does the cursor move at all here? do we need to move back to the original line? + + +def clear_screen(): + sbinfo = get_win32_screen_buffer_info() + dims = sbinfo.window_dimensions() + + start = COORD(X=sbinfo.window_top_left.x, Y=sbinfo.window_top_left.y) + length = dims.x * dims.y + + _erase(start, length) + + +def remove_cursor(): + pass # not supported in windows afaict + + +def reveal_cursor(): + pass # not supported in windows afaict + + +def move_cursor(x: int, y: int): + """ + Move the cursor to a given (x, y) coordinate + """ + if x < 0 or y < 0: + return + + windll.kernel32.SetConsoleCursorPosition(hconsole, COORD(X=x, Y=y)) + + +def move_cursor_up(n: int): + if n <= 0: + return + + sbinfo = get_win32_screen_buffer_info() + + x = sbinfo.cursor_position.x + y = sbinfo.cursor_position.y - n + move_cursor(x, y) + + +def line_up(n: int): + """Moves to the start of the destination line""" + sbinfo = get_win32_screen_buffer_info() + + x = 0 + y = sbinfo.cursor_position.y - n + move_cursor(x, y) + + +def line_down(n: int): + """Moves to the start of the destination line""" + + line_up(-n) + + +def raw_input(f: Callable[P, T]) -> Callable[P, T]: + # ayush: got most of this from + # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/669541123c9a72da1fda662cbd0a18ffe9e6d113/src/prompt_toolkit/input/win32.py + @wraps(f) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: + + original_mode = DWORD() + handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE)) + windll.kernel32.GetConsoleMode(handle, pointer(original_mode)) + + enable_echo_input = 0x0004 + enable_line_input = 0x0002 + enable_processed_input = 0x0001 + + windll.kernel32.SetConsoleMode( + handle, + original_mode.value + & ~(enable_echo_input | enable_line_input | enable_processed_input), + ) + + try: + return f(*args, **kwargs) + finally: + windll.kernel32.SetConsoleMode(handle, original_mode) + + return wrapper + + +class Special(Enum): + left = "left" + right = "right" + up = "up" + down = "down" + enter = "enter" + ctrlc = "ctrlc" + ctrld = "ctrld" + + +@dataclass +class KeyInput: + special: Optional[Special] + value: str + + +# ignoring most things except for arrow keys / enter +def get_key_input() -> Optional[KeyInput]: + # ayush: + # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/669541123c9a72da1fda662cbd0a18ffe9e6d113/src/prompt_toolkit/input/win32.py#L127 + handle: HANDLE + if sys.stdin.isatty(): + handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE)) + else: + _fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY) + handle = HANDLE(msvcrt.get_osfhandle(_fdcon)) + + max_count = 2048 # Max events to read at the same time. + + read = DWORD(0) + arrtype = INPUT_RECORD * max_count + input_records = arrtype() + + windll.kernel32.ReadConsoleInputW( + handle, pointer(input_records), max_count, pointer(read) + ) + + for i in range(read.value): + ir = input_records[i] + + if ir.EventType not in EventTypes: + continue + + ev = getattr(ir.Event, EventTypes[ir.EventType]) + + if not (isinstance(ev, KEY_EVENT_RECORD) and ev.KeyDown): + continue + + u_char = ev.uChar.UnicodeChar + + special: Optional[Special] = None + if u_char == "\x00": # special keys, e.g. arrow keys + if ev.VirtualKeyCode == 37: + special = Special.left + elif ev.VirtualKeyCode == 38: + special = Special.up + elif ev.VirtualKeyCode == 39: + special = Special.right + elif ev.VirtualKeyCode == 40: + special = Special.down + elif u_char == "\x0d": + special = Special.enter + elif u_char == "\x03": + special = Special.ctrlc + elif u_char == "\x04": + special = Special.ctrld + + return KeyInput(special, u_char) + + +def set_attrs(color: int = 0xB, bold: bool = True) -> int: + common.show() + + sbinfo = get_win32_screen_buffer_info() + attrs = sbinfo.attributes + + attrs = attrs & ~0xF + attrs |= color + + if bold: + attrs |= 1 << 8 # bold + else: + attrs &= ~(1 << 8) + + windll.kernel32.SetConsoleTextAttribute(hconsole, attrs) + + return sbinfo.attributes + + +def reset_attrs(attrs: int): + windll.kernel32.SetConsoleTextAttribute(hconsole, attrs) + + +@raw_input +def select_tui( + title: str, options: List[common.SelectOption[T]], clear_terminal: bool = True +) -> Optional[T]: + """ + Renders a terminal UI that allows users to select one of the options + listed in `options` + + Args: + title: The title of the selection window. + options: A list of names for each of the options. + clear_terminal: Whether or not to clear the entire terminal window + before displaying - default False + """ + + if len(options) == 0: + raise ValueError("No options given") + + def render( + curr_selected: int, + start_index: int = 0, + max_per_page: int = 10, + indent: str = " ", + ) -> int: + if curr_selected < 0 or curr_selected >= len(options): + curr_selected = 0 + + print(title) + line_down(1) + + num_lines_rendered = 5 # 4 "extra" lines for header + footer + + for i in range(start_index, start_index + max_per_page): + if i >= len(options): + break + + name = options[i]["display_name"] + + if i == curr_selected: + prefix = indent[:-2] + "> " + + old = set_attrs() + print(f"{prefix}{name}") + reset_attrs(old) + else: + print(f"{indent}{name}") + + num_lines_rendered += 1 + + line_down(1) + + control_str = "[ARROW-KEYS] Navigate\t[ENTER] Select\t[Q] Quit" + print(control_str) + line_up(num_lines_rendered - 1) + + return num_lines_rendered + + curr_selected = 0 + start_index = 0 + _, term_height = os.get_terminal_size() + remove_cursor() + + max_per_page = min(len(options), term_height - 4) + + if clear_terminal: + clear_screen() + move_cursor(0, 0) + else: + print("\n" * (max_per_page + 3)) + move_cursor_up(max_per_page + 4) + + num_lines_rendered = render( + curr_selected, + start_index=start_index, + max_per_page=max_per_page, + ) + + try: + while True: + k = get_key_input() + if k is None: + continue + + if k.special == Special.enter: + return options[curr_selected]["value"] + elif k.special == Special.up: # Up Arrow + curr_selected = max(curr_selected - 1, 0) + if curr_selected - start_index < max_per_page // 2 and start_index > 0: + start_index -= 1 + elif k.special == Special.down: # Down Arrow + curr_selected = min(curr_selected + 1, len(options) - 1) + if ( + curr_selected - start_index > max_per_page // 2 + and start_index < len(options) - max_per_page + ): + start_index += 1 + elif k.special in {Special.ctrlc, Special.ctrld} or k.value in {"q", "Q"}: + return + else: + continue + + clear(num_lines_rendered) + num_lines_rendered = render( + curr_selected, + start_index=start_index, + max_per_page=max_per_page, + ) + except KeyboardInterrupt: + ... + finally: + clear(num_lines_rendered) + reveal_cursor() + common.show() diff --git a/latch_cli/menus/win32_types.py b/latch_cli/menus/win32_types.py new file mode 100644 index 00000000..e949168e --- /dev/null +++ b/latch_cli/menus/win32_types.py @@ -0,0 +1,232 @@ +# ayush: this code is lifted from prompt-toolkit +# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/669541123c9a72da1fda662cbd0a18ffe9e6d113/src/prompt_toolkit/win32_types.py + +from __future__ import annotations + +from ctypes import Structure, Union, c_char, c_long, c_short, c_ulong +from ctypes.wintypes import BOOL, DWORD, LPVOID, WCHAR, WORD +from typing import TYPE_CHECKING + +# Input/Output standard device numbers. Note that these are not handle objects. +# It's the `windll.kernel32.GetStdHandle` system call that turns them into a +# real handle object. +STD_INPUT_HANDLE = c_ulong(-10) +STD_OUTPUT_HANDLE = c_ulong(-11) +STD_ERROR_HANDLE = c_ulong(-12) + + +class COORD(Structure): + """ + Struct in wincon.h + http://msdn.microsoft.com/en-us/library/windows/desktop/ms682119(v=vs.85).aspx + """ + + if TYPE_CHECKING: + X: int + Y: int + + _fields_ = [ + ("X", c_short), # Short + ("Y", c_short), # Short + ] + + def __repr__(self) -> str: + return "{}(X={!r}, Y={!r}, type_x={!r}, type_y={!r})".format( + self.__class__.__name__, + self.X, + self.Y, + type(self.X), + type(self.Y), + ) + + +class UNICODE_OR_ASCII(Union): + if TYPE_CHECKING: + AsciiChar: bytes + UnicodeChar: str + + _fields_ = [ + ("AsciiChar", c_char), + ("UnicodeChar", WCHAR), + ] + + +class KEY_EVENT_RECORD(Structure): + """ + http://msdn.microsoft.com/en-us/library/windows/desktop/ms684166(v=vs.85).aspx + """ + + if TYPE_CHECKING: + KeyDown: int + RepeatCount: int + VirtualKeyCode: int + VirtualScanCode: int + uChar: UNICODE_OR_ASCII + ControlKeyState: int + + _fields_ = [ + ("KeyDown", c_long), # bool + ("RepeatCount", c_short), # word + ("VirtualKeyCode", c_short), # word + ("VirtualScanCode", c_short), # word + ("uChar", UNICODE_OR_ASCII), # Unicode or ASCII. + ("ControlKeyState", c_long), # double word + ] + + +class MOUSE_EVENT_RECORD(Structure): + """ + http://msdn.microsoft.com/en-us/library/windows/desktop/ms684239(v=vs.85).aspx + """ + + if TYPE_CHECKING: + MousePosition: COORD + ButtonState: int + ControlKeyState: int + EventFlags: int + + _fields_ = [ + ("MousePosition", COORD), + ("ButtonState", c_long), # dword + ("ControlKeyState", c_long), # dword + ("EventFlags", c_long), # dword + ] + + +class WINDOW_BUFFER_SIZE_RECORD(Structure): + """ + http://msdn.microsoft.com/en-us/library/windows/desktop/ms687093(v=vs.85).aspx + """ + + if TYPE_CHECKING: + Size: COORD + + _fields_ = [("Size", COORD)] + + +class MENU_EVENT_RECORD(Structure): + """ + http://msdn.microsoft.com/en-us/library/windows/desktop/ms684213(v=vs.85).aspx + """ + + if TYPE_CHECKING: + CommandId: int + + _fields_ = [("CommandId", c_long)] # uint + + +class FOCUS_EVENT_RECORD(Structure): + """ + http://msdn.microsoft.com/en-us/library/windows/desktop/ms683149(v=vs.85).aspx + """ + + if TYPE_CHECKING: + SetFocus: int + + _fields_ = [("SetFocus", c_long)] # bool + + +class EVENT_RECORD(Union): + if TYPE_CHECKING: + KeyEvent: KEY_EVENT_RECORD + MouseEvent: MOUSE_EVENT_RECORD + WindowBufferSizeEvent: WINDOW_BUFFER_SIZE_RECORD + MenuEvent: MENU_EVENT_RECORD + FocusEvent: FOCUS_EVENT_RECORD + + _fields_ = [ + ("KeyEvent", KEY_EVENT_RECORD), + ("MouseEvent", MOUSE_EVENT_RECORD), + ("WindowBufferSizeEvent", WINDOW_BUFFER_SIZE_RECORD), + ("MenuEvent", MENU_EVENT_RECORD), + ("FocusEvent", FOCUS_EVENT_RECORD), + ] + + +class INPUT_RECORD(Structure): + """ + http://msdn.microsoft.com/en-us/library/windows/desktop/ms683499(v=vs.85).aspx + """ + + if TYPE_CHECKING: + EventType: int + Event: EVENT_RECORD + + _fields_ = [("EventType", c_short), ("Event", EVENT_RECORD)] # word # Union. + + +EventTypes = { + 1: "KeyEvent", + 2: "MouseEvent", + 4: "WindowBufferSizeEvent", + 8: "MenuEvent", + 16: "FocusEvent", +} + + +class SMALL_RECT(Structure): + """struct in wincon.h.""" + + if TYPE_CHECKING: + Left: int + Top: int + Right: int + Bottom: int + + _fields_ = [ + ("Left", c_short), + ("Top", c_short), + ("Right", c_short), + ("Bottom", c_short), + ] + + +class CONSOLE_SCREEN_BUFFER_INFO(Structure): + """struct in wincon.h.""" + + if TYPE_CHECKING: + dwSize: COORD + dwCursorPosition: COORD + wAttributes: int + srWindow: SMALL_RECT + dwMaximumWindowSize: COORD + + _fields_ = [ + ("dwSize", COORD), + ("dwCursorPosition", COORD), + ("wAttributes", WORD), + ("srWindow", SMALL_RECT), + ("dwMaximumWindowSize", COORD), + ] + + def __repr__(self) -> str: + return "CONSOLE_SCREEN_BUFFER_INFO({!r},{!r},{!r},{!r},{!r},{!r},{!r},{!r},{!r},{!r},{!r})".format( + self.dwSize.Y, + self.dwSize.X, + self.dwCursorPosition.Y, + self.dwCursorPosition.X, + self.wAttributes, + self.srWindow.Top, + self.srWindow.Left, + self.srWindow.Bottom, + self.srWindow.Right, + self.dwMaximumWindowSize.Y, + self.dwMaximumWindowSize.X, + ) + + +class SECURITY_ATTRIBUTES(Structure): + """ + http://msdn.microsoft.com/en-us/library/windows/desktop/aa379560(v=vs.85).aspx + """ + + if TYPE_CHECKING: + nLength: int + lpSecurityDescriptor: int + bInheritHandle: int # BOOL comes back as 'int'. + + _fields_ = [ + ("nLength", DWORD), + ("lpSecurityDescriptor", LPVOID), + ("bInheritHandle", BOOL), + ] diff --git a/latch_cli/services/execute/main.py b/latch_cli/services/execute/main.py index b98219d0..2092a444 100644 --- a/latch_cli/services/execute/main.py +++ b/latch_cli/services/execute/main.py @@ -4,13 +4,14 @@ import os import signal import sys -from typing import Generic, Literal, Optional, Tuple, TypedDict, TypeVar, Union +from typing import Literal, Optional, TypedDict, Union from urllib.parse import urljoin, urlparse import websockets.client as websockets from latch_sdk_config.latch import NUCLEUS_URL from typing_extensions import TypeAlias +from latch_cli.menus import raw_input from latch_cli.services.execute.utils import ( ContainerNode, EGNNode, @@ -172,6 +173,7 @@ async def connect(egn_info: EGNNode, container_info: Optional[ContainerNode]): pass +@raw_input def exec( execution_id: Optional[str] = None, egn_id: Optional[str] = None, @@ -184,13 +186,4 @@ def exec( egn_info = get_egn_info(execution_info, egn_id) container_info = get_container_info(egn_info, container_index) - import termios - import tty - - old_settings_stdin = termios.tcgetattr(sys.stdin.fileno()) - tty.setraw(sys.stdin) - - try: - asyncio.run(connect(egn_info, container_info)) - finally: - termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, old_settings_stdin) + asyncio.run(connect(egn_info, container_info)) diff --git a/latch_cli/services/get_executions.py b/latch_cli/services/get_executions.py index 377dbd8b..d8344fec 100644 --- a/latch_cli/services/get_executions.py +++ b/latch_cli/services/get_executions.py @@ -4,6 +4,7 @@ from pathlib import Path from typing import Dict, List +import click from apscheduler.schedulers.background import BackgroundScheduler from latch_sdk_config.latch import config @@ -26,6 +27,12 @@ def get_executions(): setting will likely cause the interface to behave in an unexpected way. """ + # todo(ayush): re-implement the other tui fns to work in win32 as well + if sys.platform == "win32": + click.secho("This function is not supported on Windows systems.", fg="red") + + raise click.exceptions.Exit(1) + token = retrieve_or_login() context = current_workspace() headers = {"Authorization": f"Bearer {token}"} @@ -60,6 +67,7 @@ def get_executions(): ) +@menus.raw_input def _all_executions( title: str, column_names: List[str], @@ -80,7 +88,7 @@ def render( ) -> int: # DISCLAIMER : MOST OF THE MAGIC NUMBERS HERE WERE THROUGH TRIAL AND ERROR - menus.move_cursor((2, 2)) + menus.vt100.move_cursor((2, 2)) max_per_page = term_height - 5 vert_index = max(0, curr_selected - max_per_page // 2) @@ -103,8 +111,8 @@ def render( max_row_len = sum(lengths.values()) + column_spacing * (len(column_names) - 1) - menus._print(title) - menus.line_down(2) + menus.print(title) + menus.vt100.line_down(2) for i in range(vert_index, vert_index + max_per_page): if i >= len(options): @@ -126,36 +134,30 @@ def render( reset = "\x1b[0m" row_str = f"{green}{bold}{row_str}{reset}" - menus.move_cursor_right(3) - menus._print(row_str) - menus.line_down(1) + menus.vt100.move_cursor_right(3) + menus.print(row_str) + menus.vt100.line_down(1) - menus.move_cursor((2, term_height - 1)) + menus.vt100.move_cursor((2, term_height - 1)) control_str = "[ARROW-KEYS] Navigate\t[ENTER] Select\t[Q] Quit" - menus._print(control_str) - menus.draw_box((2, 3), term_height - 5, term_width - 4) + menus.print(control_str) + menus.vt100.draw_box((2, 3), term_height - 5, term_width - 4) - menus._show() + menus.show() return max_row_len - import termios - import tty - - old_settings = termios.tcgetattr(sys.stdin.fileno()) - tty.setraw(sys.stdin.fileno()) - curr_selected = hor_index = 0 - menus.remove_cursor() - menus.clear_screen() - menus.move_cursor((0, 0)) + menus.vt100.remove_cursor() + menus.vt100.clear_screen() + menus.vt100.move_cursor((0, 0)) prev = (curr_selected, hor_index, term_width, term_height) max_row_len = render(curr_selected, hor_index, term_width, term_height) try: while True: - b = menus.read_bytes(1) + b = menus.vt100.read_bytes(1) term_width, term_height = os.get_terminal_size() rerender = False if b == b"\r": @@ -171,7 +173,7 @@ def render( _execution_dashboard(selected_execution_data, resp.json()) rerender = True elif b == b"\x1b": - b = menus.read_bytes(2) + b = menus.vt100.read_bytes(2) if b == b"[A": # Up Arrow curr_selected = max(0, curr_selected - 1) elif b == b"[B": # Down Arrow @@ -183,7 +185,7 @@ def render( if max_row_len > term_width + 7: hor_index = min(max_row_len - term_width + 7, hor_index + 5) elif b == b"[1": # Start of SHIFT + arrow keys - b = menus.read_bytes(3) + b = menus.vt100.read_bytes(3) if b == b";2A": # Up Arrow curr_selected = max(0, curr_selected - 20) elif b == b";2B": # Down Arrow @@ -220,16 +222,15 @@ def render( (curr_selected, hor_index, term_width, term_height) != prev ): prev = (curr_selected, hor_index, term_width, term_height) - menus.clear_screen() + menus.vt100.clear_screen() max_row_len = render(curr_selected, hor_index, term_width, term_height) except KeyboardInterrupt: ... finally: - menus.clear_screen() - menus.reveal_cursor() - menus.move_cursor((0, 0)) - menus._show() - termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, old_settings) + menus.vt100.clear_screen() + menus.vt100.reveal_cursor() + menus.vt100.move_cursor((0, 0)) + menus.show() def _execution_dashboard(execution_data: Dict[str, str], workflow_graph: Dict): @@ -237,13 +238,13 @@ def _execution_dashboard(execution_data: Dict[str, str], workflow_graph: Dict): def render(curr_selected: int, term_width: int, term_height: int): # DISCLAIMER : MOST OF THE MAGIC NUMBERS HERE WERE THROUGH TRIAL AND ERROR - menus.move_cursor((2, 2)) - menus._print( + menus.vt100.move_cursor((2, 2)) + menus.print( f'{execution_data["display_name"]} - {execution_data["workflow_tagged"]}' ) - menus.draw_box((2, 3), term_height - 5, term_width - 4) + menus.vt100.draw_box((2, 3), term_height - 5, term_width - 4) - menus.move_cursor((2, term_height - 1)) + menus.vt100.move_cursor((2, term_height - 1)) instructions = [ "[ARROW-KEYS] Navigate", "[ENTER] View Task Logs", @@ -252,9 +253,9 @@ def render(curr_selected: int, term_width: int, term_height: int): if execution_data["status"] == "RUNNING": instructions.append("[A] Abort") instructions.append("[Q] Back to All Executions") - menus._print("\t".join(instructions)) + menus.print("\t".join(instructions)) - menus.move_cursor((4, 4)) + menus.vt100.move_cursor((4, 4)) for i, (_, task) in enumerate(fixed_workflow_graph): name, status = task["name"] or task["sub_wf_name"], task["status"] row_str = " ".join([name, status]) @@ -262,14 +263,14 @@ def render(curr_selected: int, term_width: int, term_height: int): green = "\x1b[38;5;40m" bold = "\x1b[1m" reset = "\x1b[0m" - menus._print(f"{green}{bold}{row_str}{reset}") + menus.print(f"{green}{bold}{row_str}{reset}") else: - menus._print(row_str) - menus.line_down(1) - menus.move_cursor_right(3) - menus._show() + menus.print(row_str) + menus.vt100.line_down(1) + menus.vt100.move_cursor_right(3) + menus.show() - menus.clear_screen() + menus.vt100.clear_screen() try: term_width, term_height = os.get_terminal_size() @@ -278,7 +279,7 @@ def render(curr_selected: int, term_width: int, term_height: int): render(curr_selected, term_width, term_height) prev = (curr_selected, term_width, term_height) while True: - b = menus.read_bytes(1) + b = menus.vt100.read_bytes(1) rerender = False if b == b"\r": _log_window(execution_data, fixed_workflow_graph, curr_selected) @@ -290,13 +291,13 @@ def render(curr_selected: int, term_width: int, term_height: int): _abort_modal(execution_data) rerender = True elif b == b"\x1b": - b = menus.read_bytes(2) + b = menus.vt100.read_bytes(2) if b == b"[A": # Up Arrow curr_selected = max(curr_selected - 1, 0) elif b == b"[B": # Down Arrow curr_selected = min(curr_selected + 1, len(workflow_graph) - 1) elif b == b"[1": # Start of SHIFT + arrow keys - b = menus.read_bytes(3) + b = menus.vt100.read_bytes(3) if b == b";2A": # Up Arrow curr_selected = max(0, curr_selected - 20) elif b == b";2B": # Down Arrow @@ -313,30 +314,30 @@ def render(curr_selected: int, term_width: int, term_height: int): curr_selected = max(curr_selected - 20, 0) term_width, term_height = os.get_terminal_size() if rerender or (prev != (curr_selected, term_width, term_height)): - menus.clear_screen() + menus.vt100.clear_screen() prev = (curr_selected, term_width, term_height) render(curr_selected, term_width, term_height) except KeyboardInterrupt: ... finally: - menus.clear_screen() - menus.move_cursor((0, 0)) - menus._show() + menus.vt100.clear_screen() + menus.vt100.move_cursor((0, 0)) + menus.show() def _loading_screen(text: str): # DISCLAIMER : MOST OF THE MAGIC NUMBERS HERE WERE THROUGH TRIAL AND ERROR term_width, term_height = os.get_terminal_size() - menus.clear_screen() - menus.draw_box((2, 3), term_height - 5, term_width - 4) + menus.vt100.clear_screen() + menus.vt100.draw_box((2, 3), term_height - 5, term_width - 4) x = (term_width - len(text)) // 2 y = term_height // 2 - menus.move_cursor((x, y)) - menus._print(text) - menus._show() + menus.vt100.move_cursor((x, y)) + menus.print(text) + menus.show() def _log_window(execution_data, fixed_workflow_graph: list, selected: int): @@ -367,13 +368,13 @@ def get_logs(): f.write(resp.json()["message"].replace("\t", " ")) def render(vert_index, hor_index, term_width, term_height): - menus.move_cursor((2, 2)) - menus._print( + menus.vt100.move_cursor((2, 2)) + menus.print( f'{execution_data["display_name"]} - {execution_data["workflow_tagged"]} -' f' {selected_task["name"]}' ) - menus.draw_box((2, 3), term_height - 5, term_width - 4) - menus.move_cursor((4, 4)) + menus.vt100.draw_box((2, 3), term_height - 5, term_width - 4) + menus.vt100.move_cursor((4, 4)) with open(log_file, "r") as f: for i, line in enumerate(f): if i < vert_index: @@ -381,17 +382,17 @@ def render(vert_index, hor_index, term_width, term_height): elif i > vert_index + term_height - 7: continue line = line.strip("\n\r") - menus._print(line[hor_index : hor_index + term_width - 7]) - menus.line_down(1) - menus.move_cursor_right(3) - menus.move_cursor((2, term_height - 1)) + menus.print(line[hor_index : hor_index + term_width - 7]) + menus.vt100.line_down(1) + menus.vt100.move_cursor_right(3) + menus.vt100.move_cursor((2, term_height - 1)) control_str = "[ARROW-KEYS] Navigate\t[Q] Back" - menus._print(control_str) - menus._show() + menus.print(control_str) + menus.show() try: term_width, term_height = os.get_terminal_size() - menus.clear_screen() + menus.vt100.clear_screen() get_logs() log_sched = BackgroundScheduler() @@ -406,12 +407,12 @@ def render(vert_index, hor_index, term_width, term_height): render(vert_index, hor_index, term_width, term_height) prev_term_dims = (vert_index, hor_index, term_width, term_height) while True: - b = menus.read_bytes(1) + b = menus.vt100.read_bytes(1) rerender = False if b in (b"r", b"R"): rerender = True elif b == b"\x1b": - b = menus.read_bytes(2) + b = menus.vt100.read_bytes(2) if b == b"[A": # Up Arrow vert_index = max(0, vert_index - 1) elif b == b"[B": # Down Arrow @@ -421,7 +422,7 @@ def render(vert_index, hor_index, term_width, term_height): elif b == b"[C": # Right Arrow hor_index += 5 elif b == b"[1": # Start of SHIFT + arrow keys - b = menus.read_bytes(3) + b = menus.vt100.read_bytes(3) if b == b";2A": # Up Arrow vert_index = max(0, vert_index - 20) elif b == b";2B": # Down Arrow @@ -450,7 +451,7 @@ def render(vert_index, hor_index, term_width, term_height): if rerender or ( prev_term_dims != (vert_index, hor_index, term_width, term_height) ): - menus.clear_screen() + menus.vt100.clear_screen() prev_term_dims = (vert_index, hor_index, term_width, term_height) render(vert_index, hor_index, term_width, term_height) except KeyboardInterrupt: @@ -458,9 +459,9 @@ def render(vert_index, hor_index, term_width, term_height): finally: log_sched.shutdown() log_file.unlink(missing_ok=True) - menus.clear_screen() - menus.move_cursor((0, 0)) - menus._show() + menus.vt100.clear_screen() + menus.vt100.move_cursor((0, 0)) + menus.show() # TODO(ayush): implement this @@ -471,7 +472,7 @@ def _relaunch_modal(execution_data): def _abort_modal(execution_data): def render(term_width: int, term_height: int): # DISCLAIMER : MOST OF THE MAGIC NUMBERS HERE WERE THROUGH TRIAL AND ERROR - menus.clear_screen() + menus.vt100.clear_screen() question = ( f"Are you sure you want to abort {execution_data['display_name']} " @@ -484,23 +485,23 @@ def render(term_width: int, term_height: int): x = (term_width - max_line_length) // 2 y = (term_height - len(lines)) // 2 - menus.draw_box((x - 3, y - 2), len(lines) + 4, max_line_length + 4) + menus.vt100.draw_box((x - 3, y - 2), len(lines) + 4, max_line_length + 4) for i, line in enumerate(lines): x = (term_width - len(line)) // 2 - menus.move_cursor((x, y + i)) - menus._print(f"{line}") + menus.vt100.move_cursor((x, y + i)) + menus.print(f"{line}") ctrl_str = "[Y] Yes\t[N] No" - menus.move_cursor(((term_width - len(ctrl_str)) // 2, y + len(lines) + 1)) - menus._print(ctrl_str) + menus.vt100.move_cursor(((term_width - len(ctrl_str)) // 2, y + len(lines) + 1)) + menus.print(ctrl_str) try: term_width, term_height = os.get_terminal_size() render(term_width, term_height) prev_term_dims = (term_width, term_height) while True: - b = menus.read_bytes(1) + b = menus.vt100.read_bytes(1) if b in (b"y", b"Y"): headers = {"Authorization": f"Bearer {retrieve_or_login()}"} resp = post( @@ -518,5 +519,5 @@ def render(term_width: int, term_height: int): except KeyboardInterrupt: ... finally: - menus.clear_screen() - menus.move_cursor((0, 0)) + menus.vt100.clear_screen() + menus.vt100.move_cursor((0, 0)) diff --git a/latch_cli/services/get_params.py b/latch_cli/services/get_params.py index 7c0584f3..b235234d 100644 --- a/latch_cli/services/get_params.py +++ b/latch_cli/services/get_params.py @@ -45,15 +45,8 @@ class _Unsupported: ... bool: False, } -# TODO(ayush): fix this to -# (1) support records, -# (2) support fully qualified workflow names, -# (note from kenny) - pretty sure you intend to support the opposite, -# fqn are supported by default, address when you get to this todo -# (3) show a message indicating the generated filename, -# (4) optionally specify the output filename - +# todo(ayush): delete this - instead potentially generate json from front end def get_params(wf_name: str, wf_version: Optional[str] = None): """Constructs a parameter map for a workflow given its name and an optional version. diff --git a/latch_cli/services/launch.py b/latch_cli/services/launch.py index 52d20702..5781d914 100644 --- a/latch_cli/services/launch.py +++ b/latch_cli/services/launch.py @@ -17,6 +17,7 @@ from latch.utils import current_workspace, retrieve_or_login +# todo(ayush): rewrite w/ json file as input def launch(params_file: Path, version: Optional[str] = None) -> str: """Launches a (versioned) workflow with parameters specified in python. diff --git a/latch_cli/services/local_dev_old.py b/latch_cli/services/local_dev_old.py index 0247b37e..753be01a 100644 --- a/latch_cli/services/local_dev_old.py +++ b/latch_cli/services/local_dev_old.py @@ -23,6 +23,7 @@ from latch.utils import current_workspace, retrieve_or_login from latch_cli.constants import docker_image_name_illegal_pat +from latch_cli.menus import raw_input from latch_cli.tinyrequests import post from latch_cli.utils import TemporarySSHCredentials, identifier_suffix_from_str @@ -251,14 +252,10 @@ async def _send_resize_message( ) +@raw_input async def _shell_session( ws: client.WebSocketClientProtocol, ): - import termios - import tty - - old_settings_stdin = termios.tcgetattr(sys.stdin.fileno()) - tty.setraw(sys.stdin) loop = asyncio.get_event_loop() reader = asyncio.StreamReader(loop=loop) @@ -333,7 +330,6 @@ async def output_task(): except asyncio.CancelledError: ... finally: - termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, old_settings_stdin) signal.signal(signal.SIGWINCH, old_sigwinch_handler) diff --git a/latch_cli/services/preview.py b/latch_cli/services/preview.py index 83fe4732..0211a554 100644 --- a/latch_cli/services/preview.py +++ b/latch_cli/services/preview.py @@ -1,8 +1,5 @@ -import os -import sys import webbrowser from pathlib import Path -from typing import List from flytekit.core.workflow import PythonFunctionWorkflow from google.protobuf.json_format import MessageToJson @@ -48,12 +45,15 @@ def preview(pkg_root: Path): wf = list(wfs.values())[0] if len(wfs) > 1: - wf = wfs[ - _select_workflow_tui( - title="Select which workflow to preview", - options=list(wfs.keys()), - ) - ] + wf = menus.select_tui( + title="Select which workflow to preview", + options=[ + menus.SelectOption(display_name=k, value=v) for k, v in wfs.items() + ], + ) + + if wf is None: + return resp = post( url=config.api.workflow.preview, @@ -68,119 +68,3 @@ def preview(pkg_root: Path): url = f"{config.console_url}/preview/parameters" webbrowser.open(url) - - -# TODO(ayush): abstract this logic in a unified interface that all tui commands use -def _select_workflow_tui(title: str, options: List[str], clear_terminal: bool = True): - """ - Renders a terminal UI that allows users to select one of the options - listed in `options` - - Args: - title: The title of the selection window. - options: A list of names for each of the options. - clear_terminal: Whether or not to clear the entire terminal window - before displaying - default False - """ - - if len(options) == 0: - raise ValueError("No options given") - - def render( - curr_selected: int, - start_index: int = 0, - max_per_page: int = 10, - indent: str = " ", - ) -> int: - if curr_selected < 0 or curr_selected >= len(options): - curr_selected = 0 - - menus._print(title) - menus.line_down(2) - - num_lines_rendered = 4 # 4 "extra" lines for header + footer - - for i in range(start_index, start_index + max_per_page): - if i >= len(options): - break - name = options[i] - if i == curr_selected: - color = "\x1b[38;5;40m" - bold = "\x1b[1m" - reset = "\x1b[0m" - menus._print(f"{indent}{color}{bold}{name}{reset}\x1b[1E") - else: - menus._print(f"{indent}{name}\x1b[1E") - num_lines_rendered += 1 - - menus.line_down(1) - - control_str = "[ARROW-KEYS] Navigate\t[ENTER] Select\t[Q] Quit" - menus._print(control_str) - menus.line_up(num_lines_rendered - 1) - - menus._show() - - return num_lines_rendered - - import termios - import tty - - old_settings = termios.tcgetattr(sys.stdin.fileno()) - tty.setraw(sys.stdin.fileno()) - - curr_selected = 0 - start_index = 0 - _, term_height = os.get_terminal_size() - menus.remove_cursor() - - if not clear_terminal: - _, curs_height = menus.current_cursor_position() - max_per_page = term_height - curs_height - 4 - else: - menus.clear_screen() - menus.move_cursor((0, 0)) - max_per_page = term_height - 4 - - num_lines_rendered = render( - curr_selected, - start_index=start_index, - max_per_page=max_per_page, - ) - - try: - while True: - b = menus.read_bytes(1) - if b == b"\r": - return options[curr_selected] - elif b == b"\x1b": - b = menus.read_bytes(2) - if b == b"[A": # Up Arrow - curr_selected = max(curr_selected - 1, 0) - if ( - curr_selected - start_index < max_per_page // 2 - and start_index > 0 - ): - start_index -= 1 - elif b == b"[B": # Down Arrow - curr_selected = min(curr_selected + 1, len(options) - 1) - if ( - curr_selected - start_index > max_per_page // 2 - and start_index < len(options) - max_per_page - ): - start_index += 1 - else: - continue - menus.clear(num_lines_rendered) - num_lines_rendered = render( - curr_selected, - start_index=start_index, - max_per_page=max_per_page, - ) - except KeyboardInterrupt: - ... - finally: - menus.clear(num_lines_rendered) - menus.reveal_cursor() - menus._show() - termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, old_settings) diff --git a/latch_cli/tui/__init__.py b/latch_cli/tui/__init__.py deleted file mode 100644 index e69de29b..00000000