Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions conan/api/subapi/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from conan.errors import ConanException
from conan.api.model import PkgReference
from conan.api.model import RecipeReference
from conan.internal.rest.pkg_sign import PkgSignaturesPlugin
from conan.internal.util.dates import revision_timestamp_now
from conan.internal.util.files import rmdir, mkdir, remove, save

Expand Down Expand Up @@ -76,6 +77,20 @@ def check_integrity(self, package_list):
checker = IntegrityChecker(cache)
checker.check(package_list)

def sign(self, package_list):
"""Sign packages with the signing plugin"""
cache = PkgCache(self._conan_api.cache_folder, self._api_helpers.global_conf)
pkg_signer = PkgSignaturesPlugin(cache, self._conan_api.home_folder)
results = pkg_signer.sign(package_list, context="cache")
return {"results": results, "context": "cache", "action": "sign"}

def verify(self, package_list):
"""Verify packages with the signing plugin"""
cache = PkgCache(self._conan_api.cache_folder, self._api_helpers.global_conf)
pkg_signer = PkgSignaturesPlugin(cache, self._conan_api.home_folder)
results = pkg_signer.verify_pkglist(package_list, context="cache")
return {"results": results, "context": "cache", "action": "verify"}

def clean(self, package_list, source=True, build=True, download=True, temp=True,
backup_sources=False):
"""
Expand Down
81 changes: 80 additions & 1 deletion conan/cli/commands/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from conan.api.conan_api import ConanAPI
from conan.api.model import ListPattern, MultiPackagesList
from conan.api.output import cli_out_write, ConanOutput
from conan.api.output import cli_out_write, ConanOutput, Color
from conan.cli import make_abs_path
from conan.cli.command import conan_command, conan_subcommand, OnceArgument
from conan.cli.commands.list import print_list_text, print_list_json
Expand All @@ -15,6 +15,25 @@ def json_export(data):
cli_out_write(json.dumps({"cache_path": data}))


def print_cache_sign_verify_text(data):
elements = data.get("results")
if elements:
title = "Verification" if data.get("action") == "verify" else "Signing"
cli_out_write(f"[Package signing plugin] {title} results:", fg=Color.BRIGHT_BLUE)
for ref, result in elements.items():
cli_out_write(f"- {ref}", fg=Color.BRIGHT_BLUE)
if result is None:
result = "Signed" if data.get("action") == "sign" else "Signature verified"
color = Color.BRIGHT_YELLOW if "warn" in result else Color.BRIGHT_WHITE
color = Color.BRIGHT_RED if "fail" in result else color
cli_out_write(f" {result}", fg=color)


def print_cache_sign_verify_json(data):
myjson = json.dumps(data, indent=4)
cli_out_write(myjson)


@conan_command(group="Consumer")
def cache(conan_api: ConanAPI, parser, *args):
"""
Expand Down Expand Up @@ -150,6 +169,66 @@ def cache_check_integrity(conan_api: ConanAPI, parser, subparser, *args):
ConanOutput().success("Integrity check: ok")


@conan_subcommand(formatters={"text": print_cache_sign_verify_text,
"json": print_cache_sign_verify_json})
def cache_sign(conan_api: ConanAPI, parser, subparser, *args):
"""
Sign packages with the Package Singing Plugin
"""
subparser.add_argument("pattern", nargs="?",
help="Selection pattern for references to check integrity for")
subparser.add_argument("-l", "--list", action=OnceArgument,
help="Package list of packages to check integrity for")
subparser.add_argument('-p', '--package-query', action=OnceArgument,
help="Only the packages matching a specific query, e.g., "
"os=Windows AND (arch=x86 OR compiler=gcc)")
args = parser.parse_args(*args)

if args.pattern is None and args.list is None:
raise ConanException("Missing pattern or package list file")
if args.pattern and args.list:
raise ConanException("Cannot specify both pattern and list")

if args.list:
listfile = make_abs_path(args.list)
multi_package_list = MultiPackagesList.load(listfile)
package_list = multi_package_list["Local Cache"]
else:
ref_pattern = ListPattern(args.pattern, rrev="*", package_id="*", prev="*")
package_list = conan_api.list.select(ref_pattern, package_query=args.package_query)
return conan_api.cache.sign(package_list)


