Skip to content
Draft
42 changes: 20 additions & 22 deletions constructor/build_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def dump_hash(info, algorithm=None):
invalid = algorithms.difference(set(hashlib.algorithms_available))
raise ValueError(f"Invalid algorithm: {', '.join(invalid)}")
BUFFER_SIZE = 65536
if isinstance(info["_outpath"], str):
if isinstance(info["_outpath"], (str, Path)):
installers = [Path(info["_outpath"])]
else:
installers = [Path(outpath) for outpath in info["_outpath"]]
Expand All @@ -75,26 +75,25 @@ def dump_hash(info, algorithm=None):
return ", ".join(outpaths)


def dump_info(info):
outpath = os.path.join(info["_output_dir"], "info.json")
with open(outpath, "w") as f:
json.dump(info, f, indent=2, default=repr)
return os.path.abspath(outpath)
def dump_info(info) -> Path:
outpath = (info["_output_dir"] / "info.json").resolve()
outpath.write_text(json.dumps(info, indent=2, default=repr))
return outpath


def dump_packages_list(info, env="base"):
def dump_packages_list(info, env="base") -> Path:
if env == "base":
dists = info["_dists"]
elif env in info["_extra_envs_info"]:
dists = info["_extra_envs_info"][env]["_dists"]
else:
raise ValueError(f"env='{env}' is not a valid env name.")

outpath = os.path.join(info["_output_dir"], f"pkg-list.{env}.txt")
outpath = (info["_output_dir"] / f"pkg-list.{env}.txt").resolve()
with open(outpath, "w") as fo:
fo.write(f"# {info['name']} {info['version']}, env={env}\n")
fo.write("\n".join(dists))
return os.path.abspath(outpath)
return outpath


def dump_lockfile(info, env="base"):
Expand Down Expand Up @@ -123,10 +122,9 @@ def dump_lockfile(info, env="base"):
hash_value = record.get("md5")
lines.append(url + (f"#{hash_value}" if hash_value else ""))

outpath = os.path.join(info["_output_dir"], f"lockfile.{env}.txt")
with open(outpath, "w") as f:
f.write("\n".join(lines))
return os.path.abspath(outpath)
outpath = (info["_output_dir"] / f"lockfile.{env}.txt").resolve()
outpath.write_text("\n".join(lines))
return outpath


def dump_licenses(info, include_text=False, text_errors=None):
Expand Down Expand Up @@ -159,24 +157,24 @@ def dump_licenses(info, include_text=False, text_errors=None):
licenses = defaultdict(dict)
for pkg_record in info["_all_pkg_records"]:
extracted_package_dir = pkg_record.extracted_package_dir
licenses_dir = os.path.join(extracted_package_dir, "info", "licenses")
licenses_dir = Path(extracted_package_dir, "info", "licenses")
licenses[pkg_record.dist_str()]["type"] = pkg_record.license
licenses[pkg_record.dist_str()]["files"] = license_files = []
if not os.path.isdir(licenses_dir):
if not licenses_dir.is_dir():
continue

# FUTURE: pathlib.Path() has .walk() in Python 3.12+
for directory, _, files in os.walk(licenses_dir):
for filepath in files:
license_path = os.path.join(directory, filepath)
license_file = {"path": license_path, "text": None}
license_path = Path(directory, filepath)
license_file = {"path": str(license_path), "text": None}
if include_text:
license_file["text"] = Path(license_path).read_text(errors=text_errors)
license_file["text"] = license_path.read_text(errors=text_errors)
license_files.append(license_file)

outpath = os.path.join(info["_output_dir"], "licenses.json")
with open(outpath, "w") as f:
json.dump(licenses, f, indent=2, default=repr)
return os.path.abspath(outpath)
outpath = (info["_output_dir"] / "licenses.json").resolve()
outpath.write_text(json.dumps(licenses, indent=2, default=repr))
return outpath


OUTPUT_HANDLERS = {
Expand Down
8 changes: 4 additions & 4 deletions constructor/conda_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import sys
from copy import deepcopy
from itertools import chain
from os.path import join
from pathlib import Path

from conda.gateways.disk import mkdir_p_sudo_safe

Expand Down Expand Up @@ -163,7 +163,7 @@ def write_repodata(cache_dir, url, full_repodata, used_packages, info):
raise NotImplementedError("Package type is unknown for: %s" % package)
if original_package in full_repodata.get(original_key, {}):
data = deepcopy(full_repodata[original_key][original_package])
pkg_fn = join(info["_download_dir"], package)
pkg_fn = info["_download_dir"] / package
data["size"] = os.stat(pkg_fn).st_size
data["sha256"] = hash_files([pkg_fn], algorithm="sha256")
data["md5"] = hash_files([pkg_fn])
Expand All @@ -183,7 +183,7 @@ def write_repodata(cache_dir, url, full_repodata, used_packages, info):
}
)
repodata = repodata_header[:-1] + "," + repodata[1:]
repodata_filepath = join(cache_dir, _cache_fn_url(repodata_url))
repodata_filepath = cache_dir / _cache_fn_url(repodata_url)
with open(repodata_filepath, "w") as fh:
fh.write(repodata)

