diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index dd063e4..6736dc6 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -25,7 +25,7 @@ repos:
- id: mypy
args: ["--strict"]
exclude: '^(docs|tasks|tests)|setup\.py'
- additional_dependencies: ["httpx", "packaging", "typer>=0.3.2"]
+ additional_dependencies: ["packaging", "typer>=0.3.2"]
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.0.270
diff --git a/news/107.misc b/news/107.misc
new file mode 100644
index 0000000..87cfe3a
--- /dev/null
+++ b/news/107.misc
@@ -0,0 +1,2 @@
+Use `pip-requirements-parser `_
+instead of our own copy of pip's parser.
diff --git a/pyproject.toml b/pyproject.toml
index bd57a3a..a4b744c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -25,8 +25,8 @@ classifiers = [
]
requires-python = ">=3.7"
dependencies=[
- "httpx",
"packaging>=23",
+ "pip-requirements-parser",
"tomli ; python_version<'3.11'",
"typer[all]>=0.3.2",
"typing-extensions ; python_version<'3.8'", # for Protocol, TypedDict
diff --git a/src/pip_deepfreeze/pip.py b/src/pip_deepfreeze/pip.py
index ffb39a0..af6c4a9 100644
--- a/src/pip_deepfreeze/pip.py
+++ b/src/pip_deepfreeze/pip.py
@@ -5,6 +5,7 @@
from packaging.utils import NormalizedName
from packaging.version import Version
+from pip_requirements_parser import RequirementsFile # type: ignore[import]
from .compat import TypedDict, shlex_join
from .installed_dist import (
@@ -17,11 +18,6 @@
list_installed_depends_by_extra,
)
from .project_name import get_project_name
-from .req_file_parser import (
- NestedRequirementsLine,
- RequirementLine,
- parse as parse_req_file,
-)
from .req_parser import get_req_name
from .sanity import get_pip_version
from .utils import (
@@ -73,14 +69,14 @@ def pip_upgrade_project(
"""
# 1. parse constraints
constraint_reqs = {}
- for req_line in parse_req_file(
- str(constraints_filename), recurse=False, reqs_only=False
- ):
- assert not isinstance(req_line, NestedRequirementsLine)
- if isinstance(req_line, RequirementLine):
- req_name = get_req_name(req_line.requirement)
- assert req_name # XXX user error instead?
- constraint_reqs[req_name] = normalize_req_line(req_line.requirement)
+ parsed_constraints_file = RequirementsFile.from_file(
+ str(constraints_filename), include_nested=False
+ )
+ for constraint_req in parsed_constraints_file.requirements:
+ assert constraint_req.name # XXX user error instead?
+ constraint_reqs[constraint_req.name] = normalize_req_line(
+ constraint_req.requirement_line.line
+ )
# 2. get installed frozen dependencies of project
installed_reqs = {
get_req_name(req_line): normalize_req_line(req_line)
diff --git a/src/pip_deepfreeze/req_file_parser.py b/src/pip_deepfreeze/req_file_parser.py
deleted file mode 100644
index 67ae7c8..0000000
--- a/src/pip_deepfreeze/req_file_parser.py
+++ /dev/null
@@ -1,506 +0,0 @@
-"""Requirements file parsing.
-
-This comes from pip.
-
-Notes about changes made compared to the pip source code, with the goal of
-moving this to a standalone library:
-
-- empty environment variables are replaced (TODO in pip, see
- https://github.com/pypa/pip/issues/8422)
-- nested constraints?
-"""
-
-# TODO better name than filename/base_filename
-
-from __future__ import absolute_import
-
-import argparse
-import codecs
-import locale
-import os
-import re
-import shlex
-import sys
-from typing import Iterable, Iterator, List, NoReturn, Optional, Text, Tuple, Union
-from urllib import parse as urllib_parse
-from urllib.request import urlopen
-
-from .compat import Protocol
-
-ReqFileLines = Iterator[Tuple[int, Text, Text]]
-
-
-__all__ = [
- "parse",
- "parse_lines",
- "RequirementsFileParserError",
- "OptionParsingError",
- "ParsedLine",
- "RequirementLine",
- "NestedRequirementsLine",
- "OptionsLine",
-]
-
-_SCHEME_RE = re.compile(r"^(http|https|file):", re.I)
-_COMMENT_RE = re.compile(r"(^|\s+)#.*$")
-_URL_SLASH_DRIVE_RE = re.compile(r"/*([a-z])\|", re.I)
-# Matches environment variable-style values in '${MY_VARIABLE_1}' with the
-# variable name consisting of only uppercase letters, digits or the '_'
-# (underscore). This follows the POSIX standard defined in IEEE Std 1003.1,
-# 2013 Edition.
-_ENV_VAR_RE = re.compile(r"(?P\$\{(?P[A-Z0-9_]+)\})")
-
-
-class HttpResponse(Protocol):
- @property
- def text(self) -> str:
- """The text content of the response."""
-
- def raise_for_status(self) -> None:
- """Raise if the response has an http error status."""
-
-
-class HttpClient(Protocol):
- def get(self, url: str) -> HttpResponse:
- """HTTP GET the URL."""
-
-
-class RequirementsFileParserError(Exception):
- pass
-
-
-class OptionParsingError(RequirementsFileParserError):
- def __init__(self, msg, filename, lineno):
- # type: (str, str, int) -> None
- super().__init__(
- "{msg} at {filename}:{lineno}".format(
- msg=msg, filename=filename, lineno=lineno
- )
- )
-
-
-class ParsedLine(object):
- def __init__(
- self,
- filename, # type: str
- lineno, # type: int
- raw_line, # type: str
- ):
- self.filename = filename
- self.lineno = lineno
- self.raw_line = raw_line
-
-
-class RequirementLine(ParsedLine):
- def __init__(
- self,
- filename, # type: str
- lineno, # type: int
- raw_line, # type: str
- requirement, # type: str
- is_editable, # type: bool
- is_constraint, # type: bool
- options, # type: List[str]
- ):
- super().__init__(filename, lineno, raw_line)
- self.requirement = requirement
- self.is_editable = is_editable
- self.is_constraint = is_constraint
- self.options = options
-
-
-class NestedRequirementsLine(ParsedLine):
- def __init__(
- self,
- filename, # type: str
- lineno, # type: int
- raw_line, # type: str
- requirements, # type: str
- is_constraint, # type: bool
- ):
- super().__init__(filename, lineno, raw_line)
- self.requirements = requirements
- self.is_constraint = is_constraint
-
-
-class OptionsLine(ParsedLine):
- def __init__(
- self,
- filename, # type: str
- lineno, # type: int
- raw_line, # type: str
- options, # type: List[str]
- ):
- super().__init__(filename, lineno, raw_line)
- self.options = options
-
-
-def _preprocess_lines(lines):
- # type: (Iterable[str]) -> ReqFileLines
- """Split, filter, and join lines, and return a line iterator."""
- lines_enum = _join_lines(enumerate(lines, start=1))
- lines_enum = _remove_comments(lines_enum)
- lines_enum = _expand_env_variables(lines_enum)
- return lines_enum
-
-
-def parse(
- filename, # type: str
- recurse=True, # type: bool
- reqs_only=True, # type: bool
- strict=False, # type: bool
- constraints=False, # type: bool
- session=None, # type: Optional[HttpClient]
-):
- # type: (...) -> Iterator[ParsedLine]
- return _parse(
- _get_file_lines(filename, session),
- filename,
- recurse=recurse,
- reqs_only=reqs_only,
- strict=strict,
- constraints=constraints,
- session=session,
- )
-
-
-def parse_lines(
- lines, # type: Iterable[str]
- filename, # type: str
- recurse=True, # type: bool
- reqs_only=True, # type: bool
- strict=False, # type: bool
- constraints=False, # type: bool
- session=None, # type: Optional[HttpClient]
-):
- # type: (...) -> Iterator[ParsedLine]
- return _parse(
- lines,
- filename,
- recurse=recurse,
- reqs_only=reqs_only,
- strict=strict,
- constraints=constraints,
- session=session,
- )
-
-
-def _parse(
- lines, # type: Iterable[str]
- filename, # type: str
- recurse=True, # type: bool
- reqs_only=True, # type: bool
- strict=False, # type: bool
- constraints=False, # type: bool
- session=None, # type: Optional[HttpClient]
-):
- # type: (...) -> Iterator[ParsedLine]
- """Parse a given file or URL, yielding parsed lines."""
- for line in _parse_lines(lines, filename, constraints, strict):
- if not reqs_only or isinstance(line, RequirementLine):
- yield line
- if isinstance(line, NestedRequirementsLine) and recurse:
- for inner_line in parse(
- filename=_file_or_url_join(line.requirements, line.filename),
- recurse=recurse,
- reqs_only=reqs_only,
- strict=strict,
- constraints=line.is_constraint,
- session=session,
- ):
- yield inner_line
-
-
-def _file_or_url_join(filename: str, base_filename: Optional[str]) -> str:
- if not base_filename:
- return filename
- # original file is over http
- if _SCHEME_RE.search(base_filename):
- # do a url join so relative paths work
- return urllib_parse.urljoin(base_filename, filename)
- # original file and nested file are paths
- elif not _SCHEME_RE.search(filename):
- # do a join so relative paths work
- return os.path.join(os.path.dirname(base_filename), filename)
- return filename
-
-
-def _parse_lines(
- lines, # type: Iterable[str]
- filename, # type: str
- constraints, # type: bool
- strict, # type: bool
-):
- # type: (...) -> Iterator[ParsedLine]
- for lineno, line, raw_line in _preprocess_lines(lines):
- args_str, opts, other_opts = _parse_line(line, filename, lineno, strict)
- if args_str:
- yield RequirementLine(
- filename,
- lineno,
- raw_line,
- requirement=args_str,
- is_editable=False,
- is_constraint=constraints,
- options=other_opts,
- )
- elif opts.editables:
- yield RequirementLine(
- filename,
- lineno,
- raw_line,
- requirement=opts.editables[0],
- is_editable=True,
- is_constraint=constraints,
- options=[], # XXX can't editables have options?
- )
- elif opts.requirements:
- yield NestedRequirementsLine(
- filename,
- lineno,
- raw_line,
- requirements=opts.requirements[0],
- # XXX this could be `constraints` instead of False
- # https://github.com/pypa/pip/issues/8416
- is_constraint=False,
- )
- elif opts.constraints:
- yield NestedRequirementsLine(
- filename,
- lineno,
- raw_line,
- requirements=opts.constraints[0],
- is_constraint=True,
- )
- elif other_opts:
- yield OptionsLine(
- filename,
- lineno,
- raw_line,
- options=other_opts,
- )
- else:
- yield ParsedLine(filename, lineno, raw_line)
-
-
-class ParsedOptions(argparse.Namespace):
- def __init__(self):
- # type: () -> None
- self.editables = [] # type: List[str]
- self.requirements = [] # type: List[str]
- self.constraints = [] # type: List[str]
-
-
-class ErrorCatchingArgumentParser(argparse.ArgumentParser):
- def exit(self, status=0, message=None):
- # type: (int, Optional[str]) -> NoReturn
- raise RequirementsFileParserError(message)
-
-
-_options_parser = ErrorCatchingArgumentParser()
-_options_parser.add_argument("-e", "--editable", action="append", dest="editables")
-_options_parser.add_argument("-r", "--requirements", action="append")
-_options_parser.add_argument("-c", "--constraints", action="append")
-
-
-def _parse_line(line, filename, lineno, strict):
- # type: (Text, str, int, bool) -> Tuple[str, ParsedOptions, List[str]]
- args_str, options_str = _break_args_options(line)
- try:
- opts, other_opts = _options_parser.parse_known_args(
- shlex.split(options_str), namespace=ParsedOptions()
- )
- except RequirementsFileParserError as e:
- raise OptionParsingError(str(e), filename, lineno)
- c = len(opts.editables) + len(opts.requirements) + len(opts.constraints)
- if strict and c > 1:
- raise OptionParsingError(
- "Cannot have more than one -e/-c/-r on the same line", filename, lineno
- )
- if strict and c > 0 and other_opts:
- raise OptionParsingError(
- "Cannot mix -e/-c/-r with other options on the same line",
- filename,
- lineno,
- )
- assert isinstance(opts, ParsedOptions)
- return args_str.strip(), opts, other_opts
-
-
-def _break_args_options(line):
- # type: (Text) -> Tuple[str, Text]
- """Break up the line into an args and options string.
-
- We only want to shlex (and then optparse) the options, not the args.
- args can contain markers which are corrupted by shlex.
- """
- tokens = line.split(" ")
- args = []
- options = tokens[:]
- for token in tokens:
- if token.startswith("-"):
- break
- else:
- args.append(token)
- options.pop(0)
- return " ".join(args), " ".join(options)
-
-
-def _join_lines(lines_enum):
- # type: (Iterator[Tuple[int, Text]]) -> ReqFileLines
- """Joins a line ending in '\' with the previous line (except when following
- comments).
-
- The joined line takes on the index of the first line.
- """
- primary_line_number = None
- new_lines = [] # type: List[Text]
- raw_lines = [] # type: List[Text]
- for line_number, raw_line in lines_enum:
- raw_line = raw_line.rstrip("\n") # in case lines comes from open()
- if not raw_line.endswith("\\") or _COMMENT_RE.match(raw_line):
- if _COMMENT_RE.match(raw_line):
- # this ensures comments are always matched later
- line = " " + raw_line
- else:
- line = raw_line
- if new_lines:
- new_lines.append(line)
- raw_lines.append(raw_line)
- assert primary_line_number is not None
- yield primary_line_number, "".join(new_lines), "\n".join(raw_lines)
- new_lines = []
- raw_lines = []
- else:
- yield line_number, line, raw_line
- else:
- if not new_lines:
- primary_line_number = line_number
- new_lines.append(raw_line.strip("\\"))
- raw_lines.append(raw_line)
-
- # last line contains \
- if new_lines:
- assert primary_line_number is not None
- yield primary_line_number, "".join(new_lines), "\n".join(raw_lines)
-
- # TODO (from pip codebase): handle space after '\'.
-
-
-def _remove_comments(lines_enum):
- # type: (ReqFileLines) -> ReqFileLines
- """Strips comments and filter empty lines."""
- for line_number, line, raw_line in lines_enum:
- line = _COMMENT_RE.sub("", line)
- line = line.strip()
- yield line_number, line, raw_line
-
-
-def _expand_env_variables(lines_enum):
- # type: (ReqFileLines) -> ReqFileLines
- """Replace all environment variables that can be retrieved via `os.getenv`.
-
- The only allowed format for environment variables defined in the
- requirement file is `${MY_VARIABLE_1}` to ensure two things:
-
- 1. Strings that contain a `$` aren't accidentally (partially) expanded.
- 2. Ensure consistency across platforms for requirement files.
-
- These points are the result of a discussion on the `github pull
- request #3514 `_.
-
- Valid characters in variable names follow the `POSIX standard
- `_ and are limited
- to uppercase letter, digits and the `_` (underscore).
- """
- for line_number, line, raw_line in lines_enum:
- for env_var, var_name in _ENV_VAR_RE.findall(line):
- value = os.getenv(var_name)
- if value is None:
- continue
-
- line = line.replace(env_var, value)
-
- yield line_number, line, raw_line
-
-
-_BOMS = [
- (codecs.BOM_UTF8, "utf-8"),
- (codecs.BOM_UTF16, "utf-16"),
- (codecs.BOM_UTF16_BE, "utf-16-be"),
- (codecs.BOM_UTF16_LE, "utf-16-le"),
- (codecs.BOM_UTF32, "utf-32"),
- (codecs.BOM_UTF32_BE, "utf-32-be"),
- (codecs.BOM_UTF32_LE, "utf-32-le"),
-] # type: List[Tuple[bytes, Text]]
-
-_ENCODING_RE = re.compile(rb"coding[:=]\s*([-\w.]+)")
-
-
-def _auto_decode(data):
- # type: (bytes) -> Text
- """Check a bytes string for a BOM to correctly detect the encoding.
-
- Fallback to locale.getpreferredencoding(False) like open() on
- Python3
- """
- for bom, encoding in _BOMS:
- if data.startswith(bom):
- return data[len(bom) :].decode(encoding)
- # Lets check the first two lines as in PEP263
- for line in data.split(b"\n")[:2]:
- if line[0:1] == b"#" and _ENCODING_RE.search(line):
- result = _ENCODING_RE.search(line)
- assert result is not None
- encoding = result.groups()[0].decode("ascii")
- return data.decode(encoding)
- return data.decode(locale.getpreferredencoding(False) or sys.getdefaultencoding())
-
-
-def _get_url_scheme(url):
- # type: (Union[str, Text]) -> Optional[Text]
- if ":" not in url:
- return None
- return url.split(":", 1)[0].lower()
-
-
-def _get_file_lines(url, session):
- # type: (str, Optional[HttpClient]) -> Iterable[str]
- """Gets the content of a file as unicode; it may be a filename, file: URL, or
- http: URL. Respects # -*- coding: declarations on the retrieved files.
-
- :param url: File path or url.
- :param session: HttpClient instance.
- """
- scheme = _get_url_scheme(url)
- if scheme in ["http", "https"]:
- if not session:
- # FIXME better exception
- raise RequirementsFileParserError(
- "Cannot get {url} because no http session is available.".format(url=url)
- )
- try:
- resp = session.get(url)
- resp.raise_for_status()
- content = resp.text
- except Exception as exc:
- raise RequirementsFileParserError(
- "Could not open requirements file: {}".format(exc)
- )
- elif scheme == "file":
- try:
- with urlopen(url) as f:
- content = _auto_decode(f.read())
- except Exception as exc:
- raise RequirementsFileParserError(
- "Could not open requirements file: {}".format(exc)
- )
- else:
- try:
- with open(url, "rb") as f:
- content = _auto_decode(f.read())
- except Exception as exc:
- raise RequirementsFileParserError(
- "Could not open requirements file: {}".format(exc)
- )
- return content.splitlines()
diff --git a/src/pip_deepfreeze/req_merge.py b/src/pip_deepfreeze/req_merge.py
index 71a5956..b4d7469 100644
--- a/src/pip_deepfreeze/req_merge.py
+++ b/src/pip_deepfreeze/req_merge.py
@@ -1,12 +1,9 @@
from pathlib import Path
from typing import Iterable, Iterator, Optional
-import httpx
from packaging.utils import canonicalize_name
+from pip_requirements_parser import RequirementsFile # type: ignore[import]
-from .compat import shlex_join
-from .req_file_parser import OptionsLine, RequirementLine, parse
-from .req_parser import get_req_name
from .utils import log_error
@@ -28,38 +25,34 @@ def prepare_frozen_reqs_for_upgrade(
frozen_reqs = set()
# 1. emit options from in_filename, collect in_reqs
if in_filename.is_file():
- for in_req in parse(
- str(in_filename),
- recurse=True,
- reqs_only=False,
- strict=True,
- session=httpx.Client(),
- ):
- if isinstance(in_req, OptionsLine):
- yield shlex_join(in_req.options)
- elif isinstance(in_req, RequirementLine):
- req_name = get_req_name(in_req.requirement)
- if not req_name:
- log_error(f"Ignoring unnamed constraint {in_req.raw_line!r}.")
- continue
- in_reqs.append((req_name, in_req.requirement))
+ in_req_file = RequirementsFile.from_file(str(in_filename), include_nested=True)
+ for in_req_option in in_req_file.options:
+ yield in_req_option.requirement_line.line
+ for in_req in in_req_file.requirements:
+ req_name = in_req.name
+ if not req_name:
+ log_error(
+ f"Ignoring unnamed constraint {in_req.requirement_line.line!r}."
+ )
+ continue
+ in_reqs.append((req_name, in_req.requirement_line.line))
# 2. emit frozen_reqs unless upgrade_all or it is in to_upgrade
for frozen_filename in frozen_filenames:
if frozen_filename.is_file() and not upgrade_all:
- for frozen_req in parse(
- str(frozen_filename), recurse=True, reqs_only=True, strict=True
- ):
- assert isinstance(frozen_req, RequirementLine)
- req_name = get_req_name(frozen_req.requirement)
+ for frozen_req in RequirementsFile.from_file(
+ str(frozen_filename), include_nested=True
+ ).requirements:
+ req_name = frozen_req.name
if not req_name:
log_error(
- f"Ignoring unnamed frozen requirement {frozen_req.raw_line!r}."
+ f"Ignoring unnamed frozen requirement "
+ f"{frozen_req.requirement_line.line!r}."
)
continue
if req_name in to_upgrade_set:
continue
frozen_reqs.add(req_name)
- yield frozen_req.requirement
+ yield frozen_req.requirement_line.line
# 3. emit in_reqs that have not been emitted as frozen reqs
for req_name, in_req_str in in_reqs:
if req_name not in frozen_reqs:
diff --git a/src/pip_deepfreeze/sync.py b/src/pip_deepfreeze/sync.py
index 6ce1dc0..7204513 100644
--- a/src/pip_deepfreeze/sync.py
+++ b/src/pip_deepfreeze/sync.py
@@ -3,13 +3,12 @@
from pathlib import Path
from typing import Iterator, List, Optional, Sequence
-import httpx
import typer
from packaging.utils import NormalizedName
+from pip_requirements_parser import RequirementsFile # type: ignore[import]
from .pip import pip_freeze_dependencies_by_extra, pip_uninstall, pip_upgrade_project
from .project_name import get_project_name
-from .req_file_parser import OptionsLine, parse as parse_req_file
from .req_merge import prepare_frozen_reqs_for_upgrade
from .req_parser import get_req_names
from .utils import (
@@ -84,15 +83,10 @@ def sync(
# output pip options in main requirements only
if not extra and requirements_in.exists():
# XXX can we avoid this second parse of requirements.txt.in?
- for parsed_req_line in parse_req_file(
- str(requirements_in),
- reqs_only=False,
- recurse=True,
- strict=True,
- session=httpx.Client(),
- ):
- if isinstance(parsed_req_line, OptionsLine):
- print(parsed_req_line.raw_line, file=f)
+ for option_line in RequirementsFile.from_file(
+ str(requirements_in), include_nested=True
+ ).options:
+ print(option_line.requirement_line.line, file=f)
# output frozen dependencies of project
for req_line in frozen_reqs:
print(req_line, file=f)
diff --git a/tests/test_req_file_parser.py b/tests/test_req_file_parser.py
deleted file mode 100644
index 2861287..0000000
--- a/tests/test_req_file_parser.py
+++ /dev/null
@@ -1,397 +0,0 @@
-import os
-import textwrap
-
-import pytest
-
-from pip_deepfreeze.req_file_parser import (
- OptionParsingError,
- RequirementLine,
- RequirementsFileParserError,
- _file_or_url_join,
- parse,
- parse_lines,
-)
-
-
-def test_basic(tmp_path):
- reqs = tmp_path / "reqs.txt"
- reqs.write_text(
- textwrap.dedent(
- """\
- # comment
- req1
- req2==1.0.0
- req3 @ https://e.c/req3.tgz
- ./req4
- https://e.c/req5.tgz
- req6 @ https://e.c/req6.tgz ; python_version < 3.7 # comment
- req6 ; python_version >= 3.7 # released version is ok
- """
- )
- )
- lines = list(parse(str(reqs)))
- assert all(isinstance(line, RequirementLine) for line in lines)
- assert [line.requirement for line in lines] == [
- "req1",
- "req2==1.0.0",
- "req3 @ https://e.c/req3.tgz",
- "./req4",
- "https://e.c/req5.tgz",
- "req6 @ https://e.c/req6.tgz ; python_version < 3.7",
- "req6 ; python_version >= 3.7",
- ]
- assert lines[0].raw_line == "req1"
- assert lines[0].lineno == 2
-
-
-def test_parse_lines(tmp_path):
- """Basic test for parse_lines."""
- lines = ["req1\n", "-r subreqs.txt\n"]
- (tmp_path / "subreqs.txt").write_text("req2")
- parsed_lines = list(parse_lines(lines, filename=str(tmp_path / "req.txt")))
- assert [line.requirement for line in parsed_lines] == ["req1", "req2"]
- assert parsed_lines[0].raw_line == "req1"
- assert parsed_lines[0].lineno == 1
-
-
-def test_recurse(tmp_path):
- reqs = tmp_path / "reqs.txt"
- subreqs = tmp_path / "subreqs.txt"
- reqs.write_text(
- textwrap.dedent(
- """\
- req1
- -r subreqs.txt
- """
- )
- )
- subreqs.write_text("req2")
- lines = list(parse(str(reqs)))
- assert all(isinstance(line, RequirementLine) for line in lines)
- assert [line.requirement for line in lines] == ["req1", "req2"]
- lines = list(parse(str(reqs), recurse=False))
- assert [line.requirement for line in lines] == ["req1"]
-
-
-def test_file_url(tmp_path):
- reqs = tmp_path / "reqs.txt"
- subreqs = tmp_path / "subreqs.txt"
- subreqs_uri = subreqs.as_uri()
- assert subreqs_uri.startswith("file://")
- reqs.write_text(
- textwrap.dedent(
- """\
- req1
- -r {}
- """.format(
- subreqs_uri
- )
- )
- )
- subreqs.write_text("req2")
- lines = list(parse(str(reqs)))
- assert all(isinstance(line, RequirementLine) for line in lines)
- assert [line.requirement for line in lines] == ["req1", "req2"]
- lines = list(parse(str(reqs), recurse=False))
- assert [line.requirement for line in lines] == ["req1"]
-
-
-def test_file_url_not_found(tmp_path):
- reqs = tmp_path / "reqs.txt"
- subreqs_uri = (tmp_path / "notfound.txt").as_uri()
- assert subreqs_uri.startswith("file://")
- reqs.write_text(f"--requirements {subreqs_uri}")
- with pytest.raises(RequirementsFileParserError) as e:
- list(parse(str(reqs)))
- assert "Could not open requirements file" in str(e.value)
- assert "notfound.txt" in str(e.value)
-
-
-class MockHttpResponse:
- def __init__(self, url, text):
- self.url = url
- self.text = text
-
- def raise_for_status(self):
- if self.text is None:
- raise RuntimeError(f"mock error opening {self.url}")
-
-
-class MockHttpSession:
- def __init__(self, url, text):
- self.url = url
- self.text = text
-
- def get(self, url):
- assert url == self.url
- return MockHttpResponse(self.url, self.text)
-
-
-def test_http_url(tmp_path):
- subreqs_url = "http://e.c/subreqs.txt"
-
- reqs = tmp_path / "reqs.txt"
- reqs.write_text(
- textwrap.dedent(
- f"""\
- req1
- -r {subreqs_url}
- """
- )
- )
- with pytest.raises(RequirementsFileParserError) as e:
- list(parse(str(reqs)))
- assert f"Cannot get {subreqs_url} because no http session is available" in str(
- e.value
- )
- lines = list(parse(str(reqs), session=MockHttpSession(subreqs_url, "req2")))
- assert all(isinstance(line, RequirementLine) for line in lines)
- assert [line.requirement for line in lines] == ["req1", "req2"]
- lines = list(parse(str(reqs), recurse=False))
- assert [line.requirement for line in lines] == ["req1"]
-
-
-def test_http_url_notfound(tmp_path):
- subreqs_url = "http://e.c/notfound.txt"
- reqs = tmp_path / "reqs.txt"
- reqs.write_text(f"-r {subreqs_url}")
- with pytest.raises(RequirementsFileParserError) as e:
- list(parse(str(reqs), session=MockHttpSession(subreqs_url, None)))
- assert "Could not open requirements file" in str(e.value)
- assert "notfound.txt" in str(e.value)
-
-
-def test_subreq_notfound(tmp_path):
- reqs = tmp_path / "reqs.txt"
- reqs.write_text("-r notfound.txt")
- with pytest.raises(RequirementsFileParserError) as e:
- _ = list(parse(str(reqs)))
- assert "Could not open requirements file" in str(e.value)
- assert "notfound.txt" in str(e.value)
-
-
-def test_relative_file(tmp_path):
- reqs = tmp_path / "reqs.txt"
- (tmp_path / "subdir").mkdir()
- subreqs = tmp_path / "subdir" / "subreqs.txt"
- subsubreqs = tmp_path / "subdir" / "subsubreqs.txt"
- reqs.write_text(
- textwrap.dedent(
- """\
- req1
- -r subdir/subreqs.txt
- """
- )
- )
- subreqs.write_text(
- textwrap.dedent(
- """\
- req2
- -r ./subsubreqs.txt
- """
- )
- )
- subsubreqs.write_text("req3")
- lines = list(parse(str(reqs)))
- assert [line.requirement for line in lines] == ["req1", "req2", "req3"]
-
-
-def test_relative_file_uri(tmp_path):
- reqs = tmp_path / "reqs.txt"
- (tmp_path / "subdir").mkdir()
- subreqs = tmp_path / "subdir" / "subreqs.txt"
- subreqs_uri = subreqs.as_uri()
- assert subreqs_uri.startswith("file://")
- subsubreqs = tmp_path / "subdir" / "subsubreqs.txt"
- reqs.write_text(
- textwrap.dedent(
- """\
- req1
- -r {}
- """.format(
- subreqs_uri
- )
- )
- )
- subreqs.write_text(
- textwrap.dedent(
- """\
- req2
- -r ./subsubreqs.txt
- """
- )
- )
- subsubreqs.write_text("req3")
- lines = list(parse(str(reqs)))
- assert [line.requirement for line in lines] == ["req1", "req2", "req3"]
-
-
-def test_editable(tmp_path):
- reqs = tmp_path / "reqs.txt"
- reqs.write_text(
- textwrap.dedent(
- """\
- # comment
- req1
- -e ./req2 # comment
- """
- )
- )
- lines = list(parse(str(reqs)))
- assert all(isinstance(line, RequirementLine) for line in lines)
- assert [line.requirement for line in lines] == ["req1", "./req2"]
- assert [line.is_editable for line in lines] == [False, True]
-
-
-def test_multiline_req(tmp_path):
- reqs = tmp_path / "reqs.txt"
- reqs.write_text(
- textwrap.dedent(
- """\
- req @ \\
- ./req.tgz # comment
- """
- )
- )
- lines = list(parse(str(reqs)))
- assert [line.requirement for line in lines] == ["req @ ./req.tgz"]
-
-
-@pytest.mark.xfail(reason="hash parsing not implemented")
-def test_hashes(tmp_path):
- reqs = tmp_path / "reqs.txt"
- reqs.write_text(
- textwrap.dedent(
- """\
- req @ ./req.tgz \\
- --hash sha1:62bd26d758...703a094285 \\
- --hash sha2:xyz
- """
- )
- )
- lines = list(parse(str(reqs)))
- assert [line.requirement for line in lines] == ["req @ ./req.tgz"]
- assert lines[0].options["hashes"] == ["sha1:62bd26d758...703a094285", "sha2:xyz"]
-
-
-def test_last_line_continuation(tmp_path):
- reqs = tmp_path / "reqs.txt"
- reqs.write_text(
- textwrap.dedent(
- """\
- # comment
- req1
- req2\\
- """
- )
- )
- lines = list(parse(str(reqs)))
- assert all(isinstance(line, RequirementLine) for line in lines)
- assert [line.requirement for line in lines] == ["req1", "req2"]
-
-
-def test_env_var(tmp_path, monkeypatch):
- monkeypatch.setenv("X_USER", "toto")
- monkeypatch.setenv("X_PASSWORD", "lehéro")
- monkeypatch.setenv("Z_USER", "")
- reqs = tmp_path / "reqs.txt"
- reqs.write_text(
- textwrap.dedent(
- """\
- https://${X_USER}@e.c/req.tgz
- https://${X_USER}:${X_PASSWORD}@e.c/req.tgz
- https://${Y_USER}@e.c/req.tgz
- https://${Z_USER}@e.c/req.tgz
- """
- )
- )
- lines = list(parse(str(reqs)))
- assert all(isinstance(line, RequirementLine) for line in lines)
- assert [line.requirement for line in lines] == [
- "https://toto@e.c/req.tgz",
- "https://toto:lehéro@e.c/req.tgz",
- "https://${Y_USER}@e.c/req.tgz",
- "https://@e.c/req.tgz",
- ]
-
-
-@pytest.mark.parametrize(
- "badreqs",
- [
- "-e ./req1 -e ./req2",
- "-r f1 -e ./req1",
- "-r f1 -r f2",
- "-c c1 -c c2",
- "-r f1 -c c1",
- "-r f1 --hash x",
- "-r f1 -f z",
- "-e ./req1 --hash x", # editable can't have options
- ],
-)
-def test_strict_option_errors(badreqs, tmp_path):
- reqs = tmp_path / "reqs.txt"
- reqs.write_text(badreqs)
- _ = list(parse(str(reqs), recurse=False, strict=False))
- with pytest.raises(OptionParsingError):
- _ = list(parse(str(reqs), recurse=False, strict=True))
-
-
-@pytest.mark.parametrize(
- "badreqs", ["-r", "--requirements", "-c", "--constraints", "-e", "--editable"]
-)
-def test_option_errors(badreqs, tmp_path):
- reqs = tmp_path / "reqs.txt"
- reqs.write_text(badreqs)
- with pytest.raises(OptionParsingError):
- _ = list(parse(str(reqs), recurse=False))
-
-
-@pytest.mark.parametrize(
- "line, expected",
- [
- ("-i https://e.c/simple", ["-i", "https://e.c/simple"]),
- (
- "--extra-index-url https://a.u/simple",
- ["--extra-index-url", "https://a.u/simple"],
- ),
- (
- "--find-links https://x.y/links -f ./links",
- ["--find-links", "https://x.y/links", "-f", "./links"],
- ),
- ("-f ./otherlinks", ["-f", "./otherlinks"]),
- ('-f "dir with space/subdir"', ["-f", "dir with space/subdir"]),
- ("--pre", ["--pre"]),
- ("pkga --hash h:v --hash x:y", ["--hash", "h:v", "--hash", "x:y"]),
- ("pkgb ; python_version<3.7 --hash h:v", ["--hash", "h:v"]),
- ("--editable=./pkg", []),
- ("-e ./pkg", []),
- ("foo==1.0", []),
- ],
-)
-def test_options(line, expected, tmp_path):
- reqs = tmp_path / "reqs.txt"
- reqs.write_text(line)
- lines = list(parse(str(reqs), reqs_only=False, recurse=False))
- assert len(lines) == 1
- assert lines[0].options == expected
-
-
-@pytest.mark.parametrize(
- "filename,base_filename,expected",
- [
- ("sr.txt", "r.txt", "sr.txt"),
- ("sr.txt", None, "sr.txt"),
- ("sr.txt", "a/r.txt", f"a{os.path.sep}sr.txt"),
- ("b/sr.txt", "a/r.txt", f"a{os.path.sep}b/sr.txt"),
- ("file:///a/sr.txt", None, "file:///a/sr.txt"),
- ("file:///a/sr.txt", "r.txt", "file:///a/sr.txt"),
- ("file:///a/sr.txt", "file:///b/r.txt", "file:///a/sr.txt"),
- ("../sr.txt", "file:///b/r.txt", "file:///sr.txt"),
- ],
-)
-def test_file_or_url_join(filename, base_filename, expected):
- assert _file_or_url_join(filename, base_filename) == expected
-
-
-# TODO test constraints and nested constraints
-# TODO test auto-decode