@conan_subcommand(formatters={"text": print_cache_sign_verify_text,
"json": print_cache_sign_verify_json})
def cache_verify(conan_api: ConanAPI, parser, subparser, *args):
"""
Check the signature of packages with the Package Singing Plugin
"""
subparser.add_argument("pattern", nargs="?",
help="Selection pattern for references to check integrity for")
subparser.add_argument("-l", "--list", action=OnceArgument,
help="Package list of packages to check integrity for")
subparser.add_argument('-p', '--package-query', action=OnceArgument,
help="Only the packages matching a specific query, e.g., "
"os=Windows AND (arch=x86 OR compiler=gcc)")
args = parser.parse_args(*args)

if args.pattern is None and args.list is None:
raise ConanException("Missing pattern or package list file")
if args.pattern and args.list:
raise ConanException("Cannot specify both pattern and list")

if args.list:
listfile = make_abs_path(args.list)
multi_package_list = MultiPackagesList.load(listfile)
package_list = multi_package_list["Local Cache"]
else:
ref_pattern = ListPattern(args.pattern, rrev="*", package_id="*", prev="*")
package_list = conan_api.list.select(ref_pattern, package_query=args.package_query)
return conan_api.cache.verify(package_list)


@conan_subcommand(formatters={"text": print_list_text,
"json": print_list_json})
def cache_save(conan_api: ConanAPI, parser, subparser, *args):
Expand Down
178 changes: 155 additions & 23 deletions conan/internal/rest/pkg_sign.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,176 @@
import copy
import json
import os

from conan.api.output import ConanOutput
from conan.errors import ConanException
from conan.internal.cache.conan_reference_layout import METADATA
from conan.internal.cache.home_paths import HomePaths
from conan.internal.loader import load_python_file
from conan.internal.util.files import mkdir
from conan.internal.util.files import load, mkdir, save, sha256sum


class PkgSignaturesTools:

SIGN_SUMMARY_CONTENT = {
"provider": None,
"method": None,
"files": {}
}
SIGN_SUMMARY_FILENAME = "sign-summary.json"

def __init__(self, artifacts_folder, signature_folder):
self._artifacts_folder = artifacts_folder
self._signature_folder = signature_folder

def get_summary_file_path(self):
return os.path.join(self._signature_folder, self.SIGN_SUMMARY_FILENAME)

def is_pkg_signed(self):
try:
c = self.load_summary()
except FileNotFoundError:
return False
return bool(c.get("provider") and c.get("method"))

def create_summary_content(self):
"""
Creates the summary content as a dictionary for manipulation
@return: Dictionary with the summary content
"""
checksums = {}
for fname in os.listdir(self._artifacts_folder):
file_path = os.path.join(self._artifacts_folder, fname)
if os.path.isfile(file_path):
sha256 = sha256sum(file_path)
checksums[fname] = sha256
sorted_checksums = dict(sorted(checksums.items()))
content = copy.deepcopy(self.SIGN_SUMMARY_CONTENT)
content["files"] = sorted_checksums
return content

def load_summary(self):
""""
Loads the summary file from the signature folder
"""
return json.loads(load(self.get_summary_file_path()))

def save_summary(self, content):
"""
Saves the content of the summary to the signature folder using SIGN_SUMMARY_FILENAME as the
file name
@param content: Content of the summary file
@return:
"""
assert content.get("provider")
assert content.get("method")
save(self.get_summary_file_path(), json.dumps(content))


class PkgSignaturesPlugin:
def __init__(self, cache, home_folder):
self._cache = cache
signer = HomePaths(home_folder).sign_plugin_path
if os.path.isfile(signer):
mod, _ = load_python_file(signer)
# TODO: At the moment it requires both methods sign and verify, but that might be relaxed
self._plugin_sign_function = mod.sign
self._plugin_verify_function = mod.verify
else:
self._plugin_sign_function = self._plugin_verify_function = None
self.sign_plugin_path = HomePaths(home_folder).sign_plugin_path
self._plugin_sign_function = self._plugin_verify_function = None
if os.path.isfile(self.sign_plugin_path):
mod, _ = load_python_file(self.sign_plugin_path)
try:
self._plugin_sign_function = mod.sign
except AttributeError:
pass
try:
self._plugin_verify_function = mod.verify
except AttributeError:
pass

