diff --git a/docs/source/schemas/schema.json b/docs/source/schemas/schema.json index dedea1d6..34c165e1 100644 --- a/docs/source/schemas/schema.json +++ b/docs/source/schemas/schema.json @@ -453,6 +453,15 @@ "description": "path to mars client script", "default": "/usr/local/bin/mars" }, + "fifo_read_timeout": { + "type": "number", + "description": "seconds to wait for FIFO data before timing out; null disables timeout" + }, + "fifo_poll_interval": { + "type": "number", + "description": "polling interval in seconds while waiting for FIFO data", + "default": 0.1 + }, "match": { "$ref": "#/definitions/DatasourcesConfig-Datasource-MARS-Match", "description": "filters requests based on presence of specific key=value pair in user request" @@ -465,6 +474,8 @@ "preferredOrder": [ "type", "command", + "fifo_read_timeout", + "fifo_poll_interval", "match", "patch" ], diff --git a/polytope_server/common/datasource/mars.py b/polytope_server/common/datasource/mars.py index 0f60f4bb..ac24a17f 100644 --- a/polytope_server/common/datasource/mars.py +++ b/polytope_server/common/datasource/mars.py @@ -23,18 +23,23 @@ import os import tempfile from subprocess import CalledProcessError +from typing import Any, Dict, Iterator, Optional import requests import yaml from ..io.fifo import FIFO +from ..request import PolytopeRequest from ..subprocess import Subprocess from . import datasource from .datasource import convert_to_mars_request class MARSDataSource(datasource.DataSource): - def __init__(self, config): + """Datasource implementation that streams MARS retrieval output.""" + + def __init__(self, config: Dict[str, Any]) -> None: + """Initialize the MARS datasource from configuration.""" assert config["type"] == "mars" self.config = config self.type = config.get("type") @@ -47,12 +52,14 @@ def __init__(self, config): self.override_mars_email = config.get("override_email") self.override_mars_apikey = config.get("override_apikey") - self.subprocess = None - self.fifo = None - self.output_file = None + self.subprocess: Optional[Subprocess] = None + self.fifo: Optional[FIFO] = None + self.output_file: Optional[str] = None self.use_file_io = config.get("use_file_io", False) self.mars_error_filter = config.get("mars_error_filter", "mars - EROR") + self.fifo_read_timeout = config.get("fifo_read_timeout") + self.fifo_poll_interval = config.get("fifo_poll_interval", 0.1) # self.fdb_config = None self.fdb_config = config.get("fdb_config", {}) @@ -78,13 +85,16 @@ def __init__(self, config): self.mars_home = None self.mars_config = None - def get_type(self): + def get_type(self) -> str: + """Return the datasource type string.""" return self.type - def archive(self, request): + def archive(self, request: PolytopeRequest) -> None: + """Archiving is not supported for this datasource.""" raise NotImplementedError("Archiving not implemented for MARS data source") - def retrieve(self, request): + def retrieve(self, request: PolytopeRequest) -> bool: + """Launch a MARS retrieval subprocess and prepare output streaming.""" if self.use_file_io: with tempfile.NamedTemporaryFile(delete=False) as tmp: @@ -141,7 +151,8 @@ def retrieve(self, request): return True - def result(self, request): + def result(self, request: PolytopeRequest) -> Iterator[bytes]: + """Yield retrieval data from FIFO or file until MARS completes.""" if self.use_file_io: with open(self.output_file, "rb") as f: @@ -158,11 +169,22 @@ def result(self, request): raise Exception("MARS retrieval failed unexpectedly with error code {}".format(e.returncode)) return - # The FIFO will get EOF if MARS exits unexpectedly, so we will break out of this loop automatically - for x in self.fifo.data(): - # logging.debug("Yielding data from FIFO.") # this floods the logs + def on_idle(): self.subprocess.read_output(request, self.mars_error_filter) - yield x + + # The FIFO will get EOF if MARS exits unexpectedly, so we will break out of this loop automatically + try: + for x in self.fifo.data( + idle_timeout=self.fifo_read_timeout, + poll_interval=self.fifo_poll_interval, + on_idle=on_idle, + ): + # logging.debug("Yielding data from FIFO.") # this floods the logs + self.subprocess.read_output(request, self.mars_error_filter) + yield x + except TimeoutError as e: + logging.error("FIFO read timed out: %s", e) + raise Exception("MARS retrieval timed out while waiting for data.") logging.info("FIFO reached EOF.") @@ -172,7 +194,8 @@ def result(self, request): logging.exception("MARS subprocess failed: {}".format(e)) raise Exception("MARS retrieval failed unexpectedly with error code {}".format(e.returncode)) - def destroy(self, request): + def destroy(self, request: PolytopeRequest) -> None: + """Finalize the subprocess and delete temporary files/FIFO.""" try: self.subprocess.finalize(request, self.mars_error_filter) # Will raise if non-zero return except Exception as e: @@ -192,12 +215,22 @@ def destroy(self, request): pass def mime_type(self) -> str: + """Return the MIME type for MARS retrievals.""" return "application/x-grib" ####################################################### - def _build_dhs_env(self): - """Build DHS callback environment from pre-set env vars or Kubernetes service.""" + def _build_dhs_env(self) -> Dict[str, str]: + """Build DHS callback environment from pre-set environment variables or Kubernetes services. + + Returns a dictionary with + - MARS_DHS_CALLBACK_HOST: the host for DHS to call back to (Kubernetes node name) + - MARS_DHS_CALLBACK_PORT: the port for DHS to call back to (from Kubernetes service) + - MARS_DHS_LOCALPORT: the local port that DHS should listen on (from Kubernetes service) + - MARS_DHS_LOCALHOST: the local host that DHS should listen on (Kubernetes pod name) + - MARS_ENVIRON_ORIGIN: a string indicating the origin of the environment variables + (set to "polytope" if generated here, or passed through from existing env vars) + """ required_dhs_keys = [ "MARS_DHS_CALLBACK_HOST", @@ -291,8 +324,8 @@ def _build_dhs_env(self): "MARS_DHS_LOCALHOST": pod_name, } - def make_env(self, request): - """Make the environment for the MARS subprocess, primarily for setting credentials""" + def make_env(self, request: PolytopeRequest) -> Dict[str, str]: + """Build environment variables for the MARS subprocess. (FDB5_CONFIG and credentials, dhs config if needed)""" try: if self.override_mars_email: logging.info("Overriding MARS_USER_EMAIL with {}".format(self.override_mars_email)) diff --git a/polytope_server/common/io/fifo.py b/polytope_server/common/io/fifo.py index 95da8b1a..1bc8abd7 100644 --- a/polytope_server/common/io/fifo.py +++ b/polytope_server/common/io/fifo.py @@ -23,12 +23,15 @@ import os import select import tempfile +import time +from typing import Callable, Iterator, Optional class FIFO: - """Creates a named pipe (FIFO) and reads data from it""" + """Creates a named pipe (FIFO) and reads data from it.""" - def __init__(self, name, dir=None): + def __init__(self, name: str, dir: Optional[str] = None) -> None: + """Create a FIFO at the provided directory with a non-blocking reader.""" if dir is None: dir = tempfile.gettempdir() @@ -39,17 +42,34 @@ def __init__(self, name, dir=None): self.fifo = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK) logging.info("FIFO created") - def ready(self): + def ready(self) -> bool: """Wait until FIFO is ready for reading -- i.e. opened by the writing process (man select)""" return len(select.select([self.fifo], [], [], 0)[0]) == 1 - def data(self, buffer_size=2 * 1024 * 1024): + def data( + self, + buffer_size: int = 2 * 1024 * 1024, + idle_timeout: Optional[float] = 30, + poll_interval: float = 0.1, + on_idle: Optional[Callable[[], None]] = None, + ) -> Iterator[bytes]: + """Yield buffered FIFO data in chunks, with optional idle callbacks/timeouts.""" buffer = b"" + last_data = time.monotonic() while True: + ready = select.select([self.fifo], [], [], poll_interval)[0] + if not ready: + if on_idle: + on_idle() + if idle_timeout is not None and time.monotonic() - last_data > idle_timeout: + raise TimeoutError(f"FIFO read timed out after {idle_timeout} seconds") + continue + data = self.read_raw() if data is None: break + last_data = time.monotonic() buffer += data while len(buffer) >= buffer_size: output, leftover = buffer[:buffer_size], buffer[buffer_size:] @@ -59,7 +79,7 @@ def data(self, buffer_size=2 * 1024 * 1024): if buffer != b"": yield buffer - def delete(self): + def delete(self) -> None: """Close and delete FIFO""" logging.info("Deleting FIFO.") try: @@ -73,7 +93,8 @@ def delete(self): logging.info(f"Deleting FIFO had an exception {e}") pass - def read_raw(self, max_read=2 * 1024 * 1024): + def read_raw(self, max_read: int = 2 * 1024 * 1024) -> Optional[bytes]: + """Read a raw chunk from the FIFO, returning None on EOF.""" while True: try: buf = os.read(self.fifo, max_read) diff --git a/polytope_server/common/subprocess.py b/polytope_server/common/subprocess.py index 90abf244..54544162 100644 --- a/polytope_server/common/subprocess.py +++ b/polytope_server/common/subprocess.py @@ -23,13 +23,23 @@ import select import subprocess from subprocess import CalledProcessError +from typing import Any, Mapping, Optional, Sequence class Subprocess: - def __init__(self): - self.subprocess = None + """Wrapper around subprocess execution with non-blocking log draining.""" - def run(self, cmd, cwd=None, env=None): + def __init__(self) -> None: + """Initialize the subprocess wrapper.""" + self.subprocess: Optional[subprocess.Popen] = None + + def run( + self, + cmd: Sequence[str], + cwd: Optional[str] = None, + env: Optional[Mapping[str, str]] = None, + ) -> None: + """Start a subprocess with stdout/stderr pipes.""" env = {**os.environ, **(env or None)} logging.info("Calling {} in directory {} with env {}".format(cmd, cwd, env)) self.subprocess = subprocess.Popen( @@ -41,8 +51,8 @@ def run(self, cmd, cwd=None, env=None): stdout=subprocess.PIPE, ) - def read_output(self, request, err_filter=None): - """Read and log output from the subprocess without blocking""" + def read_output(self, request: Any, err_filter: Optional[str] = None) -> None: + """Read and log output from the subprocess without blocking.""" reads = [i for i in [self.subprocess.stdout, self.subprocess.stderr] if i] ret = select.select(reads, [], [], 0) while ret[0]: @@ -60,14 +70,16 @@ def read_output(self, request, err_filter=None): break ret = select.select(reads, [], [], 0) - def running(self): + def running(self) -> bool: + """Return True while the subprocess is still running.""" return self.subprocess.poll() is None - def returncode(self): + def returncode(self) -> Optional[int]: + """Return the subprocess exit code if it has terminated.""" return self.subprocess.poll() - def finalize(self, request, err_filter): - """Close subprocess and decode output""" + def finalize(self, request: Any, err_filter: Optional[str]) -> None: + """Wait for completion (60s timeout), log output, and raise on failure.""" logging.info("Finalizing subprocess") # fifo has been closed so this process should finish, but sometimes hangs so we set a timeout try: