Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/source/schemas/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,15 @@
"description": "path to mars client script",
"default": "/usr/local/bin/mars"
},
"fifo_read_timeout": {
"type": "number",
"description": "seconds to wait for FIFO data before timing out; null disables timeout"
},
"fifo_poll_interval": {
"type": "number",
"description": "polling interval in seconds while waiting for FIFO data",
"default": 0.1
},
"match": {
"$ref": "#/definitions/DatasourcesConfig-Datasource-MARS-Match",
"description": "filters requests based on presence of specific key=value pair in user request"
Expand All @@ -465,6 +474,8 @@
"preferredOrder": [
"type",
"command",
"fifo_read_timeout",
"fifo_poll_interval",
"match",
"patch"
],
Expand Down
67 changes: 50 additions & 17 deletions polytope_server/common/datasource/mars.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,23 @@
import os
import tempfile
from subprocess import CalledProcessError
from typing import Any, Dict, Iterator, Optional

import requests
import yaml

from ..io.fifo import FIFO
from ..request import PolytopeRequest
from ..subprocess import Subprocess
from . import datasource
from .datasource import convert_to_mars_request


class MARSDataSource(datasource.DataSource):
def __init__(self, config):
"""Datasource implementation that streams MARS retrieval output."""

def __init__(self, config: Dict[str, Any]) -> None:
"""Initialize the MARS datasource from configuration."""
assert config["type"] == "mars"
self.config = config
self.type = config.get("type")
Expand All @@ -47,12 +52,14 @@ def __init__(self, config):
self.override_mars_email = config.get("override_email")
self.override_mars_apikey = config.get("override_apikey")

self.subprocess = None
self.fifo = None
self.output_file = None
self.subprocess: Optional[Subprocess] = None
self.fifo: Optional[FIFO] = None
self.output_file: Optional[str] = None
self.use_file_io = config.get("use_file_io", False)

self.mars_error_filter = config.get("mars_error_filter", "mars - EROR")
self.fifo_read_timeout = config.get("fifo_read_timeout")
self.fifo_poll_interval = config.get("fifo_poll_interval", 0.1)

# self.fdb_config = None
self.fdb_config = config.get("fdb_config", {})
Expand All @@ -78,13 +85,16 @@ def __init__(self, config):
self.mars_home = None
self.mars_config = None

def get_type(self):
def get_type(self) -> str:
"""Return the datasource type string."""
return self.type

def archive(self, request):
def archive(self, request: PolytopeRequest) -> None:
"""Archiving is not supported for this datasource."""
raise NotImplementedError("Archiving not implemented for MARS data source")

def retrieve(self, request):
def retrieve(self, request: PolytopeRequest) -> bool:
"""Launch a MARS retrieval subprocess and prepare output streaming."""

if self.use_file_io:
with tempfile.NamedTemporaryFile(delete=False) as tmp:
Expand Down Expand Up @@ -141,7 +151,8 @@ def retrieve(self, request):

return True

def result(self, request):
def result(self, request: PolytopeRequest) -> Iterator[bytes]:
"""Yield retrieval data from FIFO or file until MARS completes."""

if self.use_file_io:
with open(self.output_file, "rb") as f:
Expand All @@ -158,11 +169,22 @@ def result(self, request):
raise Exception("MARS retrieval failed unexpectedly with error code {}".format(e.returncode))
return

# The FIFO will get EOF if MARS exits unexpectedly, so we will break out of this loop automatically
for x in self.fifo.data():
# logging.debug("Yielding data from FIFO.") # this floods the logs
def on_idle():
self.subprocess.read_output(request, self.mars_error_filter)
yield x

# The FIFO will get EOF if MARS exits unexpectedly, so we will break out of this loop automatically
try:
for x in self.fifo.data(
idle_timeout=self.fifo_read_timeout,
poll_interval=self.fifo_poll_interval,
on_idle=on_idle,
):
# logging.debug("Yielding data from FIFO.") # this floods the logs
self.subprocess.read_output(request, self.mars_error_filter)
yield x
except TimeoutError as e:
logging.error("FIFO read timed out: %s", e)
raise Exception("MARS retrieval timed out while waiting for data.")

logging.info("FIFO reached EOF.")

Expand All @@ -172,7 +194,8 @@ def result(self, request):
logging.exception("MARS subprocess failed: {}".format(e))
raise Exception("MARS retrieval failed unexpectedly with error code {}".format(e.returncode))

def destroy(self, request):
def destroy(self, request: PolytopeRequest) -> None:
"""Finalize the subprocess and delete temporary files/FIFO."""
try:
self.subprocess.finalize(request, self.mars_error_filter) # Will raise if non-zero return
except Exception as e:
Expand All @@ -192,12 +215,22 @@ def destroy(self, request):
pass

def mime_type(self) -> str:
"""Return the MIME type for MARS retrievals."""
return "application/x-grib"

#######################################################

def _build_dhs_env(self):
"""Build DHS callback environment from pre-set env vars or Kubernetes service."""
def _build_dhs_env(self) -> Dict[str, str]:
"""Build DHS callback environment from pre-set environment variables or Kubernetes services.

Returns a dictionary with
- MARS_DHS_CALLBACK_HOST: the host for DHS to call back to (Kubernetes node name)
- MARS_DHS_CALLBACK_PORT: the port for DHS to call back to (from Kubernetes service)
- MARS_DHS_LOCALPORT: the local port that DHS should listen on (from Kubernetes service)
- MARS_DHS_LOCALHOST: the local host that DHS should listen on (Kubernetes pod name)
- MARS_ENVIRON_ORIGIN: a string indicating the origin of the environment variables
(set to "polytope" if generated here, or passed through from existing env vars)
"""

required_dhs_keys = [
"MARS_DHS_CALLBACK_HOST",
Expand Down Expand Up @@ -291,8 +324,8 @@ def _build_dhs_env(self):
"MARS_DHS_LOCALHOST": pod_name,
}

def make_env(self, request):
"""Make the environment for the MARS subprocess, primarily for setting credentials"""
def make_env(self, request: PolytopeRequest) -> Dict[str, str]:
"""Build environment variables for the MARS subprocess. (FDB5_CONFIG and credentials, dhs config if needed)"""
try:
if self.override_mars_email:
logging.info("Overriding MARS_USER_EMAIL with {}".format(self.override_mars_email))
Expand Down
33 changes: 27 additions & 6 deletions polytope_server/common/io/fifo.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import os
import select
import tempfile
import time
from typing import Callable, Iterator, Optional


class FIFO:
"""Creates a named pipe (FIFO) and reads data from it"""
"""Creates a named pipe (FIFO) and reads data from it."""

def __init__(self, name, dir=None):
def __init__(self, name: str, dir: Optional[str] = None) -> None:
"""Create a FIFO at the provided directory with a non-blocking reader."""

if dir is None:
dir = tempfile.gettempdir()
Expand All @@ -39,17 +42,34 @@ def __init__(self, name, dir=None):
self.fifo = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
logging.info("FIFO created")

def ready(self):
def ready(self) -> bool:
"""Wait until FIFO is ready for reading -- i.e. opened by the writing process (man select)"""
return len(select.select([self.fifo], [], [], 0)[0]) == 1

def data(self, buffer_size=2 * 1024 * 1024):
def data(
self,
buffer_size: int = 2 * 1024 * 1024,
idle_timeout: Optional[float] = 30,
poll_interval: float = 0.1,
on_idle: Optional[Callable[[], None]] = None,
) -> Iterator[bytes]:
"""Yield buffered FIFO data in chunks, with optional idle callbacks/timeouts."""
buffer = b""
last_data = time.monotonic()

while True:
ready = select.select([self.fifo], [], [], poll_interval)[0]
if not ready:
if on_idle:
on_idle()
if idle_timeout is not None and time.monotonic() - last_data > idle_timeout:
raise TimeoutError(f"FIFO read timed out after {idle_timeout} seconds")
continue

data = self.read_raw()
if data is None:
break
last_data = time.monotonic()
buffer += data
while len(buffer) >= buffer_size:
output, leftover = buffer[:buffer_size], buffer[buffer_size:]
Expand All @@ -59,7 +79,7 @@ def data(self, buffer_size=2 * 1024 * 1024):
if buffer != b"":
yield buffer

def delete(self):
def delete(self) -> None:
"""Close and delete FIFO"""
logging.info("Deleting FIFO.")
try:
Expand All @@ -73,7 +93,8 @@ def delete(self):
logging.info(f"Deleting FIFO had an exception {e}")
pass

def read_raw(self, max_read=2 * 1024 * 1024):
def read_raw(self, max_read: int = 2 * 1024 * 1024) -> Optional[bytes]:
"""Read a raw chunk from the FIFO, returning None on EOF."""
while True:
try:
buf = os.read(self.fifo, max_read)
Expand Down
30 changes: 21 additions & 9 deletions polytope_server/common/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,23 @@
import select
import subprocess
from subprocess import CalledProcessError
from typing import Any, Mapping, Optional, Sequence


class Subprocess:
def __init__(self):
self.subprocess = None
"""Wrapper around subprocess execution with non-blocking log draining."""

def run(self, cmd, cwd=None, env=None):
def __init__(self) -> None:
"""Initialize the subprocess wrapper."""
self.subprocess: Optional[subprocess.Popen] = None

def run(
self,
cmd: Sequence[str],
cwd: Optional[str] = None,
env: Optional[Mapping[str, str]] = None,
) -> None:
"""Start a subprocess with stdout/stderr pipes."""
env = {**os.environ, **(env or None)}
logging.info("Calling {} in directory {} with env {}".format(cmd, cwd, env))
self.subprocess = subprocess.Popen(
Expand All @@ -41,8 +51,8 @@ def run(self, cmd, cwd=None, env=None):
stdout=subprocess.PIPE,
)

def read_output(self, request, err_filter=None):
"""Read and log output from the subprocess without blocking"""
def read_output(self, request: Any, err_filter: Optional[str] = None) -> None:
"""Read and log output from the subprocess without blocking."""
reads = [i for i in [self.subprocess.stdout, self.subprocess.stderr] if i]
ret = select.select(reads, [], [], 0)
while ret[0]:
Expand All @@ -60,14 +70,16 @@ def read_output(self, request, err_filter=None):
break
ret = select.select(reads, [], [], 0)

def running(self):
def running(self) -> bool:
"""Return True while the subprocess is still running."""
return self.subprocess.poll() is None

def returncode(self):
def returncode(self) -> Optional[int]:
"""Return the subprocess exit code if it has terminated."""
return self.subprocess.poll()

def finalize(self, request, err_filter):
"""Close subprocess and decode output"""
def finalize(self, request: Any, err_filter: Optional[str]) -> None:
"""Wait for completion (60s timeout), log output, and raise on failure."""
logging.info("Finalizing subprocess")
# fifo has been closed so this process should finish, but sometimes hangs so we set a timeout
try:
Expand Down