def sign(self, upload_data):
def sign(self, upload_data, context="upload"): # cache, upload,
results = {}
if self._plugin_sign_function is None:
return
ConanOutput().error(f"[Package signing plugin] sign() function not found in "
f"{self.sign_plugin_path}")
return results

def _sign(ref, files, folder):
def _sign(ref, files, folder, context="upload"):
output = ConanOutput(scope=f"{ref.repr_notime()}")
metadata_sign = os.path.join(folder, METADATA, "sign")
mkdir(metadata_sign)
self._plugin_sign_function(ref, artifacts_folder=folder, signature_folder=metadata_sign)
sign_tools = PkgSignaturesTools(folder, metadata_sign)
try:
result = self._plugin_sign_function(ref, artifacts_folder=folder,
signature_folder=metadata_sign, output=output,
sign_tools=sign_tools)
except (ConanException, AssertionError) as e:
result = _handle_failure(e, context, ref)
# Add files to the pkglist/bundle
for f in os.listdir(metadata_sign):
files[f"{METADATA}/sign/{f}"] = os.path.join(metadata_sign, f)
return {ref.repr_notime(): result}

for rref, recipe_bundle in upload_data.refs().items():
if recipe_bundle["upload"]:
_sign(rref, recipe_bundle["files"], self._cache.recipe_layout(rref).download_export())
for pref, pkg_bundle in upload_data.prefs(rref, recipe_bundle).items():
if pkg_bundle["upload"]:
_sign(pref, pkg_bundle["files"], self._cache.pkg_layout(pref).download_package())
if context == "upload":
for rref, recipe_bundle in upload_data.refs().items():
if recipe_bundle["upload"]:
result = _sign(rref, recipe_bundle["files"],
self._cache.recipe_layout(rref).download_export())
results.update(result)
for pref, pkg_bundle in upload_data.prefs(rref, recipe_bundle).items():
if pkg_bundle["upload"]:
result = _sign(pref, pkg_bundle["files"],
self._cache.pkg_layout(pref).download_package())
results.update(result)
else:
for rref, recipe_bundle in upload_data.refs().items():
if recipe_bundle:
result = _sign(rref, {}, self._cache.recipe_layout(rref).download_export(),
context)
results.update(result)
for pref, pkg_bundle in upload_data.prefs(rref, recipe_bundle).items():
if pkg_bundle:
result = _sign(pref, {}, self._cache.pkg_layout(pref).download_package(),
context)
results.update(result)
return results

def verify(self, ref, folder, files):
def verify(self, ref, folder, files, context="install"):
if self._plugin_verify_function is None:
return
ConanOutput().error(f"[Package signing plugin] verify() function not found in "
f"{self.sign_plugin_path}")
return {}
output = ConanOutput(scope=f"{ref.repr_notime()}")
metadata_sign = os.path.join(folder, METADATA, "sign")
self._plugin_verify_function(ref, artifacts_folder=folder, signature_folder=metadata_sign,
files=files)
sign_tools = PkgSignaturesTools(folder, metadata_sign)
try:
result = self._plugin_verify_function(ref, artifacts_folder=folder,
signature_folder=metadata_sign, files=files,
output=output, sign_tools=sign_tools)
except (ConanException, AssertionError) as e:
result = _handle_failure(e, context, ref)
return {ref.repr_notime(): result}

def verify_pkglist(self, pkg_list, context="cache"): # cache, install, upload
results = {}
if self._plugin_verify_function is None:
ConanOutput().error(f"[Package signing plugin] verify() function not found in "
f"{self.sign_plugin_path}")
return results

for rref, recipe_bundle in pkg_list.refs().items():
if recipe_bundle:
rref_folder = self._cache.recipe_layout(rref).download_export()
result = self.verify(rref, rref_folder, os.listdir(rref_folder), context)
results.update(result)
for pref, pkg_bundle in pkg_list.prefs(rref, recipe_bundle).items():
if pkg_bundle:
pref_folder = self._cache.pkg_layout(pref).download_package()
result = self.verify(pref, pref_folder, os.listdir(pref_folder), context)
results.update(result)
return results


def _handle_failure(exception, action, ref):
exception_msg = str(exception)
if action in ["upload", "install"]:
# TODO: Mark folder with set_dirty(artifacts_folder)
raise ConanException(f"{ref.repr_notime()}: {exception_msg}")
else:
error_msg = f"Failed: {exception_msg}" if exception_msg else "Failed"
return error_msg
Loading
Loading