diff --git a/README.md b/README.md index f3441d4..aa02ebe 100644 --- a/README.md +++ b/README.md @@ -40,16 +40,17 @@ pip install . ### Usage -Currently, you can use the `extract` function from the `beam` module inside your own Python code: +#### As a Python module +To extract data from a file, you can use the `extract` function from the `beam` module inside your own Python code: ```python -from beam import extract +from datatractor_beam import extract # extract(, ) data = extract("./example.mpr", "biologic-mpr") ``` -This example will install the first compatible `biologic-mpr` extractor it finds in the registry into a fresh virtualenv (under `./beam-venvs`), and then execute it on the file at `example.mpr`. +This example will install the first extractor that is compatible with the `biologic-mpr` filetype that it finds in the registry. It will be installed into a fresh virtualenv (under `./beam-venvs`), and then executed on the file at `example.mpr`. By default, the `extract` function will attempt to use the extractor's Python-based invocation (i.e. the optional `preferred_mode="python"` argument is specified). This means the extractor will be executed from within python, and the returned `data` object will be a Python object as defined (and supported) by the extractor. This may require additional packages to be installed, for examples `pandas` or `xarray`, which are both supported via the installation command `pip install .[formats]` above. If you encounter the following traceback, a missing "format" (such as `xarray` here) is the likely reason: @@ -63,16 +64,25 @@ ModuleNotFoundError: No module named 'xarray' Alternatively, if the `preferred_mode="cli"` argument is specified, the extractor will be executed using its command-line invocation. This means the output of the extractor will most likely be a file, which can be further specified using the `output_type` argument: ```python -from beam import extract +from datatractor_beam import extract ret = extract("example.mpr", "biologic-mpr", output_path="output.nc", preferred_mode = "cli") ``` In this case, the `ret` will be empty bytes, and the output of the extractor should appear in the `output.nc` file. -Finally, `beam` can also be executed from the command line, implying `preferred_mode="cli"`. The command line invocation equivalent to the above Python syntax is: +#### As a command line utility + +The `datatractor` utility supports the following subcommands: + +- `beam`: used to extract data from an input file of a known file type, +- `probe`: used to search the registry for extractors that match a known file type, +- `yard`: used to fetch the definition of an extractor from the registry, and +- `install`: used to install an extractor. + +In particular, the `extract()` functionality discussed above can also be executed from the command line, implying `preferred_mode="cli"`. The command line invocation equivalent to the above Python syntax is: ```bash -beam biologic-mpr example.mpr --outfile output.nc +datatractor beam biologic-mpr example.mpr --output-path output.nc ``` diff --git a/beam/__init__.py b/beam/__init__.py index 9016d32..133e0f8 100644 --- a/beam/__init__.py +++ b/beam/__init__.py @@ -17,6 +17,7 @@ import argparse import importlib.metadata import json +import logging import multiprocessing.managers import multiprocessing.shared_memory import pickle @@ -27,6 +28,7 @@ import urllib.request import venv from enum import Enum +from functools import wraps from pathlib import Path from types import ModuleType from typing import Any, Callable, Optional @@ -37,6 +39,8 @@ REGISTRY_BASE_URL = "https://yard.datatractor.org/api" BIN = "Scripts" if platform.system() == "Windows" else "bin" +logger = logging.getLogger(__name__) + class SupportedExecutionMethod(Enum): # TODO: would be nice to generate these directly from the LinkML schema @@ -54,54 +58,276 @@ class SupportedUsageScope(Enum): DATA = "meta+data" -def run_beam(): - argparser = argparse.ArgumentParser( - prog="beam", - description="""CLI for datatractor extractors that takes a filename and a filetype, then installs and runs an appropriate extractor, if available, from the chosen registry (default: https://registry.datatractor.org/). Filetype IDs can be found in the registry API at e.g., https://registry.datatractor.org/api/filetypes. If a matching extractor is found at https://registry.datatractor.org/api/extractors, it will be installed into a virtual environment local to the beam installation. The results of the extractor will be written out to a file at --outfile, or in the default location for that output file type.""", +def coerce_preferred(func): + """ + A decorator to coerce :class:`str` types of ``preferred_mode`` and ``preferred_scope`` + to their :class:`Enum` counterparts. + """ + + @wraps(func) + def wrapper(*args, **kwargs): + mode = kwargs.pop("preferred_mode", None) + if isinstance(mode, str): + mode = SupportedExecutionMethod(mode) + if mode is not None: + kwargs["preferred_mode"] = mode + scope = kwargs.pop("preferred_scope", None) + if isinstance(scope, str): + scope = SupportedUsageScope(scope) + if scope is not None: + kwargs["preferred_scope"] = scope + return func(*args, **kwargs) + + return wrapper + + +def run_datatractor(): + parser = argparse.ArgumentParser( + prog="datatractor", + description="CLI for the reference implementation of the Datatractor project.", ) - argparser.add_argument( + parser.add_argument( "--version", action="version", version=f"%(prog)s version {__version__}", ) - argparser.add_argument( - "filetype", + subparsers = parser.add_subparsers( + title="Available subcommands", + required=True, + ) + + beam = subparsers.add_parser( + "beam", + help="Extracts data from the provided of type .", + description=f""" + Takes an input_path and an input_type, installs and runs an appropriate + extractor, if available, from the chosen registry (default: {REGISTRY_BASE_URL}). + """, + ) + + beam.add_argument( + "input_type", help="FileType.ID of the input file", - default=None, ) - argparser.add_argument( - "infile", + beam.add_argument( + "input_path", help="Path of the input file", - default=None, ) - argparser.add_argument( - "--outfile", + beam.add_argument( + "--output-path", "-o", help="Optional path of the output file", default=None, ) + beam.add_argument( + "--preferred-mode", + help="Preferred extraction mode to use with the Extractor (default: cli).", + default="cli", + choices=["cli", "python"], + ) + beam.set_defaults(func=extract) + + probe = subparsers.add_parser( + "probe", + help="Searches the registry for Extractors available for .", + description=f""" + Searches the chosen registry (default: {REGISTRY_BASE_URL}) for Extractors + matching the provided input_type and returns their ID(s). + """, + ) - args = argparser.parse_args() + probe.add_argument( + "input_type", + help="FileType.ID for which available Extractors are to be looked up.", + ) + probe.set_defaults(func=fetch_registered_extractors) + + yard = subparsers.add_parser( + "yard", + help="Searches the registry for an Extractors matching .", + description=f""" + Searches the chosen registry (default: {REGISTRY_BASE_URL}) for an Extractor + that matches the provided extractor_id and returns its full definition. + """, + ) - extract( - input_path=args.infile, - input_type=args.filetype, - output_path=args.outfile, - preferred_mode=SupportedExecutionMethod.CLI, + yard.add_argument( + "extractor_id", + help="Extractor.ID of the requested extractor.", + ) + yard.set_defaults(func=search_extractor) + + install = subparsers.add_parser( + "install", + help="Installs the Extractor matching .", + description=f""" + Searches the chosen registry (default: {REGISTRY_BASE_URL}) for an Extractor + that matches the provided extractor_id and installs it into the current + Python env. + """, ) + install.add_argument( + "extractor_id", + help="Extractor.ID of the requested extractor.", + ) + install.add_argument( + "--no-use-venv", + action="store_false", + dest="use_venv", + help=""" + Do not install into a separate venv. Note: this will try to install the + Extractor into your current Python environment, using pip! + """, + default=True, + ) + install.set_defaults(func=install_extractor) + + for p in probe, beam, yard, install: + p.add_argument( + "--verbose", + help="Set verbosity level to DEBUG.", + action="store_true", + default=False, + ) + + p.add_argument( + "--registry-base-url", + help="Base URL of the registry to use", + default=REGISTRY_BASE_URL, + ) + + args = parser.parse_args() + if args.verbose: + logging.basicConfig(level=logging.DEBUG) + else: + logging.basicConfig(level=logging.WARNING) + delattr(args, "verbose") + logger.debug("loglevel set to DEBUG") # + + func = args.func + delattr(args, "func") + try: + ret = func(**vars(args)) + if ret is not None and len(ret) > 0: + try: + ret = json.dumps(ret) + except json.JSONDecodeError: + pass + print(ret) + except RuntimeError as e: + print(e) + +@coerce_preferred +def search_filetype( + input_type: str, + preferred_mode: SupportedExecutionMethod = SupportedExecutionMethod.PYTHON, + preferred_scope: SupportedUsageScope = SupportedUsageScope.DATA, + extractor_definition: dict | None = None, + registry_base_url: str = REGISTRY_BASE_URL, +) -> dict: + """Search a registry for an extractor definition matching preferred mode and scope. + + Parameters: + input_type: The ID of the ``FileType`` in the registry. + preferred_mode: The preferred execution method. + If the extractor supports both Python and CLI, this will be used to determine + which to use. If the extractor only supports one method, this will be ignored. + Accepts the ``SupportedExecutionMethod`` values of "cli" or "python". + preferred_scope: The preferred extraction scope. + Accepts the ``SupportedUsageScope`` values of "meta+data" (default) or "meta-only". + extractor_definition: A dictionary containing the extractor definition to use instead + of a registry lookup. + registry_base_url: The base URL of the registry to use. Defaults to the + |yardsite|_. + + Returns: + The matching extractor definition. + + """ + if extractor_definition is not None: + extractors = [extractor_definition] + else: + extractors = fetch_registered_extractors( + input_type=input_type, + registry_base_url=registry_base_url, + ).get("registered_extractors", []) + + matching_definition = None + for extractor in extractors: + if isinstance(extractor, str): + try: + request_url = f"{registry_base_url}/extractors/{extractor}" + entry = urllib.request.urlopen(request_url) + except urllib.error.HTTPError as e: + raise RuntimeError( + f"Could not find extractor {extractor!r} in the registry at {request_url!r}.\nFull error: {e}" + ) + extractor = json.loads(entry.read().decode("utf-8"))["data"] + + try: + match = find_matching_usage( + usages=extractor["usage"], + input_type=input_type, + preferred_mode=preferred_mode, + preferred_scope=preferred_scope, + strict=True, + ) + except RuntimeError: + match = None + + if match is not None: + logger.info(f"Found matching usage with extractor: {extractor['id']!r}") + matching_definition = extractor + break + + if matching_definition is None: + raise RuntimeError( + "No extractors found with the preferred execution mode and input type." + ) + + return matching_definition + + +def search_extractor( + extractor_id: str, + registry_base_url: str = REGISTRY_BASE_URL, +) -> dict: + """Search a registry for an ``Extractor`` matching the provided ``extractor_id``. + + Parameters: + extractor_id: The ID of the ``FileType`` in the registry. + registry_base_url: The base URL of the registry to use. Defaults to the + |yardsite|_. + + Returns: + The matching extractor definition. + + """ + try: + request_url = f"{registry_base_url}/extractors/{extractor_id}" + entry = urllib.request.urlopen(request_url) + except urllib.error.HTTPError as e: + raise RuntimeError( + f"Could not find extractor {extractor_id!r} in the registry at {request_url!r}.\nFull error: {e}" + ) + extractor = json.loads(entry.read().decode("utf-8"))["data"] + return extractor + + +@coerce_preferred def extract( input_path: Path | str, input_type: str, output_path: Path | str | None = None, output_type: str | None = None, - preferred_mode: SupportedExecutionMethod | str = SupportedExecutionMethod.PYTHON, - preferred_scope: SupportedUsageScope | str = SupportedUsageScope.DATA, + preferred_mode: SupportedExecutionMethod = SupportedExecutionMethod.PYTHON, + preferred_scope: SupportedUsageScope = SupportedUsageScope.DATA, install: bool = True, use_venv: bool = True, extractor_definition: dict | None = None, @@ -121,6 +347,8 @@ def extract( If the extractor supports both Python and CLI, this will be used to determine which to use. If the extractor only supports one method, this will be ignored. Accepts the ``SupportedExecutionMethod`` values of "cli" or "python". + preferred_scope: The preferred extraction scope. + Accepts the ``SupportedUsageScope`` values of "meta+data" (default) or "meta-only". install: Whether to install the extractor package before running it. Defaults to True. extractor_definition: A dictionary containing the extractor definition to use instead of a registry lookup. @@ -145,64 +373,18 @@ def extract( output_path = Path(output_path) if output_path else None - if isinstance(preferred_mode, str): - preferred_mode = SupportedExecutionMethod(preferred_mode) - if isinstance(preferred_scope, str): - preferred_scope = SupportedUsageScope(preferred_scope) - - if extractor_definition is None: - try: - request_url = f"{registry_base_url}/filetypes/{input_type}" - response = urllib.request.urlopen(request_url) - except urllib.error.HTTPError as e: - raise RuntimeError( - f"Could not find file type {input_type!r} in the registry at {request_url!r}.\nFull error: {e}" - ) - json_response = json.loads(response.read().decode("utf-8")) - extractors = json_response["data"]["registered_extractors"] - if not extractors: - raise RuntimeError( - f"No extractors found for file type {input_type!r} in the registry" - ) - elif len(extractors) > 0: - print(f"Discovered the following extractors: {extractors}.") - else: - extractors = [extractor_definition] - - matching_definition = None - for extractor in extractors: - if isinstance(extractor, str): - try: - request_url = f"{registry_base_url}/extractors/{extractor}" - entry = urllib.request.urlopen(request_url) - except urllib.error.HTTPError as e: - raise RuntimeError( - f"Could not find extractor {extractor!r} in the registry at {request_url!r}.\nFull error: {e}" - ) - extractor = json.loads(entry.read().decode("utf-8"))["data"] - - try: - match = find_matching_usage( - usages=extractor["usage"], - input_type=input_type, - preferred_mode=preferred_mode, - preferred_scope=preferred_scope, - strict=True, - ) - except RuntimeError: - match = None - if match is not None: - print(f"Found matching usage with extractor: {extractor['id']!r}") - matching_definition = extractor - break + matching_definition = search_filetype( + input_type=input_type, + preferred_mode=preferred_mode, + preferred_scope=preferred_scope, + extractor_definition=extractor_definition, + registry_base_url=registry_base_url, + ) - if matching_definition is None: - raise RuntimeError( - "No extractors found with the preferred execution mode and input type." - ) plan = ExtractorPlan( - matching_definition, + entry=matching_definition, preferred_mode=preferred_mode, + preferred_scope=preferred_scope, install=install, use_venv=use_venv, ) @@ -218,6 +400,39 @@ def extract( tmp_path.unlink() +@coerce_preferred +def install_extractor( + extractor_id: str | None = None, + use_venv: bool = True, + extractor_definition: dict | None = None, + registry_base_url: str = REGISTRY_BASE_URL, +) -> Any: + """Install an ``Extractor`` from a provided definition or by searching registry. + + Parameters: + extractor: The ID of the ``Extractor`` in the registry. + use_venv: Whether to install the package into a new venv. Defaults to True. + extractor_definition: A dictionary containing the extractor definition to use instead + of a registry lookup. + registry_base_url: The base URL of the registry to use. Defaults to the + |yardsite|_. + + Returns: + The output of the extractor, either a Python object or nothing. + + """ + + if extractor_definition is None: + if extractor_id is None: + raise RuntimeError("Must provide either Extractor ID or a full definition.") + extractor_definition = search_extractor( + extractor_id=extractor_id, + registry_base_url=registry_base_url, + ) + + ExtractorPlan(entry=extractor_definition, install=True, use_venv=use_venv) + + def find_matching_usage( usages: list[dict], input_type: str, @@ -261,10 +476,14 @@ def find_matching_usage( elif thisScope == preferred_scope: candidates["scope"] = usage if not strict and candidates["mode"] is not None: - print("Found usage with matching execution method but wrong extraction scope.") + logger.warning( + "Found usage with matching execution method but wrong extraction scope." + ) return candidates["mode"] if not strict and candidates["scope"] is not None: - print("Found usage with matching extraction scope but wrong execution method.") + logger.warning( + "Found usage with matching extraction scope but wrong execution method." + ) return candidates["scope"] raise RuntimeError( @@ -272,6 +491,29 @@ def find_matching_usage( ) +def fetch_registered_extractors( + input_type: str, + registry_base_url: str = REGISTRY_BASE_URL, +) -> dict: + try: + request_url = f"{registry_base_url}/filetypes/{input_type}" + response = urllib.request.urlopen(request_url) + except urllib.error.HTTPError as e: + raise RuntimeError( + f"Could not find file type {input_type!r} in the registry at {request_url!r}.\nFull error: {e}" + ) + json_response = json.loads(response.read().decode("utf-8")) + extractors = json_response["data"]["registered_extractors"] + + if not extractors: + raise RuntimeError( + f"No registered extractors found for file type {input_type!r} in the registry at {registry_base_url!r}." + ) + elif len(extractors) > 0: + logger.info("Discovered the following extractors: %s.", extractors) + return {"id": input_type, "registered_extractors": extractors} + + class ExtractorPlan: """A plan for parsing a file.""" @@ -311,7 +553,7 @@ def install(self): The installation proceeds inside the appropriate venv, if configured. """ - print(f"Attempting to install {self.entry.get('id', self.entry)}") + logger.info("Attempting to install '%s'", self.entry.get("id", self.entry)) if not self.entry.get("installation"): raise RuntimeError( "No installation instructions provided for {self.entry.get('id', self.entry)}" @@ -333,9 +575,19 @@ def install(self): "install", f"{p}", ] - subprocess.run(command, check=True) + logger.info("Running installation command '%s'", command) + ret = subprocess.run( + command, + text=True, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + ) + for line in ret.stdout.split("\n"): + logger.debug(line) + ret.check_returncode() break - except Exception: + except Exception as e: + logger.critical(e) continue else: raise RuntimeError( @@ -401,7 +653,7 @@ def execute( ) if method == SupportedExecutionMethod.CLI: - print(f"Executing {command}") + logger.info("Executing %s", command) if self.venv_dir: venv_bin_command = str(self.venv_dir / BIN / command) output = self._execute_cli_venv(venv_bin_command) @@ -413,7 +665,7 @@ def execute( f"Requested output file {output_path} does not exist" ) - print(f"Wrote output to {output_path}") + print(f"Wrote output to {output_path!r}.") elif method == SupportedExecutionMethod.PYTHON: if self.venv_dir: @@ -424,12 +676,12 @@ def execute( return output def _execute_cli(self, command): - print(f"Executing {command=}") + logger.info("Executing command '%s'.", command) results = subprocess.check_output(command, shell=True) return results def _execute_cli_venv(self, command): - print(f"Executing {command=} in venv") + logger.info("Executing command '%s' in venv '%s'.", command, self.venv_dir) py_cmd = f"import subprocess; subprocess.check_output(r'{command}', shell=True)" command = [str(self.venv_dir / BIN / "python"), "-c", py_cmd] results = subprocess.check_output(command) diff --git a/pyproject.toml b/pyproject.toml index a31d4f3..77342c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,7 +57,7 @@ dev = [ repository = "https://github.com/datatractor/beam" [project.scripts] -beam = "beam:run_beam" +datatractor = "beam:run_datatractor" [tool.ruff] extend-exclude = [ diff --git a/tests/test_mpr.py b/tests/test_mpr.py index abed481..150123a 100644 --- a/tests/test_mpr.py +++ b/tests/test_mpr.py @@ -78,7 +78,14 @@ def test_biologic_extract_no_registry(test_mprs): install=(ind == 0), extractor_definition={ "id": "yadg", - "supported_filetypes": [{"id": "biologic-mpr"}], + "supported_filetypes": [ + { + "id": "biologic-mpr", + "template": { + "input_type": "eclab.mpr", + }, + } + ], "usage": [ { "method": "python", @@ -93,7 +100,7 @@ def test_biologic_extract_no_registry(test_mprs): "method": "pip", "requires_python": ">=3.9", "requirements": None, - "packages": ["yadg~=5.0"], + "packages": ["yadg~=6.0"], } ], }, @@ -156,7 +163,14 @@ def test_biologic_beam(tmp_path, test_mprs): for ind, test_mpr in enumerate(test_mprs): input_path = tmp_path / test_mpr output_path = tmp_path / test_mpr.name.replace(".mpr", ".nc") - task = ["beam", "biologic-mpr", str(input_path), "--outfile", str(output_path)] + task = [ + "datatractor", + "beam", + "biologic-mpr", + str(input_path), + "--output-path", + str(output_path), + ] subprocess.run(task) assert output_path.exists() break