diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 8c914d883..8ed843398 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,7 +1,7 @@ [bumpversion] commit = True tag = True -current_version = 4.131 +current_version = 4.145 parse = (?P\d+)\.(?P\d+)(\.(?P\d+)(\-(?P[a-z]+))?)? serialize = {major}.{minor} diff --git a/.gitignore b/.gitignore index 3127051e2..a24e7d74f 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ *.pyc *.swp /venv/ +*.code-workspace \ No newline at end of file diff --git a/PKGBUILD b/PKGBUILD index a37afcbda..51c05542c 100644 --- a/PKGBUILD +++ b/PKGBUILD @@ -39,15 +39,15 @@ for _variant in "${_variants[@]}"; do pkgname+=(kvmd-platform-$_platform-$_board) done pkgbase=kvmd -pkgver=4.131 +pkgver=4.145 pkgrel=1 pkgdesc="The main PiKVM daemon" url="https://github.com/pikvm/kvmd" license=(GPL) arch=(any) depends=( - "python>=3.13" - "python<3.14" + "python>=3.14" + "python<3.15" python-yaml python-ruamel-yaml python-aiohttp @@ -97,7 +97,7 @@ depends=( certbot "raspberrypi-io-access>=0.7" raspberrypi-utils - "ustreamer>=6.41" + "ustreamer>=6.47" # Systemd UDEV bug "systemd>=248.3-2" @@ -106,9 +106,6 @@ depends=( # https://archlinuxarm.org/forum/viewtopic.php?f=15&t=15725&start=40 "zstd>=1.5.1-2.1" - # Possible hotfix for the new os update - openssl-1.1 - # Bootconfig dos2unix parted @@ -225,7 +222,7 @@ for _variant in "${_variants[@]}"; do backup=() pkgdesc=\"PiKVM platform configs - $_platform for $_board\" - depends=(kvmd=$pkgver-$pkgrel \"linux-rpi-pikvm>=6.12.56-1\" \"raspberrypi-bootloader-pikvm>=20251031-1\") + depends=(kvmd=$pkgver-$pkgrel \"linux-rpi-pikvm>=6.12.56-5\" \"raspberrypi-bootloader-pikvm>=20251031-1\") if [[ $_base == v0 ]]; then depends=(\"\${depends[@]}\" platformio-core avrdude make patch) diff --git a/configs/os/boot-config/v4plus-hdmi-rpi4.txt b/configs/os/boot-config/v4plus-hdmi-rpi4.txt index 437443259..a25c84d44 100644 --- a/configs/os/boot-config/v4plus-hdmi-rpi4.txt +++ b/configs/os/boot-config/v4plus-hdmi-rpi4.txt @@ -15,10 +15,6 @@ dtoverlay=dwc2,dr_mode=peripheral dtoverlay=tc358743,4lane=1 dtoverlay=tc358743-audio -# Passthrough -dtoverlay=vc4-kms-v3d -disable_overscan=1 - # I2C (display) dtparam=i2c_arm=on diff --git a/configs/os/modules-load/v2-hdmi.conf b/configs/os/modules-load/v2-hdmi.conf index af538dbab..76c8eeb04 100644 --- a/configs/os/modules-load/v2-hdmi.conf +++ b/configs/os/modules-load/v2-hdmi.conf @@ -1,3 +1,4 @@ dwc2 libcomposite tc358743 +nbd diff --git a/configs/os/modules-load/v2-hdmiusb.conf b/configs/os/modules-load/v2-hdmiusb.conf index 9c9626be4..8933495ea 100644 --- a/configs/os/modules-load/v2-hdmiusb.conf +++ b/configs/os/modules-load/v2-hdmiusb.conf @@ -1,2 +1,3 @@ dwc2 libcomposite +nbd diff --git a/configs/os/modules-load/v3-hdmi.conf b/configs/os/modules-load/v3-hdmi.conf index f1ede9d11..97bbb798c 100644 --- a/configs/os/modules-load/v3-hdmi.conf +++ b/configs/os/modules-load/v3-hdmi.conf @@ -2,3 +2,4 @@ dwc2 libcomposite tc358743 i2c-dev +nbd diff --git a/configs/os/modules-load/v4mini-hdmi.conf b/configs/os/modules-load/v4mini-hdmi.conf index f1ede9d11..97bbb798c 100644 --- a/configs/os/modules-load/v4mini-hdmi.conf +++ b/configs/os/modules-load/v4mini-hdmi.conf @@ -2,3 +2,4 @@ dwc2 libcomposite tc358743 i2c-dev +nbd diff --git a/configs/os/modules-load/v4plus-hdmi.conf b/configs/os/modules-load/v4plus-hdmi.conf index f1ede9d11..97bbb798c 100644 --- a/configs/os/modules-load/v4plus-hdmi.conf +++ b/configs/os/modules-load/v4plus-hdmi.conf @@ -2,3 +2,4 @@ dwc2 libcomposite tc358743 i2c-dev +nbd diff --git a/configs/os/udev/common.rules b/configs/os/udev/common.rules index 97525df52..e1d60a4bd 100644 --- a/configs/os/udev/common.rules +++ b/configs/os/udev/common.rules @@ -3,6 +3,7 @@ ACTION!="remove", KERNEL=="ttyACM[0-9]*", SUBSYSTEM=="tty", SUBSYSTEMS=="usb", ATTRS{idVendor}=="1209", ATTRS{idProduct}=="eda3", SYMLINK+="kvmd-hid-bridge" ACTION!="remove", KERNEL=="ttyACM[0-9]*", SUBSYSTEM=="tty", SUBSYSTEMS=="usb", ATTRS{idVendor}=="2e8a", ATTRS{idProduct}=="1080", SYMLINK+="kvmd-switch" +ACTION!="remove", KERNEL=="nbd15", SUBSYSTEM=="block", GROUP="kvmd", SYMLINK+="kvmd-nbd" # Disable USB autosuspend for critical devices ACTION!="remove", SUBSYSTEM=="usb", ATTR{idVendor}=="1209", ATTR{idProduct}=="eda3", GOTO="kvmd-usb" diff --git a/kvmd/__init__.py b/kvmd/__init__.py index f09ad7582..575c2c962 100644 --- a/kvmd/__init__.py +++ b/kvmd/__init__.py @@ -20,4 +20,9 @@ # ========================================================================== # -__version__ = "4.131" +__version__ = "4.145" + + +import multiprocessing + +multiprocessing.set_start_method("fork") # FIXME diff --git a/kvmd/aiogp.py b/kvmd/aiogp.py index 3f90f568f..ce8d93949 100644 --- a/kvmd/aiogp.py +++ b/kvmd/aiogp.py @@ -70,10 +70,10 @@ async def poll(self) -> None: self.__loop = asyncio.get_running_loop() self.__thread.start() try: - await aiotools.run_async(self.__thread.join) + await asyncio.to_thread(self.__thread.join) finally: self.__stop_event.set() - await aiotools.run_async(self.__thread.join) + await asyncio.to_thread(self.__thread.join) def __run(self) -> None: assert self.__values is None diff --git a/kvmd/aiomulti.py b/kvmd/aiomulti.py index a45372040..1afa09f68 100644 --- a/kvmd/aiomulti.py +++ b/kvmd/aiomulti.py @@ -20,64 +20,205 @@ # ========================================================================== # +import os +import signal +import asyncio import multiprocessing +import multiprocessing.queues +import multiprocessing.connection import queue +import logging +from typing import Callable from typing import Type from typing import TypeVar from typing import Generic +from typing import Any -from . import aiotools +import setproctitle # ===== -_QueueItemT = TypeVar("_QueueItemT") +def rename_process(suffix: str) -> None: + setproctitle.setproctitle(f"kvmd/{suffix}: {setproctitle.getproctitle()}") -async def queue_get_last( # pylint: disable=invalid-name - q: "multiprocessing.Queue[_QueueItemT]", - timeout: float, -) -> tuple[bool, (_QueueItemT | None)]: +# ===== +class AioMpProcess: + def __init__( + self, + name: str, + target: Callable[..., None], + args: tuple[Any, ...]=(), + ) -> None: + + self.__name = name + self.__target = target - return (await aiotools.run_async(queue_get_last_sync, q, timeout)) + self.__proc = multiprocessing.Process( + target=self.__target_wrapper, + args=args, + daemon=True, + name=name, + ) + def __target_wrapper(self, *args: Any, **kwargs: Any) -> None: + logger = logging.getLogger(self.__target.__module__) + logger.info("Started process kvmd/%s: pid=%s", self.__name, os.getpid()) + os.setpgrp() + rename_process(self.__name) + self.__target(*args, **kwargs) -def queue_get_last_sync( # pylint: disable=invalid-name - q: "multiprocessing.Queue[_QueueItemT]", - timeout: float, -) -> tuple[bool, (_QueueItemT | None)]: + def is_alive(self) -> bool: + return self.__proc.is_alive() - try: - item = q.get(timeout=timeout) - while not q.empty(): - item = q.get() - return (True, item) - except queue.Empty: - return (False, None) + @property + def exitcode(self) -> (int | None): + return self.__proc.exitcode + + def start(self) -> None: + self.__proc.start() + + def send_sigterm(self) -> None: + if self.__proc.pid is None: + return + try: + os.kill(self.__proc.pid, signal.SIGTERM) + except ProcessLookupError: + pass + + def sendpg_sigkill(self) -> None: + if self.__proc.pid is None: + return + try: + own = os.getpgid(os.getpid()) + target = os.getpgid(self.__proc.pid) + if own != target: + os.killpg(target, signal.SIGKILL) + else: + os.kill(self.__proc.pid, signal.SIGKILL) + except ProcessLookupError: + pass + + async def async_join(self, timeout: float=0.0) -> bool: + if self.__proc.pid is None: + return False + + prev = self.__proc.is_alive() + + loop = asyncio.get_running_loop() + fut = asyncio.Future() # type: ignore + try: + fd = os.pidfd_open(self.__proc.pid, os.PIDFD_NONBLOCK) + except ProcessLookupError: + pass + else: + try: + loop.add_reader(fd, fut.set_result, None) + fut.add_done_callback(lambda _: loop.remove_reader(fd)) + if timeout > 0: + await asyncio.wait_for(fut, timeout) + else: + await fut + except TimeoutError: + pass + finally: + try: + loop.remove_reader(fd) + finally: + os.close(fd) + + # Crank the internal MP machinery and return a status code. + # It should be non-blocking. + new = self.__proc.is_alive() + if prev != new: + self.__get_logger().info("Stopped kvmd/%s: exitcode=%s", self.__name, self.exitcode) + return new + + def __get_logger(self) -> logging.Logger: + return logging.getLogger(self.__target.__module__) # ===== -class AioProcessNotifier: +class AioMpQueue[T](multiprocessing.queues.Queue[T]): + def __init__(self, maxsize: int=0) -> None: + super().__init__(maxsize=maxsize, ctx=multiprocessing.get_context()) + + def get_reader(self) -> multiprocessing.connection.Connection: + return self._reader # type: ignore # pylint: disable=protected-access + + def get_reader_fd(self) -> int: + return self.get_reader().fileno() + + async def async_fetch(self, timeout: float=0.0) -> tuple[bool, (T | None)]: + return (await self.__async_get(timeout, False)) + + async def async_fetch_last(self, timeout: float=0.0) -> tuple[bool, (T | None)]: + return (await self.__async_get(timeout, True)) + + async def __async_get(self, timeout: float, last_only: bool) -> tuple[bool, (T | None)]: + loop = asyncio.get_running_loop() + fut = asyncio.Future() # type: ignore + fd = self.get_reader_fd() + + try: + loop.add_reader(fd, fut.set_result, None) + fut.add_done_callback(lambda _: loop.remove_reader(fd)) + if timeout > 0: + await asyncio.wait_for(fut, timeout) + else: + await fut + + if not last_only: + return (True, self.get(False)) + + got = False + item: (T | None) = None + while not self.empty(): + got = True + item = self.get(False) + await asyncio.sleep(0) # Switch task to prevent hanging in a loop + return (got, item) + except (TimeoutError, queue.Empty): + return (False, None) + finally: + loop.remove_reader(fd) + + def fetch_last(self, timeout: float=0.0) -> tuple[bool, (T | None)]: + try: + item = self.get(timeout=timeout) + while not self.empty(): + item = self.get() + return (True, item) + except queue.Empty: + return (False, None) + + def clear_current(self) -> None: + for _ in range(self.qsize()): + try: + self.get_nowait() + except queue.Empty: + break + + +# ===== +class AioMpNotifier: def __init__(self) -> None: - self.__queue: "multiprocessing.Queue[int]" = multiprocessing.Queue() + self.__queue: AioMpQueue[int] = AioMpQueue() def notify(self, mask: int=0) -> None: self.__queue.put_nowait(mask) - async def wait(self) -> int: - while True: - mask = await aiotools.run_async(self.__get) - if mask >= 0: - return mask - - def __get(self) -> int: - try: - mask = self.__queue.get(timeout=0.1) + async def wait(self, timeout: float=0) -> int: + (got, mask) = await self.__queue.async_fetch(timeout) + if not got: # Timeout + return -1 + assert mask is not None + if got: while not self.__queue.empty(): mask |= self.__queue.get() - return mask - except queue.Empty: - return -1 + await asyncio.sleep(0) + return mask # ===== @@ -88,7 +229,7 @@ class AioSharedFlags(Generic[_SharedFlagT]): def __init__( self, initial: dict[str, _SharedFlagT], - notifier: AioProcessNotifier, + notifier: AioMpNotifier, type: Type[_SharedFlagT]=bool, # pylint: disable=redefined-builtin ) -> None: @@ -114,7 +255,7 @@ def update(self, **kwargs: _SharedFlagT) -> None: self.__notifier.notify() async def get(self) -> dict[str, _SharedFlagT]: - return (await aiotools.run_async(self.__inner_get)) + return (await asyncio.to_thread(self.__inner_get)) def __inner_get(self) -> dict[str, _SharedFlagT]: with self.__lock: diff --git a/kvmd/aioproc.py b/kvmd/aioproc.py index 376df004f..aa19160d6 100644 --- a/kvmd/aioproc.py +++ b/kvmd/aioproc.py @@ -26,10 +26,6 @@ import asyncio.subprocess import logging -import setproctitle - -from .logging import get_logger - # ===== async def run_process( @@ -75,7 +71,11 @@ async def log_process( return proc -async def log_stdout_infinite(proc: asyncio.subprocess.Process, logger: logging.Logger) -> None: # pylint: disable=no-member +async def log_stdout_infinite( + proc: asyncio.subprocess.Process, # pylint: disable=no-member + logger: logging.Logger, +) -> None: + empty = 0 async for line_bytes in proc.stdout: # type: ignore line = line_bytes.decode(errors="ignore").strip() @@ -88,35 +88,31 @@ async def log_stdout_infinite(proc: asyncio.subprocess.Process, logger: logging. raise RuntimeError("Asyncio process: too many empty lines") -async def kill_process(proc: asyncio.subprocess.Process, wait: float, logger: logging.Logger) -> None: # pylint: disable=no-member +async def kill_process( + proc: asyncio.subprocess.Process, # pylint: disable=no-member + wait: float, + logger: logging.Logger, +) -> None: + if proc.returncode is None: try: - proc.terminate() - await asyncio.sleep(wait) - if proc.returncode is None: + try: + proc.terminate() try: - os.killpg(os.getpgid(proc.pid), signal.SIGKILL) - except Exception: - if proc.returncode is not None: - raise - await proc.wait() - logger.info("Process killed: retcode=%d", proc.returncode) - except asyncio.CancelledError: - pass + await asyncio.wait_for(proc.wait(), timeout=wait) + except TimeoutError: + pass + finally: + if proc.returncode is None: + try: + os.killpg(os.getpgid(proc.pid), signal.SIGKILL) + await asyncio.wait_for(proc.wait(), timeout=wait) + except Exception: + if proc.returncode is not None: + raise except Exception: if proc.returncode is None: - logger.exception("Can't kill process pid=%d", proc.pid) - else: - logger.info("Process killed: retcode=%d", proc.returncode) - - -def rename_process(suffix: str, prefix: str="kvmd") -> None: - setproctitle.setproctitle(f"{prefix}/{suffix}: {setproctitle.getproctitle()}") - - -def settle(name: str, suffix: str, prefix: str="kvmd") -> logging.Logger: - logger = get_logger(1) - logger.info("Started %s pid=%d", name, os.getpid()) - os.setpgrp() - rename_process(suffix, prefix) - return logger + logger.exception("Can't kill process pid=%s", proc.pid) + finally: + if proc.returncode is not None: + logger.info("Process killed: retcode=%s", proc.returncode) diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py index 656a10e6e..20ce5b783 100644 --- a/kvmd/aiotools.py +++ b/kvmd/aiotools.py @@ -60,7 +60,9 @@ def sigint_handler() -> None: def sigterm_handler() -> None: raise SystemExit() - loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.add_signal_handler(signal.SIGINT, sigint_handler) loop.add_signal_handler(signal.SIGTERM, sigterm_handler) @@ -192,15 +194,6 @@ async def stop_all_deadly_tasks() -> None: await asyncio.gather(*tasks, return_exceptions=True) -# ===== -async def run_async(func: Callable[..., _RetvalT], *args: Any) -> _RetvalT: - return (await asyncio.get_running_loop().run_in_executor(None, func, *args)) - - -def run_sync(coro: Coroutine[Any, Any, _RetvalT]) -> _RetvalT: - return asyncio.get_event_loop().run_until_complete(coro) - - # ===== async def wait_infinite() -> None: while True: @@ -212,15 +205,25 @@ async def wait_first(*aws: asyncio.Task) -> tuple[set[asyncio.Task], set[asyncio # ===== -async def spawn_and_follow(*coros: Coroutine) -> None: - tasks: list[asyncio.Task] = list(map(asyncio.create_task, coros)) +async def spawn_and_follow( + *coros: Coroutine, + wait: float=0.0, + tasks: (list[asyncio.Task] | None)=None, +) -> None: + + if tasks is None: + tasks = [] + try: - await asyncio.gather(*tasks) - except Exception: + for coro in coros: + tasks.append(asyncio.create_task(coro)) + await wait_first(*tasks) + if wait > 0: + await asyncio.sleep(wait) + finally: for task in tasks: task.cancel() - await asyncio.gather(*tasks, return_exceptions=True) - raise + await shield_fg(asyncio.gather(*tasks, return_exceptions=True)) # ===== diff --git a/kvmd/apps/_scheme.py b/kvmd/apps/_scheme.py index f64b0b0f0..9d27f4c0a 100644 --- a/kvmd/apps/_scheme.py +++ b/kvmd/apps/_scheme.py @@ -365,9 +365,9 @@ def make_config_scheme() -> dict: }, "desired_fps": { - "default": Option(40, type=valid_stream_fps, unpack_as="desired_fps"), + "default": Option(30, type=valid_stream_fps, unpack_as="desired_fps"), "min": Option(0, type=valid_stream_fps, unpack_as="desired_fps_min"), - "max": Option(70, type=valid_stream_fps, unpack_as="desired_fps_max"), + "max": Option(90, type=valid_stream_fps, unpack_as="desired_fps_max"), }, "h264_bitrate": { diff --git a/kvmd/apps/ipmi/server.py b/kvmd/apps/ipmi/server.py index cc508126d..a613be213 100644 --- a/kvmd/apps/ipmi/server.py +++ b/kvmd/apps/ipmi/server.py @@ -24,7 +24,6 @@ import select import asyncio import threading -import multiprocessing import functools import queue @@ -40,7 +39,7 @@ from ...clients.kvmd import KvmdClient -from ... import aiotools +from ... import aiomulti from ... import network from .auth import IpmiAuthManager @@ -174,7 +173,7 @@ async def runner(): # type: ignore logger.error("[%s]: Can't perform request %s: %s", session.sockaddr[0], name, ex) raise - return aiotools.run_sync(runner()) + return asyncio.run(runner()) # ===== @@ -213,9 +212,9 @@ def __is_sol_activated(self) -> bool: def __start_sol_worker(self, session: IpmiServerSession) -> None: assert self.__sol_console is None assert self.__sol_thread is None - user_queue: "multiprocessing.Queue[bytes]" = multiprocessing.Queue() # Only for select() - self.__sol_console = IpmiConsole(session, user_queue.put_nowait) - self.__sol_thread = threading.Thread(target=self.__sol_worker, args=(user_queue,), daemon=True) + user_q: aiomulti.AioMpQueue[bytes] = aiomulti.AioMpQueue() # Only for select() + self.__sol_console = IpmiConsole(session, user_q.put_nowait) + self.__sol_thread = threading.Thread(target=self.__sol_worker, args=(user_q,), daemon=True) self.__sol_thread.start() def __stop_sol_worker(self) -> None: @@ -233,22 +232,22 @@ def __close_sol_console(self) -> None: self.__sol_console = None get_logger(0).info("SOL closed") - def __sol_worker(self, user_queue: "multiprocessing.Queue[bytes]") -> None: + def __sol_worker(self, user_q: aiomulti.AioMpQueue[bytes]) -> None: logger = get_logger(0) logger.info("Starting SOL worker ...") try: assert self.__sol_console is not None with serial.Serial(self.__sol_device_path, self.__sol_speed) as tty: logger.info("Opened SOL port %s at speed=%d", self.__sol_device_path, self.__sol_speed) - qr = user_queue._reader # type: ignore # pylint: disable=protected-access + qr = user_q.get_reader() try: while not self.__sol_stop: ready = select.select([qr, tty], [], [], self.__sol_select_timeout)[0] if qr in ready: data = b"" - for _ in range(user_queue.qsize()): # Don't hold on this with [not empty()] + for _ in range(user_q.qsize()): # Don't hold on this with [not empty()] try: - data += user_queue.get_nowait() + data += user_q.get_nowait() except queue.Empty: break if data: diff --git a/kvmd/apps/janus/stun.py b/kvmd/apps/janus/stun.py index 7026ec2b4..e93965abd 100644 --- a/kvmd/apps/janus/stun.py +++ b/kvmd/apps/janus/stun.py @@ -7,7 +7,6 @@ import enum from ... import tools -from ... import aiotools from ...logging import get_logger @@ -69,11 +68,12 @@ def __init__( self.__retries_delay = retries_delay self.__stun_ip = "" - self.__sock: (socket.socket | None) = None async def get_info(self, src_ip: str, src_port: int) -> StunInfo: + stun_ip = self.__stun_ip nat_type = StunNatType.ERROR ext_ip = "" + try: (src_fam, _, _, _, src_addr) = (await self.__retried_getaddrinfo_udp(src_ip, src_port))[0] @@ -87,24 +87,23 @@ async def get_info(self, src_ip: str, src_port: int) -> StunInfo: if not self.__stun_ip or self.__stun_ip not in stun_ips: # On new IP, changed family, etc. self.__stun_ip = stun_ips[0] + stun_ip = self.__stun_ip - with socket.socket(src_fam, socket.SOCK_DGRAM) as self.__sock: - self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.__sock.settimeout(self.__timeout) - self.__sock.bind(src_addr) - (nat_type, resp) = await self.__get_nat_type(src_ip) + with socket.socket(src_fam, socket.SOCK_DGRAM) as sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.settimeout(self.__timeout) + sock.bind(src_addr) + (nat_type, resp) = await self.__get_nat_type(sock, stun_ip, src_ip) ext_ip = (resp.ext.ip if resp.ext is not None else "") except Exception as ex: get_logger(0).error("Can't get STUN info: %s", tools.efmt(ex)) - finally: - self.__sock = None return StunInfo( nat_type=nat_type, src_ip=src_ip, ext_ip=ext_ip, stun_host=self.__host, - stun_ip=self.__stun_ip, + stun_ip=stun_ip, stun_port=self.__port, ) @@ -112,20 +111,26 @@ async def __retried_getaddrinfo_udp(self, host: str, port: int) -> list: retries = self.__retries while True: try: - return socket.getaddrinfo(host, port, type=socket.SOCK_DGRAM) + return (await asyncio.to_thread(socket.getaddrinfo, host, port, type=socket.SOCK_DGRAM)) except Exception: retries -= 1 if retries == 0: raise await asyncio.sleep(self.__retries_delay) - async def __get_nat_type(self, src_ip: str) -> tuple[StunNatType, _StunResponse]: # pylint: disable=too-many-return-statements - first = await self.__make_request("First probe", self.__stun_ip, b"") + async def __get_nat_type( # pylint: disable=too-many-return-statements + self, + sock: socket.SocketType, + stun_ip: str, + src_ip: str, + ) -> tuple[StunNatType, _StunResponse]: + + first = await self.__make_request(sock, "First probe", stun_ip, b"") if not first.ok: return (StunNatType.BLOCKED, first) req = struct.pack(">HHI", 0x0003, 0x0004, 0x00000006) # Change-Request - resp = await self.__make_request("Change request [ext_ip == src_ip]", self.__stun_ip, req) + resp = await self.__make_request(sock, "Change request [ext_ip == src_ip]", stun_ip, req) if first.ext is not None and first.ext.ip == src_ip: if resp.ok: @@ -137,20 +142,27 @@ async def __get_nat_type(self, src_ip: str) -> tuple[StunNatType, _StunResponse] if first.changed is None: raise RuntimeError(f"Changed addr is None: {first}") - resp = await self.__make_request("Change request [ext_ip != src_ip]", first.changed, b"") + resp = await self.__make_request(sock, "Change request [ext_ip != src_ip]", first.changed, b"") if not resp.ok: return (StunNatType.CHANGED_ADDR_ERROR, resp) if resp.ext == first.ext: req = struct.pack(">HHI", 0x0003, 0x0004, 0x00000002) - resp = await self.__make_request("Change port", first.changed.ip, req) + resp = await self.__make_request(sock, "Change port", first.changed.ip, req) if resp.ok: return (StunNatType.RESTRICTED_NAT, resp) return (StunNatType.RESTRICTED_PORT_NAT, resp) return (StunNatType.SYMMETRIC_NAT, resp) - async def __make_request(self, ctx: str, addr: (_StunAddress | str), req: bytes) -> _StunResponse: + async def __make_request( + self, + sock: socket.SocketType, + ctx: str, + addr: (_StunAddress | str), + req: bytes, + ) -> _StunResponse: + # TODO: Support IPv6 and RFC 5389 # The first 4 bytes of the response are the Type (2) and Length (2) # The 5th byte is Reserved @@ -169,7 +181,7 @@ async def __make_request(self, ctx: str, addr: (_StunAddress | str), req: bytes) trans_id = b"\x21\x12\xA4\x42" + secrets.token_bytes(12) (resp, error) = (b"", "") for _ in range(self.__retries): - (resp, error) = await self.__inner_make_request(trans_id, req, addr_t) + (resp, error) = await self.__inner_make_request(sock, trans_id, req, addr_t) if not error: break await asyncio.sleep(self.__retries_delay) @@ -196,17 +208,22 @@ async def __make_request(self, ctx: str, addr: (_StunAddress | str), req: bytes) remaining -= (4 + attr_len) return _StunResponse(ok=True, **parsed) - async def __inner_make_request(self, trans_id: bytes, req: bytes, addr: tuple[str, int]) -> tuple[bytes, str]: - assert self.__sock is not None + async def __inner_make_request( + self, + sock: socket.SocketType, + trans_id: bytes, + req: bytes, + addr: tuple[str, int], + ) -> tuple[bytes, str]: req = struct.pack(">HH", 0x0001, len(req)) + trans_id + req # Bind Request try: - await aiotools.run_async(self.__sock.sendto, req, addr) + await asyncio.to_thread(sock.sendto, req, addr) except Exception as ex: return (b"", f"Send error: {tools.efmt(ex)}") try: - resp = (await aiotools.run_async(self.__sock.recvfrom, 2048))[0] + resp = (await asyncio.to_thread(sock.recvfrom, 2048))[0] except Exception as ex: return (b"", f"Recv error: {tools.efmt(ex)}") diff --git a/kvmd/apps/kvmd/api/msd.py b/kvmd/apps/kvmd/api/msd.py index d7d856857..5195d4f19 100644 --- a/kvmd/apps/kvmd/api/msd.py +++ b/kvmd/apps/kvmd/api/msd.py @@ -20,6 +20,7 @@ # ========================================================================== # +import asyncio import lzma import time @@ -34,7 +35,6 @@ from ....logging import get_logger -from .... import aiotools from .... import htclient from ....htserver import exposed_http @@ -114,13 +114,13 @@ async def compressed() -> AsyncGenerator[bytes, None]: buf = b"" try: async for chunk in reader.read_chunked(): - buf += await aiotools.run_async(compressor.compress, chunk) + buf += await asyncio.to_thread(compressor.compress, chunk) if len(buf) >= limit: yield buf buf = b"" finally: # Закрыть в любом случае - buf += await aiotools.run_async(compressor.flush) + buf += await asyncio.to_thread(compressor.flush) if len(buf) > 0: yield buf diff --git a/kvmd/apps/kvmd/info/extras.py b/kvmd/apps/kvmd/info/extras.py index a855ad152..417ba245e 100644 --- a/kvmd/apps/kvmd/info/extras.py +++ b/kvmd/apps/kvmd/info/extras.py @@ -81,7 +81,7 @@ def __get_extras_path(self, *parts: str) -> str: async def __read_extra(self, sui: (sysunit.SystemdUnitInfo | None), name: str) -> dict: try: - extra = await aiotools.run_async(load_yaml_file, self.__get_extras_path(name, "manifest.yaml")) + extra = await asyncio.to_thread(load_yaml_file, self.__get_extras_path(name, "manifest.yaml")) await self.__rewrite_app_daemon(sui, extra) self.__rewrite_app_port(extra) return {re.sub(r"[^a-zA-Z0-9_]+", "_", name): extra} diff --git a/kvmd/apps/kvmd/info/meta.py b/kvmd/apps/kvmd/info/meta.py index 6267fac46..f72488d9a 100644 --- a/kvmd/apps/kvmd/info/meta.py +++ b/kvmd/apps/kvmd/info/meta.py @@ -20,6 +20,7 @@ # ========================================================================== # +import asyncio import socket from typing import AsyncGenerator @@ -49,7 +50,7 @@ async def get_server_host(self) -> str: async def get_state(self) -> (dict | None): try: - meta = ((await aiotools.run_async(load_yaml_file, self.__meta_path)) or {}) + meta = ((await asyncio.to_thread(load_yaml_file, self.__meta_path)) or {}) if meta["server"]["host"] == "@auto": meta["server"]["host"] = socket.gethostname() return meta diff --git a/kvmd/apps/kvmd/ocr.py b/kvmd/apps/kvmd/ocr.py index 367c0c805..91cd137a3 100644 --- a/kvmd/apps/kvmd/ocr.py +++ b/kvmd/apps/kvmd/ocr.py @@ -23,6 +23,7 @@ import os import stat import io +import asyncio import ctypes import ctypes.util import contextlib @@ -158,7 +159,7 @@ def get_available_langs(self) -> list[str]: async def recognize(self, data: bytes, langs: list[str], left: int, top: int, right: int, bottom: int) -> str: if not langs: langs = self.__default_langs - return (await aiotools.run_async(self.__inner_recognize, data, langs, left, top, right, bottom)) + return (await asyncio.to_thread(self.__inner_recognize, data, langs, left, top, right, bottom)) def __inner_recognize(self, data: bytes, langs: list[str], left: int, top: int, right: int, bottom: int) -> str: with _tess_api(self.__data_dir_path, langs) as api: diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 42423d24e..99373c514 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -38,7 +38,7 @@ from ...errors import OperationError from ... import aiotools -from ... import aioproc +from ... import aiomulti from ...htserver import HttpExposed from ...htserver import exposed_http @@ -106,7 +106,7 @@ def __init__(self) -> None: class _Subsystem: name: str event_type: str - sysprep: (Callable[[], None] | None) + sysprep: (Callable[[], Coroutine[Any, Any, None]] | None) systask: (Callable[[], Coroutine[Any, Any, None]] | None) cleanup: (Callable[[], Coroutine[Any, Any, dict]] | None) trigger_state: (Callable[[], Coroutine[Any, Any, None]] | None) = None @@ -267,15 +267,17 @@ async def __ws_bin_ping_handler(self, ws: WsSession, _: bytes) -> None: # ===== SYSTEM STUFF def run(self, **kwargs: Any) -> None: # type: ignore # pylint: disable=arguments-differ - for sub in self.__subsystems: - if sub.sysprep: - sub.sysprep() - aioproc.rename_process("main") + aiomulti.rename_process("main") super().run(**kwargs) async def _check_request_auth(self, exposed: HttpExposed, req: Request) -> None: await check_request_auth(self.__auth, exposed, req) + async def _before_app(self) -> None: + for sub in self.__subsystems: + if sub.sysprep: + await sub.sysprep() + async def _init_app(self) -> None: aiotools.create_deadly_task("Stream controller", self.__stream_controller()) for sub in self.__subsystems: diff --git a/kvmd/apps/kvmd/switch/__init__.py b/kvmd/apps/kvmd/switch/__init__.py index 85eca48a4..ff50e1477 100644 --- a/kvmd/apps/kvmd/switch/__init__.py +++ b/kvmd/apps/kvmd/switch/__init__.py @@ -300,18 +300,11 @@ async def poll_state(self) -> AsyncGenerator[dict, None]: # ===== async def systask(self) -> None: - tasks = [ - asyncio.create_task(self.__systask_events()), - asyncio.create_task(self.__systask_default_edid()), - asyncio.create_task(self.__systask_storage()), - ] - try: - await asyncio.gather(*tasks) - except Exception: - for task in tasks: - task.cancel() - await asyncio.gather(*tasks, return_exceptions=True) - raise + await aiotools.spawn_and_follow( + self.__systask_events(), + self.__systask_default_edid(), + self.__systask_storage(), + ) async def __systask_events(self) -> None: async for event in self.__chain.poll_events(): diff --git a/kvmd/apps/kvmd/switch/chain.py b/kvmd/apps/kvmd/switch/chain.py index d50f0f7b3..113b25f0a 100644 --- a/kvmd/apps/kvmd/switch/chain.py +++ b/kvmd/apps/kvmd/switch/chain.py @@ -21,7 +21,6 @@ import multiprocessing -import queue import select import dataclasses import time @@ -30,8 +29,7 @@ from .lib import get_logger from .lib import tools -from .lib import aiotools -from .lib import aioproc +from .lib import aiomulti from .types import Edids from .types import Dummies @@ -209,8 +207,8 @@ def __init__( self.__units: list[_UnitContext] = [] self.__active_port = -1 - self.__cmd_queue: "multiprocessing.Queue[_BaseCmd]" = multiprocessing.Queue() - self.__events_queue: "multiprocessing.Queue[BaseEvent]" = multiprocessing.Queue() + self.__cmd_q: aiomulti.AioMpQueue[_BaseCmd] = aiomulti.AioMpQueue() + self.__events_q: aiomulti.AioMpQueue[BaseEvent] = aiomulti.AioMpQueue() self.__stop_event = multiprocessing.Event() @@ -279,32 +277,29 @@ def reboot_unit(self, unit: int, bootloader: bool) -> None: # ===== async def poll_events(self) -> AsyncGenerator[BaseEvent, None]: - proc = multiprocessing.Process(target=self.__subprocess, daemon=True) + proc = aiomulti.AioMpProcess("switch", self.__subprocess) + proc.start() try: - proc.start() while True: - try: - yield (await aiotools.run_async(self.__events_queue.get, True, 0.1)) - except queue.Empty: - pass + (_, event) = await self.__events_q.async_fetch() + assert event is not None + yield event finally: - if proc.is_alive(): - self.__stop_event.set() - if proc.is_alive() or proc.exitcode is not None: - await aiotools.run_async(proc.join) + self.__stop_event.set() + await proc.async_join() # ===== def __queue_cmd(self, cmd: _BaseCmd) -> None: if not self.__stop_event.is_set(): - self.__cmd_queue.put_nowait(cmd) + self.__cmd_q.put_nowait(cmd) def __queue_event(self, event: BaseEvent) -> None: if not self.__stop_event.is_set(): - self.__events_queue.put_nowait(event) + self.__events_q.put_nowait(event) def __subprocess(self) -> None: - logger = aioproc.settle("Switch", "switch") + logger = get_logger(0) no_device_reported = False while True: try: @@ -322,7 +317,7 @@ def __subprocess(self) -> None: logger.error("%s", tools.efmt(ex)) except Exception: logger.exception("Unexpected error in the Switch loop") - tools.clear_queue(self.__cmd_queue) + self.__cmd_q.clear_current() if self.__stop_event.is_set(): break time.sleep(1) @@ -352,14 +347,14 @@ def __select(self) -> bool: try: return bool(select.select([ self.__device.get_fd(), - self.__cmd_queue._reader, # type: ignore # pylint: disable=protected-access + self.__cmd_q.get_reader_fd(), ], [], [], 1)[0]) except Exception as ex: raise DeviceError(ex) def __consume_commands(self) -> None: # pylint: disable=too-many-branches - while not self.__cmd_queue.empty(): - cmd = self.__cmd_queue.get() + while not self.__cmd_q.empty(): + cmd = self.__cmd_q.get() match cmd: case _CmdSetActual(): self.__actual = cmd.actual diff --git a/kvmd/apps/kvmd/switch/lib.py b/kvmd/apps/kvmd/switch/lib.py index 4ef2647e4..48e2ef962 100644 --- a/kvmd/apps/kvmd/switch/lib.py +++ b/kvmd/apps/kvmd/switch/lib.py @@ -27,6 +27,7 @@ from .... import tools # noqa: F401 from .... import aiotools # noqa: F401 from .... import aioproc # noqa: F401 +from .... import aiomulti # noqa: F401 from .... import bitbang # noqa: F401 from .... import htclient # noqa: F401 from ....inotify import Inotify # noqa: F401 diff --git a/kvmd/apps/kvmd/ugpio.py b/kvmd/apps/kvmd/ugpio.py index a3c453f51..8b09a0143 100644 --- a/kvmd/apps/kvmd/ugpio.py +++ b/kvmd/apps/kvmd/ugpio.py @@ -309,10 +309,10 @@ async def __get_io_state(self) -> dict: }, } - def sysprep(self) -> None: + async def sysprep(self) -> None: get_logger(0).info("Preparing User-GPIO drivers ...") for (_, driver) in tools.sorted_kvs(self.__drivers): - driver.prepare() + await driver.prepare() async def systask(self) -> None: get_logger(0).info("Running User-GPIO drivers ...") diff --git a/kvmd/apps/localhid/multi.py b/kvmd/apps/localhid/multi.py index f131bc264..18ef4722d 100644 --- a/kvmd/apps/localhid/multi.py +++ b/kvmd/apps/localhid/multi.py @@ -156,7 +156,7 @@ def is_grabbed(self) -> bool: return self.__grabbed async def set_grabbed(self, grabbed: bool) -> None: - await aiotools.run_async(self.__inner_set_grabbed, grabbed) + await asyncio.to_thread(self.__inner_set_grabbed, grabbed) def __inner_set_grabbed(self, grabbed: bool) -> None: if self.__grabbed != grabbed: @@ -168,7 +168,7 @@ def __inner_set_grabbed(self, grabbed: bool) -> None: self.__inner_set_leds(*self.__leds) async def set_leds(self, caps: bool, scroll: bool, num: bool) -> None: - await aiotools.run_async(self.__inner_set_leds, caps, scroll, num) + await asyncio.to_thread(self.__inner_set_leds, caps, scroll, num) def __inner_set_leds(self, caps: bool, scroll: bool, num: bool) -> None: self.__leds = (caps, scroll, num) diff --git a/kvmd/apps/media/server.py b/kvmd/apps/media/server.py index 8dec706a5..515459057 100644 --- a/kvmd/apps/media/server.py +++ b/kvmd/apps/media/server.py @@ -63,8 +63,27 @@ def is_diff(self) -> bool: class _Client: ws: WsSession src: _Source - queue: asyncio.Queue[dict] - sender: (asyncio.Task | None) = dataclasses.field(default=None) + sender: asyncio.Task + _queue: asyncio.Queue[dict] = dataclasses.field(default_factory=(lambda: asyncio.Queue(32))) + + async def get_frame(self) -> dict: + return (await self._queue.get()) + + async def put_frame(self, frame: dict) -> bool: # Overflow/wipe flag + try: + self._queue.put_nowait(frame) + except asyncio.QueueFull: + # Если какой-то из клиентов не справляется, очищаем ему очередь и запрашиваем кейфрейм. + # Я вижу у такой логики кучу минусов, хз как себя покажет, но лучше пока ничего не придумал. + for _ in range(self._queue.qsize()): + try: + self._queue.get_nowait() + except asyncio.QueueEmpty: + break + return True + except Exception: + pass + return False class MediaServer(HttpServer): @@ -75,8 +94,6 @@ class MediaServer(HttpServer): __F_H264 = "h264" __F_JPEG = "jpeg" - __Q_SIZE = 32 - def __init__( self, h264_streamer: (BaseStreamerClient | None), @@ -148,7 +165,7 @@ def __start_stream(self, ws: WsSession, m_type: str, m_fmt: str) -> bool: src: (_Source | None) = self.__media.get(m_type, {}).get(m_fmt) if src is None: return False - client = _Client(ws, src, asyncio.Queue(self.__Q_SIZE)) + client = _Client(ws, src, None) # type: ignore client.sender = aiotools.create_deadly_task(str(ws), self.__sender(client)) src.clients[ws] = client get_logger(0).info("Streaming %s to %s ...", src.streamer, ws) @@ -187,9 +204,10 @@ async def __sender(self, client: _Client) -> None: need_key = client.src.is_diff() if need_key: client.src.key_required = True + has_key = False while True: - frame = await client.queue.get() + frame = await client.get_frame() has_key = (not need_key or has_key or frame["key"]) if has_key: try: @@ -206,6 +224,7 @@ async def __streamer(self, src: _Source) -> None: if len(src.clients) == 0: await asyncio.sleep(1) continue + try: async with src.streamer.reading() as read_frame: while len(src.clients) > 0: @@ -213,15 +232,10 @@ async def __streamer(self, src: _Source) -> None: if frame["key"]: src.key_required = False for client in src.clients.values(): - try: - client.queue.put_nowait(frame) - except asyncio.QueueFull: - # Если какой-то из клиентов не справляется, очищаем ему очередь и запрашиваем кейфрейм. - # Я вижу у такой логики кучу минусов, хз как себя покажет, но лучше пока ничего не придумал. - tools.clear_queue(client.queue) + if (await client.put_frame(frame)): + # Overflowed and cleaned up, need a keyframe src.key_required = True - except Exception: - pass + except StreamerError as ex: if isinstance(ex, StreamerPermError): logger.exception("Streamer failed: %s", src.streamer) diff --git a/kvmd/apps/nbd/__init__.py b/kvmd/apps/nbd/__init__.py new file mode 100644 index 000000000..5b52009e9 --- /dev/null +++ b/kvmd/apps/nbd/__init__.py @@ -0,0 +1,66 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2024 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import asyncio +import signal +import argparse + +from ...logging import get_logger + +from ... import aiotools + +from ...nbd import NbdServer +from ...nbd.types import NbdStopEvent + +from .._logging import init_logging + + +# ===== +async def _async_main(device_path: str, url: str) -> None: + server = NbdServer(device_path) + + async def poller() -> None: + logger = get_logger(0) + async for event in server.poll(): + logger.info("NBD-EVENT: %s", event) + if isinstance(event, NbdStopEvent): + break + + task = asyncio.create_task(poller()) + + loop = asyncio.get_running_loop() + loop.add_signal_handler(signal.SIGINT, server.unbind) + loop.add_signal_handler(signal.SIGTERM, server.unbind) + + await server.bind(url) + await task + + +def main() -> None: + init_logging(False) + + parser = argparse.ArgumentParser() + parser.add_argument("-d", "--device", default="/dev/nbd0") + parser.add_argument("-u", "--url", required=True) + opts = parser.parse_args() + + aiotools.run(_async_main(opts.device, opts.url)) diff --git a/kvmd/apps/nbd/__main__.py b/kvmd/apps/nbd/__main__.py new file mode 100644 index 000000000..4827fc498 --- /dev/null +++ b/kvmd/apps/nbd/__main__.py @@ -0,0 +1,24 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2024 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +from . import main +main() diff --git a/kvmd/apps/oled/screen.py b/kvmd/apps/oled/screen.py index 58b938e1c..cff7edc37 100644 --- a/kvmd/apps/oled/screen.py +++ b/kvmd/apps/oled/screen.py @@ -20,6 +20,7 @@ # ========================================================================== # +import asyncio import time import async_lru @@ -30,8 +31,6 @@ from PIL import Image from PIL import ImageFont -from ... import aiotools - # ===== class Screen: # pylint: disable=too-many-instance-attributes @@ -59,16 +58,16 @@ async def set_swimming(self, interval: float, offset_x: int) -> None: @async_lru.alru_cache(maxsize=1) async def set_contrast(self, contrast: int) -> None: - await aiotools.run_async(self.__device.contrast, contrast) + await asyncio.to_thread(self.__device.contrast, contrast) async def draw_text(self, text: str) -> None: - await aiotools.run_async(self.__inner_draw_text, text) + await asyncio.to_thread(self.__inner_draw_text, text) async def draw_image(self, image_path: str) -> None: - await aiotools.run_async(self.__inner_draw_image, image_path) + await asyncio.to_thread(self.__inner_draw_image, image_path) async def draw_white(self) -> None: - await aiotools.run_async(self.__inner_draw_white) + await asyncio.to_thread(self.__inner_draw_white) def __inner_draw_text(self, text: str) -> None: with luma_canvas(self.__device) as draw: diff --git a/kvmd/apps/oled/sensors.py b/kvmd/apps/oled/sensors.py index a28c39dc0..2ff2705e9 100644 --- a/kvmd/apps/oled/sensors.py +++ b/kvmd/apps/oled/sensors.py @@ -30,7 +30,6 @@ from ...logging import get_logger from ... import tools -from ... import aiotools from ... import network from ...clients.kvmd import KvmdClient @@ -112,7 +111,7 @@ async def __fqdn_task_loop(self) -> None: async def __iface_task_loop(self) -> None: while True: try: - fi = await aiotools.run_async(network.get_first_iface) + fi = await asyncio.to_thread(network.get_first_iface) self.__s_iface = fi.name self.__s_ip = fi.ip except Exception: diff --git a/kvmd/apps/vnc/render.py b/kvmd/apps/vnc/render.py index a443e34e9..11265fcdc 100644 --- a/kvmd/apps/vnc/render.py +++ b/kvmd/apps/vnc/render.py @@ -23,18 +23,17 @@ import sys import os import io +import asyncio import functools from PIL import Image as PilImage from PIL import ImageDraw as PilImageDraw from PIL import ImageFont as PilImageFont -from ... import aiotools - # ===== async def make_text_jpeg(width: int, height: int, quality: int, text: str) -> bytes: - return (await aiotools.run_async(_inner_make_text_jpeg, width, height, quality, text)) + return (await asyncio.to_thread(_inner_make_text_jpeg, width, height, quality, text)) @functools.lru_cache(maxsize=10) diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py index 7da5124fd..8fef5462f 100644 --- a/kvmd/apps/vnc/server.py +++ b/kvmd/apps/vnc/server.py @@ -120,7 +120,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-locals self.__kvmd_session: (KvmdClientSession | None) = None self.__kvmd_ws: (KvmdClientWs | None) = None - self.__fb_queue: "asyncio.Queue[dict]" = asyncio.Queue() + self.__fb_q: "asyncio.Queue[dict]" = asyncio.Queue() self.__fb_has_key = False self.__clipboard = "" @@ -260,9 +260,9 @@ def __get_default_streamer(self) -> BaseStreamerClient: async def __queue_frame(self, frame: (dict | str)) -> None: if isinstance(frame, str): frame = await self.__make_text_frame(frame) - if self.__fb_queue.qsize() > 10: - self.__fb_queue.get_nowait() - self.__fb_queue.put_nowait(frame) + if self.__fb_q.qsize() > 10: + self.__fb_q.get_nowait() + self.__fb_q.put_nowait(frame) async def __make_text_frame(self, text: str) -> dict: return { @@ -276,7 +276,7 @@ async def __fb_sender_task_loop(self) -> None: # pylint: disable=too-many-branc last: (dict | None) = None async for _ in self._send_fb_allowed(): while True: - frame = await self.__fb_queue.get() + frame = await self.__fb_q.get() if ( last is None # pylint: disable=too-many-boolean-expressions or frame["format"] == StreamerFormats.JPEG @@ -290,12 +290,12 @@ async def __fb_sender_task_loop(self) -> None: # pylint: disable=too-many-branc ): self.__fb_has_key = (frame["format"] == StreamerFormats.H264 and frame["key"]) last = frame - if self.__fb_queue.qsize() == 0: + if self.__fb_q.qsize() == 0: break continue assert frame["format"] == StreamerFormats.H264 last["data"] += frame["data"] - if self.__fb_queue.qsize() == 0: + if self.__fb_q.qsize() == 0: break if self._width != last["width"] or self._height != last["height"]: diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py index e7047e3fc..9313b76f3 100644 --- a/kvmd/clients/streamer.py +++ b/kvmd/clients/streamer.py @@ -21,6 +21,7 @@ import io +import asyncio import contextlib import dataclasses import functools @@ -37,7 +38,6 @@ from PIL import Image as PilImage from .. import tools -from .. import aiotools from .. import htclient from . import BaseHttpClient @@ -102,7 +102,7 @@ async def make_preview(self, max_width: int, max_height: int, quality: int) -> b if (max_width, max_height) == (self.width, self.height): return self.data - return (await aiotools.run_async(self.__inner_make_preview, max_width, max_height, quality)) + return (await asyncio.to_thread(self.__inner_make_preview, max_width, max_height, quality)) @functools.lru_cache(maxsize=1) def __inner_make_preview(self, max_width: int, max_height: int, quality: int) -> bytes: @@ -275,7 +275,7 @@ async def read_frame(key_required: bool) -> dict: key_required = (key_required and self.__fmt == StreamerFormats.H264) with _memsink_reading_handle_errors(): while True: - frame = await aiotools.run_async(sink.wait_frame, key_required) + frame = await asyncio.to_thread(sink.wait_frame, key_required) if frame is not None: self.__check_format(frame["format"]) return frame diff --git a/kvmd/htserver.py b/kvmd/htserver.py index 3aa8929a6..8e55c214b 100644 --- a/kvmd/htserver.py +++ b/kvmd/htserver.py @@ -381,7 +381,6 @@ def run( shutdown_timeout=1, access_log_format=access_log_format, print=self.__run_app_print, - loop=asyncio.get_event_loop(), ) # ===== @@ -496,6 +495,9 @@ def __remove_ws(self, ws: WsSession) -> None: async def _check_request_auth(self, exposed: HttpExposed, req: Request) -> None: pass + async def _before_app(self) -> None: + pass + async def _init_app(self) -> None: raise NotImplementedError @@ -514,6 +516,8 @@ def _on_ws_removed(self, ws: WsSession) -> None: # ===== async def __make_app(self) -> Application: + await self._before_app() + self.__app = Application(middlewares=[normalize_path_middleware( # pylint: disable=attribute-defined-outside-init append_slash=False, remove_slash=True, diff --git a/kvmd/inotify.py b/kvmd/inotify.py index e2cc62512..8691ce38e 100644 --- a/kvmd/inotify.py +++ b/kvmd/inotify.py @@ -34,7 +34,6 @@ from .logging import get_logger -from . import aiotools from . import libc @@ -199,7 +198,7 @@ def __init__(self) -> None: self.__moved: dict[int, str] = {} - self.__events_queue: "asyncio.Queue[InotifyEvent]" = asyncio.Queue() + self.__events_q: "asyncio.Queue[InotifyEvent]" = asyncio.Queue() async def watch_all_changes(self, *paths: str) -> None: await self.watch(InotifyMask.ALL_CHANGES_EVENTS, *paths) @@ -208,9 +207,9 @@ async def watch(self, mask: int, *paths: str) -> None: for path in paths: path = os.path.normpath(path) assert path not in self.__wd_by_path, path - get_logger().info("Watching for %s", path) + get_logger(2).info("Watching for %s", path) # Асинхронно, чтобы не висло на NFS - wd = _inotify_check(await aiotools.run_async(libc.inotify_add_watch, self.__fd, _fs_encode(path), mask)) + wd = _inotify_check(await asyncio.to_thread(libc.inotify_add_watch, self.__fd, _fs_encode(path), mask)) self.__wd_by_path[path] = wd self.__path_by_wd[wd] = path @@ -224,13 +223,13 @@ async def watch(self, mask: int, *paths: str) -> None: # del self.__path_by_wd[wd] # def has_events(self) -> bool: -# return (not self.__events_queue.empty()) +# return (not self.__events_q.empty()) async def get_event(self, timeout: float) -> (InotifyEvent | None): assert timeout > 0 try: return (await asyncio.wait_for( - asyncio.ensure_future(self.__events_queue.get()), + asyncio.ensure_future(self.__events_q.get()), timeout=timeout, )) except asyncio.TimeoutError: @@ -273,7 +272,7 @@ def __read_and_queue_events(self) -> None: del self.__wd_by_path[ignored_path] continue - self.__events_queue.put_nowait(event) + self.__events_q.put_nowait(event) def __read_parsed_events(self) -> Generator[InotifyEvent, None, None]: for (wd, mask, cookie, name_bytes) in _inotify_parsed_buffer(self.__read_buffer()): diff --git a/kvmd/nbd/__init__.py b/kvmd/nbd/__init__.py new file mode 100644 index 000000000..dbd64172e --- /dev/null +++ b/kvmd/nbd/__init__.py @@ -0,0 +1,117 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import asyncio +import urllib.parse + +from typing import Final +from typing import AsyncGenerator +from typing import Type +from typing import Any + +from ..logging import get_logger + +from .. import tools +from .. import aiotools + +from ..yamlconf import make_config +from ..validators import ValidatorError + +from .errors import NbdError + +from .types import BaseNbdEvent +from .types import NbdStopEvent + +from .device import NbdDevice +from .process import NbdProcess + +from .remotes import BaseNbdRemote +from .remotes.http import NbdHttpRemote + + +# ===== +class NbdServer: + __DEVICE_BLOCK: Final[int] = 512 + __DEVICE_TIMEOUT: Final[int] = 3600 + + __REMOTES: Final[dict[str, Type[BaseNbdRemote]]] = { + scheme: cls + for cls in [NbdHttpRemote] + for scheme in cls.get_schemes() + } + + def __init__(self, path: str) -> None: + self.__device = NbdDevice(path, self.__DEVICE_BLOCK, self.__DEVICE_TIMEOUT) + self.__proc: (NbdProcess | None) = None + self.__nr = aiotools.AioNotifier() + self.__lock = asyncio.Lock() + + # ===== + + async def bind(self, url: str, **kwargs: Any) -> None: + async with self.__lock: + if self.__proc: + raise NbdError("NBD is already bound") + + scheme = urllib.parse.urlparse(url).scheme + cls = self.__REMOTES.get(scheme) + if cls is None: + raise ValidatorError("Unsupported remote URL scheme") + + try: + config = make_config({"url": url, **kwargs}, {}, cls.get_options()) + except Exception as ex: + raise ValidatorError(tools.efmt(ex)) + + remote = cls(**config._unpack()) + image = await remote.probe() + + assert self.__proc is None + self.__nr.notify() + self.__proc = NbdProcess(self.__device, remote, image) + + def unbind(self) -> None: + if self.__proc: + self.__proc.stop() + + async def poll(self) -> AsyncGenerator[BaseNbdEvent]: + while True: + await self.__nr.wait() + if self.__proc: + stop: (NbdStopEvent | None) = None + try: + async with self.__proc.running(): + async for event in self.__proc.poll(): + if isinstance(event, NbdStopEvent): + if stop is None: + stop = event + else: + yield event + except NbdError as ex: + get_logger(0).error("%s", tools.efmt(ex)) + except Exception: + get_logger(0).exception("Unexpected error in NBD poller loop") + finally: + self.__proc = None + if stop is None: + stop = NbdStopEvent("main", "Unknown stop reason", False) + yield stop diff --git a/kvmd/nbd/device.py b/kvmd/nbd/device.py new file mode 100644 index 000000000..c7c37dd47 --- /dev/null +++ b/kvmd/nbd/device.py @@ -0,0 +1,135 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import sys +import os +import fcntl +import socket +import asyncio +import contextlib +import math + +from typing import Final +from typing import Generator +from typing import AsyncGenerator + +from ..logging import get_logger + +from .. import tools + +from .errors import NbdDeviceError +from .types import NbdImage +from .link import NbdLink + + +# ===== +_NBD_CLEAR_SOCK: Final[tuple[int, str]] = (0x0000AB04, "NBD_CLEAR_SOCK") +_NBD_DO_IT: Final[tuple[int, str]] = (0x0000AB03, "NBD_DO_IT") +_NBD_DISCONNECT: Final[tuple[int, str]] = (0x0000AB08, "NBD_DISCONNECT") +_NBD_SET_BLKSIZE: Final[tuple[int, str]] = (0x0000AB01, "NBD_SET_BLKSIZE") +_NBD_SET_FLAGS: Final[tuple[int, str]] = (0x0000AB0A, "NBD_SET_FLAGS") +_NBD_SET_SIZE_BLOCKS: Final[tuple[int, str]] = (0x0000AB07, "NBD_SET_SIZE_BLOCKS") +_NBD_SET_SOCK: Final[tuple[int, str]] = (0x0000AB00, "NBD_SET_SOCK") +_NBD_SET_TIMEOUT: Final[tuple[int, str]] = (0x0000AB09, "NBD_SET_TIMEOUT") +_BLKROSET: Final[tuple[int, str]] = (0x0000125D, "BLKROSET") + + +def _ioctl(fd: int, ctl: tuple[int, str], value: (int | bytes)=0) -> None: + (req, name) = ctl + try: + fcntl.ioctl(fd, req, value) + except Exception as ex: + raise NbdDeviceError(f"Ioctl {name} error", ex) + + +@contextlib.contextmanager +def _wrap_exceptions() -> Generator[None]: + try: + yield + except NbdDeviceError: + raise + except Exception as ex: + raise NbdDeviceError(tools.efmt(ex)) + + +# ===== +class NbdDevice: + def __init__(self, path: str, block: int, timeout: float) -> None: + self.__path = path + self.__block = block + self.__timeout = timeout + + # ===== + + def get_path(self) -> str: + return self.__path + + async def open_close(self) -> None: + await asyncio.to_thread(self.__inner_open_close) + + def __inner_open_close(self) -> None: + fd = os.open(self.__path, os.O_RDONLY) + os.close(fd) + + @contextlib.asynccontextmanager + async def open_prepared(self, link: NbdLink, image: NbdImage) -> AsyncGenerator[int]: + with _wrap_exceptions(): + fd = await asyncio.to_thread(os.open, self.__path, os.O_RDWR) + try: + self.__cleanup(fd, close=False) + self.__prepare(fd, image, link.device_s) + yield fd + finally: + try: + self.__cleanup(fd, close=True) + except Exception as ex: + get_logger(0).error("Cleanup error: %s", tools.efmt(ex)) + + async def do_it(self, fd: int) -> None: + logger = get_logger(0) + logger.info("Running NBD_DO_IT ...") + await asyncio.to_thread(_ioctl, fd, _NBD_DO_IT) # Blocks here + logger.info("Stopped NBD_DO_IT") + + def __prepare(self, fd: int, image: NbdImage, sock: socket.SocketType) -> None: + logger = get_logger(0) + + blocks = (image.size + self.__block) // self.__block + flags = (0 if image.rw else 2) # NBD_FLAG_READ_ONLY + ro_bytes = int(not image.rw).to_bytes(byteorder=sys.byteorder, length=4) # Kinda ptr + + logger.info("Preparing %s: bytes=%s, bs=%s, blocks=%s, rw=%s ...", + self.__path, image.size, self.__block, blocks, image.rw) + + _ioctl(fd, _NBD_SET_BLKSIZE, self.__block) + _ioctl(fd, _NBD_SET_SIZE_BLOCKS, blocks) + _ioctl(fd, _NBD_SET_FLAGS, flags) + _ioctl(fd, _BLKROSET, ro_bytes) + _ioctl(fd, _NBD_SET_TIMEOUT, math.ceil(self.__timeout)) + _ioctl(fd, _NBD_SET_SOCK, sock.fileno()) + logger.info("Prepared") + + def __cleanup(self, fd: int, close: bool) -> None: + _ioctl(fd, _NBD_DISCONNECT) # Should be always OK .. + _ioctl(fd, _NBD_CLEAR_SOCK) # ... accordung to kernel sources + if close: + os.close(fd) diff --git a/kvmd/nbd/errors.py b/kvmd/nbd/errors.py new file mode 100644 index 000000000..ca7b4c370 --- /dev/null +++ b/kvmd/nbd/errors.py @@ -0,0 +1,54 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +from .. import tools + + +# ===== +class NbdError(Exception): + def __init__(self, msg: str, ex: (Exception | None)=None) -> None: + if ex: + msg += ": " + tools.efmt(ex) + super().__init__(msg) + + +# ===== +class NbdDeviceError(NbdError): + pass + + +# ===== +class NbdIoError(NbdError): + pass + + +class NbdIoConnectionError(NbdIoError): + pass + + +class NbdIoProtocolError(NbdIoError): + pass + + +# ===== +class NbdRemoteError(NbdError): + pass diff --git a/kvmd/nbd/link.py b/kvmd/nbd/link.py new file mode 100644 index 000000000..7f370a5f5 --- /dev/null +++ b/kvmd/nbd/link.py @@ -0,0 +1,93 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import asyncio +import socket +import contextlib +import dataclasses + +from typing import Generator +from typing import AsyncGenerator + + +# ===== +@dataclasses.dataclass(frozen=True) +class NbdLink: + device_s: socket.SocketType + remote_r: asyncio.StreamReader + remote_w: asyncio.StreamWriter + _remote_s: socket.SocketType + _stopped: bool = dataclasses.field(default=False, hash=False) + + @classmethod + @contextlib.asynccontextmanager + async def opened(cls) -> AsyncGenerator["NbdLink"]: + (device_s, remote_s) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM, 0) + + try: + (remote_r, remote_w) = await asyncio.open_connection(sock=remote_s) + except: # noqa: E722 + for sock in [device_s, remote_s]: + try: + sock.close() + except Exception: + pass + raise + + link = NbdLink(device_s, remote_r, remote_w, remote_s) + try: + yield link + finally: + # На самом деле мы должны использовать aiotools.close_writer(remote_w), + # но для простоты обработки CancelledError этим можно пренебречь, + # особенно с учетом того, что всё это живет в подпроцессе, который + # будет отстрелян по завершении работы. + # device_s.close(); aiotools.close_writer(remote_w); + link._close() + + def is_stopped(self) -> bool: + return self._stopped + + @contextlib.contextmanager + def shutdown_at_end(self) -> Generator[None]: + try: + yield + finally: + self.shutdown() + + def shutdown(self) -> bool: + ok = True + for sock in [self.device_s, self._remote_s]: + try: + sock.shutdown(socket.SHUT_RDWR) + except Exception: + ok = False + object.__setattr__(self, "_stopped", ok) + return ok + + def _close(self) -> None: + self.shutdown() + for sock in [self.device_s, self._remote_s]: + try: + sock.close() + except Exception: + pass diff --git a/kvmd/nbd/process.py b/kvmd/nbd/process.py new file mode 100644 index 000000000..373a667f7 --- /dev/null +++ b/kvmd/nbd/process.py @@ -0,0 +1,212 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import asyncio +import signal +import contextlib +import logging + +from typing import Final +from typing import Generator +from typing import AsyncGenerator + +from ..logging import get_logger + +from .. import tools +from .. import aiotools +from .. import aiomulti + +from .errors import NbdError +from .errors import NbdIoConnectionError + +from .types import NbdImage +from .types import BaseNbdEvent +from .types import NbdStartEvent +from .types import NbdStopEvent + +from .device import NbdDevice +from .remotes import BaseNbdRemote +from .link import NbdLink + + +# ===== +class NbdProcess: + __QUEUE_SIZE: Final[int] = 128 + __REACT_TIMEOUT: Final[int] = 3 + + def __init__( + self, + device: NbdDevice, + remote: BaseNbdRemote, + image: NbdImage, + ) -> None: + + self.__device = device + self.__remote = remote + self.__image = image + + self.__events_q: aiomulti.AioMpQueue[BaseNbdEvent] = aiomulti.AioMpQueue(self.__QUEUE_SIZE) + self.__proc = aiomulti.AioMpProcess("nbd", self.__subprocess) + self.__ready_nr = aiomulti.AioMpNotifier() + + def stop(self) -> None: + self.__proc.send_sigterm() + + @contextlib.asynccontextmanager + async def running(self) -> AsyncGenerator[None]: + logger = get_logger(0) + logger.info("Starting NBD process ...") + + self.__proc.start() + try: + ready = await self.__ready_nr.wait(self.__image.timeout + self.__REACT_TIMEOUT) + if ready < 0: # pylint: disable=no-else-raise + # No events - not started + raise NbdError("NBD process did not respond in time at start") + elif ready == 0: + # Failed to start in time, but notified - wait for exiting + await self.__proc.async_join(self.__REACT_TIMEOUT) + return # FIXME: defunc + + yield + + finally: + try: + if self.__proc.is_alive(): + logger.info("Stopping NBD process with SIGTERM ...") + self.__proc.send_sigterm() + await self.__proc.async_join(self.__REACT_TIMEOUT) + finally: + if self.__proc.is_alive(): + logger.info("Killing NBD process with SIGKILL ...") + self.__proc.sendpg_sigkill() + + alive = await self.__proc.async_join(self.__REACT_TIMEOUT) + if not alive: + logger.info("NBD process stopped: retcode=%s", self.__proc.exitcode) + else: + logger.error("Can't stop NBD process") + + async def poll(self) -> AsyncGenerator[BaseNbdEvent]: + while self.__proc.is_alive(): + (got, event) = await self.__events_q.async_fetch(1) # FIXME: Wait for process too + if got: + assert event is not None + yield event + while not self.__events_q.empty(): + await asyncio.sleep(0) + yield self.__events_q.get_nowait() + + def __subprocess(self) -> None: + with self.__catch_exceptions("main", subtask=False): + aiotools.run(self.__subprocess_loop()) + + async def __subprocess_loop(self) -> None: + async with NbdLink.opened() as link: + tasks: list[asyncio.Task] = [] + + def stop() -> None: + # Прибиваем через shutdown(), чтобы всё, что держится на сокетах, прервалось. + # Если не получилось - делаем cancel() и дожидаемся SIGKILL. + if link.shutdown(): + self.__queue_event_noex(NbdStopEvent("main", "Shutdown", True)) + else: + for task in tasks: + task.cancel() + + for signum in [signal.SIGTERM, signal.SIGINT]: + asyncio.get_running_loop().add_signal_handler(signum, stop) + + prepared = aiotools.AioStage() + + await aiotools.spawn_and_follow( + self.__sub_device_server(link, prepared), + self.__sub_remote_server(link), + self.__sub_checker(link, prepared), + wait=1, # FIXME: Get rid of this + tasks=tasks, + ) + + async def __sub_device_server(self, link: NbdLink, prepared: aiotools.AioStage) -> None: + with self.__catch_exceptions("device", log=self.__device.__module__): + with link.shutdown_at_end(): + async with self.__device.open_prepared(link, self.__image) as fd: + prepared.set_passed() + await self.__device.do_it(fd) + + async def __sub_remote_server(self, link: NbdLink) -> None: + try: + with self.__catch_exceptions("remote", log=self.__remote.__module__): + with link.shutdown_at_end(): + try: + await self.__remote.serve(link, self.__events_q) + except NbdIoConnectionError: + if not link.is_stopped(): + raise + finally: + await self.__remote.cleanup() + + async def __sub_checker(self, link: NbdLink, prepared: aiotools.AioStage) -> None: + with self.__catch_exceptions("checker"): + with link.shutdown_at_end(): + try: + await prepared.wait_passed() + await asyncio.wait_for( + self.__device.open_close(), + timeout=self.__image.timeout, + ) + except BaseException as ex: + self.__ready_nr.notify(0) + if isinstance(ex, TimeoutError): + raise NbdError("Can't open+close device in time") + raise + self.__ready_nr.notify(1) + self.__events_q.put_nowait(NbdStartEvent(self.__image, self.__device.get_path())) + await aiotools.wait_infinite() + + @contextlib.contextmanager + def __catch_exceptions(self, src: str, log: str="", subtask: bool=True) -> Generator[None]: + logger = (logging.getLogger(log) if log else get_logger(0)) + if subtask: + logger.info("Starting subtask %s ...", src) + msg = "" + try: + yield + except asyncio.CancelledError: + pass # Normally we don't interested in this as a reason + except NbdError as ex: + msg = tools.efmt(ex) + logger.error("%s", msg) + except Exception as ex: + msg = tools.efmt(ex) + logger.exception("Unhandled exception") + finally: + if msg: + self.__queue_event_noex(NbdStopEvent(src, msg, False)) + if subtask: + logger.info("Subtask %s finished", src) + + def __queue_event_noex(self, event: BaseNbdEvent) -> None: + try: + self.__events_q.put_nowait(event) + except Exception as ex: + get_logger(0).error("Can't queue stop event: %s", tools.efmt(ex)) diff --git a/kvmd/nbd/remotes/__init__.py b/kvmd/nbd/remotes/__init__.py new file mode 100644 index 000000000..d353ead72 --- /dev/null +++ b/kvmd/nbd/remotes/__init__.py @@ -0,0 +1,216 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import asyncio +import struct +import errno + +from typing import Final + +from ...yamlconf import Option + +from ... import tools +from ... import aiomulti + +from ..errors import NbdRemoteError +from ..errors import NbdIoConnectionError +from ..errors import NbdIoProtocolError + +from ..types import NbdImage +from ..types import BaseNbdEvent +from ..types import NbdRemoteEvent + +from ..link import NbdLink + + +# ===== +class BaseNbdRemote: + # https://github.com/NetworkBlockDevice/nbd/blob/master/doc/proto.md + # https://github.com/NetworkBlockDevice/nbd/blob/master/nbd-client.c + # https://github.com/mirror/busybox/blob/master/networking/nbd-client.c + # https://elixir.bootlin.com/linux/v6.12/source/drivers/block/nbd.c + + __MAGIC_RECV: Final[int] = 0x25609513 + __MAGIC_SEND: Final[int] = 0x67446698 + + __OP_READ: Final[int] = 0 + __OP_WRITE: Final[int] = 1 + __OP_STOP: Final[int] = 2 + + def __init__(self) -> None: + self.__recv_st = struct.Struct(">IHHQQI") + self.__send_st = struct.Struct(">IIQ") + + self.__image: (NbdImage | None) = None + self.__events_q: (aiomulti.AioMpQueue[BaseNbdEvent] | None) = None + + # ===== + + @classmethod + def get_schemes(cls) -> set[str]: + raise NotImplementedError + + @classmethod + def get_options(cls) -> dict[str, Option]: + return {} + + # ===== + + async def _do_probe(self) -> NbdImage: + raise NotImplementedError + + async def _do_again(self) -> NbdImage: + raise NotImplementedError + + async def _on_read(self, offset: int, size: int) -> bytes: + raise NotImplementedError + + async def _on_write(self, offset: int, data: bytes) -> None: + raise NotImplementedError + + async def _do_cleanup(self) -> None: + raise NotImplementedError + + # ===== + + async def _send_status_ok(self) -> None: + await self.__send_remote_event(True, "Online") + + async def _send_status_error(self, msg: str) -> None: + await self.__send_remote_event(False, msg) + + async def __send_remote_event(self, online: bool, msg: str) -> None: + assert self.__events_q is not None + try: + self.__events_q.put_nowait(NbdRemoteEvent(online, msg)) + except Exception as ex: + raise NbdRemoteError(f"Can't send status event: {tools.efmt(ex)}") + + async def _probe_again(self) -> None: + assert self.__image + image = await self._do_again() + if self.__image.rw is True and not image.rw: + raise NbdRemoteError("The source permissions changed: RW -> RO") + if self.__image.size != image.size: + raise NbdRemoteError(f"The source file has a new size: {self.__image.size} -> {image.size}") + + # ===== + + async def probe(self) -> NbdImage: + assert self.__events_q is None # Not running + self.__image = await self._do_probe() + return self.__image + + async def serve( + self, + link: NbdLink, + events_q: aiomulti.AioMpQueue[BaseNbdEvent], + ) -> None: + + assert self.__image + assert self.__events_q is None + self.__events_q = events_q + + await self._probe_again() # Validate NbdImage after first probing + await self._send_status_ok() + + while True: + (op, cookie, offset, size, data) = await self.__recv_request(link.remote_r) + result: (tuple[int, bytes] | None) = None + match op: + case self.__OP_READ: + result = await self.__handle_read(offset, size) + case self.__OP_WRITE: + result = await self.__handle_write(offset, data) + case self.__OP_STOP: + raise NbdIoConnectionError("Closed by kernel") + case _: + raise NbdIoProtocolError(f"Unknown OP received: 0x{op:X}") + assert result is not None + await self.__send_response(link.remote_w, cookie, *result) + + async def cleanup(self) -> None: + try: + await self._do_cleanup() + finally: + self.__events_q = None + self.__image = None + + async def __recv_request( + self, + reader: asyncio.StreamReader, + ) -> tuple[int, int, int, int, bytes]: + + try: + header = await reader.readexactly(self.__recv_st.size) + (magic, flags, op, cookie, offset, size) = self.__recv_st.unpack(header) + data = b"" + if op == self.__OP_WRITE and size > 0: + data = await reader.readexactly(size) + except (ConnectionError, asyncio.IncompleteReadError) as ex: + raise NbdIoConnectionError("Can't receive request", ex) + + if magic != self.__MAGIC_RECV: + raise NbdIoProtocolError(f"Invalid request magic: 0x{magic:X}") + if flags: + raise NbdIoProtocolError(f"Got non-zero request flags: 0x{flags:X}") + return (op, cookie, offset, size, data) + + async def __send_response( + self, + writer: asyncio.StreamWriter, + cookie: int, error: int, data: bytes=b"", + ) -> None: + + try: + header = self.__send_st.pack(self.__MAGIC_SEND, error, cookie) + writer.write(header) + if error == 0 and len(data) > 0: + writer.write(data) + await writer.drain() + except ConnectionError as ex: + raise NbdIoConnectionError("Can't send response", ex) + + async def __handle_read(self, offset: int, size: int) -> tuple[int, bytes]: + assert self.__image + if offset >= self.__image.size: + return (errno.EINVAL, b"") + + data = await self._on_read(offset, size) + if len(data) < size: + if offset + size > self.__image.size: + data += b"\x00" * (size - len(data)) + else: + raise NbdIoProtocolError("Insufficient READ data") + elif len(data) > size: + raise NbdIoProtocolError("Too much READ data") + + return (0, data) + + async def __handle_write(self, offset: int, data: bytes) -> tuple[int, bytes]: + assert self.__image + if not self.__image.rw: + return (errno.EPERM, b"") + if offset >= self.__image.size: + return (errno.ENOSPC, b"") + await self._on_write(offset, data) + return (0, b"") diff --git a/kvmd/nbd/remotes/http.py b/kvmd/nbd/remotes/http.py new file mode 100644 index 000000000..3653fab7b --- /dev/null +++ b/kvmd/nbd/remotes/http.py @@ -0,0 +1,167 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import asyncio + +import aiohttp + +from ... import tools +from ... import htclient + +from ...yamlconf import Option + +from ...validators.basic import valid_bool +from ...validators.basic import valid_number +from ...validators.net import valid_url + +from ..errors import NbdError +from ..errors import NbdRemoteError + +from ..types import NbdImage + +from . import BaseNbdRemote + + +# ===== +class NbdHttpRemote(BaseNbdRemote): + def __init__( + self, + url: str, + verify: bool, + user: str, + passwd: str, + timeout: float, + retries_delay: float, + ) -> None: + + super().__init__() + + self.__url = url + self.__verify = verify + self.__user = user + self.__passwd = passwd + self.__timeout = timeout + self.__retries_delay = retries_delay + + self.__session: (aiohttp.ClientSession | None) = None + + # ===== + + @classmethod + def get_schemes(cls) -> set[str]: + return set(["http", "https"]) + + @classmethod + def get_options(cls) -> dict[str, Option]: + return { + "url": Option("", type=valid_url), + "verify": Option(True, type=valid_bool), + "user": Option(""), + "passwd": Option(""), + "timeout": Option(3.0, type=valid_number.mk(min=1.0, max=30.0, type=float)), + "retries_delay": Option(5.0, type=valid_number.mk(min=1.0, max=30.0, type=float)), + } + + # ===== + + async def _do_probe(self) -> NbdImage: + async with self.__make_session() as session: + return (await self.__probe(session)) + + async def _do_again(self) -> NbdImage: + session = self.__ensure_session() + return (await self.__probe(session)) + + async def __probe(self, session: aiohttp.ClientSession) -> NbdImage: + async with session.head(self.__url) as resp: + htclient.raise_not_200(resp) + cl = resp.content_length + if not isinstance(cl, int) or cl < 0: + raise NbdRemoteError(f"Invalid Content-Length: {cl}") + return NbdImage( + size=cl, + rw=False, + timeout=self.__timeout, + ) + + # ===== + + async def _on_read(self, offset: int, size: int) -> bytes: + errors = 0 + while True: + try: + if errors > 0: + await self._probe_again() + data = (await self.__read(offset, size)) + if errors > 0: + await self._send_status_ok() + errors = 0 + return data + except NbdError: + raise + except Exception as ex: + errors += 1 + msg = f"READ: {tools.efmt(ex)}; Retrying ({errors}) ..." + await self._send_status_error(msg) + await asyncio.sleep(self.__retries_delay) + + async def __read(self, offset: int, size: int) -> bytes: + session = self.__ensure_session() + async with session.get( + url=self.__url, + headers={aiohttp.hdrs.RANGE: f"bytes={offset}-{offset + size}"}, + ) as resp: + + resp.raise_for_status() # 206 partial is OK here + return (await resp.read())[:size] + + async def _on_write(self, offset: int, data: bytes) -> None: + _ = offset + _ = data + raise RuntimeError("WRITE should not be called for HTTP") + + # ===== + + async def _do_cleanup(self) -> None: + if self.__session: + try: + await self.__session.close() + finally: + self.__session = None + + # ===== + + def __ensure_session(self) -> aiohttp.ClientSession: + if self.__session is None: + self.__session = self.__make_session() + return self.__session + + def __make_session(self) -> aiohttp.ClientSession: + return aiohttp.ClientSession( + headers={aiohttp.hdrs.USER_AGENT: htclient.make_user_agent("KVMD-NBD")}, + connector=aiohttp.TCPConnector(ssl=self.__verify), + auth=(aiohttp.BasicAuth(self.__user, self.__passwd) if self.__user else None), + timeout=aiohttp.ClientTimeout(total=self.__timeout), + + # Don't ask for compression: https://github.com/aio-libs/aiohttp/issues/5513 + skip_auto_headers=frozenset([aiohttp.hdrs.ACCEPT_ENCODING]), + ) diff --git a/kvmd/nbd/types.py b/kvmd/nbd/types.py new file mode 100644 index 000000000..a2536fe6b --- /dev/null +++ b/kvmd/nbd/types.py @@ -0,0 +1,55 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import dataclasses + + +# ===== +@dataclasses.dataclass(frozen=True) +class NbdImage: + size: int + rw: bool + timeout: float + + +# ===== +class BaseNbdEvent: + pass + + +@dataclasses.dataclass(frozen=True) +class NbdRemoteEvent(BaseNbdEvent): + online: bool + msg: str + + +@dataclasses.dataclass(frozen=True) +class NbdStartEvent(BaseNbdEvent): + image: NbdImage + path: str + + +@dataclasses.dataclass(frozen=True) +class NbdStopEvent(BaseNbdEvent): + src: str + msg: str + ok: bool diff --git a/kvmd/plugins/atx/gpio.py b/kvmd/plugins/atx/gpio.py index cfa7660ff..0258de34a 100644 --- a/kvmd/plugins/atx/gpio.py +++ b/kvmd/plugins/atx/gpio.py @@ -109,7 +109,7 @@ def get_plugin_options(cls) -> dict: "long_click_delay": Option(5.5, type=valid_float_f01), } - def sysprep(self) -> None: + async def sysprep(self) -> None: assert self.__line_req is None self.__line_req = gpiod.request_lines( self.__device_path, diff --git a/kvmd/plugins/auth/ldap.py b/kvmd/plugins/auth/ldap.py index 961a47c71..994497067 100644 --- a/kvmd/plugins/auth/ldap.py +++ b/kvmd/plugins/auth/ldap.py @@ -20,6 +20,8 @@ # ========================================================================== # +import asyncio + import ldap from ...yamlconf import Option @@ -31,7 +33,6 @@ from ...logging import get_logger from ... import tools -from ... import aiotools from . import BaseAuthService @@ -67,7 +68,7 @@ def get_plugin_options(cls) -> dict: } async def authorize(self, user: str, passwd: str) -> bool: - return (await aiotools.run_async(self.__inner_authorize, user, passwd)) + return (await asyncio.to_thread(self.__inner_authorize, user, passwd)) def __inner_authorize(self, user: str, passwd: str) -> bool: if self.__user_domain: diff --git a/kvmd/plugins/auth/pam.py b/kvmd/plugins/auth/pam.py index 119951ab3..7b5fec5ad 100644 --- a/kvmd/plugins/auth/pam.py +++ b/kvmd/plugins/auth/pam.py @@ -32,8 +32,6 @@ from ...logging import get_logger -from ... import aiotools - from . import BaseAuthService @@ -67,7 +65,7 @@ async def authorize(self, user: str, passwd: str) -> bool: assert user == user.strip() assert user async with self.__lock: - return (await aiotools.run_async(self.__inner_authorize, user, passwd)) + return (await asyncio.to_thread(self.__inner_authorize, user, passwd)) def __inner_authorize(self, user: str, passwd: str) -> bool: if self.__allow_users and user not in self.__allow_users: diff --git a/kvmd/plugins/auth/radius.py b/kvmd/plugins/auth/radius.py index f048a2e6b..cca251c26 100644 --- a/kvmd/plugins/auth/radius.py +++ b/kvmd/plugins/auth/radius.py @@ -20,6 +20,7 @@ # ========================================================================== # +import asyncio import io import pyrad.client @@ -34,8 +35,6 @@ from ...logging import get_logger -from ... import aiotools - from . import BaseAuthService @@ -420,7 +419,7 @@ def get_plugin_options(cls) -> dict: } async def authorize(self, user: str, passwd: str) -> bool: - return (await aiotools.run_async(self.__inner_authorize, user, passwd)) + return (await asyncio.to_thread(self.__inner_authorize, user, passwd)) def __inner_authorize(self, user: str, passwd: str) -> bool: assert user == user.strip() diff --git a/kvmd/plugins/hid/__init__.py b/kvmd/plugins/hid/__init__.py index bfe4b98fd..29c2eef07 100644 --- a/kvmd/plugins/hid/__init__.py +++ b/kvmd/plugins/hid/__init__.py @@ -96,7 +96,7 @@ def _get_base_options(cls) -> dict[str, Any]: # ===== - def sysprep(self) -> None: + async def sysprep(self) -> None: raise NotImplementedError async def get_state(self) -> dict: diff --git a/kvmd/plugins/hid/_mcu/__init__.py b/kvmd/plugins/hid/_mcu/__init__.py index dc681b686..b638fe390 100644 --- a/kvmd/plugins/hid/_mcu/__init__.py +++ b/kvmd/plugins/hid/_mcu/__init__.py @@ -32,10 +32,7 @@ from ....logging import get_logger -from .... import tools -from .... import aiotools from .... import aiomulti -from .... import aioproc from ....yamlconf import Option @@ -104,7 +101,7 @@ def connected(self) -> Generator[BasePhyConnection, None, None]: raise NotImplementedError -class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes +class BaseMcuHid(BaseHid): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments,super-init-not-called self, phy: BasePhy, @@ -123,8 +120,7 @@ def __init__( # pylint: disable=too-many-arguments,super-init-not-called **gpio_kwargs: Any, ) -> None: - BaseHid.__init__(self, ignore_keys=ignore_keys, **mouse_x_range, **mouse_y_range, **jiggler) - multiprocessing.Process.__init__(self, daemon=True) + super().__init__(ignore_keys=ignore_keys, **mouse_x_range, **mouse_y_range, **jiggler) self.__read_retries = read_retries self.__common_retries = common_retries @@ -137,10 +133,12 @@ def __init__( # pylint: disable=too-many-arguments,super-init-not-called self.__gpio = Gpio(device_path=gpio_device_path, **gpio_kwargs) self.__reset_self = reset_self + self.__proc = aiomulti.AioMpProcess("hid", self.__subprocess) + self.__reset_required_event = multiprocessing.Event() - self.__events_queue: "multiprocessing.Queue[BaseEvent]" = multiprocessing.Queue() + self.__events_q: aiomulti.AioMpQueue[BaseEvent] = aiomulti.AioMpQueue() - self.__notifier = aiomulti.AioProcessNotifier() + self.__notifier = aiomulti.AioMpNotifier() self.__state_flags = aiomulti.AioSharedFlags({ "online": 0, "busy": 0, @@ -171,9 +169,8 @@ def get_plugin_options(cls) -> dict: **cls._get_base_options(), } - def sysprep(self) -> None: - get_logger(0).info("Starting HID daemon ...") - self.start() + async def sysprep(self) -> None: + self.__proc.start() async def get_state(self) -> dict: state = await self.__state_flags.get() @@ -254,13 +251,10 @@ async def poll_state(self) -> AsyncGenerator[dict, None]: async def reset(self) -> None: self.__reset_required_event.set() - @aiotools.atomic_fg async def cleanup(self) -> None: - if self.is_alive(): - get_logger(0).info("Stopping HID daemon ...") + if self.__proc.is_alive(): self.__stop_event.set() - if self.is_alive() or self.exitcode is not None: - self.join() + await self.__proc.async_join() # ===== @@ -309,11 +303,11 @@ def __queue_event(self, event: BaseEvent, clear: bool=False) -> None: # FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между # очисткой и добавлением нового события. Неприятно, но не смертельно. # Починить блокировкой после перехода на асинхронные очереди. - tools.clear_queue(self.__events_queue) - self.__events_queue.put_nowait(event) + self.__events_q.clear_current() + self.__events_q.put_nowait(event) - def run(self) -> None: # pylint: disable=too-many-branches - logger = aioproc.settle("HID", "hid") + def __subprocess(self) -> None: # pylint: disable=too-many-branches + logger = get_logger(0) while not self.__stop_event.is_set(): try: with self.__gpio: @@ -337,13 +331,13 @@ def __hid_loop(self) -> None: continue reset = True with self.__phy.connected() as conn: - while not (self.__stop_event.is_set() and self.__events_queue.qsize() == 0): + while not (self.__stop_event.is_set() and self.__events_q.qsize() == 0): if self.__reset_required_event.is_set(): self.__set_state_busy(True) self.__reset_required_event.clear() break # Проваливаемся и резетим в __hid_loop_wait_device() try: - event = self.__events_queue.get(timeout=0.1) + event = self.__events_q.get(timeout=0.1) except queue.Empty: self.__process_request(conn, REQUEST_PING) else: diff --git a/kvmd/plugins/hid/bt/__init__.py b/kvmd/plugins/hid/bt/__init__.py index 246e3a8a1..717ee8c7c 100644 --- a/kvmd/plugins/hid/bt/__init__.py +++ b/kvmd/plugins/hid/bt/__init__.py @@ -36,9 +36,7 @@ from ....validators.basic import valid_int_f1 from ....validators.basic import valid_float_f01 -from .... import aiotools from .... import aiomulti -from .... import aioproc from .. import BaseHid @@ -88,10 +86,10 @@ def __init__( # pylint: disable=too-many-arguments,too-many-locals super().__init__(ignore_keys=ignore_keys, **mouse_x_range, **mouse_y_range, **jiggler) self._set_jiggler_absolute(False) - self.__proc: (multiprocessing.Process | None) = None + self.__proc = aiomulti.AioMpProcess("hid", self.__server_worker) self.__stop_event = multiprocessing.Event() - self.__notifier = aiomulti.AioProcessNotifier() + self.__notifier = aiomulti.AioMpNotifier() self.__server = BtServer( iface=BluezIface( @@ -132,9 +130,7 @@ def get_plugin_options(cls) -> dict: **cls._get_base_options(), } - def sysprep(self) -> None: - get_logger(0).info("Starting HID daemon ...") - self.__proc = multiprocessing.Process(target=self.__server_worker, daemon=True) + async def sysprep(self) -> None: self.__proc.start() async def get_state(self) -> dict: @@ -179,14 +175,10 @@ async def reset(self) -> None: self.clear_events() self.__server.queue_event(ResetEvent()) - @aiotools.atomic_fg async def cleanup(self) -> None: - if self.__proc is not None: - if self.__proc.is_alive(): - get_logger(0).info("Stopping HID daemon ...") - self.__stop_event.set() - if self.__proc.is_alive() or self.__proc.exitcode is not None: - self.__proc.join() + if self.__proc.is_alive(): + self.__stop_event.set() + await self.__proc.async_join() # ===== @@ -221,7 +213,7 @@ def _clear_events(self) -> None: # ===== def __server_worker(self) -> None: # pylint: disable=too-many-branches - logger = aioproc.settle("HID", "hid") + logger = get_logger(0) while not self.__stop_event.is_set(): try: self.__server.run() diff --git a/kvmd/plugins/hid/bt/server.py b/kvmd/plugins/hid/bt/server.py index b95f3b80d..59bc348f9 100644 --- a/kvmd/plugins/hid/bt/server.py +++ b/kvmd/plugins/hid/bt/server.py @@ -24,6 +24,7 @@ import select import multiprocessing import multiprocessing.synchronize +import multiprocessing.connection import dataclasses import contextlib import queue @@ -85,7 +86,7 @@ def __init__( socket_timeout: float, select_timeout: float, - notifier: aiomulti.AioProcessNotifier, + notifier: aiomulti.AioMpNotifier, stop_event: multiprocessing.synchronize.Event, ) -> None: @@ -101,9 +102,9 @@ def __init__( self.__stop_event = stop_event self.__clients: dict[str, _BtClient] = {} - self.__to_read: set[socket.socket] = set() + self.__to_read: set[socket.SocketType | multiprocessing.connection.Connection] = set() - self.__events_queue: "multiprocessing.Queue[BaseEvent]" = multiprocessing.Queue() + self.__events_q: aiomulti.AioMpQueue[BaseEvent] = aiomulti.AioMpQueue() self.__state_flags = aiomulti.AioSharedFlags({ "online": False, @@ -133,13 +134,13 @@ async def get_state(self) -> dict: def queue_event(self, event: BaseEvent) -> None: if not self.__stop_event.is_set(): - self.__events_queue.put_nowait(event) + self.__events_q.put_nowait(event) def clear_events(self) -> None: # FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между # очисткой и добавлением события ClearEvent. Неприятно, но не смертельно. # Починить блокировкой после перехода на асинхронные очереди. - tools.clear_queue(self.__events_queue) + self.__events_q.clear_current() self.queue_event(ClearEvent()) # ===== @@ -160,7 +161,7 @@ def __main_loop( # pylint: disable=too-many-branches server_int_sock: socket.socket, ) -> None: - qr = self.__events_queue._reader # type: ignore # pylint: disable=protected-access + qr = self.__events_q.get_reader() self.__to_read = set([qr, server_ctl_sock, server_int_sock]) self.__clients = {} @@ -213,9 +214,9 @@ def __process_leds(self, leds: int) -> None: ) def __process_events(self) -> None: # pylint: disable=too-many-branches - for _ in range(self.__events_queue.qsize()): + for _ in range(self.__events_q.qsize()): try: - event = self.__events_queue.get_nowait() + event = self.__events_q.get_nowait() except queue.Empty: break else: diff --git a/kvmd/plugins/hid/ch9329/__init__.py b/kvmd/plugins/hid/ch9329/__init__.py index ed3f5dcf6..272f1a479 100644 --- a/kvmd/plugins/hid/ch9329/__init__.py +++ b/kvmd/plugins/hid/ch9329/__init__.py @@ -31,9 +31,7 @@ from ....logging import get_logger from .... import tools -from .... import aiotools from .... import aiomulti -from .... import aioproc from ....yamlconf import Option @@ -51,7 +49,7 @@ # ===== -class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes +class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments,super-init-not-called self, ignore_keys: list[str], @@ -64,24 +62,25 @@ def __init__( # pylint: disable=too-many-arguments,super-init-not-called read_timeout: float, ) -> None: - BaseHid.__init__(self, ignore_keys=ignore_keys, **mouse_x_range, **mouse_y_range, **jiggler) - multiprocessing.Process.__init__(self, daemon=True) + super().__init__(ignore_keys=ignore_keys, **mouse_x_range, **mouse_y_range, **jiggler) self.__device_path = device_path self.__speed = speed self.__read_timeout = read_timeout self.__reset_required_event = multiprocessing.Event() - self.__cmd_queue: "multiprocessing.Queue[bytes]" = multiprocessing.Queue() + self.__cmd_q: aiomulti.AioMpQueue[bytes] = aiomulti.AioMpQueue() - self.__notifier = aiomulti.AioProcessNotifier() + self.__notifier = aiomulti.AioMpNotifier() self.__state_flags = aiomulti.AioSharedFlags({ "online": 0, "busy": 0, "status": 0, }, self.__notifier, type=int) + self.__proc = aiomulti.AioMpProcess("hid", self.__subprocess) self.__stop_event = multiprocessing.Event() + self.__chip = Chip(device_path, speed, read_timeout) self.__keyboard = Keyboard() self.__mouse = Mouse() @@ -95,9 +94,8 @@ def get_plugin_options(cls) -> dict: **cls._get_base_options(), } - def sysprep(self) -> None: - get_logger(0).info("Starting HID daemon ...") - self.start() + async def sysprep(self) -> None: + self.__proc.start() async def get_state(self) -> dict: state = await self.__state_flags.get() @@ -140,13 +138,10 @@ async def poll_state(self) -> AsyncGenerator[dict, None]: async def reset(self) -> None: self.__reset_required_event.set() - @aiotools.atomic_fg async def cleanup(self) -> None: - if self.is_alive(): - get_logger(0).info("Stopping HID daemon ...") + if self.__proc.is_alive(): self.__stop_event.set() - if self.is_alive() or self.exitcode is not None: - self.join() + await self.__proc.async_join() # ===== @@ -184,7 +179,7 @@ def _send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None: self.__queue_cmd(self.__mouse.process_relative(delta_x, delta_y)) def _clear_events(self) -> None: - tools.clear_queue(self.__cmd_queue) + self.__cmd_q.clear_current() def __queue_cmd(self, cmd: bytes, clear: bool=False) -> None: if not self.__stop_event.is_set(): @@ -192,11 +187,11 @@ def __queue_cmd(self, cmd: bytes, clear: bool=False) -> None: # FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между # очисткой и добавлением нового события. Неприятно, но не смертельно. # Починить блокировкой после перехода на асинхронные очереди. - tools.clear_queue(self.__cmd_queue) - self.__cmd_queue.put_nowait(cmd) + self.__cmd_q.clear_current() + self.__cmd_q.put_nowait(cmd) - def run(self) -> None: # pylint: disable=too-many-branches - logger = aioproc.settle("HID", "hid") + def __subprocess(self) -> None: + logger = get_logger(0) while not self.__stop_event.is_set(): try: self.__hid_loop() @@ -208,7 +203,7 @@ def __hid_loop(self) -> None: while not self.__stop_event.is_set(): try: with self.__chip.connected() as conn: - while not (self.__stop_event.is_set() and self.__cmd_queue.qsize() == 0): + while not (self.__stop_event.is_set() and self.__cmd_q.qsize() == 0): if self.__reset_required_event.is_set(): try: self.__set_state_busy(True) @@ -216,7 +211,7 @@ def __hid_loop(self) -> None: finally: self.__reset_required_event.clear() try: - cmd = self.__cmd_queue.get(timeout=0.1) + cmd = self.__cmd_q.get(timeout=0.1) # get_logger(0).info(f"HID : cmd = {cmd}") except queue.Empty: self.__process_cmd(conn, b"") diff --git a/kvmd/plugins/hid/ch9329/keyboard.py b/kvmd/plugins/hid/ch9329/keyboard.py index 98c8749a0..b4592d798 100644 --- a/kvmd/plugins/hid/ch9329/keyboard.py +++ b/kvmd/plugins/hid/ch9329/keyboard.py @@ -32,7 +32,7 @@ def __init__(self) -> None: "num": False, "caps": False, "scroll": False, - }, aiomulti.AioProcessNotifier(), bool) + }, aiomulti.AioMpNotifier(), bool) self.__mods = 0 self.__active_keys: list[int] = [] diff --git a/kvmd/plugins/hid/otg/__init__.py b/kvmd/plugins/hid/otg/__init__.py index bf615fdf7..17a102bc8 100644 --- a/kvmd/plugins/hid/otg/__init__.py +++ b/kvmd/plugins/hid/otg/__init__.py @@ -20,6 +20,7 @@ # ========================================================================== # +import asyncio import copy from typing import AsyncGenerator @@ -64,7 +65,7 @@ def __init__( self.__udc = udc - self.__notifier = aiomulti.AioProcessNotifier() + self.__notifier = aiomulti.AioMpNotifier() win98_fix = mouse.pop("absolute_win98_fix") common = {"notifier": self.__notifier, "noop": noop} @@ -122,7 +123,7 @@ def get_plugin_options(cls) -> dict: **cls._get_base_options(), } - def sysprep(self) -> None: + async def sysprep(self) -> None: udc = usb.find_udc(self.__udc) get_logger(0).info("Using UDC %s", udc) self.__keyboard_proc.start(udc) @@ -177,14 +178,13 @@ async def reset(self) -> None: self.__mouse_alt_proc.send_reset_event() async def cleanup(self) -> None: - try: - self.__keyboard_proc.cleanup() - finally: - try: - self.__mouse_proc.cleanup() - finally: - if self.__mouse_alt_proc: - self.__mouse_alt_proc.cleanup() + coros = [ + self.__keyboard_proc.cleanup(), + self.__mouse_proc.cleanup(), + ] + if self.__mouse_alt_proc: + coros.append(self.__mouse_alt_proc.cleanup()) + await asyncio.gather(*coros) # ===== diff --git a/kvmd/plugins/hid/otg/device.py b/kvmd/plugins/hid/otg/device.py index a3bc27393..5f5a99b4b 100644 --- a/kvmd/plugins/hid/otg/device.py +++ b/kvmd/plugins/hid/otg/device.py @@ -34,20 +34,19 @@ from .... import tools from .... import aiomulti -from .... import aioproc from .... import usb from .events import BaseEvent # ===== -class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-instance-attributes +class BaseDeviceProcess: # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments self, name: str, read_size: int, initial_state: dict, - notifier: aiomulti.AioProcessNotifier, + notifier: aiomulti.AioMpNotifier, device_path: str, select_timeout: float, @@ -56,8 +55,6 @@ def __init__( # pylint: disable=too-many-arguments noop: bool, ) -> None: - super().__init__(daemon=True) - self.__name = name self.__read_size = read_size @@ -67,21 +64,23 @@ def __init__( # pylint: disable=too-many-arguments self.__write_retries = write_retries self.__noop = noop - self.__udc_state_path = "" - self.__fd = -1 - self.__events_queue: "multiprocessing.Queue[BaseEvent]" = multiprocessing.Queue() + self.__proc = aiomulti.AioMpProcess(f"hid-{self.__name}", self.__subprocess) + self.__events_q: aiomulti.AioMpQueue[BaseEvent] = aiomulti.AioMpQueue() self.__state_flags = aiomulti.AioSharedFlags({"online": True, **initial_state}, notifier) self.__stop_event = multiprocessing.Event() + + self.__udc_state_path = "" + self.__fd = -1 self.__no_device_reported = False self.__logger: (logging.Logger | None) = None def start(self, udc: str) -> None: # type: ignore # pylint: disable=arguments-differ self.__udc_state_path = usb.get_udc_path(udc, usb.U_STATE) - super().start() + self.__proc.start() - def run(self) -> None: # pylint: disable=too-many-branches - self.__logger = aioproc.settle(f"HID-{self.__name}", f"hid-{self.__name}") + def __subprocess(self) -> None: # pylint: disable=too-many-branches + self.__logger = get_logger(0) report = b"" retries = 0 while not self.__stop_event.is_set(): @@ -91,7 +90,7 @@ def run(self) -> None: # pylint: disable=too-many-branches self.__read_all_reports() try: - event = self.__events_queue.get(timeout=self.__queue_timeout) + event = self.__events_q.get(timeout=self.__queue_timeout) except queue.Empty: # Проблема в том, что устройство может отвечать EAGAIN или ESHUTDOWN, # если оно было отключено физически. См: @@ -143,21 +142,19 @@ def _update_state(self, **kwargs: bool) -> None: # ===== - def _stop(self) -> None: - if self.is_alive(): - get_logger().info("Stopping HID-%s daemon ...", self.__name) + async def _stop(self) -> None: + if self.__proc.is_alive(): self.__stop_event.set() - if self.is_alive() or self.exitcode is not None: - self.join() + await self.__proc.async_join() def _queue_event(self, event: BaseEvent) -> None: - self.__events_queue.put_nowait(event) + self.__events_q.put_nowait(event) def _clear_queue(self) -> None: - tools.clear_queue(self.__events_queue) + self.__events_q.clear_current() def _cleanup_write(self, report: bytes) -> None: - assert not self.is_alive() + assert not self.__proc.is_alive() assert self.__fd < 0 if self.__ensure_device(): self.__write_report(report) @@ -194,7 +191,7 @@ def __write_report(self, report: bytes) -> bool: self.__name, written, len(report)) except Exception as ex: if isinstance(ex, OSError) and ( - # https://github.com/raspberrypi/linux/commit/61b7f805dc2fd364e0df682de89227e94ce88e25 + # https://github.com/raspberrypi/linux/commit/61b7f805dc2fd364e0df682de89227e94ce88e2 ex.errno == errno.EAGAIN # pylint: disable=no-member or ex.errno == errno.ESHUTDOWN # pylint: disable=no-member ): @@ -224,7 +221,10 @@ def __read_all_reports(self) -> None: try: report = os.read(self.__fd, self.__read_size) except Exception as ex: - if isinstance(ex, OSError) and ex.errno == errno.EAGAIN: # pylint: disable=no-member + if isinstance(ex, OSError) and ( + ex.errno == errno.EAGAIN # pylint: disable=no-member + or ex.errno == errno.ESHUTDOWN # pylint: disable=no-member + ): logger.debug("HID-%s busy/unplugged (read): %s", self.__name, tools.efmt(ex)) else: logger.exception("Can't read report from HID-%s", self.__name) diff --git a/kvmd/plugins/hid/otg/keyboard.py b/kvmd/plugins/hid/otg/keyboard.py index c2eefffcd..28a0e28ca 100644 --- a/kvmd/plugins/hid/otg/keyboard.py +++ b/kvmd/plugins/hid/otg/keyboard.py @@ -54,10 +54,12 @@ def __init__(self, **kwargs: Any) -> None: self.__pressed_mods: set[UsbKey] = set() self.__pressed_keys: list[UsbKey | None] = [None] * 6 - def cleanup(self) -> None: - self._stop() - get_logger().info("Clearing HID-keyboard events ...") - self._cleanup_write(b"\x00" * 8) # Release all keys and modifiers + async def cleanup(self) -> None: + try: + await self._stop() + finally: + get_logger().info("Clearing HID-keyboard events ...") + self._cleanup_write(b"\x00" * 8) # Release all keys and modifiers def send_clear_event(self) -> None: self._clear_queue() diff --git a/kvmd/plugins/hid/otg/mouse.py b/kvmd/plugins/hid/otg/mouse.py index 24dc9bf17..7219b3390 100644 --- a/kvmd/plugins/hid/otg/mouse.py +++ b/kvmd/plugins/hid/otg/mouse.py @@ -64,18 +64,20 @@ def set_win98_fix(self, enabled: bool) -> None: def get_win98_fix(self) -> bool: return self.__win98_fix - def cleanup(self) -> None: - self._stop() - get_logger().info("Clearing HID-mouse events ...") - report = make_mouse_report( - absolute=self.__absolute, - buttons=0, - move_x=(self.__x if self.__absolute else 0), - move_y=(self.__y if self.__absolute else 0), - wheel_x=(0 if self.__horizontal_wheel else None), - wheel_y=0, - ) - self._cleanup_write(report) # Release all buttons + async def cleanup(self) -> None: + try: + await self._stop() + finally: + get_logger().info("Clearing HID-mouse events ...") + report = make_mouse_report( + absolute=self.__absolute, + buttons=0, + move_x=(self.__x if self.__absolute else 0), + move_y=(self.__y if self.__absolute else 0), + wheel_x=(0 if self.__horizontal_wheel else None), + wheel_y=0, + ) + self._cleanup_write(report) # Release all buttons def send_clear_event(self) -> None: self._clear_queue() diff --git a/kvmd/plugins/msd/__init__.py b/kvmd/plugins/msd/__init__.py index b2f9d50ef..2e647839a 100644 --- a/kvmd/plugins/msd/__init__.py +++ b/kvmd/plugins/msd/__init__.py @@ -21,6 +21,7 @@ import os +import asyncio import contextlib import time @@ -283,7 +284,7 @@ async def open(self) -> "MsdFileWriter": get_logger(1).info("Writing %r image (%d bytes) to MSD ...", self.__name, self.__file_size) await aiofiles.os.makedirs(os.path.dirname(self.__path), exist_ok=True) self.__file = await aiofiles.open(self.__path, mode="w+b", buffering=0) # type: ignore - await aiotools.run_async(os.ftruncate, self.__file.fileno(), self.__file_size) # type: ignore + await asyncio.to_thread(os.ftruncate, self.__file.fileno(), self.__file_size) # type: ignore return self async def finish(self) -> bool: @@ -309,7 +310,7 @@ async def close(self) -> None: async def __sync(self) -> None: assert self.__file is not None await self.__file.flush() # type: ignore - await aiotools.run_async(os.fsync, self.__file.fileno()) # type: ignore + await asyncio.to_thread(os.fsync, self.__file.fileno()) # type: ignore # ===== diff --git a/kvmd/plugins/msd/otg/__init__.py b/kvmd/plugins/msd/otg/__init__.py index 1245c4f68..725bcf6a4 100644 --- a/kvmd/plugins/msd/otg/__init__.py +++ b/kvmd/plugins/msd/otg/__init__.py @@ -132,6 +132,8 @@ def __init__( # pylint: disable=super-init-not-called self.__initial_image: str = initial["image"] self.__initial_cdrom: bool = initial["cdrom"] + self.__gadget = gadget # Only for sysprep() + self.__drive = Drive(gadget, instance=0, lun=0) self.__storage = Storage(fstab.find_msd().root_path, remount_cmd) @@ -142,10 +144,6 @@ def __init__( # pylint: disable=super-init-not-called self.__state = _State(self.__notifier) self.__reset = False - logger = get_logger(0) - logger.info("Using OTG gadget %r as MSD", gadget) - aiotools.run_sync(self.__unsafe_reload_state()) - @classmethod def get_plugin_options(cls) -> dict: return { @@ -166,6 +164,10 @@ def get_plugin_options(cls) -> dict: # ===== + async def sysprep(self) -> None: + get_logger(0).info("Using OTG gadget %r as MSD", self.__gadget) + await self.__unsafe_reload_state() + async def get_state(self) -> dict: async with self.__state._lock: # pylint: disable=protected-access storage: (dict | None) = None @@ -436,8 +438,10 @@ async def __close_writer(self) -> None: @aiotools.atomic_fg async def cleanup(self) -> None: - await self.__close_reader() - await self.__close_writer() + try: + await self.__close_reader() + finally: + await self.__close_writer() async def systask(self) -> None: logger = get_logger(0) diff --git a/kvmd/plugins/msd/otg/storage.py b/kvmd/plugins/msd/otg/storage.py index 046f10fab..14c1e5a91 100644 --- a/kvmd/plugins/msd/otg/storage.py +++ b/kvmd/plugins/msd/otg/storage.py @@ -31,7 +31,6 @@ import aiofiles import aiofiles.os -from .... import aiotools from .... import aiohelpers from .. import MsdError @@ -59,7 +58,7 @@ def __init__(self, name: str, path: str, storage: Optional["Storage"]) -> None: async def _reload(self) -> None: # Only for Storage() and set_complete() # adopted используется в последующих проверках - self.__adopted = await aiotools.run_async(self.__is_adopted) + self.__adopted = await asyncio.to_thread(self.__is_adopted) complete = await self.__is_complete() removable = await self.__is_removable() (size, mod_ts) = await self.__get_stat() @@ -156,7 +155,7 @@ def __init__(self, name: str, path: str) -> None: self.__path = path async def _reload(self) -> None: # Only for Storage() - st = await aiotools.run_async(os.statvfs, self.__path) + st = await asyncio.to_thread(os.statvfs, self.__path) if self.name == "": writable = True else: @@ -200,7 +199,7 @@ async def reload(self) -> None: watchable_paths: list[str] = [] images: dict[str, Image] = {} parts: dict[str, _Part] = {} - for (root_path, is_part, files) in (await aiotools.run_async(self.__walk)): + for (root_path, is_part, files) in (await asyncio.to_thread(self.__walk)): watchable_paths.append(root_path) for path in files: name = self.__make_relative_name(path) diff --git a/kvmd/plugins/ugpio/__init__.py b/kvmd/plugins/ugpio/__init__.py index 6f9dfaec1..60ba67e4b 100644 --- a/kvmd/plugins/ugpio/__init__.py +++ b/kvmd/plugins/ugpio/__init__.py @@ -85,7 +85,7 @@ def register_output(self, pin: str, initial: (bool | None)) -> None: _ = pin _ = initial - def prepare(self) -> None: + async def prepare(self) -> None: pass async def run(self) -> None: diff --git a/kvmd/plugins/ugpio/anelpwr.py b/kvmd/plugins/ugpio/anelpwr.py index 3049fdd25..e13ec9318 100644 --- a/kvmd/plugins/ugpio/anelpwr.py +++ b/kvmd/plugins/ugpio/anelpwr.py @@ -98,14 +98,12 @@ def register_output(self, pin: str, initial: (bool | None)) -> None: self.__initial[pin] = initial self.__state[pin] = None - def prepare(self) -> None: - async def inner_prepare() -> None: - await asyncio.gather(*[ - self.write(pin, state) - for (pin, state) in self.__initial.items() - if state is not None - ], return_exceptions=True) - aiotools.run_sync(inner_prepare()) + async def prepare(self) -> None: + await asyncio.gather(*[ + self.write(pin, state) + for (pin, state) in self.__initial.items() + if state is not None + ], return_exceptions=True) async def run(self) -> None: prev_state: (dict | None) = None diff --git a/kvmd/plugins/ugpio/extron.py b/kvmd/plugins/ugpio/extron.py index 08a18d42a..3f43152d3 100644 --- a/kvmd/plugins/ugpio/extron.py +++ b/kvmd/plugins/ugpio/extron.py @@ -34,7 +34,6 @@ from ... import aiotools from ... import aiomulti -from ... import aioproc from ...yamlconf import Option @@ -67,11 +66,11 @@ def __init__( self.__read_timeout = read_timeout self.__protocol = protocol - self.__ctl_queue: "multiprocessing.Queue[int]" = multiprocessing.Queue() - self.__channel_queue: "multiprocessing.Queue[int | None]" = multiprocessing.Queue() + self.__ctl_q: aiomulti.AioMpQueue[int] = aiomulti.AioMpQueue() + self.__channel_q: aiomulti.AioMpQueue[int | None] = aiomulti.AioMpQueue() self.__channel: (int | None) = -1 - self.__proc: (multiprocessing.Process | None) = None + self.__proc = aiomulti.AioMpProcess(f"gpio-extron-{self._instance_name}", self.__serial_worker) self.__stop_event = multiprocessing.Event() @classmethod @@ -87,25 +86,20 @@ def get_plugin_options(cls) -> dict: def get_pin_validator(cls) -> Callable[[Any], Any]: return valid_number.mk(min=0, max=3, name="Extron USB channel") - def prepare(self) -> None: - assert self.__proc is None - self.__proc = multiprocessing.Process(target=self.__serial_worker, daemon=True) + async def prepare(self) -> None: self.__proc.start() async def run(self) -> None: while True: - (got, channel) = await aiomulti.queue_get_last(self.__channel_queue, 1) + (got, channel) = await self.__channel_q.async_fetch_last(1) if got and self.__channel != channel: self.__channel = channel self._notifier.notify() async def cleanup(self) -> None: - if self.__proc is not None: - if self.__proc.is_alive(): - get_logger(0).info("Stopping %s daemon ...", self) - self.__stop_event.set() - if self.__proc.is_alive() or self.__proc.exitcode is not None: - self.__proc.join() + if self.__proc.is_alive(): + self.__stop_event.set() + await self.__proc.async_join() async def read(self, pin: str) -> bool: if not self.__is_online(): @@ -116,24 +110,23 @@ async def write(self, pin: str, state: bool) -> None: if not self.__is_online(): raise GpioDriverOfflineError(self) if state: - self.__ctl_queue.put_nowait(int(pin)) + self.__ctl_q.put_nowait(int(pin)) # ===== def __is_online(self) -> bool: return ( - self.__proc is not None - and self.__proc.is_alive() + self.__proc.is_alive() and self.__channel is not None ) def __serial_worker(self) -> None: - logger = aioproc.settle(str(self), f"gpio-extron-{self._instance_name}") + logger = get_logger(0) while not self.__stop_event.is_set(): try: with self.__get_serial() as tty: data = b"" - self.__channel_queue.put_nowait(-1) + self.__channel_q.put_nowait(-1) # Switch and then recieve the state. # FIXME: Get actual state without modifying the current. @@ -142,15 +135,15 @@ def __serial_worker(self) -> None: while not self.__stop_event.is_set(): (channel, data) = self.__recv_channel(tty, data) if channel is not None: - self.__channel_queue.put_nowait(channel) + self.__channel_q.put_nowait(channel) - (got, channel) = aiomulti.queue_get_last_sync(self.__ctl_queue, 0.1) # type: ignore + (got, channel) = self.__ctl_q.fetch_last(0.1) if got: assert channel is not None self.__send_channel(tty, channel) except Exception as ex: - self.__channel_queue.put_nowait(None) + self.__channel_q.put_nowait(None) if isinstance(ex, serial.SerialException) and ex.errno == errno.ENOENT: # pylint: disable=no-member logger.error("Missing %s serial device: %s", self, self.__device_path) else: diff --git a/kvmd/plugins/ugpio/ezcoo.py b/kvmd/plugins/ugpio/ezcoo.py index 37e0293ba..58d104f52 100644 --- a/kvmd/plugins/ugpio/ezcoo.py +++ b/kvmd/plugins/ugpio/ezcoo.py @@ -34,7 +34,6 @@ from ... import aiotools from ... import aiomulti -from ... import aioproc from ...yamlconf import Option @@ -67,11 +66,11 @@ def __init__( self.__read_timeout = read_timeout self.__protocol = protocol - self.__ctl_queue: "multiprocessing.Queue[int]" = multiprocessing.Queue() - self.__channel_queue: "multiprocessing.Queue[int | None]" = multiprocessing.Queue() - self.__channel: (int | None) = -1 + self.__ctl_q: aiomulti.AioMpQueue[int] = aiomulti.AioMpQueue() + self.__ch_q: aiomulti.AioMpQueue[int | None] = aiomulti.AioMpQueue() + self.__ch: (int | None) = -1 - self.__proc: (multiprocessing.Process | None) = None + self.__proc = aiomulti.AioMpProcess(f"gpio-ezcoo-{self._instance_name}", self.__serial_worker) self.__stop_event = multiprocessing.Event() @classmethod @@ -87,70 +86,67 @@ def get_plugin_options(cls) -> dict: def get_pin_validator(cls) -> Callable[[Any], Any]: return valid_number.mk(min=0, max=3, name="Ezcoo channel") - def prepare(self) -> None: - assert self.__proc is None - self.__proc = multiprocessing.Process(target=self.__serial_worker, daemon=True) + async def prepare(self) -> None: self.__proc.start() async def run(self) -> None: while True: - (got, channel) = await aiomulti.queue_get_last(self.__channel_queue, 1) - if got and self.__channel != channel: - self.__channel = channel + (got, ch) = await self.__ch_q.async_fetch_last(1) + if got and self.__ch != ch: + self.__ch = ch self._notifier.notify() async def cleanup(self) -> None: - if self.__proc is not None: - if self.__proc.is_alive(): - get_logger(0).info("Stopping %s daemon ...", self) - self.__stop_event.set() - if self.__proc.is_alive() or self.__proc.exitcode is not None: - self.__proc.join() + if self.__proc.is_alive(): + self.__stop_event.set() + await self.__proc.async_join() async def read(self, pin: str) -> bool: if not self.__is_online(): raise GpioDriverOfflineError(self) - return (self.__channel == int(pin)) + return (self.__ch == int(pin)) async def write(self, pin: str, state: bool) -> None: if not self.__is_online(): raise GpioDriverOfflineError(self) if state: - self.__ctl_queue.put_nowait(int(pin)) + self.__ctl_q.put_nowait(int(pin)) # ===== def __is_online(self) -> bool: return ( - self.__proc is not None - and self.__proc.is_alive() - and self.__channel is not None + self.__proc.is_alive() + and self.__ch is not None ) def __serial_worker(self) -> None: - logger = aioproc.settle(str(self), f"gpio-ezcoo-{self._instance_name}") + logger = get_logger(0) while not self.__stop_event.is_set(): try: with self.__get_serial() as tty: data = b"" - self.__channel_queue.put_nowait(-1) + self.__ch_q.put_nowait(-1) - # Switch and then recieve the state. - # FIXME: Get actual state without modifying the current. - self.__send_channel(tty, 0) + # Get actual state without modifying the current + if self.__protocol <= 1: + tty.write(b"GET OUT1 VS\n" * 2) # Twice because of some bugs + else: + tty.write(b"EZG OUT1 VS\n" * 2) + tty.flush() while not self.__stop_event.is_set(): - (channel, data) = self.__recv_channel(tty, data) - if channel is not None: - self.__channel_queue.put_nowait(channel) + (ch, data) = self.__recv_channel(tty, data) + if ch is not None: + self.__ch_q.put_nowait(ch) - (got, channel) = aiomulti.queue_get_last_sync(self.__ctl_queue, 0.1) # type: ignore + (got, ch) = self.__ctl_q.fetch_last(0.1) if got: - assert channel is not None - self.__send_channel(tty, channel) + assert ch is not None + self.__send_channel(tty, ch) except Exception as ex: - self.__channel_queue.put_nowait(None) + self.__ch_q.put_nowait(None) if isinstance(ex, serial.SerialException) and ex.errno == errno.ENOENT: # pylint: disable=no-member logger.error("Missing %s serial device: %s", self, self.__device_path) else: @@ -161,25 +157,30 @@ def __get_serial(self) -> serial.Serial: return serial.Serial(self.__device_path, self.__speed, timeout=self.__read_timeout) def __recv_channel(self, tty: serial.Serial, data: bytes) -> tuple[(int | None), bytes]: - channel: (int | None) = None + ch: (int | None) = None if tty.in_waiting: data += tty.read_all() - found = re.findall(b"V[0-9a-fA-F]{2}S", data) + found = list(re.finditer(b"(OUT1 VS \\d+)|(V[0-9a-fA-F]{2}S)", data)) if found: - channel = { - b"V0CS": 0, + last = found[-1] + ch = { + b"V0CS": 0, # Switching retval (manual or via the TTY) b"V18S": 1, b"V5ES": 2, b"V08S": 3, - }.get(found[-1], -1) - data = data[-8:] - return (channel, data) - - def __send_channel(self, tty: serial.Serial, channel: int) -> None: - assert 0 <= channel <= 3 + b"OUT1 VS 1": 0, # "EZG OUT1 VS" return value + b"OUT1 VS 2": 1, + b"OUT1 VS 3": 2, + b"OUT1 VS 4": 3, + }.get(last[0], -1) + data = data[last.end(0):] + return (ch, data) + + def __send_channel(self, tty: serial.Serial, ch: int) -> None: + assert 0 <= ch <= 3 cmd = b"%s OUT1 VS IN%d\n" % ( (b"SET" if self.__protocol == 1 else b"EZS"), - channel + 1, + ch + 1, ) tty.write(cmd * 2) # Twice because of ezcoo bugs tty.flush() diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py index 6cda826b2..fa8d8dd1c 100644 --- a/kvmd/plugins/ugpio/gpio.py +++ b/kvmd/plugins/ugpio/gpio.py @@ -72,7 +72,7 @@ def register_input(self, pin: str, debounce: float) -> None: def register_output(self, pin: str, initial: (bool | None)) -> None: self.__output_pins[int(pin)] = initial - def prepare(self) -> None: + async def prepare(self) -> None: assert self.__reader is None assert self.__outputs_req is None self.__reader = aiogp.AioReader( diff --git a/kvmd/plugins/ugpio/gz_hk401x.py b/kvmd/plugins/ugpio/gz_hk401x.py new file mode 100644 index 000000000..9cf34f9c1 --- /dev/null +++ b/kvmd/plugins/ugpio/gz_hk401x.py @@ -0,0 +1,189 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2024 Maxim Devaev # +# 2021-2021 Sebastian Goscik # +# 2023-2026 Up # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import re +import multiprocessing +import errno +import time + +from typing import Callable +from typing import Any + +import serial + +from ...logging import get_logger + +from ... import aiotools +from ... import aiomulti + +from ...yamlconf import Option + +from ...validators.basic import valid_number +from ...validators.basic import valid_float_f01 +from ...validators.os import valid_abs_path +from ...validators.hw import valid_tty_speed + +from . import GpioDriverOfflineError +from . import BaseUserGpioDriver + + +# ===== +class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attributes + + MIN_CHANNEL = 0 + MAX_CHANNEL = 3 + + def __init__( + self, + instance_name: str, + notifier: aiotools.AioNotifier, + + device_path: str, + speed: int, + read_timeout: float, + ) -> None: + + super().__init__(instance_name, notifier) + + self.__device_path = device_path + self.__speed = speed + self.__read_timeout = read_timeout + + self.__ctl_q: aiomulti.AioMpQueue[int] = aiomulti.AioMpQueue() + self.__channel_q: aiomulti.AioMpQueue[int | None] = aiomulti.AioMpQueue() + self.__channel: (int | None) = -1 + + self.__proc = aiomulti.AioMpProcess(f"gpio-gz-hk401x-{self._instance_name}", self.__serial_worker) + self.__stop_event = multiprocessing.Event() + + @classmethod + def get_plugin_options(cls) -> dict: + return { + "device": Option("", type=valid_abs_path, unpack_as="device_path"), + "speed": Option(9600, type=valid_tty_speed), + "read_timeout": Option(2.0, type=valid_float_f01), + } + + @classmethod + def get_pin_validator(cls) -> Callable[[Any], Any]: + return valid_number.mk(min=0, max=3, name="GZ-HK401x channel") + + async def prepare(self) -> None: + self.__proc.start() + + async def run(self) -> None: + while True: + (got, channel) = await self.__channel_q.async_fetch_last(1) + if got and self.__channel != channel: + self.__channel = channel + self._notifier.notify() + + async def cleanup(self) -> None: + if self.__proc.is_alive(): + self.__stop_event.set() + await self.__proc.async_join() + + async def read(self, pin: str) -> bool: + if not self.__is_online(): + raise GpioDriverOfflineError(self) + return (self.__channel == int(pin)) + + async def write(self, pin: str, state: bool) -> None: + if not self.__is_online(): + raise GpioDriverOfflineError(self) + if state: + self.__ctl_q.put_nowait(int(pin)) + + # ===== + + def __is_online(self) -> bool: + return ( + self.__proc.is_alive() + and self.__channel is not None + ) + + def __serial_worker(self) -> None: + logger = get_logger(0) + while not self.__stop_event.is_set(): + try: + with self.__get_serial() as tty: + data = b"" + self.__channel_q.put_nowait(-1) + + # Wait for first port heartbeat to set correct channel (~2 sec max). + # Only for the classic switch with protocol version 1. + + while not self.__stop_event.is_set(): + (channel, data) = self.__recv_channel(tty, data) + if channel is not None: + self.__channel_q.put_nowait(channel) + + (got, channel) = self.__ctl_q.fetch_last(0.1) + if got: + assert channel is not None + self.__send_channel(tty, channel) + + + except Exception as ex: + self.__channel_q.put_nowait(None) + if isinstance(ex, serial.SerialException) and ex.errno == errno.ENOENT: # pylint: disable=no-member + logger.error("Missing %s serial device: %s", self, self.__device_path) + else: + logger.exception("Unexpected %s error", self) + time.sleep(1) + + def __get_serial(self) -> serial.Serial: + return serial.Serial(self.__device_path, self.__speed, timeout=self.__read_timeout) + + def __recv_channel(self, tty: serial.Serial, data: bytes) -> tuple[(int | None), bytes]: + channel: (int | None) = None + if tty.in_waiting: + data += tty.read_all() + get_logger(0).debug('Driver %s received serial data" %s', self._instance_name, data) + if len(data) != 1: + get_logger(0).warning('Driver %s received invalid data: "%s" .', self._instance_name, data) + else: + response = int.from_bytes(data, 'little', signed=False) + if response < Plugin.MIN_CHANNEL + 1 or response > Plugin.MAX_CHANNEL + 1: + get_logger(0).warning('Driver %s received invalid serial data: "%s" .', self._instance_name, data) + else: + channel = response - 1 + data = b"" + return (channel, data) + + def __send_channel(self, tty: serial.Serial, channel: int) -> None: + get_logger(0).info('Sending channel %s', channel) + assert 0 <= channel <= 3 + channel += 1 + channel_byte = 0x30 + channel + cmd = bytearray(b'\xfe\x00\x33') + cmd.append(channel_byte) + cmd.append(0xaa) + tty.write(bytes(cmd)) + tty.flush() + + def __str__(self) -> str: + return f"GZ-HK401X({self._instance_name})" + + __repr__ = __str__ diff --git a/kvmd/plugins/ugpio/hidrelay.py b/kvmd/plugins/ugpio/hidrelay.py index 1c8fe3cb5..483eac10d 100644 --- a/kvmd/plugins/ugpio/hidrelay.py +++ b/kvmd/plugins/ugpio/hidrelay.py @@ -86,7 +86,7 @@ def get_pin_validator(cls) -> Callable[[Any], Any]: def register_output(self, pin: str, initial: (bool | None)) -> None: self.__initials[int(pin)] = initial - def prepare(self) -> None: + async def prepare(self) -> None: logger = get_logger(0) logger.info("Probing driver %s on %s ...", self, self.__device_path) try: diff --git a/kvmd/plugins/ugpio/hue.py b/kvmd/plugins/ugpio/hue.py index f03009ee9..0050eac5b 100644 --- a/kvmd/plugins/ugpio/hue.py +++ b/kvmd/plugins/ugpio/hue.py @@ -97,14 +97,12 @@ def register_output(self, pin: str, initial: (bool | None)) -> None: self.__initial[pin] = initial self.__state[pin] = None - def prepare(self) -> None: - async def inner_prepare() -> None: - await asyncio.gather(*[ - self.write(pin, state) - for (pin, state) in self.__initial.items() - if state is not None - ], return_exceptions=True) - aiotools.run_sync(inner_prepare()) + async def prepare(self) -> None: + await asyncio.gather(*[ + self.write(pin, state) + for (pin, state) in self.__initial.items() + if state is not None + ], return_exceptions=True) async def run(self) -> None: prev_state: (dict | None) = None diff --git a/kvmd/plugins/ugpio/ipmi.py b/kvmd/plugins/ugpio/ipmi.py index 37a7a16f7..3e5ec8302 100644 --- a/kvmd/plugins/ugpio/ipmi.py +++ b/kvmd/plugins/ugpio/ipmi.py @@ -123,7 +123,7 @@ def register_output(self, pin: str, initial: (bool | None)) -> None: if pin not in [*_OUTPUTS, *_OUTPUTS.values()]: raise RuntimeError(f"Unsupported mode 'output' for pin={pin} on {self}") - def prepare(self) -> None: + async def prepare(self) -> None: get_logger(0).info("Probing driver %s on %s:%d ...", self, self.__host, self.__port) async def run(self) -> None: diff --git a/kvmd/plugins/ugpio/locator.py b/kvmd/plugins/ugpio/locator.py index d5cba7194..6aba9f583 100644 --- a/kvmd/plugins/ugpio/locator.py +++ b/kvmd/plugins/ugpio/locator.py @@ -73,7 +73,7 @@ def register_output(self, pin: str, initial: (bool | None)) -> None: _ = initial self.__tasks[int(pin)] = None - def prepare(self) -> None: + async def prepare(self) -> None: self.__line_req = gpiod.request_lines( self.__device_path, consumer="kvmd::locator", diff --git a/kvmd/plugins/ugpio/noyito.py b/kvmd/plugins/ugpio/noyito.py index c1cc0cfd6..074ac4ae0 100644 --- a/kvmd/plugins/ugpio/noyito.py +++ b/kvmd/plugins/ugpio/noyito.py @@ -84,7 +84,7 @@ def get_pin_validator(cls) -> Callable[[Any], Any]: def register_output(self, pin: str, initial: (bool | None)) -> None: self.__initials[int(pin)] = bool(initial) - def prepare(self) -> None: + async def prepare(self) -> None: logger = get_logger(0) logger.info("Probing driver %s on %s ...", self, self.__device_path) try: diff --git a/kvmd/plugins/ugpio/otgconf.py b/kvmd/plugins/ugpio/otgconf.py index 6624cfd42..39abef0e9 100644 --- a/kvmd/plugins/ugpio/otgconf.py +++ b/kvmd/plugins/ugpio/otgconf.py @@ -66,7 +66,7 @@ def __init__( def get_pin_validator(cls) -> Callable[[Any], Any]: return valid_stripped_string_not_empty - def prepare(self) -> None: + async def prepare(self) -> None: self.__udc = usb.find_udc(self.__udc) get_logger().info("Using UDC %s", self.__udc) diff --git a/kvmd/plugins/ugpio/pway.py b/kvmd/plugins/ugpio/pway.py index 252c44aa0..1945206db 100644 --- a/kvmd/plugins/ugpio/pway.py +++ b/kvmd/plugins/ugpio/pway.py @@ -36,7 +36,6 @@ from ... import aiotools from ... import aiomulti -from ... import aioproc from ...yamlconf import Option @@ -69,11 +68,11 @@ def __init__( self.__read_timeout = read_timeout self.__protocol = protocol - self.__ctl_queue: "multiprocessing.Queue[int]" = multiprocessing.Queue() - self.__channel_queue: "multiprocessing.Queue[int | None]" = multiprocessing.Queue() + self.__ctl_q: aiomulti.AioMpQueue[int] = aiomulti.AioMpQueue() + self.__channel_q: aiomulti.AioMpQueue[int | None] = aiomulti.AioMpQueue() self.__channel: (int | None) = -1 - self.__proc: (multiprocessing.Process | None) = None + self.__proc = aiomulti.AioMpProcess(f"gpio-pway-{self._instance_name}", self.__serial_worker) self.__stop_event = multiprocessing.Event() @classmethod @@ -89,25 +88,20 @@ def get_plugin_options(cls) -> dict: def get_pin_validator(cls) -> Callable[[Any], Any]: return valid_number.mk(min=0, max=15, name="PWAY channel") - def prepare(self) -> None: - assert self.__proc is None - self.__proc = multiprocessing.Process(target=self.__serial_worker, daemon=True) + async def prepare(self) -> None: self.__proc.start() async def run(self) -> None: while True: - (got, channel) = await aiomulti.queue_get_last(self.__channel_queue, 1) + (got, channel) = await self.__channel_q.async_fetch_last(1) if got and self.__channel != channel: self.__channel = channel self._notifier.notify() async def cleanup(self) -> None: - if self.__proc is not None: - if self.__proc.is_alive(): - get_logger(0).info("Stopping %s daemon ...", self) - self.__stop_event.set() - if self.__proc.is_alive() or self.__proc.exitcode is not None: - self.__proc.join() + if self.__proc.is_alive(): + self.__stop_event.set() + await self.__proc.async_join() async def read(self, pin: str) -> bool: if not self.__is_online(): @@ -118,24 +112,23 @@ async def write(self, pin: str, state: bool) -> None: if not self.__is_online(): raise GpioDriverOfflineError(self) if state: - self.__ctl_queue.put_nowait(int(pin)) + self.__ctl_q.put_nowait(int(pin)) # ===== def __is_online(self) -> bool: return ( - self.__proc is not None - and self.__proc.is_alive() + self.__proc.is_alive() and self.__channel is not None ) def __serial_worker(self) -> None: - logger = aioproc.settle(str(self), f"gpio-pway-{self._instance_name}") + logger = get_logger(0) while not self.__stop_event.is_set(): try: with self.__get_serial() as tty: data = b"" - self.__channel_queue.put_nowait(-1) + self.__channel_q.put_nowait(-1) # Switch and then recieve the state. # FIXME: Get actual state without modifying the current. @@ -145,15 +138,15 @@ def __serial_worker(self) -> None: while not self.__stop_event.is_set(): (channel, data) = self.__recv_channel(tty, data) if channel is not None: - self.__channel_queue.put_nowait(channel) + self.__channel_q.put_nowait(channel) - (got, channel) = aiomulti.queue_get_last_sync(self.__ctl_queue, 0.1) # type: ignore + (got, channel) = self.__ctl_q.fetch_last(0.1) if got: assert channel is not None self.__send_channel(tty, channel) except Exception as ex: - self.__channel_queue.put_nowait(None) + self.__channel_q.put_nowait(None) if isinstance(ex, serial.SerialException) and ex.errno == errno.ENOENT: # pylint: disable=no-member logger.error("Missing %s serial device: %s", self, self.__device_path) else: diff --git a/kvmd/plugins/ugpio/pwm.py b/kvmd/plugins/ugpio/pwm.py index 42ed21703..4ad877aa1 100644 --- a/kvmd/plugins/ugpio/pwm.py +++ b/kvmd/plugins/ugpio/pwm.py @@ -84,7 +84,7 @@ def get_pin_validator(cls) -> Callable[[Any], Any]: def register_output(self, pin: str, initial: (bool | None)) -> None: self.__channels[int(pin)] = initial - def prepare(self) -> None: + async def prepare(self) -> None: logger = get_logger(0) for (pin, initial) in self.__channels.items(): try: diff --git a/kvmd/plugins/ugpio/xh_hk4401.py b/kvmd/plugins/ugpio/xh_hk4401.py index 1a8e40715..d2a42fd25 100644 --- a/kvmd/plugins/ugpio/xh_hk4401.py +++ b/kvmd/plugins/ugpio/xh_hk4401.py @@ -35,7 +35,6 @@ from ... import aiotools from ... import aiomulti -from ... import aioproc from ...yamlconf import Option @@ -68,11 +67,11 @@ def __init__( self.__read_timeout = read_timeout self.__protocol = protocol # https://github.com/pikvm/kvmd/pull/158 - self.__ctl_queue: "multiprocessing.Queue[int]" = multiprocessing.Queue() - self.__channel_queue: "multiprocessing.Queue[int | None]" = multiprocessing.Queue() + self.__ctl_q: aiomulti.AioMpQueue[int] = aiomulti.AioMpQueue() + self.__channel_q: aiomulti.AioMpQueue[int | None] = aiomulti.AioMpQueue() self.__channel: (int | None) = -1 - self.__proc: (multiprocessing.Process | None) = None + self.__proc = aiomulti.AioMpProcess(f"gpio-xh-hk4401-{self._instance_name}", self.__serial_worker) self.__stop_event = multiprocessing.Event() @classmethod @@ -88,25 +87,20 @@ def get_plugin_options(cls) -> dict: def get_pin_validator(cls) -> Callable[[Any], Any]: return valid_number.mk(min=0, max=3, name="XH-HK4401 channel") - def prepare(self) -> None: - assert self.__proc is None - self.__proc = multiprocessing.Process(target=self.__serial_worker, daemon=True) + async def prepare(self) -> None: self.__proc.start() async def run(self) -> None: while True: - (got, channel) = await aiomulti.queue_get_last(self.__channel_queue, 1) + (got, channel) = await self.__channel_q.async_fetch_last(1) if got and self.__channel != channel: self.__channel = channel self._notifier.notify() async def cleanup(self) -> None: - if self.__proc is not None: - if self.__proc.is_alive(): - get_logger(0).info("Stopping %s daemon ...", self) - self.__stop_event.set() - if self.__proc.is_alive() or self.__proc.exitcode is not None: - self.__proc.join() + if self.__proc.is_alive(): + self.__stop_event.set() + await self.__proc.async_join() async def read(self, pin: str) -> bool: if not self.__is_online(): @@ -117,47 +111,46 @@ async def write(self, pin: str, state: bool) -> None: if not self.__is_online(): raise GpioDriverOfflineError(self) if state: - self.__ctl_queue.put_nowait(int(pin)) + self.__ctl_q.put_nowait(int(pin)) # ===== def __is_online(self) -> bool: return ( - self.__proc is not None - and self.__proc.is_alive() + self.__proc.is_alive() and self.__channel is not None ) def __serial_worker(self) -> None: - logger = aioproc.settle(str(self), f"gpio-xh-hk4401-{self._instance_name}") + logger = get_logger(0) while not self.__stop_event.is_set(): try: with self.__get_serial() as tty: data = b"" - self.__channel_queue.put_nowait(-1) + self.__channel_q.put_nowait(-1) # Wait for first port heartbeat to set correct channel (~2 sec max). # Only for the classic switch with protocol version 1. while self.__protocol == 1: (channel, data) = self.__recv_channel(tty, data) if channel is not None: - self.__channel_queue.put_nowait(channel) + self.__channel_q.put_nowait(channel) break while not self.__stop_event.is_set(): (channel, data) = self.__recv_channel(tty, data) if channel is not None: - self.__channel_queue.put_nowait(channel) + self.__channel_q.put_nowait(channel) - (got, channel) = aiomulti.queue_get_last_sync(self.__ctl_queue, 0.1) # type: ignore + (got, channel) = self.__ctl_q.fetch_last(0.1) if got: assert channel is not None self.__send_channel(tty, channel) if self.__protocol == 2: - self.__channel_queue.put_nowait(channel) + self.__channel_q.put_nowait(channel) except Exception as ex: - self.__channel_queue.put_nowait(None) + self.__channel_q.put_nowait(None) if isinstance(ex, serial.SerialException) and ex.errno == errno.ENOENT: # pylint: disable=no-member logger.error("Missing %s serial device: %s", self, self.__device_path) else: diff --git a/kvmd/tools.py b/kvmd/tools.py index 18a976bf2..90f1e2091 100644 --- a/kvmd/tools.py +++ b/kvmd/tools.py @@ -22,11 +22,8 @@ import os import tempfile -import asyncio import operator import contextlib -import multiprocessing.queues -import queue import shlex from typing import Generator @@ -88,15 +85,6 @@ def is_dict(kvs: Any, *path: str) -> bool: return True -# ===== -def clear_queue(q: (multiprocessing.queues.Queue | asyncio.Queue)) -> None: # pylint: disable=invalid-name - for _ in range(q.qsize()): - try: - q.get_nowait() - except (queue.Empty, asyncio.QueueEmpty): - break - - # ===== def build_cmd(cmd: list[str], cmd_remove: list[str], cmd_append: list[str]) -> list[str]: assert len(cmd) >= 1, cmd diff --git a/setup.py b/setup.py index c3feb46d8..b16f45f5b 100755 --- a/setup.py +++ b/setup.py @@ -56,7 +56,7 @@ def main() -> None: setup( name="kvmd", - version="4.131", + version="4.145", url="https://github.com/pikvm/kvmd", license="GPLv3", author="Maxim Devaev", @@ -69,6 +69,8 @@ def main() -> None: "kvmd.validators", "kvmd.yamlconf", "kvmd.keyboard", + "kvmd.nbd", + "kvmd.nbd.remotes", "kvmd.plugins", "kvmd.plugins.auth", "kvmd.plugins.hid", @@ -97,6 +99,7 @@ def main() -> None: "kvmd.apps.otgmsd", "kvmd.apps.otgconf", "kvmd.apps.swctl", + "kvmd.apps.nbd", "kvmd.apps.htpasswd", "kvmd.apps.totp", "kvmd.apps.edidconf", @@ -149,7 +152,7 @@ def main() -> None: classifiers=[ "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", "Development Status :: 5 - Production/Stable", - "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", "Topic :: System :: Systems Administration", "Operating System :: POSIX :: Linux", "Intended Audience :: System Administrators", diff --git a/testenv/Dockerfile b/testenv/Dockerfile index cc227266e..4d455a96a 100644 --- a/testenv/Dockerfile +++ b/testenv/Dockerfile @@ -17,7 +17,6 @@ RUN \ glibc \ pacman \ openssl \ - openssl-1.1 \ && pacman-db-upgrade \ && $PACMAN -Syu \ p11-kit \ diff --git a/testenv/linters/mypy.ini b/testenv/linters/mypy.ini index f96d83076..3f9ee3cf7 100644 --- a/testenv/linters/mypy.ini +++ b/testenv/linters/mypy.ini @@ -1,5 +1,5 @@ [mypy] -python_version = 3.13 +python_version = 3.14 ignore_missing_imports = true disallow_untyped_defs = true strict_optional = true diff --git a/testenv/linters/pylint.ini b/testenv/linters/pylint.ini index 58341a47c..c5f211663 100644 --- a/testenv/linters/pylint.ini +++ b/testenv/linters/pylint.ini @@ -54,7 +54,7 @@ max-line-length = 160 [BASIC] # Good variable names which should always be accepted, separated by a comma -good-names = _, __, x, y, ws +good-names = _, __, x, y, a, b, ws # Regular expression matching correct method names method-rgx = [a-z_][a-z0-9_]{1,50}$ diff --git a/testenv/tests/test_aiomulti.py b/testenv/tests/test_aiomulti.py new file mode 100644 index 000000000..13adc9991 --- /dev/null +++ b/testenv/tests/test_aiomulti.py @@ -0,0 +1,67 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2024 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import signal +import time + +import pytest + +from kvmd.aiomulti import AioMpProcess + + +# ===== +def _target(a: int, b: str) -> None: + assert a == 1 + assert b == "foo" + while True: + time.sleep(1) + + +# ===== +@pytest.mark.asyncio +async def test_ok__sigterm_join() -> None: + proc = AioMpProcess("test", _target, (1, "foo")) + assert not proc.is_alive() + proc.start() + assert proc.is_alive() + assert (await proc.async_join(0.1)) + assert (await proc.async_join(1)) + proc.send_sigterm() + assert not (await proc.async_join(30)) + assert not (await proc.async_join(1)) + assert not (await proc.async_join()) + assert proc.exitcode == -int(signal.SIGTERM) + + +@pytest.mark.asyncio +async def test_ok__sigkill_join() -> None: + proc = AioMpProcess("test", _target, (1, "foo")) + assert not proc.is_alive() + proc.start() + assert proc.is_alive() + assert (await proc.async_join(0.1)) + assert (await proc.async_join(1)) + proc.sendpg_sigkill() + assert not (await proc.async_join(30)) + assert not (await proc.async_join(1)) + assert not (await proc.async_join()) + assert proc.exitcode == -int(signal.SIGKILL) diff --git a/testenv/tox.ini b/testenv/tox.ini index b1384932f..59f7b7f59 100644 --- a/testenv/tox.ini +++ b/testenv/tox.ini @@ -3,7 +3,7 @@ envlist = flake8, pylint, mypy, vulture, pytest, eslint, htmlhint, shellcheck skipsdist = true [testenv] -basepython = python3.13 +basepython = python3.14 sitepackages = true changedir = /src diff --git a/web/base.pug b/web/base.pug index a1423aa69..eaa442551 100644 --- a/web/base.pug +++ b/web/base.pug @@ -40,7 +40,7 @@ block _vars_dynamic css_dir = `${share_dir}/css` js_dir = `${share_dir}/js` svg_dir = `${share_dir}/svg` - png_dir = `${share_dir}/png` + pic_dir = `${share_dir}/pic` html(lang="en") @@ -59,6 +59,8 @@ html(lang="en") each name in ["vars", "main"].concat(css_list).concat(["user"]) link(rel="stylesheet" href=`${css_dir}/${name}.css`) + block head + if main_js script(type="module") | import {setRootPrefix} from "#{js_dir}/vars.js"; diff --git a/web/kvm/index.html b/web/kvm/index.html index 942182dca..107097197 100644 --- a/web/kvm/index.html +++ b/web/kvm/index.html @@ -51,6 +51,8 @@ + +