Expand All @@ -196,6 +196,6 @@ def write_repodata(cache_dir, url, full_repodata, used_packages, info):
# Maybe it's not needed anymore.

def write_cache_dir():
cache_dir = join(PackageCacheData.first_writable().pkgs_dir, "cache")
cache_dir = Path(PackageCacheData.first_writable().pkgs_dir, "cache")
mkdir_p_sudo_safe(cache_dir)
return cache_dir
10 changes: 4 additions & 6 deletions constructor/construct.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import re
import sys
from functools import partial
from os.path import dirname
from pathlib import Path

from jsonschema import Draft202012Validator, validators
Expand Down Expand Up @@ -108,16 +107,15 @@ def yamlize(data, directory, content_filter):
return yaml.load(data)


def parse(path, platform):
def parse(path: Path, platform):
try:
with open(path) as fi:
data = fi.read()
data = path.read_text()
except OSError:
sys.exit("Error: could not open '%s' for reading" % path)
directory = dirname(path)
directory = path.parent
content_filter = partial(select_lines, namespace=ns_platform(platform))
try:
res = yamlize(data, directory, content_filter)
res = yamlize(data, str(directory), content_filter)
except YamlParsingError as e:
sys.exit(e.error_msg())

Expand Down
57 changes: 30 additions & 27 deletions constructor/fcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
fcp (fetch conda packages) module
"""

from __future__ import annotations

import logging
import os
import shutil
import sys
import tempfile
from collections import defaultdict
from itertools import groupby
from os.path import abspath, expanduser, isdir, join
from pathlib import Path
from subprocess import check_call
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -133,10 +135,10 @@ def _show(name, version, platform, download_dir, precs, more_recent_versions={})
logger.debug(" %s", prec.fn)


def _fetch(download_dir, precs):
assert conda_context.pkgs_dirs[0] == download_dir
def _fetch(download_dir: Path, precs):
assert Path(conda_context.pkgs_dirs[0]) == download_dir
pc = PackageCacheData.first_writable()
assert pc.pkgs_dir == download_dir
assert Path(pc.pkgs_dir) == download_dir
assert pc.is_writable, f"{download_dir} does not exist or is not writable"

ProgressiveFetchExtract(precs).execute()
Expand All @@ -156,17 +158,17 @@ def check_duplicates_files(pc_recs, platform, duplicate_files="error"):

for pc_rec in pc_recs:
fn = pc_rec.fn
extracted_package_dir = pc_rec.extracted_package_dir
extracted_package_dir = Path(pc_rec.extracted_package_dir)

total_tarball_size += int(pc_rec.get("size", 0))

paths_data = read_paths_json(extracted_package_dir).paths
for path_data in paths_data:
short_path = path_data.path
try:
size = path_data.size_in_bytes or getsize(join(extracted_package_dir, short_path))
size = path_data.size_in_bytes or getsize(extracted_package_dir / short_path)
except AttributeError:
size = getsize(join(extracted_package_dir, short_path))
size = getsize(extracted_package_dir / short_path)
total_extracted_pkgs_size += size

map_members_scase[short_path].add(fn)
Expand Down Expand Up @@ -204,13 +206,14 @@ def check_duplicates_files(pc_recs, platform, duplicate_files="error"):
return total_tarball_size, total_extracted_pkgs_size


def _precs_from_environment(environment, input_dir):
if not isdir(environment) and ("/" in environment or "\\" in environment):
env2 = join(input_dir, environment)
if isdir(env2):
def _precs_from_environment(environment: Path, input_dir: Path):
environment = Path(environment)
if not environment.is_dir() and len(environment.parts) > 1:
env2 = input_dir / environment
if env2.is_dir():
environment = env2
if isdir(environment):
environment = abspath(join(input_dir, expanduser(environment)))
if environment.is_dir():
environment = (input_dir / environment.expanduser()).resolve()
else:
environment = locate_prefix_by_name(environment)
pdata = PrefixData(environment)
Expand Down Expand Up @@ -267,13 +270,13 @@ def _solve_precs(
sys.exit("CONDA_EXE env variable is empty. Need to activate a conda env.")
# make the environment, if needed
if environment_file:
environment = tempfile.mkdtemp()
environment = Path(tempfile.mkdtemp())
new_env = os.environ.copy()
new_env["CONDA_SUBDIR"] = platform
# use conda env for yaml, and standard conda create otherwise
subcommand = (
["env", "create"]
if environment_file.endswith((".yml", ".yaml"))
if environment_file.suffix in (".yml", ".yaml")
else ["create", "--yes"]
)
if channel_urls:
Expand Down Expand Up @@ -356,14 +359,14 @@ def _fetch_precs(precs, download_dir, transmute_file_type=""):
dist = filename_dist(dist)
new_file_name = "%s%s" % (dist[:-8], transmute_file_type)
new_dists.append(new_file_name)
new_file_name = join(download_dir, new_file_name)
if os.path.exists(new_file_name):
new_file_name = Path(download_dir, new_file_name)
if new_file_name.exists():
continue
logger.info("transmuting %s", dist)
conda_package_handling.api.transmute(
os.path.join(download_dir, dist),
str(download_dir / dist),
transmute_file_type,
out_folder=download_dir,
out_folder=str(download_dir),
)
else:
new_dists.append(dist)
Expand All @@ -375,7 +378,7 @@ def _fetch_precs(precs, download_dir, transmute_file_type=""):
def _main(
name,
version,
download_dir,
download_dir: Path,
platform,
channel_urls=(),
channels_remap=(),
Expand All @@ -384,14 +387,14 @@ def _main(
menu_packages=None,
ignore_duplicate_files=True,
environment=None,
environment_file=None,
environment_file: Path | None = None,
verbose=True,
dry_run=False,
conda_exe="conda.exe",
conda_exe: Path = Path("conda.exe"),
transmute_file_type="",
extra_envs=None,
check_path_spaces=True,
input_dir="",
input_dir: Path = Path.cwd(),
):
precs = _solve_precs(
name,
Expand Down Expand Up @@ -481,9 +484,9 @@ def _main(

def main(info, verbose=True, dry_run=False, conda_exe="conda.exe"):
name = info["name"]
input_dir = info["_input_dir"]
input_dir: Path = info["_input_dir"]
version = info["version"]
download_dir = info["_download_dir"]
download_dir: Path = info["_download_dir"]
platform = info["_platform"]
channel_urls = all_channel_urls(info.get("channels", ()), subdirs=[platform, "noarch"])
channels_remap = info.get("channels_remap", ())
Expand All @@ -492,7 +495,7 @@ def main(info, verbose=True, dry_run=False, conda_exe="conda.exe"):
menu_packages = info.get("menu_packages")
ignore_duplicate_files = info.get("ignore_duplicate_files", True)
environment = info.get("environment", None)
environment_file = info.get("environment_file", None)
environment_file: Path = info.get("environment_file", None)
transmute_file_type = info.get("transmute_file_type", "")
extra_envs = info.get("extra_envs", {})
check_path_spaces = info.get("check_path_spaces", True)
Expand All @@ -517,7 +520,7 @@ def main(info, verbose=True, dry_run=False, conda_exe="conda.exe"):
# Restoring the state for "proxy_servers" to what it was before
conda_context.proxy_servers = proxy_servers
assert conda_context.ssl_verify == _ssl_verify
assert conda_context.pkgs_dirs and conda_context.pkgs_dirs[0] == download_dir
assert conda_context.pkgs_dirs and Path(conda_context.pkgs_dirs[0]) == download_dir

(
pkg_records,
Expand Down
11 changes: 5 additions & 6 deletions constructor/imaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@

import sys
from io import BytesIO
from os.path import dirname, join
from pathlib import Path
from random import randint

from PIL import Image, ImageDraw, ImageFont

ttf_path = join(dirname(__file__), "ttf", "Vera.ttf")
with open(ttf_path, "rb") as f:
ttf_bytes = f.read()
ttf_path = Path(__file__).parent / "ttf" / "Vera.ttf"
ttf_bytes = ttf_path.read_bytes()
white = 0xFF, 0xFF, 0xFF
# These are for Windows
welcome_size = 164, 314
Expand Down Expand Up @@ -99,7 +98,7 @@ def add_color_info(info):
sys.exit("Error: color '%s' not defined" % color_name)


def write_images(info, dir_path, os="windows"):
def write_images(info, dir_path: Path, os="windows"):
if os == "windows":
instructions = [
("welcome", welcome_size, mk_welcome_image, ".bmp"),
Expand All @@ -122,7 +121,7 @@ def write_images(info, dir_path, os="windows"):
add_color_info(info)
im = function(info)
assert im.size == size
im.save(join(dir_path, name + ext))
im.save(dir_path / f"{name}{ext}")


if __name__ == "__main__":
Expand Down
Loading
Loading