From d6fc235e823cc1af6cdb3bd8f78392a00dd96f42 Mon Sep 17 00:00:00 2001 From: Peter Gaultney Date: Mon, 18 May 2020 11:31:53 -0500 Subject: [PATCH] partial transfer of xoipy functionality to this new repository --- .flake8 | 33 +++++ .gitignore | 121 ++++++++++++++++++ .pre-commit-config.yaml | 47 +++++++ .pylintrc | 14 +++ LICENSE | 21 ++++ Pipfile | 15 +++ mypy.ini | 9 ++ scripts/pipenv_runner.py | 39 ++++++ scripts/pipenv_utils.py | 45 +++++++ scripts/precommit_pylint.py | 18 +++ scripts/precommit_utils.py | 21 ++++ scripts/pylint_pipenv_score_limit.py | 56 +++++++++ setup.py | 21 ++++ xoto3/__about__.py | 4 + xoto3/__init__.py | 0 xoto3/dynamodb/__init__.py | 0 xoto3/lazy.py | 83 +++++++++++++ xoto3/paginate.py | 156 +++++++++++++++++++++++ xoto3/py.typed | 0 xoto3/recursive_map.py | 97 +++++++++++++++ xoto3/ssm/__init__.py | 0 xoto3/ssm/parameters.py | 179 +++++++++++++++++++++++++++ xoto3/types.py | 58 +++++++++ 23 files changed, 1037 insertions(+) create mode 100644 .flake8 create mode 100644 .gitignore create mode 100644 .pre-commit-config.yaml create mode 100644 .pylintrc create mode 100644 LICENSE create mode 100644 Pipfile create mode 100644 mypy.ini create mode 100755 scripts/pipenv_runner.py create mode 100755 scripts/pipenv_utils.py create mode 100755 scripts/precommit_pylint.py create mode 100644 scripts/precommit_utils.py create mode 100755 scripts/pylint_pipenv_score_limit.py create mode 100644 setup.py create mode 100644 xoto3/__about__.py create mode 100644 xoto3/__init__.py create mode 100644 xoto3/dynamodb/__init__.py create mode 100644 xoto3/lazy.py create mode 100644 xoto3/paginate.py create mode 100644 xoto3/py.typed create mode 100644 xoto3/recursive_map.py create mode 100644 xoto3/ssm/__init__.py create mode 100644 xoto3/ssm/parameters.py create mode 100644 xoto3/types.py diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..73898a7 --- /dev/null +++ b/.flake8 @@ -0,0 +1,33 @@ +[flake8] +max-line-length = 120 +ignore = + E203, + E101, + E111, + E114, + E115, + E116, + E117, + E121, + E122, + E123, + E124, + E125, + E126, + E127, + E128, + E129, + E131, + E133, + E2, + E3, + E5, + E701, + E702, + E703, + E704, + W1, + W2, + W3, + W503, + W504, diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7f9d0ab --- /dev/null +++ b/.gitignore @@ -0,0 +1,121 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +Pipfile.lock +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ +.DS_Store + +# IDE artifacts +**/.idea diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..fc0e74f --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,47 @@ +--- +fail_fast: true + +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v2.2.3 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: flake8 + - id: check-case-conflict + - id: check-json + - id: mixed-line-ending + - id: check-merge-conflict + + - repo: https://github.com/prettier/prettier + rev: 1.18.2 + hooks: + - id: prettier + exclude: .*\.html + + - repo: https://github.com/ambv/black + rev: stable + hooks: + - id: black + language_version: python3.7 + args: + - --line-length=100 + + - repo: local + hooks: + - id: mypy-pipenv + name: mypy-pipenv + entry: scripts/pipenv_runner.py mypy + language: system + types: [python] + args: + - --show-traceback + - --ignore-missing-imports + - --check-untyped-defs + - id: pylint-pipenv + name: pylint-pipenv + entry: scripts/precommit_pylint.py + language: system + types: [python] + args: + - --rcfile=.pylintrc diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..ea1245b --- /dev/null +++ b/.pylintrc @@ -0,0 +1,14 @@ +[BASIC] +max-line-length=120 +good-names=logger,_,i,j,k,__logger + +# Regular expression matching correct method names. Overrides method-naming style +method-rgx=(([a-z_][a-z0-9_]{2,})|(_[a-z0-9_]*)|(__[a-z][a-z0-9_]+__))$ + +[MESSAGES CONTROL] +disable= + invalid-name, + too-few-public-methods, + bad-continuation, + logging-fstring-interpolation, + missing-docstring, diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..2b8fb02 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 XOi Technologies + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..aeb8f46 --- /dev/null +++ b/Pipfile @@ -0,0 +1,15 @@ +[[source]] +url = "https://pypi.python.org/simple" +verify_ssl = true + +[packages] +boto3 = ">=1.9" +typing-extensions = "*" + +[dev-packages] +pytest = "~=5.0" +mypy = ">=0.770" +pylint = "*" + +[requires] +python_version = "3.7" diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..8362fd1 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,9 @@ +[mypy] +python_version = 3.7 +show_error_context = True +show_column_numbers = True + +[mypy-conftest] +ignore_errors = True +[mypy-boto3] +ignore_missing_imports = True diff --git a/scripts/pipenv_runner.py b/scripts/pipenv_runner.py new file mode 100755 index 0000000..5e85b40 --- /dev/null +++ b/scripts/pipenv_runner.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python +"""A pipenv runner for mypy""" + +import typing as ty +import subprocess +import os +import sys + +from precommit_utils import interpret_precommit_args +from pipenv_utils import find_pipenv_dir + + +def run_in_pipenv(cmd: str, fileorpath: str, cli_args: ty.Iterable = ()): + """Looks for the closest Pipfile/pipenv and runs the command within that venv""" + print(f"checking {fileorpath} with {cmd}") + fileorpath = os.path.abspath(fileorpath) + pipenv_dir = find_pipenv_dir(fileorpath) + + if pipenv_dir: + fileorpath = fileorpath[len(pipenv_dir) + 1 :] if pipenv_dir != fileorpath else pipenv_dir + full_command = ["pipenv", "run", cmd, fileorpath, *cli_args] + return subprocess.run( + full_command, cwd=pipenv_dir, env={**os.environ, **dict(PYTHONPATH=pipenv_dir)} + ) + + return subprocess.run([cmd, fileorpath, *cli_args]) + + +if __name__ == "__main__": + path_args, cli_args = interpret_precommit_args() + + cmd = cli_args[0] + cli_args = cli_args[1:] + + for path_arg in path_args: + cp = run_in_pipenv(cmd, path_arg, cli_args) + if cp.returncode != 0: + print(f"Failed {cmd} over {path_arg} with {cp.returncode}") + sys.exit(cp.returncode) diff --git a/scripts/pipenv_utils.py b/scripts/pipenv_utils.py new file mode 100755 index 0000000..3369261 --- /dev/null +++ b/scripts/pipenv_utils.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +import argparse +import os +import subprocess +import glob + + +def find_pipenv_dir(fileorpath: str) -> str: + fileorpath = os.path.abspath(fileorpath) + pipenv_dir = os.path.dirname(fileorpath) if not os.path.isdir(fileorpath) else fileorpath + while len(pipenv_dir) > 1: + if os.path.exists(os.path.join(pipenv_dir, "Pipfile")): + return pipenv_dir + pipenv_dir = os.path.dirname(pipenv_dir) + return "" + + +def get_pipenv_venv_site_packages(pipenv_dir: str) -> str: + venv_output = subprocess.check_output(["pipenv", "--venv"], text=True, cwd=pipenv_dir).rstrip( + "\n" + ) + glob_out = glob.glob(venv_output + "/lib/python*") + return glob_out[0] + "/site-packages/" + + +def get_pythonpath_for_pipfile_dir_and_venv(filepath: str) -> str: + pipenv_dir = find_pipenv_dir(filepath) + if pipenv_dir: + return ":".join([pipenv_dir, get_pipenv_venv_site_packages(pipenv_dir)]) + return "" + + +def main(): + """Prints a PYTHONPATH including the Pipfile's own directory, if one is found""" + parser = argparse.ArgumentParser() + parser.add_argument("path") + args = parser.parse_args() + + pythonpath = get_pythonpath_for_pipfile_dir_and_venv(args.path) + if pythonpath: + print(pythonpath) + + +if __name__ == "__main__": + main() diff --git a/scripts/precommit_pylint.py b/scripts/precommit_pylint.py new file mode 100755 index 0000000..334036c --- /dev/null +++ b/scripts/precommit_pylint.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python +import subprocess +import os + +from precommit_utils import interpret_precommit_args +from pipenv_utils import get_pythonpath_for_pipfile_dir_and_venv + + +if __name__ == "__main__": + path_args, cli_args = interpret_precommit_args() + + for path in path_args: + pythonpath = get_pythonpath_for_pipfile_dir_and_venv(path) + print(f"Linting {path} with PYTHONPATH <{pythonpath}>") + subprocess.check_call( + ["scripts/pylint_pipenv_score_limit.py", path, *cli_args, "--assume-pythonpath"], + env={**os.environ, **dict(PYTHONPATH=pythonpath)}, + ) diff --git a/scripts/precommit_utils.py b/scripts/precommit_utils.py new file mode 100644 index 0000000..27ef2f3 --- /dev/null +++ b/scripts/precommit_utils.py @@ -0,0 +1,21 @@ +import sys +import os + + +def interpret_precommit_args(): + """Precommit passes its arguments strangely and this lets us figure out which ones are which type""" + path_args = list() + cli_args = list() + for arg in sys.argv[1:]: + if os.path.exists(arg): + path_args.append(arg) + else: + cli_args.append(arg) + + if not path_args: + raise Exception( + "Everything passed was considered to be a CLI argument, " + "so there was nothing to check. " + str(cli_args) + ) + + return path_args, cli_args diff --git a/scripts/pylint_pipenv_score_limit.py b/scripts/pylint_pipenv_score_limit.py new file mode 100755 index 0000000..5d42e31 --- /dev/null +++ b/scripts/pylint_pipenv_score_limit.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python +import argparse +import os +import sys +import subprocess + +from pylint import lint + +from pipenv_utils import get_pythonpath_for_pipfile_dir_and_venv + + +def module_passes_lint_score_limit(path, limit, other_args=()) -> bool: + run = lint.Run([path, *other_args], do_exit=False) + score = run.linter.stats["global_note"] + + if score < limit: + print(f"Score for {path} was {score:.03f}; less than limit {limit}") + return False + return True + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("path_to_python_module") + parser.add_argument("--score-lower-limit", type=float, default=8.5) + parser.add_argument("--assume-pythonpath", action="store_true") + + args, pylint_args = parser.parse_known_args() + path = args.path_to_python_module + + if not args.assume_pythonpath: + pythonpath = get_pythonpath_for_pipfile_dir_and_venv(path) + if pythonpath: + # relaunch this same process but with the new pythonpath + new_env = {**os.environ, **dict(PYTHONPATH=pythonpath)} + # os.execve(__file__, sys.argv + ['--assume-pythonpath'], new_env) + replacement_args = [ + os.path.abspath(sys.argv[0]), + args.path_to_python_module, + "--score-lower-limit", + str(args.score_lower_limit), + "--assume-pythonpath", + *pylint_args, + ] + print( + f"Relaunching this process with new PYTHONPATH {pythonpath} and args {replacement_args}" + ) + subprocess.check_call(replacement_args, env=new_env) + return + + if not module_passes_lint_score_limit(path, args.score_lower_limit, pylint_args): + raise ValueError(f"Module {path} failed linting!") + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..3b85021 --- /dev/null +++ b/setup.py @@ -0,0 +1,21 @@ +from setuptools import setup, find_packages + +PKG_NAME = "xoto3" +about: dict = dict() +exec(open(f"{PKG_NAME}/__about__.py").read(), about) + +setup( + name=PKG_NAME, + version=about["__version__"], + author=about["__author__"], + author_email=about["__author_email__"], + description="High level utilities for a subset of boto3 operations common for AWS serverless development in Python.", + packages=find_packages(), + package_data={"": ["py.typed"]}, + python_requires=">=3.6", + install_requires=[ + "boto3 >= 1.9", + "typing-extensions >= 3.7", + ], + # it is important to keep these install_requires basically in sync with the Pipfile as well. +) diff --git a/xoto3/__about__.py b/xoto3/__about__.py new file mode 100644 index 0000000..35d50a1 --- /dev/null +++ b/xoto3/__about__.py @@ -0,0 +1,4 @@ +"""xoto3""" +__version__ = '1.0.0' +__author__ = "Peter Gaultney" +__author_email__ = "pgaultney@xoi.io" diff --git a/xoto3/__init__.py b/xoto3/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/xoto3/dynamodb/__init__.py b/xoto3/dynamodb/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/xoto3/lazy.py b/xoto3/lazy.py new file mode 100644 index 0000000..94329ea --- /dev/null +++ b/xoto3/lazy.py @@ -0,0 +1,83 @@ +"""Primarily for singleton resources that you want to lazily load on usage.""" +import typing as ty +from typing import TypeVar, Generic +from threading import local + +import boto3 + +L = TypeVar("L") + + +class Lazy(Generic[L]): + """A Lazy resource pattern. + + Encapsulates a single instance of the resource that can be accessed + with (), and can also have more than one copy made of it as desired. + """ + + def __init__(self, loader_func: ty.Callable[..., L], storage=None): # must have a __dict__ + """The loader func can encapsulate everything necessary to create the + resource, or it can take additional arguments via the call. + + """ + self.loader_func = loader_func + self.storage = storage if storage else lambda: 0 + + def __call__(self, *args, **kwargs) -> L: + """Access to the internal instance. + + The first call will create an internal instance, and + subsequent calls will return it. + """ + try: + return self.storage.value + except AttributeError: + self.storage.value = self.instance(*args, **kwargs) + return self.storage.value + + def copy(self) -> "Lazy[L]": + """Each Lazy object is a self-contained singleton + + but other Lazy-loading resource copies can easily be derived + from the first. + + """ + return Lazy(self.loader_func) + + def instance(self, *args, **kwargs) -> L: + """If you want your own personal instance instead of the internal one, + you may create it. + + """ + return self.loader_func(*args, **kwargs) + + +class ThreadLocalLazy(Lazy[L]): + def __init__(self, loader_func: ty.Callable[..., L]): + # local() creates a brand new instance every time it is called, + # so this does not cause issues with storage being shared across multiple TTLazies + super().__init__(loader_func, local()) + + +class SessionedBoto3: + """Apparently there are thread-safety issues conditions if you don't + first create a session. + + https://github.com/boto/boto3/issues/1592 + """ + + def __init__(self, method_name: str, *args, **kwargs): + self.method_name = method_name + assert self.method_name in {"resource", "client"} + self.args = args + self.kwargs = kwargs + + def __call__(self): + session = boto3.session.Session() + method = getattr(session, self.method_name) + return method(*self.args, **self.kwargs) + + +def tlls(method_name: str, *args, **kwargs): + """Thread Local Lazy SessionedBoto3""" + return ThreadLocalLazy(SessionedBoto3(method_name, *args, **kwargs)) diff --git a/xoto3/paginate.py b/xoto3/paginate.py new file mode 100644 index 0000000..46a6651 --- /dev/null +++ b/xoto3/paginate.py @@ -0,0 +1,156 @@ +import typing as ty +from copy import deepcopy +from functools import partial + +# dear goodness AWS's pagination utilities are bare-bones... +# here's a bit of general purpose logic that can probably be reused lots of places... + +KeyPath = ty.Tuple[str, ...] +LastEvaluatedCallback = ty.Optional[ty.Callable[[ty.Any], ty.Any]] + + +def get_at_path(path, d): + for path_elem in path: + d = d.get(path_elem, None) + if d is None: + return d + return d + + +def set_at_path(path, d, val): + for path_elem in path[:-1]: + d = d[path_elem] + d[path[-1]] = val + + +def yield_pages_from_operation( + exclusive_start_path: KeyPath, + last_evaluated_path: KeyPath, + limit_path: ty.Tuple[str, ...], + items_path: ty.Tuple[str, ...], + # whether or not limiting _happens_ is controlled by whether you set a limit in your request dict + # but if you provide limit_path you must provide items_path and vice-versa, + # or we won't be able figure out how to create the new limit for each paged request. + operation: ty.Callable[..., dict], + # the thing that turns a request into the next page of a response + request: dict, + # your basic request + last_evaluated_callback: LastEvaluatedCallback = None, +) -> ty.Iterable[dict]: + """Look, here's the deal... + + boto3 (and AWS APIs in general) have a fairly consistent + pagination behavior for requests which can/must be paginated. + + These requests are usually called 'methods', 'operations', or + 'actions' in the boto3 documentation. Our unified term is + 'operation'. + + You perform an operation with a request that tells them that you + want to start 'at the beginning', effectively doing so by not + supplying what is usually called something like + ExclusiveStart[Key]. The operation gives you back a page of + results, and it also gives you back something that is sort of like + a bookmark - it says where you left off in your pagination. + + If you want the next page of results, you pass your original request + back plus that 'bookmark' as the ExclusiveStartThingy. + + Each time, they'll pass you back a new bookmark, until you finally + get the last page of results, at which point they'll pass you back + an 'empty' bookmark. When they do that, you know there are no more pages. + + Most of these same API operations also support a related behavior + called 'limiting', which allows you to request that your dataset + 'end' after N items are found. The reason this behavior is built + in here (instead of having a separate abstraction) is that the + 'bookmark' is usually an opaque token based on the very last item + returned and therefore cannot itself be 'adjusted' by a specific + number of items. Once you've received a bookmark, there's no way + of 'resuming' your pagination from a point partway through a + page. If your end client needs to receive data in concrete page + sizes, then the only way to support that without requiring + something other than the end client to maintain pagination state + is to pass that limit request all the way to the underlying + system. + + This is an attempt to build a general-purpose abstraction for + those two API behaviors. + + Note that this function does _not_ paginate the items within each + page for you. It returns the entire, unchanged response from each + time it calls the operation. The items themselves will be + contained within that page and you can process them and their + metadata as you please. + + You _probably_ want to construct a partially-applied function + containing the first 4 arguments (which define the behavior for a + specific operation), so that you can then invoke the same + operation paginator repeatedly with different requests. + + """ + assert all((limit_path, items_path)) or not any((limit_path, items_path)) + request = deepcopy(request) + # we make a copy of your request because we're going to modify it + # as we paginate but you shouldn't have to deal with that. + + get_le = partial(get_at_path, last_evaluated_path) + set_es = partial(set_at_path, exclusive_start_path) + get_limit = partial(get_at_path, limit_path) + set_limit = partial(set_at_path, limit_path) + get_items = partial(get_at_path, items_path) + + # the limiting logic is an add-on and does not have to be used + starting_limit = 0 + if limit_path: + assert items_path + starting_limit = get_limit(request) + + limit = starting_limit + ExclusiveStart: ty.Any = get_le(request) or "" + + while ExclusiveStart is not None: + assert limit is None or limit >= 0 + if ExclusiveStart: + set_es(request, ExclusiveStart) + if limit: + set_limit(request, limit) + page_response = operation(**request) + yield page_response # we yield the entire response + ExclusiveStart = get_le(page_response) or None + if starting_limit: + # a limit was requested + limit = limit - len(get_items(page_response)) + if limit <= 0: + # we're done; before we leave, provide last evaluated if requested + if last_evaluated_callback: + last_evaluated_callback(ExclusiveStart) + ExclusiveStart = None + + +# below are some arguments that can be used to paginate existing methods given as useful examples + +DYNAMODB_SCAN = (("ExclusiveStartKey",), ("LastEvaluatedKey",), ("Limit",), ("Items",)) +DYNAMODB_QUERY = DYNAMODB_SCAN # these are the same +# e.g. partial(yield_pages_from_request, *DYNAMODB_QUERY)(table.query, your_query_request) + +DYNAMODB_STREAMS_DESCRIBE_STREAM = ( + ("ExclusiveStartShardId",), + ("StreamDescription", "LastEvaluatedShardId"), + ("Limit",), + ("StreamDescription", "Shards"), +) + +DYNAMODB_STREAMS_GET_RECORDS = ( + ("ShardIterator",), + ("NextShardIterator",), + ("Limit",), + ("Records",), +) + +S3_LIST_OBJECTS_V2 = ( + ("ContinuationToken",), + ("NextContinuationToken",), + ("MaxKeys",), + ("Contents",), +) diff --git a/xoto3/py.typed b/xoto3/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/xoto3/recursive_map.py b/xoto3/recursive_map.py new file mode 100644 index 0000000..f2983df --- /dev/null +++ b/xoto3/recursive_map.py @@ -0,0 +1,97 @@ +from typing import Tuple, Hashable, Callable, Any, Mapping, Set, Optional, Sequence + + +KeyPath = Tuple[Hashable, ...] +SimpleTransform = Callable[[Any], Any] +PathCallback = Callable[[KeyPath, Any], bool] + + +def map_recursively( + transform: SimpleTransform, + obj: Any, + *, + path: tuple = (), + path_callback: Optional[PathCallback] = None +) -> Any: + """Does a depth-first walk of the object, calling the transform as a + preorder operation before then recursing into mappings (dicts), + lists, tuples, and sets, rendering a new corresponding builtin + instance for each. + + Only applies the first recursive transform that matches the type + of the provided object. + + Does not preserve subtypes of Set or Mapping. + + Does not natively support iterables that are not tuples or lists, + because (for instance) consuming generators is likely to lead to + very unexpected behavior, as it effectively is a side-effect + rather than a pure transformation. If you wish to, for instance, + recurse into generators, your transform can return a reified list + or tuple. + + If you want 'access' to the current path in your transform, you + can provide an additional path_callback which will receive the + path immediately _prior_ to each call to your transform, and which + returns True if you wish the recursion to short-circuit + immediately _after_ the transform. + """ + stop = False + if path_callback: + stop = path_callback(path, obj) + obj = transform(obj) + if stop: + return obj + + # then apply the first builtin-type-matching recursive transform. + if isinstance(obj, Mapping): + return { + k: map_recursively(transform, v, path=path + (k,), path_callback=path_callback) + for k, v in obj.items() + } + if isinstance(obj, Set): + return { + map_recursively(transform, member, path=path, path_callback=path_callback) + for member in obj + } + if isinstance(obj, list): + return [ + map_recursively(transform, item, path=path, path_callback=path_callback) + for item in obj + ] + if isinstance(obj, tuple): + return tuple(( + map_recursively(transform, item, path=path, path_callback=path_callback) + for item in obj + )) + + return obj + + +def tuple_starts_with(a: tuple, b: tuple) -> bool: + for i, b_i in enumerate(b): + try: + if b_i != a[i]: + return False + except IndexError: + return False + return True + + + +class PathedTransform: + """For use with map_recursively""" + + def __init__(self, transform: SimpleTransform, target_path: Sequence[Hashable]): + self.transform = transform + self.target_path = target_path + + def path_callback(self, path: Sequence[Hashable], *_args): + self.current_path = path + # stop if not part of target path, or if we've reached the full path + return not tuple_starts_with(self.target_path, path) or self.current_path == self.target_path + + def __call__(self, obj: Any) -> Any: + if self.current_path == self.target_path: + return self.transform(obj) + return obj diff --git a/xoto3/ssm/__init__.py b/xoto3/ssm/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/xoto3/ssm/parameters.py b/xoto3/ssm/parameters.py new file mode 100644 index 0000000..bd38fe9 --- /dev/null +++ b/xoto3/ssm/parameters.py @@ -0,0 +1,179 @@ +"""A utility supporting a nicer API for basic SSM parameter store +operations, which also transparently supports splitting large payloads +across multiple parameters and joining them upon get using this same +utility. +""" +import re +import os +import os.path +import json +from datetime import datetime + +from botocore.exceptions import ClientError + +from xoto3.lazy import tlls + + +_MULTIPART_PARAM_COUNT = "__SSM_MULTIPART_COUNT" + +_SSM_NOT_ALLOWED_REGEX = re.compile("[^0-9a-zA-Z_.-]+") + +try: + import __main__ as main_func + SCRIPT_NAME = os.path.basename(main_func.__file__) +except Exception: # pylint: disable=broad-except # this is okay because getting the name is best-effort + SCRIPT_NAME = "SSM Params" + + +SSM_CLIENT = tlls("client", "ssm") + + +def delete(name: str, ssm=None) -> bool: + """Returns True if parameter was deleted, + False if it didn't exist, + and raises for other errors""" + ssm = ssm or SSM_CLIENT() + try: + ssm.delete_parameter(Name=name) + except ClientError as ce: + if _get_client_error_type(ce) == "ParameterNotFound": + return True + raise ce + return True + + +def get(name: str, default=None, error=True, ssm=None, **kwargs) -> str: + """General utility for getting SSM parameter values. + + Returns only the actual value, not the full boto3 response. + + Handles large/multipart gets transparently. + + Providing a default or setting error=False will only prevent + ParameterNotFound from raising an exception - all other boto3 + ClientErrors will always raise. + """ + ssm = ssm or SSM_CLIENT() + + try: + param = ssm.get_parameter(Name=name, **kwargs) + except ClientError as ce: + if _get_client_error_type(ce) != "ParameterNotFound": + raise ce # always raise if we've having unusual difficulties + if default is not None or not error: + return default + raise ce + param_value = param["Parameter"]["Value"] + + # check if this is a multipart param + param_part_count = _get_num_multiparts(param_value) + if param_part_count: + try: + return _get_multipart_param(name, param_part_count, ssm) + except (KeyError, ValueError, ClientError) as e: + raise RuntimeError(f"SSM multipart param {name} is not complete", e) + return param_value + + +def put(Name: str, Value: str, Type="String", Overwrite=True, ssm=None, **kwargs): + """General utility for putting SSM parameter values. + + Handles large/multipart puts transparently. + + Raises all ClientErrors to the caller. + """ + ssm = ssm or SSM_CLIENT() + + if len(Value) > 4096: + _put_multipart_param(Name, Value, ssm, Type) + else: + if "Description" not in kwargs: + kwargs["Description"] = f"Set by {SCRIPT_NAME} at " + datetime.now().isoformat() + ssm.put_parameter(Name=Name, Value=Value, Type=Type, Overwrite=Overwrite, **kwargs) + + +def ssm_allowed_name(unsafe_name: str) -> str: + """Replace characters that aren't allowed as part of an SSM name""" + return _SSM_NOT_ALLOWED_REGEX.sub("_", unsafe_name) + + +def _get_client_error_type(clienterror: ClientError): + return clienterror.response.get("Error", {}).get("Code") + + +def _get_num_multiparts(param_value: str) -> int: + """Returns 0 if not multipart, or more than 1 if multipart""" + try: + param_obj = json.loads(param_value) + if isinstance(param_obj, dict) and _MULTIPART_PARAM_COUNT in param_obj: + return int(param_obj[_MULTIPART_PARAM_COUNT]) + return 0 + except ValueError: + return 0 + + +def _get_multipart_param(param_name: str, count: int, ssm) -> str: + """You must pass the count returned from _get_num_multiparts""" + param_value = "" + i = 0 + for i in range(count): + param_value += ssm.get_parameter(Name=_get_multipart_param_part_name(param_name, i))[ + "Parameter" + ]["Value"] + return param_value + + +def _get_multipart_param_part_name(name, part_num): + return f"{name}__{part_num}" + + +def div_ceil(a, b): + val = -(-a // b) + assert val * b >= a + return val + + +def _delete_multipart_param(name, ssm, count=None): + if not count: + # find out how many there are + count = _get_num_multiparts(ssm.get_parameter(Name=name)) + for i in range(count): + try: + ssm.delete_parameter(Name=_get_multipart_param_part_name(name, i)) + except ClientError as ce: + if _get_client_error_type(ce) != "ParameterNotFound": + raise ce # indicates a more serious problem than just a missing piece + + +def _put_multipart_param(name, value, ssm, Type): + num_parts = div_ceil(len(value), 4096) + thistime = datetime.now().isoformat() + + put_param_payload = dict( + Name=name, + Value=json.dumps({_MULTIPART_PARAM_COUNT: num_parts}), + Description="A multipart SSM parameter, stored at " + thistime + f", by {SCRIPT_NAME}", + Overwrite=False, + Type=Type, + ) + try: + ssm.put_parameter(**put_param_payload) + except ClientError as ce: + if _get_client_error_type(ce) != "ParameterAlreadyExists": + raise ce + existing_num_parts = _get_num_multiparts(ssm.get_parameter(Name=name)["Parameter"]["Value"]) + if existing_num_parts > num_parts: + # clean up existing parameter so we don't have 'leftovers' + _delete_multipart_param(name, ssm, count=existing_num_parts) + put_param_payload["Overwrite"] = True + ssm.put_parameter(**put_param_payload) + + for i in range(num_parts): + des = f"Part {i+1} of {num_parts} of {name}, stored at " + thistime + f", by {SCRIPT_NAME}" + ssm.put_parameter( + Name=_get_multipart_param_part_name(name, i), + Value=value[i * 4096 : (i + 1) * 4096], + Description=des, + Overwrite=True, + Type=Type, + ) diff --git a/xoto3/types.py b/xoto3/types.py new file mode 100644 index 0000000..7649c3e --- /dev/null +++ b/xoto3/types.py @@ -0,0 +1,58 @@ +"""These are partial types for parts of boto3 itself""" +import typing as ty +from decimal import Decimal +from typing_extensions import TypedDict, Literal + +KeyAttributeType = ty.Union[int, str, float, Decimal] +ItemKey = ty.Mapping[str, KeyAttributeType] + +AttrInput = ty.Mapping[str, ty.Any] +DynamoInputItem = AttrInput + + +SchemaKeyAttibute = TypedDict("SchemaKeyAttibute", {"AttributeName": str}) + + +class KeyAndType(TypedDict): + AttributeName: str + KeyType: ty.Union[Literal["HASH"], Literal["RANGE"]] + + +class DynamoIndex(TypedDict): + IndexName: str + KeySchema: ty.List[KeyAndType] + + +class TableResource: + """A stub for a boto3 DynamoDB Table Resource. + + This can be updated as we use more methods from the type.""" + + name: str + + key_schema: ty.List[SchemaKeyAttibute] + + global_secondary_indexes: ty.Optional[ty.List[DynamoIndex]] + + local_secondary_indexes: ty.Optional[ty.List[DynamoIndex]] + + def get_item(self, Key: ItemKey, **kwargs) -> dict: + ... + + def update_item(self, TableName: str, Key: ItemKey, **kwargs) -> dict: + ... + + def put_item(self, Item: DynamoInputItem, **kwargs) -> dict: + ... + + def batch_writer(self, overwrite_by_pkeys: ty.Optional[ty.List[str]]) -> ty.ContextManager: + ... + + def delete_item(self, Key: ItemKey, **kwargs) -> dict: + ... + + def query(self, *args, **kwargs) -> dict: + ... + + def scan(self, *args, **kwargs) -> dict: + ...