diff --git a/.gitignore b/.gitignore index 30291da41..843bcef8d 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,5 @@ llvmlab/dist llvmlab/docs/_build llvmlab/llvmlab.egg-info # Visual Studio Code IDE configuration -.vscode \ No newline at end of file +.vscode +.python-version diff --git a/codesign/debugsign/dbsign/ansi.py b/codesign/debugsign/dbsign/ansi.py deleted file mode 100644 index 0ac4ecca2..000000000 --- a/codesign/debugsign/dbsign/ansi.py +++ /dev/null @@ -1,41 +0,0 @@ -# COPYRIGHT LINE: FIXME - -""" -dbsign.ansi -""" - -from __future__ import print_function - -import sys - - -def ANSI(color, msg): # type: (str, str) -> str - if sys.stdout.isatty(): - pre, post = _ANSI_CODES[color], _ANSI_CODES['clear'] - msg = "{}{}{}".format(pre, msg, post) - return msg - - -def OK(msg): # type: (str) -> str - return ANSI('green', msg) - - -def INFO(msg): # type: (str) -> str - return ANSI('blue', msg) - - -def WARN(msg): # type: (str) -> str - return ANSI('purple', msg) - - -def ERROR(msg): # type: (str) -> str - return ANSI('red', msg) - - -_ANSI_CODES = { - 'clear': '\033[0m', - 'blue': '\033[1;34m', - 'green': '\033[1;32m', - 'purple': '\033[1;35m', - 'red': '\033[1;31m', -} diff --git a/codesign/debugsign/dbsign/commands.py b/codesign/debugsign/dbsign/commands.py deleted file mode 100644 index 02f6de713..000000000 --- a/codesign/debugsign/dbsign/commands.py +++ /dev/null @@ -1,422 +0,0 @@ -# COPYRIGHT LINE: FIXME - -""" -dbsign.commands -""" - -from __future__ import print_function - -import os -import sys - -from dbsign.ansi import ERROR, INFO, OK, WARN -import dbsign.logger as L -import dbsign.security as S -import dbsign.shell as sh - - -# -# Globals and configurables -# - -OVERVIEW_TEXT = '''\ - -{1} -OVERVIEW: - - To configure code signing on a new system, do the following (in order): - - {0} setup - {0} import P12_FILE # MUST BE DONE FROM GUI CONSOLE! - - To verify the configuration: - - {0} check # Not foolproof, but catches most issues - - To enable access to the identity for code signing (eg, from Jenkins job): - - {0} prep - - To replace the configured identity with a new one: - - {0} remove # Removes the whole keychain! - {0} import NEW_P12 # Must be done from GUI console! - - Note that this script currently assumes the following: - - * The identity's common name will be "lldb_codesign" - * The keychain will be named "lldb_codesign" - * The keychain will be locked using the password "lldb_codesign" - * The P12 archive will be encrypted with the password "lldb_codesign" - - This is intended to make it trivial to codesign utilities using the - imported certificate, without exposing any local account information - (eg, user's login keychain password). Please take these factors into - account when evaluating security. -''' - -log = L.get_logger(__name__) - -CFG = { - 'debug': False, - 'executable': os.path.basename(sys.argv[0]), - 'identity': 'lldb_codesign', - 'id_file': None, # from command line argument - 'keynick': 'lldb', - 'keydb': None, - 'keypass': 'lldb_codesign', - 'privileges': ['system.privilege.taskport'], -} - - -# -# Top-Level Commands -# - -def cmd_check(): # type: () -> int - keydb = CFG['keydb'] - keypass = CFG['keypass'] - identity = CFG['identity'] - exe = CFG['executable'] - - for priv in CFG['privileges']: - print('Verifying privilege {} ... '.format(priv), end='') - res_priv = S.verify_privilege(priv) - if res_priv: - print(OK('OK')) - else: - print(WARN('NOT SET')) - log.debug(res_priv.value) - print(WARN('WARNING'), 'Privileges have not been set.') - print(INFO('To set, run: {} --unsafe setup'.format(exe))) - - print("Unlocking keychain ... ", end='') - res_unlock = S.unlock_keychain(keydb, keypass) - if res_unlock: - print(OK('OK')) - else: - print(ERROR('FAILED')) - log.debug(res_unlock.value) - print(WARN('WARNING'), 'Keychain not configured.') - print(INFO('Please run: {} setup'.format(exe))) - return 1 - - print("Verifying keychain ... ", end='') - res_find = S.keychain_exists(keydb) - if res_find: - print(OK('OK')) - else: - print(ERROR('FAILED')) - log.debug(res_find) - print(INFO(res_find.value)) - return 2 - - print("Searching for identity in keychain ... ", end='') - res_find = S.identity_installed(identity, keydb) - if res_find: - print(OK('OK')) - else: - print(ERROR('FAILED')) - log.debug(res_find) - print(WARN('WARNING'), res_find.value) - print(INFO("Please run: {} import".format(exe))) - return 3 - - print('Verifying identity ... ', end='') - res_id = S.verify_identity(identity, keydb) - if res_id: - print(OK('OK')) - else: - print(ERROR('FAILED')) - log.debug(res_id.value) - print(WARN('WARNING'), "Unable to verify identity") - print(INFO('Please run: {} import'.format(exe))) - return 4 - - return 0 - - -def cmd_clean(): # type: () -> int - identity = CFG['identity'] - keydb = CFG['keydb'] - keypass = CFG['keypass'] - - print("Unlocking keychain ... ", end='') - res_unlock = S.unlock_keychain(keydb, keypass) - if res_unlock: - print(OK('OK')) - else: - print(WARN('FAILED')) - log.debug(res_unlock.value) - print(INFO('Failed to unlock keychain.')) - - print('Removing identity and trust settings ... ', end='') - res_id = S.delete_identity(identity, keydb) - if res_id: - print(OK('OK')) - else: - print(WARN('Failed to remove identity')) - log.debug(res_id.value) - - print('Backing up and removing keychain ... ', end='') - res_key = S.delete_keychain(keydb, backup=True) - if res_key: - print(OK('OK')) - else: - print(WARN('Failed to remove keychain')) - log.debug(res_id.value) - - return 0 - - -def cmd_help(parser): # type: (argparse.ArgumentParser) -> int - print(OVERVIEW_TEXT.format( - CFG['executable'], - parser.format_help())) - return 0 - - -def cmd_import(): # type: () -> int - exe = CFG['executable'] - identity = CFG['identity'] - keydb = CFG['keydb'] - keypass = CFG['keypass'] - id_file = CFG['id_file'] - id_pass = identity - - _auth_sudo() - - if 'SSH_CONNECTION' in os.environ or 'TERM_SESSION_ID' not in os.environ: - print(WARN('WARNING!'), "Remote console session detected!", - "This procedure must be performed from the system console.") - - print('Verifying privileges ... ', end='') - res_verify_privs = S.verify_privileges(CFG['privileges']) - if res_verify_privs: - print(OK('OK')) - else: - print(WARN("WARNING")) - log.debug(res_verify_privs) - print(WARN("Privileges have not been set. Trust may fail.")) - print(INFO("To set privileges, run: {} --unsafe setup".format(exe))) - - print("Unlocking keychain ... ", end='') - res_unlock = S.unlock_keychain(keydb, keypass) - if res_unlock: - print(OK('OK')) - else: - print(ERROR('FAILED')) - log.debug(res_unlock) - print(INFO("Failed to unlock keychain."), - "Run: {} check".format(exe)) - return 1 - - print("Importing new identity {} ... ".format(identity), end='') - res_import = S.import_identity(keydb, keypass, identity, id_file, id_pass) - if res_import: - print(OK('OK')) - log.debug(res_import.value) - else: - print(ERROR('FAILED')) - log.debug(res_import) - print(ERROR('ERROR'), res_import.value) - if 'exists' in res_import.value: - print(WARN("To remove existing identity:"), - "{} remove".format(exe)) - return 2 - - print(WARN("This will test codesigning with the configured identity")) - print(WARN("Please authenticate (if requested) and click 'Always Allow'")) - - print("Trusting identity ... ", end='') - res_trust = S.trust_identity(identity, keydb) - if res_trust: - print(OK('OK')) - else: - print(ERROR('FAILED')) - log.debug(res_trust) - print(INFO("Trust unsuccessful:"), res_trust.value) - if 'unknown error' in res_trust.value: - print(WARN("Please ensure this step is performed" - " from the system console!")) - - print("Rolling back imported identity ... ", end='') - res_remove = S.delete_identity(identity, keydb) - if res_remove: - print(OK('OK')) - else: - print(ERROR('FAILED')) - log.debug(res_remove) - print(res_trust.value) - return 4 - return 3 - - return 0 - - -def cmd_lint(): # type: () -> int - print(OK('Running linters... ')) - lint_problems = _run_linter() - if lint_problems: - print(WARN('Lint:'), len(lint_problems)) - map(log.warn, lint_problems) - - return len(lint_problems) - - -def cmd_prep(): # type: () -> int - """Deliberately terse method for use in CI""" - keydb = CFG['keydb'] - keypass = CFG['keypass'] - - res_unlock = S.unlock_keychain(keydb, keypass) - if not res_unlock: - log.debug(res_unlock.value) - print(ERROR('ERROR'), 'Unable to access signing identity') - return 1 - - return 0 - - -def cmd_remove(): # type: () -> int - keydb = CFG['keydb'] - keypass = CFG['keypass'] - identity = CFG['identity'] - - print("Unlocking keychain ... ", end='') - res_unlock = S.unlock_keychain(keydb, keypass) - if res_unlock: - print(OK('OK')) - else: - log.debug(res_unlock.value) - print(ERROR('ERROR'), 'Failed to unlock keychain') - - print("Removing identity from keychain ... ", end='') - res_rm_id = S.delete_identity(identity, keydb) - if res_rm_id: - print(OK('OK')) - else: - print(WARN('FAILED')) - log.debug(res_rm_id) - print(WARN('WARNING'), "Failed to delete identity from keychain.") - print(INFO(res_rm_id.value)) - - return 0 - - -def cmd_setup(): # type: () -> int - keydb = CFG['keydb'] - keypass = CFG['keypass'] - exe = CFG['executable'] - - print("Configuring keychain ... ", end='') - res_create = S.create_keychain(keydb, keypass) - if res_create: - print(OK('OK')) - else: - print(ERROR('FAILED')) - log.debug(res_create) - print(INFO('Keychain creation failed')) - return 1 - - print("Unlocking keychain ... ", end='') - res_unlock = S.unlock_keychain(keydb, keypass) - if res_unlock: - print(OK('OK')) - else: - print(ERROR('FAILED')) - log.debug(res_unlock) - if 'keychain could not be found' in res_unlock.value: - print(INFO("Keychain creation failed")) - else: - print(INFO("Failed to unlock keychain")) - print(INFO(res_unlock.value)) - return 2 - - print("Adding keychain to search list ... ", end='') - res_searchable = S.add_to_search_list(keydb) - if res_searchable: - print(OK('OK')) - else: - print(ERROR('FAILED')) - log.debug(res_searchable) - print(INFO("Failed to add keychain to search list")) - print(WARN("codesign will not be able to find the signing identity.")) - return 3 - - privs = CFG['privileges'] - print("Checking privileges ... ", end='') - if S.verify_privileges(privs): - print(OK('OK')) - else: - print(INFO('NOT SET')) - - _auth_sudo() - if not os.getenv(S.UNSAFE_FLAG, False): - print(INFO('NOTE'), 'Altering privileges may not be safe.') - print(INFO('NOTE'), 'Re-run with the --unsafe flag to enable.') - else: - priv_value = 'allow' - for priv in CFG['privileges']: - print('Setting privilege {} ... '.format(priv), end='') - res_priv = S.authdb_privilege_write(priv, priv_value) - if res_priv: - print(OK('OK')) - else: - print(INFO('not set')) - log.debug(res_priv.value) - print(INFO('Privileges have not been set.')) - print(INFO('Please re-run: {} setup'.format(exe))) - return 4 - - return 0 - - -def cmd_test(): # type: () -> int - _auth_sudo() - - print(OK('Running unittests... ')) - test_problems = _run_unittests() - if test_problems: - print(ERROR('Failures:'), len(test_problems)) - map(log.debug, test_problems) - - return len(test_problems) - - -def _auth_sudo(): # type: () -> Result - cmd_sudo_check = sh.sudo_run(['-n']) - if not cmd_sudo_check: - print(WARN("If prompted, authenticate with sudo ... ")) - cmd_auth = sh.sudo_run(['ls']) - if not cmd_auth: - print(WARN("WARNING"), "sudo authentication failed") - return cmd_auth - else: - return cmd_sudo_check - - -def _run_linter(): # type: () -> list(str) - report_file = 'flake8_report.pep8.txt' - fmt = 'lint: %(path)s:%(row)d:%(col)d: %(code)s %(text)s' - lint_paths = ['./debugsign', './dbsign/', './unittests/'] - - cmd_flake = sh.run(['flake8', '--tee', report_file, - '--format={}'.format(fmt)] + lint_paths) - return cmd_flake.stdout.splitlines() - - -def _run_unittests(): # type: () -> list(str) - try: - import unittest2 as unittest - except ImportError: - import unittest - - tests = unittest.TestLoader().discover('unittests') - test_result = unittest.TextTestRunner( - stream=sys.stdout, - verbosity=2, - ).run(tests) - - problems = test_result.errors + test_result.failures - return [str(problem[0]) for problem in problems] diff --git a/codesign/debugsign/dbsign/result.py b/codesign/debugsign/dbsign/result.py deleted file mode 100644 index 285568316..000000000 --- a/codesign/debugsign/dbsign/result.py +++ /dev/null @@ -1,59 +0,0 @@ -# COPYRIGHT LINE: FIXME - -""" -dbsign.result - -result classes for debugsign -""" - -from __future__ import print_function - -import dbsign.logger as logger - - -log = logger.get_logger(__name__) - - -# -# Result class -# - -class Result(object): - def __init__(self, value): # type: () -> () - self._checked = False - self._value = value - - def __del__(self): # type: () -> () - assert self._checked - - def __nonzero__(self): # type: () -> bool - raise NotImplementedError("{} does not support boolean evaluation". - format(self.__class__.__name__)) - - def __repr__(self): # type: () -> str - return "{0.__class__.__name__}({0._value!r})".format(self) - - @property - def checked(self): # type: () -> bool - return self._checked - - @property - def value(self): # type: () -> str - self._checked = True - return self._value - - def renew(self): # type: () -> Result - self._checked = False - return self - - -class Failure(Result): - def __nonzero__(self): # type: () -> bool - self._checked = True - return False - - -class Success(Result): - def __nonzero__(self): # type: () -> bool - self._checked = True - return True diff --git a/codesign/debugsign/debugsign b/codesign/debugsign/debugsign deleted file mode 100755 index e3a5a4ab7..000000000 --- a/codesign/debugsign/debugsign +++ /dev/null @@ -1,97 +0,0 @@ -#!/usr/bin/env python -tt - -import argparse -import os -import sys - -import dbsign.commands as C -import dbsign.logger as L -import dbsign.security as S - -from dbsign.commands import CFG - - -log = L.get_logger(CFG['executable']) - - -def make_parser(): # type: () -> argparse.ArgumentParser - parser = argparse.ArgumentParser(prog=CFG['executable'], add_help=False) - parser.set_defaults(func=lambda: C.cmd_help(parser)) - parser.add_argument('--debug', '-d', '--verbose', '-v', - action='count', default=0, - help='be more verbose') - parser.add_argument('--quiet', '-q', - action='count', default=0, - help='be quieter') - parser.add_argument('--unsafe', action='store_true', - help='permit unsafe operations') - subparsers = parser.add_subparsers(help='Subcommands') - - parser_help = subparsers.add_parser('help', help='Display usage info') - parser_help.set_defaults(func=lambda: C.cmd_help(parser)) - - parser_check = subparsers.add_parser( - 'check', help='Validate current system settings') - parser_check.set_defaults(func=C.cmd_check) - - parser_clean = subparsers.add_parser( - 'clean', help='Remove identity and keychain') - parser_clean.set_defaults(func=C.cmd_clean) - - parser_import = subparsers.add_parser( - 'import', help='Import and trust new identity from .P12 file') - parser_import.set_defaults(func=C.cmd_import) - parser_import.add_argument('id_file', metavar='P12_FILE', - help='Identity in .p12 format') - - parser_setup = subparsers.add_parser( - 'setup', help='Initialize keychain and privilege settings') - parser_setup.set_defaults(func=C.cmd_setup) - - parser_unlock = subparsers.add_parser( - 'prep', help='Unlock keychain and verify identity is valid') - parser_unlock.set_defaults(func=C.cmd_prep) - - parser_remove = subparsers.add_parser( - 'remove', help='Remove the installed identity') - parser_remove.set_defaults(func=C.cmd_remove) - - parser_lint = subparsers.add_parser( - 'lint', help='Lint the program source code') - parser_lint.set_defaults(func=C.cmd_lint) - - parser_test = subparsers.add_parser( - 'test', help='Test the program') - parser_test.set_defaults(func=C.cmd_test) - - return parser - - -def main(main_args): # type: (list[str]) -> int - parser = make_parser() - args = parser.parse_args(main_args) - - if args.unsafe: - os.environ[S.UNSAFE_FLAG] = 'YES' - - # Logging setup - level = L.BASE_LOGLEVEL + (args.quiet - args.debug) * 10 - L.set_level(L.normalize(level)) - - CFG['debug'] = (args.debug > 0) - CFG['keydb'] = '{}/Library/Keychains/{}.{}'.format( - os.environ['HOME'], CFG['keynick'], S.derive_keychain_extension()) - if hasattr(args, 'id_file'): - CFG['id_file'] = args.id_file - - log.debug('Command line parameters: %s', sys.argv) - log.debug('Arguments to main: %s', main_args) - log.debug('Parsed args: %s', args) - log.debug('Configuration: %s', CFG) - - return args.func() - - -if __name__ == '__main__': - return_code = main(sys.argv[1:]) - sys.exit(return_code) diff --git a/codesign/debugsign/dbsign/__init__.py b/codesign/debugsign/debugsign/__init__.py similarity index 100% rename from codesign/debugsign/dbsign/__init__.py rename to codesign/debugsign/debugsign/__init__.py diff --git a/codesign/debugsign/debugsign/__main__.py b/codesign/debugsign/debugsign/__main__.py new file mode 100644 index 000000000..249bb7e57 --- /dev/null +++ b/codesign/debugsign/debugsign/__main__.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 + +import os +import sys + +from argparse import ArgumentParser + +import commands +import logger +import security + +from commands import CFG + + +log = logger.get_logger(CFG["executable"]) + + +def make_parser() -> ArgumentParser: + parser = ArgumentParser(prog=CFG["executable"], add_help=False) + parser.set_defaults(func=lambda: commands.cmd_help(parser)) + parser.add_argument( + "--debug", + "-d", + "--verbose", + "-v", + action="count", + default=0, + help="be more verbose", + ) + parser.add_argument("--quiet", "-q", action="count", default=0, help="be quieter") + parser.add_argument( + "--unsafe", action="store_true", help="permit unsafe operations" + ) + subparsers = parser.add_subparsers(help="Subcommands") + + parser_help = subparsers.add_parser("help", help="Display usage info") + parser_help.set_defaults(func=lambda: commands.cmd_help(parser)) + + parser_check = subparsers.add_parser( + "check", help="Validate current system settings" + ) + parser_check.set_defaults(func=commands.cmd_check) + + parser_clean = subparsers.add_parser("clean", help="Remove identity and keychain") + parser_clean.set_defaults(func=commands.cmd_clean) + + parser_import = subparsers.add_parser( + "import", help="Import and trust new identity from .P12 file" + ) + parser_import.set_defaults(func=commands.cmd_import) + parser_import.add_argument( + "id_file", metavar="P12_FILE", help="Identity in .p12 format" + ) + + parser_setup = subparsers.add_parser( + "setup", help="Initialize keychain and privilege settings" + ) + parser_setup.set_defaults(func=commands.cmd_setup) + + parser_unlock = subparsers.add_parser( + "prep", help="Unlock keychain and verify identity is valid" + ) + parser_unlock.set_defaults(func=commands.cmd_prep) + + parser_remove = subparsers.add_parser( + "remove", help="Remove the installed identity" + ) + parser_remove.set_defaults(func=commands.cmd_remove) + + parser_lint = subparsers.add_parser("lint", help="Lint the program source code") + parser_lint.set_defaults(func=commands.cmd_lint) + + parser_test = subparsers.add_parser("test", help="Test the program") + parser_test.set_defaults(func=commands.cmd_test) + + return parser + + +def main(main_args: list[str]) -> int: + parser = make_parser() + args = parser.parse_args(main_args) + + if args.unsafe: + os.environ[security.UNSAFE_FLAG] = "YES" + + # Logging setup + level = logger.BASE_LOGLEVEL + (args.quiet - args.debug) * 10 + logger.set_level(logger.normalize(level)) + + CFG["debug"] = args.debug > 0 + CFG["keydb"] = "{}/Library/Keychains/{}.{}".format( + os.environ["HOME"], CFG["keynick"], security.derive_keychain_extension() + ) + if hasattr(args, "id_file"): + CFG["id_file"] = args.id_file + + log.debug("Command line parameters: %s", sys.argv) + log.debug("Arguments to main: %s", main_args) + log.debug("Parsed args: %s", args) + log.debug("Configuration: %s", CFG) + + return args.func() + + +if __name__ == "__main__": + return_code: int = main(sys.argv[1:]) + sys.exit(return_code) diff --git a/codesign/debugsign/debugsign/ansi.py b/codesign/debugsign/debugsign/ansi.py new file mode 100644 index 000000000..1972e61bb --- /dev/null +++ b/codesign/debugsign/debugsign/ansi.py @@ -0,0 +1,41 @@ +# COPYRIGHT LINE: FIXME + +""" +dbsign.ansi +""" + +from __future__ import print_function + +import sys + + +def ANSI(color: str, msg: str) -> str: + if sys.stdout.isatty(): + pre, post = _ANSI_CODES[color], _ANSI_CODES["clear"] + msg = "{}{}{}".format(pre, msg, post) + return msg + + +def OK(msg: str) -> str: + return ANSI("green", msg) + + +def INFO(msg: str) -> str: + return ANSI("blue", msg) + + +def WARN(msg: str) -> str: + return ANSI("purple", msg) + + +def ERROR(msg: str) -> str: + return ANSI("red", msg) + + +_ANSI_CODES = { + "clear": "\033[0m", + "blue": "\033[1;34m", + "green": "\033[1;32m", + "purple": "\033[1;35m", + "red": "\033[1;31m", +} diff --git a/codesign/debugsign/debugsign/commands.py b/codesign/debugsign/debugsign/commands.py new file mode 100644 index 000000000..a9eb43345 --- /dev/null +++ b/codesign/debugsign/debugsign/commands.py @@ -0,0 +1,429 @@ +# COPYRIGHT LINE: FIXME + +""" +dbsign.commands +""" + +from __future__ import print_function + +import os +import sys + +from argparse import ArgumentParser +from logging import Logger + +from result import Result + +import logger +import security +import shell + +from ansi import ERROR, INFO, OK, WARN + +# +# Globals and configurables +# + +OVERVIEW_TEXT: str = """\ + +{1} +OVERVIEW: + + To configure code signing on a new system, do the following (in order): + + {0} setup + {0} import P12_FILE # MUST BE DONE FROM GUI CONSOLE! + + To verify the configuration: + + {0} check # Not foolproof, but catches most issues + + To enable access to the identity for code signing (eg, from Jenkins job): + + {0} prep + + To replace the configured identity with a new one: + + {0} remove # Removes the whole keychain! + {0} import NEW_P12 # Must be done from GUI console! + + Note that this script currently assumes the following: + + * The identity's common name will be "lldb_codesign" + * The keychain will be named "lldb_codesign" + * The keychain will be locked using the password "lldb_codesign" + * The P12 archive will be encrypted with the password "lldb_codesign" + + This is intended to make it trivial to codesign utilities using the + imported certificate, without exposing any local account information + (eg, user's login keychain password). Please take these factors into + account when evaluating security. +""" + +log: Logger = logger.get_logger(__name__) + +CFG = { + "debug": False, + "executable": os.path.basename(sys.argv[0]), + "identity": "lldb_codesign", + "id_file": None, # from command line argument + "keynick": "lldb", + "keydb": None, + "keypass": "lldb_codesign", + "privileges": ["system.privilege.taskport"], +} + + +# +# Top-Level Commands +# + + +def cmd_check() -> int: + keydb = CFG["keydb"] + keypass = CFG["keypass"] + identity = CFG["identity"] + exe = CFG["executable"] + + for priv in CFG["privileges"]: + print("Verifying privilege {} ... ".format(priv), end="") + res_priv = security.verify_privilege(priv) + if res_priv: + print(OK("OK")) + else: + print(WARN("NOT SET")) + log.debug(res_priv.value) + print(WARN("WARNING"), "Privileges have not been set.") + print(INFO("To set, run: {} --unsafe setup".format(exe))) + + print("Unlocking keychain ... ", end="") + res_unlock = security.unlock_keychain(keydb, keypass) + if res_unlock: + print(OK("OK")) + else: + print(ERROR("FAILED")) + log.debug(res_unlock.value) + print(WARN("WARNING"), "Keychain not configured.") + print(INFO("Please run: {} setup".format(exe))) + return 1 + + print("Verifying keychain ... ", end="") + res_find = security.keychain_exists(keydb) + if res_find: + print(OK("OK")) + else: + print(ERROR("FAILED")) + log.debug(res_find) + print(INFO(res_find.value)) + return 2 + + print("Searching for identity in keychain ... ", end="") + res_find = security.identity_installed(identity, keydb) + if res_find: + print(OK("OK")) + else: + print(ERROR("FAILED")) + log.debug(res_find) + print(WARN("WARNING"), res_find.value) + print(INFO("Please run: {} import".format(exe))) + return 3 + + print("Verifying identity ... ", end="") + res_id = security.verify_identity(identity, keydb) + if res_id: + print(OK("OK")) + else: + print(ERROR("FAILED")) + log.debug(res_id.value) + print(WARN("WARNING"), "Unable to verify identity") + print(INFO("Please run: {} import".format(exe))) + return 4 + + return 0 + + +def cmd_clean() -> int: + identity = CFG["identity"] + keydb = CFG["keydb"] + keypass = CFG["keypass"] + + print("Unlocking keychain ... ", end="") + res_unlock = security.unlock_keychain(keydb, keypass) + if res_unlock: + print(OK("OK")) + else: + print(WARN("FAILED")) + log.debug(res_unlock.value) + print(INFO("Failed to unlock keychain.")) + + print("Removing identity and trust settings ... ", end="") + res_id = security.delete_identity(identity, keydb) + if res_id: + print(OK("OK")) + else: + print(WARN("Failed to remove identity")) + log.debug(res_id.value) + + print("Backing up and removing keychain ... ", end="") + res_key = security.delete_keychain(keydb, backup=True) + if res_key: + print(OK("OK")) + else: + print(WARN("Failed to remove keychain")) + log.debug(res_id.value) + + return 0 + + +def cmd_help(parser: ArgumentParser) -> int: + print(OVERVIEW_TEXT.format(CFG["executable"], parser.format_help())) + return 0 + + +def cmd_import() -> int: + exe = CFG["executable"] + identity = CFG["identity"] + keydb = CFG["keydb"] + keypass = CFG["keypass"] + id_file = CFG["id_file"] + id_pass = identity + + _auth_sudo() + + if "SSH_CONNECTION" in os.environ or "TERM_SESSION_ID" not in os.environ: + print( + WARN("WARNING!"), + "Remote console session detected!", + "This procedure must be performed from the system console.", + ) + + print("Verifying privileges ... ", end="") + res_verify_privs = security.verify_privileges(CFG["privileges"]) + if res_verify_privs: + print(OK("OK")) + else: + print(WARN("WARNING")) + log.debug(res_verify_privs) + print(WARN("Privileges have not been set. Trust may fail.")) + print(INFO("To set privileges, run: {} --unsafe setup".format(exe))) + + print("Unlocking keychain ... ", end="") + res_unlock = security.unlock_keychain(keydb, keypass) + if res_unlock: + print(OK("OK")) + else: + print(ERROR("FAILED")) + log.debug(res_unlock) + print(INFO("Failed to unlock keychain."), "Run: {} check".format(exe)) + return 1 + + print("Importing new identity {} ... ".format(identity), end="") + res_import = security.import_identity(keydb, keypass, identity, id_file, id_pass) + if res_import: + print(OK("OK")) + log.debug(res_import.value) + else: + print(ERROR("FAILED")) + log.debug(res_import) + print(ERROR("ERROR"), res_import.value) + if "exists" in res_import.value: + print(WARN("To remove existing identity:"), "{} remove".format(exe)) + return 2 + + print(WARN("This will test codesigning with the configured identity")) + print(WARN("Please authenticate (if requested) and click 'Always Allow'")) + + print("Trusting identity ... ", end="") + res_trust = security.trust_identity(identity, keydb) + if res_trust: + print(OK("OK")) + else: + print(ERROR("FAILED")) + log.debug(res_trust) + print(INFO("Trust unsuccessful:"), res_trust.value) + if "unknown error" in res_trust.value: + print( + WARN("Please ensure this step is performed" " from the system console!") + ) + + print("Rolling back imported identity ... ", end="") + res_remove = security.delete_identity(identity, keydb) + if res_remove: + print(OK("OK")) + else: + print(ERROR("FAILED")) + log.debug(res_remove) + print(res_trust.value) + return 4 + return 3 + + return 0 + + +def cmd_lint() -> int: + print(OK("Running linters... ")) + lint_problems = _run_linter() + if lint_problems: + print(WARN("Lint:"), len(lint_problems)) + map(log.warn, lint_problems) + + return len(lint_problems) + + +def cmd_prep() -> int: + """Deliberately terse method for use in CI""" + keydb = CFG["keydb"] + keypass = CFG["keypass"] + + res_unlock = security.unlock_keychain(keydb, keypass) + if not res_unlock: + log.debug(res_unlock.value) + print(ERROR("ERROR"), "Unable to access signing identity") + return 1 + + return 0 + + +def cmd_remove() -> int: + keydb = CFG["keydb"] + keypass = CFG["keypass"] + identity = CFG["identity"] + + print("Unlocking keychain ... ", end="") + res_unlock = security.unlock_keychain(keydb, keypass) + if res_unlock: + print(OK("OK")) + else: + log.debug(res_unlock.value) + print(ERROR("ERROR"), "Failed to unlock keychain") + + print("Removing identity from keychain ... ", end="") + res_rm_id = security.delete_identity(identity, keydb) + if res_rm_id: + print(OK("OK")) + else: + print(WARN("FAILED")) + log.debug(res_rm_id) + print(WARN("WARNING"), "Failed to delete identity from keychain.") + print(INFO(res_rm_id.value)) + + return 0 + + +def cmd_setup() -> int: + keydb = CFG["keydb"] + keypass = CFG["keypass"] + exe = CFG["executable"] + + print("Configuring keychain ... ", end="") + res_create = security.create_keychain(keydb, keypass) + if res_create: + print(OK("OK")) + else: + print(ERROR("FAILED")) + log.debug(res_create) + print(INFO("Keychain creation failed")) + return 1 + + print("Unlocking keychain ... ", end="") + res_unlock = security.unlock_keychain(keydb, keypass) + if res_unlock: + print(OK("OK")) + else: + print(ERROR("FAILED")) + log.debug(res_unlock) + if "keychain could not be found" in res_unlock.value: + print(INFO("Keychain creation failed")) + else: + print(INFO("Failed to unlock keychain")) + print(INFO(res_unlock.value)) + return 2 + + print("Adding keychain to search list ... ", end="") + res_searchable = security.add_to_search_list(keydb) + if res_searchable: + print(OK("OK")) + else: + print(ERROR("FAILED")) + log.debug(res_searchable) + print(INFO("Failed to add keychain to search list")) + print(WARN("codesign will not be able to find the signing identity.")) + return 3 + + privs = CFG["privileges"] + print("Checking privileges ... ", end="") + if security.verify_privileges(privs): + print(OK("OK")) + else: + print(INFO("NOT SET")) + + _auth_sudo() + if not os.getenv(security.UNSAFE_FLAG, False): + print(INFO("NOTE"), "Altering privileges may not be safe.") + print(INFO("NOTE"), "Re-run with the --unsafe flag to enable.") + else: + priv_value = "allow" + for priv in CFG["privileges"]: + print("Setting privilege {} ... ".format(priv), end="") + res_priv = security.authdb_privilege_write(priv, priv_value) + if res_priv: + print(OK("OK")) + else: + print(INFO("not set")) + log.debug(res_priv.value) + print(INFO("Privileges have not been set.")) + print(INFO("Please re-run: {} setup".format(exe))) + return 4 + + return 0 + + +def cmd_test() -> int: + _auth_sudo() + + print(OK("Running unittests... ")) + test_problems = _run_unittests() + if test_problems: + print(ERROR("Failures:"), len(test_problems)) + map(log.debug, test_problems) + + return len(test_problems) + + +def _auth_sudo() -> Result: + cmd_sudo_check = shell.sudo_run(["-n"]) + if not cmd_sudo_check: + print(WARN("If prompted, authenticate with sudo ... ")) + cmd_auth = shell.sudo_run(["ls"]) + if not cmd_auth: + print(WARN("WARNING"), "sudo authentication failed") + return cmd_auth + else: + return cmd_sudo_check + + +def _run_linter() -> list[str]: + report_file = "flake8_report.pep8.txt" + fmt = "lint: %(path)s:%(row)d:%(col)d: %(code)s %(text)s" + lint_paths = ["./debugsign", "./dbsign/", "./unittests/"] + + cmd_flake = shell.run( + ["flake8", "--tee", report_file, "--format={}".format(fmt)] + lint_paths + ) + return cmd_flake.stdout.splitlines() + + +def _run_unittests() -> list[str]: + try: + import unittest2 as unittest + except ImportError: + import unittest + + tests = unittest.TestLoader().discover("unittests") + test_result = unittest.TextTestRunner( + stream=sys.stdout, + verbosity=2, + ).run(tests) + + problems = test_result.errors + test_result.failures + return [str(problem[0]) for problem in problems] diff --git a/codesign/debugsign/dbsign/logger.py b/codesign/debugsign/debugsign/logger.py similarity index 100% rename from codesign/debugsign/dbsign/logger.py rename to codesign/debugsign/debugsign/logger.py diff --git a/codesign/debugsign/debugsign/result.py b/codesign/debugsign/debugsign/result.py new file mode 100644 index 000000000..f6dec3c2d --- /dev/null +++ b/codesign/debugsign/debugsign/result.py @@ -0,0 +1,667 @@ +from __future__ import annotations + +import functools +import inspect +import sys +from warnings import warn +from typing import ( + Any, + AsyncGenerator, + Awaitable, + Callable, + Final, + Generator, + Generic, + Iterator, + Literal, + NoReturn, + Type, + TypeVar, + Union, +) + +from typing_extensions import TypeIs + +if sys.version_info >= (3, 10): + from typing import ParamSpec, TypeAlias +else: + from typing_extensions import ParamSpec, TypeAlias + + +T = TypeVar("T", covariant=True) # Success type +E = TypeVar("E", covariant=True) # Error type +U = TypeVar("U") +F = TypeVar("F") +P = ParamSpec("P") +R = TypeVar("R") +TBE = TypeVar("TBE", bound=BaseException) + + +class Ok(Generic[T]): + """ + A value that indicates success and which stores arbitrary data for the return value. + """ + + __match_args__ = ("ok_value",) + __slots__ = ("_value",) + + def __iter__(self) -> Iterator[T]: + yield self._value + + def __init__(self, value: T) -> None: + self._value = value + + def __repr__(self) -> str: + return "Ok({})".format(repr(self._value)) + + def __eq__(self, other: Any) -> bool: + return isinstance(other, Ok) and self._value == other._value + + def __ne__(self, other: Any) -> bool: + return not (self == other) + + def __hash__(self) -> int: + return hash((True, self._value)) + + def is_ok(self) -> Literal[True]: + return True + + def is_err(self) -> Literal[False]: + return False + + def ok(self) -> T: + """ + Return the value. + """ + return self._value + + def err(self) -> None: + """ + Return `None`. + """ + return None + + @property + def value(self) -> T: + """ + Return the inner value. + + @deprecated Use `ok_value` or `err_value` instead. This method will be + removed in a future version. + """ + warn( + "Accessing `.value` on Result type is deprecated, please use " + + "`.ok_value` or `.err_value` instead", + DeprecationWarning, + stacklevel=2, + ) + return self._value + + @property + def ok_value(self) -> T: + """ + Return the inner value. + """ + return self._value + + def expect(self, _message: str) -> T: + """ + Return the value. + """ + return self._value + + def expect_err(self, message: str) -> NoReturn: + """ + Raise an UnwrapError since this type is `Ok` + """ + raise UnwrapError(self, message) + + def unwrap(self) -> T: + """ + Return the value. + """ + return self._value + + def unwrap_err(self) -> NoReturn: + """ + Raise an UnwrapError since this type is `Ok` + """ + raise UnwrapError(self, "Called `Result.unwrap_err()` on an `Ok` value") + + def unwrap_or(self, _default: U) -> T: + """ + Return the value. + """ + return self._value + + def unwrap_or_else(self, _op: object) -> T: + """ + Return the value. + """ + return self._value + + def unwrap_or_raise(self, _e: object) -> T: + """ + Return the value. + """ + return self._value + + def map(self, op: Callable[[T], U]) -> Ok[U]: + """ + The contained result is `Ok`, so return `Ok` with original value mapped to + a new value using the passed in function. + """ + return Ok(op(self._value)) + + async def map_async(self, op: Callable[[T], Awaitable[U]]) -> Ok[U]: + """ + The contained result is `Ok`, so return the result of `op` with the + original value passed in + """ + return Ok(await op(self._value)) + + def map_or(self, _default: object, op: Callable[[T], U]) -> U: + """ + The contained result is `Ok`, so return the original value mapped to a new + value using the passed in function. + """ + return op(self._value) + + def map_or_else(self, _default_op: object, op: Callable[[T], U]) -> U: + """ + The contained result is `Ok`, so return original value mapped to + a new value using the passed in `op` function. + """ + return op(self._value) + + def map_err(self, _op: object) -> Ok[T]: + """ + The contained result is `Ok`, so return `Ok` with the original value + """ + return self + + def and_then(self, op: Callable[[T], Result[U, E]]) -> Result[U, E]: + """ + The contained result is `Ok`, so return the result of `op` with the + original value passed in + """ + return op(self._value) + + async def and_then_async( + self, op: Callable[[T], Awaitable[Result[U, E]]] + ) -> Result[U, E]: + """ + The contained result is `Ok`, so return the result of `op` with the + original value passed in + """ + return await op(self._value) + + def or_else(self, _op: object) -> Ok[T]: + """ + The contained result is `Ok`, so return `Ok` with the original value + """ + return self + + def inspect(self, op: Callable[[T], Any]) -> Result[T, E]: + """ + Calls a function with the contained value if `Ok`. Returns the original result. + """ + op(self._value) + return self + + def inspect_err(self, _op: Callable[[E], Any]) -> Result[T, E]: + """ + Calls a function with the contained value if `Err`. Returns the original result. + """ + return self + + +class DoException(Exception): + """ + This is used to signal to `do()` that the result is an `Err`, + which short-circuits the generator and returns that Err. + Using this exception for control flow in `do()` allows us + to simulate `and_then()` in the Err case: namely, we don't call `op`, + we just return `self` (the Err). + """ + + def __init__(self, err: Err[E]) -> None: + self.err = err + + +class Err(Generic[E]): + """ + A value that signifies failure and which stores arbitrary data for the error. + """ + + __match_args__ = ("err_value",) + __slots__ = ("_value",) + + def __iter__(self) -> Iterator[NoReturn]: + def _iter() -> Iterator[NoReturn]: + # Exception will be raised when the iterator is advanced, not when it's created + raise DoException(self) + yield # This yield will never be reached, but is necessary to create a generator + + return _iter() + + def __init__(self, value: E) -> None: + self._value = value + + def __repr__(self) -> str: + return "Err({})".format(repr(self._value)) + + def __eq__(self, other: Any) -> bool: + return isinstance(other, Err) and self._value == other._value + + def __ne__(self, other: Any) -> bool: + return not (self == other) + + def __hash__(self) -> int: + return hash((False, self._value)) + + def is_ok(self) -> Literal[False]: + return False + + def is_err(self) -> Literal[True]: + return True + + def ok(self) -> None: + """ + Return `None`. + """ + return None + + def err(self) -> E: + """ + Return the error. + """ + return self._value + + @property + def value(self) -> E: + """ + Return the inner value. + + @deprecated Use `ok_value` or `err_value` instead. This method will be + removed in a future version. + """ + warn( + "Accessing `.value` on Result type is deprecated, please use " + + "`.ok_value` or '.err_value' instead", + DeprecationWarning, + stacklevel=2, + ) + return self._value + + @property + def err_value(self) -> E: + """ + Return the inner value. + """ + return self._value + + def expect(self, message: str) -> NoReturn: + """ + Raises an `UnwrapError`. + """ + exc = UnwrapError( + self, + f"{message}: {self._value!r}", + ) + if isinstance(self._value, BaseException): + raise exc from self._value + raise exc + + def expect_err(self, _message: str) -> E: + """ + Return the inner value + """ + return self._value + + def unwrap(self) -> NoReturn: + """ + Raises an `UnwrapError`. + """ + exc = UnwrapError( + self, + f"Called `Result.unwrap()` on an `Err` value: {self._value!r}", + ) + if isinstance(self._value, BaseException): + raise exc from self._value + raise exc + + def unwrap_err(self) -> E: + """ + Return the inner value + """ + return self._value + + def unwrap_or(self, default: U) -> U: + """ + Return `default`. + """ + return default + + def unwrap_or_else(self, op: Callable[[E], T]) -> T: + """ + The contained result is ``Err``, so return the result of applying + ``op`` to the error value. + """ + return op(self._value) + + def unwrap_or_raise(self, e: Type[TBE]) -> NoReturn: + """ + The contained result is ``Err``, so raise the exception with the value. + """ + raise e(self._value) + + def map(self, op: object) -> Err[E]: + """ + Return `Err` with the same value + """ + return self + + async def map_async(self, op: object) -> Err[E]: + """ + The contained result is `Ok`, so return the result of `op` with the + original value passed in + """ + return self + + def map_or(self, default: U, op: object) -> U: + """ + Return the default value + """ + return default + + def map_or_else(self, default_op: Callable[[], U], op: object) -> U: + """ + Return the result of the default operation + """ + return default_op() + + def map_err(self, op: Callable[[E], F]) -> Err[F]: + """ + The contained result is `Err`, so return `Err` with original error mapped to + a new value using the passed in function. + """ + return Err(op(self._value)) + + def and_then(self, op: object) -> Err[E]: + """ + The contained result is `Err`, so return `Err` with the original value + """ + return self + + async def and_then_async(self, op: object) -> Err[E]: + """ + The contained result is `Err`, so return `Err` with the original value + """ + return self + + def or_else(self, op: Callable[[E], Result[T, F]]) -> Result[T, F]: + """ + The contained result is `Err`, so return the result of `op` with the + original value passed in + """ + return op(self._value) + + def inspect(self, op: Callable[[T], Any]) -> Result[T, E]: + """ + Calls a function with the contained value if `Ok`. Returns the original result. + """ + return self + + def inspect_err(self, op: Callable[[E], Any]) -> Result[T, E]: + """ + Calls a function with the contained value if `Err`. Returns the original result. + """ + op(self._value) + return self + + +# define Result as a generic type alias for use +# in type annotations +""" +A simple `Result` type inspired by Rust. +Not all methods (https://doc.rust-lang.org/std/result/enum.Result.html) +have been implemented, only the ones that make sense in the Python context. +""" +Result: TypeAlias = Union[Ok[T], Err[E]] + +""" +A type to use in `isinstance` checks. +This is purely for convenience sake, as you could also just write `isinstance(res, (Ok, Err)) +""" +OkErr: Final = (Ok, Err) + + +class UnwrapError(Exception): + """ + Exception raised from ``.unwrap_<...>`` and ``.expect_<...>`` calls. + + The original ``Result`` can be accessed via the ``.result`` attribute, but + this is not intended for regular use, as type information is lost: + ``UnwrapError`` doesn't know about both ``T`` and ``E``, since it's raised + from ``Ok()`` or ``Err()`` which only knows about either ``T`` or ``E``, + not both. + """ + + _result: Result[object, object] + + def __init__(self, result: Result[object, object], message: str) -> None: + self._result = result + super().__init__(message) + + @property + def result(self) -> Result[Any, Any]: + """ + Returns the original result. + """ + return self._result + + +def as_result( + *exceptions: Type[TBE], +) -> Callable[[Callable[P, R]], Callable[P, Result[R, TBE]]]: + """ + Make a decorator to turn a function into one that returns a ``Result``. + + Regular return values are turned into ``Ok(return_value)``. Raised + exceptions of the specified exception type(s) are turned into ``Err(exc)``. + """ + if not exceptions or not all( + inspect.isclass(exception) and issubclass(exception, BaseException) + for exception in exceptions + ): + raise TypeError("as_result() requires one or more exception types") + + def decorator(f: Callable[P, R]) -> Callable[P, Result[R, TBE]]: + """ + Decorator to turn a function into one that returns a ``Result``. + """ + + @functools.wraps(f) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> Result[R, TBE]: + try: + return Ok(f(*args, **kwargs)) + except exceptions as exc: + return Err(exc) + + return wrapper + + return decorator + + +def as_async_result( + *exceptions: Type[TBE], +) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[Result[R, TBE]]]]: + """ + Make a decorator to turn an async function into one that returns a ``Result``. + Regular return values are turned into ``Ok(return_value)``. Raised + exceptions of the specified exception type(s) are turned into ``Err(exc)``. + """ + if not exceptions or not all( + inspect.isclass(exception) and issubclass(exception, BaseException) + for exception in exceptions + ): + raise TypeError("as_result() requires one or more exception types") + + def decorator( + f: Callable[P, Awaitable[R]], + ) -> Callable[P, Awaitable[Result[R, TBE]]]: + """ + Decorator to turn a function into one that returns a ``Result``. + """ + + @functools.wraps(f) + async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> Result[R, TBE]: + try: + return Ok(await f(*args, **kwargs)) + except exceptions as exc: + return Err(exc) + + return async_wrapper + + return decorator + + +def is_ok(result: Result[T, E]) -> TypeIs[Ok[T]]: + """A type guard to check if a result is an Ok + + Usage: + + ``` python + r: Result[int, str] = get_a_result() + if is_ok(r): + r # r is of type Ok[int] + elif is_err(r): + r # r is of type Err[str] + ``` + + """ + return result.is_ok() + + +def is_err(result: Result[T, E]) -> TypeIs[Err[E]]: + """A type guard to check if a result is an Err + + Usage: + + ``` python + r: Result[int, str] = get_a_result() + if is_ok(r): + r # r is of type Ok[int] + elif is_err(r): + r # r is of type Err[str] + ``` + + """ + return result.is_err() + + +def do(gen: Generator[Result[T, E], None, None]) -> Result[T, E]: + """Do notation for Result (syntactic sugar for sequence of `and_then()` calls). + + + Usage: + + ``` rust + // This is similar to + use do_notation::m; + let final_result = m! { + x <- Ok("hello"); + y <- Ok(True); + Ok(len(x) + int(y) + 0.5) + }; + ``` + + ``` rust + final_result: Result[float, int] = do( + Ok(len(x) + int(y) + 0.5) + for x in Ok("hello") + for y in Ok(True) + ) + ``` + + NOTE: If you exclude the type annotation e.g. `Result[float, int]` + your type checker might be unable to infer the return type. + To avoid an error, you might need to help it with the type hint. + """ + try: + return next(gen) + except DoException as e: + out: Err[E] = e.err # type: ignore + return out + except TypeError as te: + # Turn this into a more helpful error message. + # Python has strange rules involving turning generators involving `await` + # into async generators, so we want to make sure to help the user clearly. + if "'async_generator' object is not an iterator" in str(te): + raise TypeError( + "Got async_generator but expected generator." + "See the section on do notation in the README." + ) + raise te + + +async def do_async( + gen: Union[Generator[Result[T, E], None, None], AsyncGenerator[Result[T, E], None]], +) -> Result[T, E]: + """Async version of do. Example: + + ``` python + final_result: Result[float, int] = await do_async( + Ok(len(x) + int(y) + z) + for x in await get_async_result_1() + for y in await get_async_result_2() + for z in get_sync_result_3() + ) + ``` + + NOTE: Python makes generators async in a counter-intuitive way. + + ``` python + # This is a regular generator: + async def foo(): ... + do(Ok(1) for x in await foo()) + ``` + + ``` python + # But this is an async generator: + async def foo(): ... + async def bar(): ... + do( + Ok(1) + for x in await foo() + for y in await bar() + ) + ``` + + We let users try to use regular `do()`, which works in some cases + of awaiting async values. If we hit a case like above, we raise + an exception telling the user to use `do_async()` instead. + See `do()`. + + However, for better usability, it's better for `do_async()` to also accept + regular generators, as you get in the first case: + + ``` python + async def foo(): ... + do(Ok(1) for x in await foo()) + ``` + + Furthermore, neither mypy nor pyright can infer that the second case is + actually an async generator, so we cannot annotate `do_async()` + as accepting only an async generator. This is additional motivation + to accept either. + """ + try: + if isinstance(gen, AsyncGenerator): + return await gen.__anext__() + else: + return next(gen) + except DoException as e: + out: Err[E] = e.err # type: ignore + return out diff --git a/codesign/debugsign/dbsign/security.py b/codesign/debugsign/debugsign/security.py similarity index 100% rename from codesign/debugsign/dbsign/security.py rename to codesign/debugsign/debugsign/security.py diff --git a/codesign/debugsign/dbsign/shell.py b/codesign/debugsign/debugsign/shell.py similarity index 100% rename from codesign/debugsign/dbsign/shell.py rename to codesign/debugsign/debugsign/shell.py diff --git a/codesign/debugsign/requirements.txt b/codesign/debugsign/requirements.txt index b0ef78958..09739b0d2 100644 --- a/codesign/debugsign/requirements.txt +++ b/codesign/debugsign/requirements.txt @@ -1,4 +1,6 @@ coverage flake8 mock +result +typing-extensions unittest2