diff --git a/.github/workflows/test-runner.yml b/.github/workflows/test-runner.yml index 16bf207..7a3ebaa 100644 --- a/.github/workflows/test-runner.yml +++ b/.github/workflows/test-runner.yml @@ -33,3 +33,9 @@ jobs: - name: Run Tests run: | python -m unittest discover -s tests -v + + - name: Lint code + if: ${{ matrix.python-version == 3.11 }} + run: | + python -m pip install pre-commit + pre-commit run --all-files diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..b911781 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,32 @@ +repos: + - repo: https://github.com/PyCQA/flake8 + rev: 7.0.0 + hooks: + - id: flake8 + args: + - --ignore=E501,E712,W503 + - repo: https://github.com/timothycrosley/isort + rev: 5.13.2 + hooks: + - id: isort + args: ["--profile", "black"] + - repo: https://github.com/psf/black + rev: 24.1.1 + hooks: + - id: black + language_version: python3.11 + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.8.0 + hooks: + - id: mypy + exclude: /tests/ + # --strict + args: + [ + --no-strict-optional, + --ignore-missing-imports, + --implicit-reexport, + --explicit-package-bases, + ] + additional_dependencies: + ["types-attrs", "types-requests", "types-setuptools", "types-PyYAML"] diff --git a/CHANGELOG.md b/CHANGELOG.md index 6328b67..fb8f309 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Added `verify` command to cli with accompanying script to ensure that the Merkle tree verification json produced by the `compute` command matches [#3](https://github.com/stacchain/stac-merkle-tree-cli/pull/3) +- Logging to cli tool and associated scripts [#5](https://github.com/stacchain/stac-merkle-tree-cli/pull/5) +- `verify` command to cli with accompanying script to ensure that the Merkle tree verification json produced by the `compute` command matches [#3](https://github.com/stacchain/stac-merkle-tree-cli/pull/3) + +### Changed + +- Moved compute and verifcation logic into classes for better oop functionality [#5](https://github.com/stacchain/stac-merkle-tree-cli/pull/5) ## [v0.3.0] - 2024-11-20 diff --git a/example_catalog/merkle_tree.json b/example_catalog/merkle_tree.json index 2a4be8d..f170364 100644 --- a/example_catalog/merkle_tree.json +++ b/example_catalog/merkle_tree.json @@ -11,9 +11,14 @@ "merkle:root": "aa7f89b29cb339032ec86d81d4090bdbd52199152fb657f50b08eec1b3234ee2", "children": [ { - "node_id": "DEM1_SAR_DGE_30_20101215T103647_20130405T103047_ADS_000000_oCX9", + "node_id": "DEM1_SAR_DGE_30_20101212T230244_20140325T230302_ADS_000000_1jTi", "type": "Item", - "merkle:object_hash": "22a31ab988181f280a0bfc6872556ad1d670373bd997f711389d057e1b1f531a" + "merkle:object_hash": "ce9f56e695ab1751b8f0c8d9ef1f1ecedaf04574ec3077e70e7426ec9fc61ea4" + }, + { + "node_id": "DEM1_SAR_DGE_30_20101212T230244_20140329T113710_ADS_000000_eAmG", + "type": "Item", + "merkle:object_hash": "ac66e07717b56e8421c8fec00b2b300afd49d30a8ec9c6d505df3b0568de9c77" }, { "node_id": "DEM1_SAR_DGE_30_20101215T103647_20130405T103047_ADS_000000_RHJx", @@ -21,9 +26,9 @@ "merkle:object_hash": "39969fd5f4a3a170ff39df8f2c13ebca66aab40890275a94ce798e281b85d54d" }, { - "node_id": "DEM1_SAR_DTE_90_20101213T034716_20130408T035028_ADS_000000_5033", + "node_id": "DEM1_SAR_DGE_30_20101215T103647_20130405T103047_ADS_000000_oCX9", "type": "Item", - "merkle:object_hash": "c1511bd4889e8078010287a820d3fcc216cad1ab48374e324fb7098fd1c3f882" + "merkle:object_hash": "22a31ab988181f280a0bfc6872556ad1d670373bd997f711389d057e1b1f531a" }, { "node_id": "DEM1_SAR_DGE_30_20101215T203649_20140913T090954_ADS_000000_4KBA", @@ -31,14 +36,14 @@ "merkle:object_hash": "3c22648957e7c76f75a9fbfdf7c164e7d745d9e4ebe809db15d2474e01be4764" }, { - "node_id": "DEM1_SAR_DGE_30_20101216T005027_20130411T005216_ADS_000000_Hbh2", + "node_id": "DEM1_SAR_DGE_30_20101215T203649_20140913T090954_ADS_000000_Quqd", "type": "Item", - "merkle:object_hash": "176a6f0026a763a85909d64d3eeac37873cd7c970fae685a6f37bd16d806b3fe" + "merkle:object_hash": "6d16f23e2fe2849ce0316c012e9284b413f7c086a11d6de421c0582b397f513e" }, { - "node_id": "DEM1_SAR_DTE_30_20101217T200231_20140802T083458_ADS_000000_tFcK", + "node_id": "DEM1_SAR_DGE_30_20101215T203914_20121226T204852_ADS_000000_Yjwo", "type": "Item", - "merkle:object_hash": "d0114075968ff1a3860eaf90d85317304f96757d6313f396eb556546a9c95006" + "merkle:object_hash": "ed99b4348ede45959a9cf471490f0f2c6a106bb4337a46d07d4fc4b4988f099f" }, { "node_id": "DEM1_SAR_DGE_30_20101216T005027_20130331T005208_ADS_000000_J3m5", @@ -46,79 +51,67 @@ "merkle:object_hash": "90a0e5265d02fd58ab81437d14ef1c318ec1b86167ac935d6abb598602798690" }, { - "node_id": "DEM1_SAR_DGE_30_20101215T203649_20140913T090954_ADS_000000_Quqd", - "type": "Item", - "merkle:object_hash": "6d16f23e2fe2849ce0316c012e9284b413f7c086a11d6de421c0582b397f513e" - }, - { - "node_id": "DEM1_SAR_DTE_90_20101212T084914_20140212T212323_ADS_000000_1370", - "type": "Item", - "merkle:object_hash": "c4a241c5917f7fcb82bedca10127ad22504b3462c0d3704d76a5f549e2c24010" - }, - { - "node_id": "DEM1_SAR_DGE_30_20101212T230244_20140329T113710_ADS_000000_eAmG", + "node_id": "DEM1_SAR_DGE_30_20101216T005027_20130411T005216_ADS_000000_Hbh2", "type": "Item", - "merkle:object_hash": "ac66e07717b56e8421c8fec00b2b300afd49d30a8ec9c6d505df3b0568de9c77" + "merkle:object_hash": "176a6f0026a763a85909d64d3eeac37873cd7c970fae685a6f37bd16d806b3fe" }, { "node_id": "DEM1_SAR_DGE_90_20101212T102356_20130709T103701_ADS_000000_6275", "type": "Item", "merkle:object_hash": "ffaee98a244aaad0f970100f0f3b11bf3ffd3f5de74fa473ed86cc851c73dbc9" }, - { - "node_id": "DEM1_SAR_DGE_30_20101215T203914_20121226T204852_ADS_000000_Yjwo", - "type": "Item", - "merkle:object_hash": "ed99b4348ede45959a9cf471490f0f2c6a106bb4337a46d07d4fc4b4988f099f" - }, { "node_id": "DEM1_SAR_DGE_90_20101212T102356_20130709T103701_ADS_000000_8521", "type": "Item", "merkle:object_hash": "6d7dca89eec5da5e31023b7e24b9527e33a021099c8e74791ecda860f96091ab" }, { - "node_id": "DEM1_SAR_DTE_90_20101212T084914_20130418T085214_ADS_000000_5545", + "node_id": "DEM1_SAR_DTE_30_20101215T203649_20140913T090954_ADS_000000_kipX", "type": "Item", - "merkle:object_hash": "7611aa1e37ee256bb346d0405c210c9e2256c43e7ed9854b4a91ef75caf59d09" + "merkle:object_hash": "88f4d9bb5b1512f214f1ca60e6ff2bac28640fd0a00339d01c453b6db9fd7f88" }, { - "node_id": "DEM1_SAR_DTE_90_20101217T224141_20140127T121413_ADS_000000_0611", + "node_id": "DEM1_SAR_DTE_30_20101215T203914_20130404T204908_ADS_000000_CURF", "type": "Item", - "merkle:object_hash": "9c33c4c49913483588fb7e2aa8084e675649ed253083ab62ac1ebebcfc16a944" + "merkle:object_hash": "457aa1fbc4005627077b55cd673a4457f2afc8a15ef3b776af8df60e60e8e1ab" }, { - "node_id": "DEM1_SAR_DTE_30_20101215T203914_20130404T204908_ADS_000000_CURF", + "node_id": "DEM1_SAR_DTE_30_20101216T100443_20140415T214254_ADS_000000_9Wqx", "type": "Item", - "merkle:object_hash": "457aa1fbc4005627077b55cd673a4457f2afc8a15ef3b776af8df60e60e8e1ab" + "merkle:object_hash": "e7b65b76ed20f947d8a2b4ae126b832d9f07fb18822863556c9fa0ac40dae873" }, { - "node_id": "DEM1_SAR_DTE_30_20101215T203649_20140913T090954_ADS_000000_kipX", + "node_id": "DEM1_SAR_DTE_30_20101217T200231_20140802T083458_ADS_000000_tFcK", "type": "Item", - "merkle:object_hash": "88f4d9bb5b1512f214f1ca60e6ff2bac28640fd0a00339d01c453b6db9fd7f88" + "merkle:object_hash": "d0114075968ff1a3860eaf90d85317304f96757d6313f396eb556546a9c95006" }, { - "node_id": "DEM1_SAR_DTE_30_20101216T100443_20140415T214254_ADS_000000_9Wqx", + "node_id": "DEM1_SAR_DTE_90_20101212T084914_20130418T085214_ADS_000000_5545", "type": "Item", - "merkle:object_hash": "e7b65b76ed20f947d8a2b4ae126b832d9f07fb18822863556c9fa0ac40dae873" + "merkle:object_hash": "7611aa1e37ee256bb346d0405c210c9e2256c43e7ed9854b4a91ef75caf59d09" }, { - "node_id": "DEM1_SAR_DGE_30_20101212T230244_20140325T230302_ADS_000000_1jTi", + "node_id": "DEM1_SAR_DTE_90_20101212T084914_20140212T212323_ADS_000000_1370", "type": "Item", - "merkle:object_hash": "ce9f56e695ab1751b8f0c8d9ef1f1ecedaf04574ec3077e70e7426ec9fc61ea4" + "merkle:object_hash": "c4a241c5917f7fcb82bedca10127ad22504b3462c0d3704d76a5f549e2c24010" }, { "node_id": "DEM1_SAR_DTE_90_20101212T102356_20130628T103333_ADS_000000_1705", "type": "Item", "merkle:object_hash": "ae063aa10078d5316f36718b92f966c9f96f63f621839bd031730eb068c1c265" + }, + { + "node_id": "DEM1_SAR_DTE_90_20101213T034716_20130408T035028_ADS_000000_5033", + "type": "Item", + "merkle:object_hash": "c1511bd4889e8078010287a820d3fcc216cad1ab48374e324fb7098fd1c3f882" + }, + { + "node_id": "DEM1_SAR_DTE_90_20101217T224141_20140127T121413_ADS_000000_0611", + "type": "Item", + "merkle:object_hash": "9c33c4c49913483588fb7e2aa8084e675649ed253083ab62ac1ebebcfc16a944" } ] }, - { - "node_id": "TERRAAQUA", - "type": "Collection", - "merkle:object_hash": "6ae6f97edd2994b632b415ff810af38639faa84544aa8a33a88bdf867a649374", - "merkle:root": "6ae6f97edd2994b632b415ff810af38639faa84544aa8a33a88bdf867a649374", - "children": [] - }, { "node_id": "S2GLC", "type": "Collection", @@ -131,6 +124,13 @@ "merkle:object_hash": "3a3803a0dae5dbaf9561aeb4cce2770bf38b5da4b71ca67398fb24d48c43a68f" } ] + }, + { + "node_id": "TERRAAQUA", + "type": "Collection", + "merkle:object_hash": "6ae6f97edd2994b632b415ff810af38639faa84544aa8a33a88bdf867a649374", + "merkle:root": "6ae6f97edd2994b632b415ff810af38639faa84544aa8a33a88bdf867a649374", + "children": [] } ] } \ No newline at end of file diff --git a/setup.py b/setup.py index c123f5d..4c86dfc 100644 --- a/setup.py +++ b/setup.py @@ -1,28 +1,28 @@ -from setuptools import setup, find_packages +from setuptools import find_packages, setup setup( - name='stac_merkle_tree_cli', - version='0.3.0', - author='Jonathan Healy', - author_email='jonathan.d.healy@gmail.com', - description='A CLI tool for computing and adding Merkle Tree information to STAC catalogs, collections, or items.', - long_description=open('README.md').read(), - long_description_content_type='text/markdown', - url='https://github.com/stacchain/stac-merkle-tree-cli', + name="stac_merkle_tree_cli", + version="0.3.0", + author="Jonathan Healy", + author_email="jonathan.d.healy@gmail.com", + description="A CLI tool for computing and adding Merkle Tree information to STAC catalogs, collections, or items.", + long_description=open("README.md").read(), + long_description_content_type="text/markdown", + url="https://github.com/stacchain/stac-merkle-tree-cli", packages=find_packages(), include_package_data=True, install_requires=[ - 'click>=8.0.0', + "click>=8.0.0", ], entry_points={ - 'console_scripts': [ - 'stac-merkle-tree-cli=stac_merkle_tree_cli.cli:cli', + "console_scripts": [ + "stac-merkle-tree-cli=stac_merkle_tree_cli.cli:cli", ], }, classifiers=[ - 'Programming Language :: Python :: 3', - 'License :: OSI Approved :: Apache Software License', - 'Operating System :: OS Independent', + "Programming Language :: Python :: 3", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", ], - python_requires='>=3.6', + python_requires=">=3.6", ) diff --git a/stac_merkle_tree_cli/cli.py b/stac_merkle_tree_cli/cli.py index 1a87dfe..8f096cc 100644 --- a/stac_merkle_tree_cli/cli.py +++ b/stac_merkle_tree_cli/cli.py @@ -1,13 +1,34 @@ # stac_merkle_cli/cli.py -import click import json +import logging from pathlib import Path -from .compute_merkle_info import process_catalog -from .verify_merkle_tree_json import verify_merkle_tree + +import click + +from .compute_merkle_info import MerkleTreeProcessor +from .verify_merkle_tree_json import MerkleTreeVerifier + +# Configure the root logger +logger = logging.getLogger("stac_merkle_cli") +logger.setLevel(logging.INFO) +handler = logging.StreamHandler() +formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") +handler.setFormatter(formatter) +if not logger.handlers: + logger.addHandler(handler) + @click.group() -def cli(): +@click.option("--verbose", "-v", is_flag=True, help="Enable verbose (debug) logging.") +@click.option( + "--quiet", + "-q", + is_flag=True, + help="Enable quiet mode. Only warnings and errors will be shown.", +) +@click.pass_context +def cli(ctx, verbose, quiet): """ STAC Merkle Tree CLI Tool. @@ -15,66 +36,110 @@ def cli(): compute Compute Merkle hashes for a STAC catalog. verify Verify the integrity of a Merkle tree JSON file. """ - pass + # Adjust logging level based on options + if verbose and quiet: + click.echo("Error: --verbose and --quiet cannot be used together.", err=True) + ctx.exit(1) + elif verbose: + logger.setLevel(logging.DEBUG) + for handler in logger.handlers: + handler.setLevel(logging.DEBUG) + logger.debug("Verbose logging enabled.") + elif quiet: + logger.setLevel(logging.WARNING) + for handler in logger.handlers: + handler.setLevel(logging.WARNING) + logger.debug("Quiet mode enabled.") + @cli.command() -@click.argument('catalog_path', type=click.Path(exists=True, file_okay=False), required=True) -@click.option('--merkle-tree-file', type=click.Path(), default='merkle_tree.json', - help='Path to the output Merkle tree structure file.') -def compute(catalog_path: str, merkle_tree_file: str): +@click.argument( + "catalog_path", type=click.Path(exists=True, file_okay=False), required=True +) +@click.option( + "--merkle-tree-file", + type=click.Path(), + default="merkle_tree.json", + help="Path to the output Merkle tree structure file.", +) +@click.pass_context +def compute(ctx, catalog_path: str, merkle_tree_file: str): """ Compute Merkle hashes for STAC catalogs, handling nested catalogs and collections. CATALOG_PATH: Path to the root directory containing 'catalog.json'. """ catalog_dir = Path(catalog_path) - catalog_json_path = catalog_dir / 'catalog.json' - + catalog_json_path = catalog_dir / "catalog.json" + if not catalog_json_path.exists(): + logger.error(f"'catalog.json' not found in {catalog_dir}") click.echo(f"Error: 'catalog.json' not found in {catalog_dir}", err=True) - exit(1) - + ctx.exit(1) + # Define the root hash_method root_hash_method = { - 'function': 'sha256', - 'fields': ['*'], - 'ordering': 'ascending', - 'description': 'Computed by including the merkle:root of collections and the catalog\'s own merkle:object_hash.' + "function": "sha256", + "fields": ["*"], + "ordering": "ascending", + "description": "Computed by including the merkle:root of collections and the catalog's own merkle:object_hash.", } - + + # Initialize the MerkleTreeProcessor + processor = MerkleTreeProcessor(logger=logger) + # Process the root catalog - merkle_tree = process_catalog(catalog_json_path, root_hash_method) - + merkle_tree = processor.process_catalog(catalog_json_path, root_hash_method) + if not merkle_tree: - click.echo("Error: Merkle tree is empty. Check your Catalog structure and hash methods.", err=True) - exit(1) - + logger.error( + "Merkle tree is empty. Check your Catalog structure and hash methods." + ) + click.echo( + "Error: Merkle tree is empty. Check your Catalog structure and hash methods.", + err=True, + ) + ctx.exit(1) + # Save the merkle_tree.json output_path = Path(catalog_path) / merkle_tree_file try: - with output_path.open('w', encoding='utf-8') as f: + with output_path.open("w", encoding="utf-8") as f: json.dump(merkle_tree, f, indent=2) + logger.info(f"Merkle tree structure saved to {output_path}") click.echo(f"Merkle tree structure saved to {output_path}") except Exception as e: + logger.exception(f"Error writing to {output_path}: {e}") click.echo(f"Error writing to {output_path}: {e}", err=True) - exit(1) + ctx.exit(1) + @cli.command() -@click.argument('merkle_tree_file', type=click.Path(exists=True, dir_okay=False), required=True) -def verify(merkle_tree_file: str): +@click.argument( + "merkle_tree_file", type=click.Path(exists=True, dir_okay=False), required=True +) +@click.pass_context +def verify(ctx, merkle_tree_file: str): """ Verify that the merkle:root in the Merkle tree JSON matches the recalculated root. MERKLE_TREE_FILE: Path to the Merkle tree JSON file. """ merkle_tree_path = Path(merkle_tree_file) - verification_result = verify_merkle_tree(merkle_tree_path) + logger.info(f"Verifying Merkle tree at {merkle_tree_path}") + + verifier = MerkleTreeVerifier(logger=logger) + verification_result = verifier.verify_merkle_tree(merkle_tree_path) + if verification_result: + logger.info("Verification Successful: The merkle:root matches.") click.echo("Verification Successful: The merkle:root matches.") - exit(0) + ctx.exit(0) else: + logger.error("Verification Failed: The merkle:root does not match.") click.echo("Verification Failed: The merkle:root does not match.", err=True) - exit(1) + ctx.exit(1) + -if __name__ == '__main__': +if __name__ == "__main__": cli() diff --git a/stac_merkle_tree_cli/compute_merkle_info.py b/stac_merkle_tree_cli/compute_merkle_info.py index f3c3b06..875c401 100644 --- a/stac_merkle_tree_cli/compute_merkle_info.py +++ b/stac_merkle_tree_cli/compute_merkle_info.py @@ -1,361 +1,534 @@ # stac_merkle_cli/compute_merkle_info.py -import json import hashlib +import json +import logging from pathlib import Path -from typing import List, Dict, Any - - -def remove_merkle_fields(data: Any) -> Any: - """ - Recursively removes Merkle-specific fields from the data. - """ - if isinstance(data, dict): - return {k: remove_merkle_fields(v) for k, v in data.items() if k not in {"merkle:object_hash", "merkle:hash_method", "merkle:root"}} - elif isinstance(data, list): - return [remove_merkle_fields(item) for item in data] - else: - return data +from typing import Any, Dict, List, Optional -def compute_merkle_object_hash(stac_object: Dict[str, Any], hash_method: Dict[str, Any]) -> str: - """ - Computes the merkle:object_hash for a STAC object. - - Parameters: - - stac_object (Dict[str, Any]): The STAC Catalog, Collection, or Item JSON object. - - hash_method (Dict[str, Any]): The hash method details from merkle:hash_method. - - Returns: - - str: The computed object hash as a hexadecimal string. +class MerkleTreeProcessor: """ - fields = hash_method.get('fields', ['*']) - if fields == ['*'] or fields == ['all']: - data_to_hash = remove_merkle_fields(stac_object) - else: - selected_data = {field: stac_object.get(field) for field in fields if field in stac_object} - data_to_hash = remove_merkle_fields(selected_data) - - # Serialize the data to a compact JSON string with sorted keys - json_str = json.dumps(data_to_hash, sort_keys=True, separators=(',', ':')) - - # Get the hash function - hash_function_name = hash_method.get('function', 'sha256').replace('-', '').lower() - hash_func = getattr(hashlib, hash_function_name, None) - if not hash_func: - raise ValueError(f"Unsupported hash function: {hash_function_name}") - - # Compute the hash - return hash_func(json_str.encode('utf-8')).hexdigest() - - -def compute_merkle_root(hashes: List[str], hash_method: Dict[str, Any]) -> str: - if not hashes: - return '' - - # Enforce ordering - ordering = hash_method.get('ordering', 'ascending') - if ordering == 'ascending': - hashes.sort() - elif ordering == 'descending': - hashes.sort(reverse=True) - elif ordering != 'unsorted': - raise ValueError(f"Unsupported ordering: {ordering}") - - # Get the hash function - hash_function_name = hash_method.get('function', 'sha256').replace('-', '').lower() - hash_func = getattr(hashlib, hash_function_name, None) - if not hash_func: - raise ValueError(f"Unsupported hash function: {hash_function_name}") - - current_level = hashes.copy() - print(f"Initial hashes for merkle:root computation: {current_level}") - - while len(current_level) > 1: - next_level = [] - for i in range(0, len(current_level), 2): - left = current_level[i] - right = current_level[i + 1] if i + 1 < len(current_level) else left - combined = bytes.fromhex(left) + bytes.fromhex(right) - new_hash = hash_func(combined).hexdigest() - next_level.append(new_hash) - print(f"Combined '{left}' + '{right}' => '{new_hash}'") - current_level = next_level - print(f"Next level hashes: {current_level}") - - print(f"Final merkle:root: {current_level[0]}") - return current_level[0] - - - -def process_item(item_path: Path, hash_method: Dict[str, Any]) -> Dict[str, Any]: + A class to compute Merkle hashes for STAC Catalogs, Collections, and Items. + It provides functionalities to process items and collections, compute object hashes, + and construct the Merkle root for the entire catalog structure. """ - Processes a STAC Item to compute and return its object hash. - Parameters: - - item_path (Path): Path to the Item JSON file. - - hash_method (Dict[str, Any]): The hash method to use. - - Returns: - - Dict[str, Any]: A dictionary containing 'node_id' and 'merkle:object_hash'. - """ - try: - with item_path.open('r', encoding='utf-8') as f: - item_json = json.load(f) - - if item_json.get('type') != 'Feature': - print(f"Skipping non-Item JSON: {item_path}") + def __init__(self, logger: Optional[logging.Logger] = None): + """ + Initializes the MerkleTreeProcessor with an optional logger. + + Parameters: + - logger (logging.Logger, optional): A logger instance for logging messages. + If not provided, a default logger is configured. + """ + if logger is None: + # Configure default logger + self.logger = logging.getLogger(self.__class__.__name__) + self.logger.setLevel(logging.INFO) + handler = logging.StreamHandler() + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + handler.setFormatter(formatter) + if not self.logger.handlers: + self.logger.addHandler(handler) + else: + self.logger = logger + + def remove_merkle_fields(self, data: Any) -> Any: + """ + Recursively removes Merkle-specific fields from the data. + + Parameters: + - data (Any): The JSON data (dict or list) from which to remove Merkle fields. + + Returns: + - Any: The data with Merkle fields excluded. + """ + if isinstance(data, dict): + return { + k: self.remove_merkle_fields(v) + for k, v in data.items() + if k not in {"merkle:object_hash", "merkle:hash_method", "merkle:root"} + } + elif isinstance(data, list): + return [self.remove_merkle_fields(item) for item in data] + else: + return data + + def compute_merkle_object_hash( + self, stac_object: Dict[str, Any], hash_method: Dict[str, Any] + ) -> str: + """ + Computes the merkle:object_hash for a STAC object. + + Parameters: + - stac_object (Dict[str, Any]): The STAC Catalog, Collection, or Item JSON object. + - hash_method (Dict[str, Any]): The hash method details from merkle:hash_method. + + Returns: + - str: The computed object hash as a hexadecimal string. + """ + self.logger.debug("Computing merkle:object_hash for STAC object.") + fields = hash_method.get("fields", ["*"]) + self.logger.debug(f"Hash fields: {fields}") + + if fields == ["*"] or fields == ["all"]: + data_to_hash = self.remove_merkle_fields(stac_object) + self.logger.debug( + "Using all fields for hashing after removing Merkle fields." + ) + else: + selected_data = { + field: stac_object.get(field) + for field in fields + if field in stac_object + } + data_to_hash = self.remove_merkle_fields(selected_data) + self.logger.debug( + f"Using specific fields for hashing: {list(selected_data.keys())}" + ) + + # Serialize the data to a compact JSON string with sorted keys + json_str = json.dumps(data_to_hash, sort_keys=True, separators=(",", ":")) + self.logger.debug(f"Serialized JSON for hashing: {json_str}") + + # Get the hash function + hash_function_name = ( + hash_method.get("function", "sha256").replace("-", "").lower() + ) + self.logger.debug(f"Selected hash function: {hash_function_name}") + hash_func = getattr(hashlib, hash_function_name, None) + if not hash_func: + self.logger.error(f"Unsupported hash function: {hash_function_name}") + raise ValueError(f"Unsupported hash function: {hash_function_name}") + + # Compute the hash + object_hash = hash_func(json_str.encode("utf-8")).hexdigest() + self.logger.debug(f"Computed merkle:object_hash: {object_hash}") + + return object_hash + + def compute_merkle_root( + self, hashes: List[str], hash_method: Dict[str, Any] + ) -> str: + """ + Computes the Merkle root from a list of hashes based on the provided hash method. + + Parameters: + - hashes (List[str]): A list of hexadecimal hash strings. + - hash_method (Dict[str, Any]): The hash method details (function, ordering). + + Returns: + - str: The computed Merkle root as a hexadecimal string. + """ + self.logger.debug(f"Computing Merkle root from {len(hashes)} hashes.") + if not hashes: + self.logger.warning("Empty hash list provided. Returning empty string.") + return "" + + # Enforce ordering + ordering = hash_method.get("ordering", "ascending") + self.logger.debug(f"Hash ordering method: {ordering}") + if ordering == "ascending": + hashes.sort() + self.logger.debug("Hashes sorted in ascending order.") + elif ordering == "descending": + hashes.sort(reverse=True) + self.logger.debug("Hashes sorted in descending order.") + elif ordering == "unsorted": + self.logger.debug("Hashes remain unsorted.") + pass # Keep the original order + else: + self.logger.error(f"Unsupported ordering: {ordering}") + raise ValueError(f"Unsupported ordering: {ordering}") + + # Get the hash function + hash_function_name = ( + hash_method.get("function", "sha256").replace("-", "").lower() + ) + self.logger.debug(f"Selected hash function: {hash_function_name}") + hash_func = getattr(hashlib, hash_function_name, None) + if not hash_func: + self.logger.error(f"Unsupported hash function: {hash_function_name}") + raise ValueError(f"Unsupported hash function: {hash_function_name}") + + current_level = hashes.copy() + self.logger.debug( + f"Initial hashes for Merkle root computation: {current_level}" + ) + + while len(current_level) > 1: + next_level = [] + self.logger.debug(f"Processing level with {len(current_level)} hashes.") + for i in range(0, len(current_level), 2): + left = current_level[i] + if i + 1 < len(current_level): + right = current_level[i + 1] + else: + right = left # Duplicate the last hash if odd number + self.logger.debug( + f"Odd number of hashes. Duplicating last hash: {left}" + ) + + self.logger.debug(f"Combining '{left}' + '{right}'") + try: + combined = bytes.fromhex(left) + bytes.fromhex(right) + except ValueError as e: + self.logger.error(f"Error converting hashes to bytes: {e}") + raise ValueError(f"Error converting hashes to bytes: {e}") + + new_hash = hash_func(combined).hexdigest() + self.logger.debug(f"Combined hash: {new_hash}") + next_level.append(new_hash) + current_level = next_level + self.logger.debug(f"Next level hashes: {current_level}") + + final_root = current_level[0] + self.logger.info(f"Final Merkle root computed: {final_root}") + return final_root + + def process_item( + self, item_path: Path, hash_method: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Processes a STAC Item to compute and return its object hash. + + Parameters: + - item_path (Path): Path to the Item JSON file. + - hash_method (Dict[str, Any]): The hash method to use. + + Returns: + - Dict[str, Any]: A dictionary containing 'node_id', 'type', and 'merkle:object_hash'. + """ + self.logger.debug(f"Processing Item: {item_path}") + try: + with item_path.open("r", encoding="utf-8") as f: + item_json = json.load(f) + + if item_json.get("type") != "Feature": + self.logger.warning(f"Skipping non-Item JSON: {item_path}") + return {} + + # Compute merkle:object_hash + object_hash = self.compute_merkle_object_hash(item_json, hash_method) + + # Add merkle:object_hash to 'properties' + properties = item_json.setdefault("properties", {}) + properties["merkle:object_hash"] = object_hash + self.logger.debug( + f"Added merkle:object_hash to Item '{item_json.get('id', item_path.stem)}'." + ) + + # Ensure the Merkle extension is listed + item_json.setdefault("stac_extensions", []) + extension_url = "https://stacchain.github.io/merkle-tree/v1.0.0/schema.json" + if extension_url not in item_json["stac_extensions"]: + item_json["stac_extensions"].append(extension_url) + item_json["stac_extensions"].sort() # Sort for consistent ordering + self.logger.debug( + f"Added Merkle extension to Item '{item_json.get('id', item_path.stem)}'." + ) + + # Save the updated Item JSON + with item_path.open("w", encoding="utf-8") as f: + json.dump(item_json, f, indent=2) + f.write("\n") + self.logger.info(f"Processed Item: {item_path}") + + # Return the structured Item node + return { + "node_id": item_json.get("id", item_path.stem), + "type": "Item", + "merkle:object_hash": object_hash, + } + + except Exception as e: + self.logger.error(f"Error processing Item {item_path}: {e}") return {} - # Compute merkle:object_hash - object_hash = compute_merkle_object_hash(item_json, hash_method) - - # Add merkle:object_hash to 'properties' - properties = item_json.setdefault('properties', {}) - properties['merkle:object_hash'] = object_hash - - # Ensure the Merkle extension is listed - item_json.setdefault('stac_extensions', []) - extension_url = 'https://stacchain.github.io/merkle-tree/v1.0.0/schema.json' - if extension_url not in item_json['stac_extensions']: - item_json['stac_extensions'].append(extension_url) - item_json['stac_extensions'].sort() # Sort for consistent ordering - - # Save the updated Item JSON - with item_path.open('w', encoding='utf-8') as f: - json.dump(item_json, f, indent=2) - f.write('\n') - - print(f"Processed Item: {item_path}") + def process_collection( + self, collection_path: Path, parent_hash_method: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Processes a STAC Collection to compute its merkle:root and builds a hierarchical Merkle node. - # Return the structured Item node - return { - 'node_id': item_json.get('id', item_path.stem), - 'type': 'Item', - 'merkle:object_hash': object_hash - } + Parameters: + - collection_path (Path): Path to the Collection JSON file. + - parent_hash_method (Dict[str, Any]): The hash method inherited from the parent. - except Exception as e: - print(f"Error processing Item {item_path}: {e}") - return {} - - -def process_collection(collection_path: Path, parent_hash_method: Dict[str, Any]) -> Dict[str, Any]: - """ - Processes a STAC Collection to compute its merkle:root and builds a hierarchical Merkle node. - - Parameters: - - collection_path (Path): Path to the Collection JSON file. - - parent_hash_method (Dict[str, Any]): The hash method inherited from the parent. - - Returns: - - Dict[str, Any]: The structured Merkle tree node for the collection. - """ - try: - with collection_path.open('r', encoding='utf-8') as f: - collection_json = json.load(f) - - if collection_json.get('type') != 'Collection': - print(f"Skipping non-Collection JSON: {collection_path}") - return {} - - # Determine the hash_method to use - hash_method = collection_json.get('merkle:hash_method', parent_hash_method) - - if not hash_method: - raise ValueError(f"Hash method not specified for {collection_path}") - - children = [] - - collection_dir = collection_path.parent - - # Process items directly in the collection directory - for item_file in collection_dir.glob('*.json'): - if item_file == collection_path: - continue - item_node = process_item(item_file, hash_method) - if item_node: - children.append(item_node) - - # Recursively process subdirectories - for subdirectory in collection_dir.iterdir(): - if subdirectory.is_dir(): - sub_collection_json = subdirectory / 'collection.json' - sub_catalog_json = subdirectory / 'catalog.json' - - if sub_collection_json.exists(): - # Process sub-collection - sub_collection_node = process_collection(sub_collection_json, hash_method) - if sub_collection_node: - children.append(sub_collection_node) - elif sub_catalog_json.exists(): - # Process sub-catalog - sub_catalog_node = process_catalog(sub_catalog_json, hash_method) - if sub_catalog_node: - children.append(sub_catalog_node) - elif is_item_directory(subdirectory): - # Process item in its own directory - item_files = list(subdirectory.glob('*.json')) - if item_files: - item_file = item_files[0] - item_node = process_item(item_file, hash_method) - if item_node: - children.append(item_node) + Returns: + - Dict[str, Any]: The structured Merkle tree node for the collection. + """ + self.logger.debug(f"Processing Collection: {collection_path}") + try: + with collection_path.open("r", encoding="utf-8") as f: + collection_json = json.load(f) + + if collection_json.get("type") != "Collection": + self.logger.warning(f"Skipping non-Collection JSON: {collection_path}") + return {} + + # Determine the hash_method to use + hash_method = collection_json.get("merkle:hash_method", parent_hash_method) + if not hash_method: + self.logger.error(f"Hash method not specified for {collection_path}") + raise ValueError(f"Hash method not specified for {collection_path}") + + children = [] + + collection_dir = collection_path.parent + + # Process items directly in the collection directory + for item_file in collection_dir.glob("*.json"): + if item_file == collection_path: + continue + item_node = self.process_item(item_file, hash_method) + if item_node: + children.append(item_node) + + # Recursively process subdirectories + for subdirectory in collection_dir.iterdir(): + if subdirectory.is_dir(): + sub_collection_json = subdirectory / "collection.json" + sub_catalog_json = subdirectory / "catalog.json" + + if sub_collection_json.exists(): + # Process sub-collection + sub_collection_node = self.process_collection( + sub_collection_json, hash_method + ) + if sub_collection_node: + children.append(sub_collection_node) + elif sub_catalog_json.exists(): + # Process sub-catalog + sub_catalog_node = self.process_catalog( + sub_catalog_json, hash_method + ) + if sub_catalog_node: + children.append(sub_catalog_node) + elif is_item_directory(subdirectory): + # Process item in its own directory + item_files = list(subdirectory.glob("*.json")) + if item_files: + item_file = item_files[0] + item_node = self.process_item(item_file, hash_method) + if item_node: + children.append(item_node) + else: + # Handle other cases or ignore + self.logger.warning(f"Unrecognized structure in {subdirectory}") + + # Compute own merkle:object_hash + own_object_hash = self.compute_merkle_object_hash( + collection_json, hash_method + ) + collection_json["merkle:object_hash"] = own_object_hash + self.logger.debug( + f"Computed merkle:object_hash for Collection '{collection_json.get('id', collection_path)}': {own_object_hash}" + ) + + # Collect all hashes: own_object_hash + child hashes + child_hashes = [] + for child in children: + if child["type"] in {"Collection", "Catalog"}: + child_hash = child.get("merkle:root") + if child_hash: + child_hashes.append(child_hash) + self.logger.debug( + f"Added child merkle:root from '{child['node_id']}': {child_hash}" + ) else: - # Handle other cases or ignore - print(f"Unrecognized structure in {subdirectory}") - - # Compute own merkle:object_hash - own_object_hash = compute_merkle_object_hash(collection_json, hash_method) - collection_json['merkle:object_hash'] = own_object_hash - - # Collect all hashes: own_object_hash + child hashes - child_hashes = [] - for child in children: - if child['type'] in {'Collection', 'Catalog'}: - child_hashes.append(child.get('merkle:root')) - else: - child_hashes.append(child.get('merkle:object_hash')) - - # Exclude None values - child_hashes = [h for h in child_hashes if h] - - # Include own_object_hash - all_hashes = child_hashes + [own_object_hash] - - # Compute merkle:root - merkle_root = compute_merkle_root(all_hashes, hash_method) - - collection_json['merkle:root'] = merkle_root - collection_json['merkle:hash_method'] = hash_method - - # Ensure the Merkle extension is listed and sorted - extension_url = 'https://stacchain.github.io/merkle-tree/v1.0.0/schema.json' - collection_json.setdefault('stac_extensions', []) - if extension_url not in collection_json['stac_extensions']: - collection_json['stac_extensions'].append(extension_url) - collection_json['stac_extensions'].sort() - - # Save the updated Collection JSON - with collection_path.open('w', encoding='utf-8') as f: - json.dump(collection_json, f, indent=2) - f.write('\n') - - print(f"Processed Collection: {collection_path}") - - # Build the hierarchical Merkle node - collection_node = { - 'node_id': collection_json.get('id', str(collection_path)), - 'type': 'Collection', - 'merkle:object_hash': own_object_hash, - 'merkle:root': merkle_root, - 'children': children - } - - return collection_node - - except Exception as e: - print(f"Error processing Collection {collection_path}: {e}") - return {} - - -def process_catalog(catalog_path: Path, parent_hash_method: Dict[str, Any] = None) -> Dict[str, Any]: - """ - Processes the root STAC Catalog to compute its merkle:root and builds a hierarchical Merkle node. - - Parameters: - - catalog_path (Path): Path to the Catalog JSON file. - - parent_hash_method (Dict[str, Any], optional): The hash method inherited from the parent. - - Returns: - - Dict[str, Any]: The structured Merkle tree node for the catalog. - """ - try: - with catalog_path.open('r', encoding='utf-8') as f: - catalog_json = json.load(f) - - if catalog_json.get('type') != 'Catalog': - print(f"Skipping non-Catalog JSON: {catalog_path}") + child_hash = child.get("merkle:object_hash") + if child_hash: + child_hashes.append(child_hash) + self.logger.debug( + f"Added child merkle:object_hash from '{child['node_id']}': {child_hash}" + ) + + # Exclude None values + child_hashes = [h for h in child_hashes if h] + + # Include own_object_hash + all_hashes = child_hashes + [own_object_hash] + self.logger.debug( + f"All hashes for Merkle root computation in Collection '{collection_json.get('id', collection_path)}': {all_hashes}" + ) + + # Compute merkle:root + merkle_root = self.compute_merkle_root(all_hashes, hash_method) + collection_json["merkle:root"] = merkle_root + self.logger.debug( + f"Computed merkle:root for Collection '{collection_json.get('id', collection_path)}': {merkle_root}" + ) + + collection_json["merkle:hash_method"] = hash_method + + # Ensure the Merkle extension is listed and sorted + extension_url = "https://stacchain.github.io/merkle-tree/v1.0.0/schema.json" + collection_json.setdefault("stac_extensions", []) + if extension_url not in collection_json["stac_extensions"]: + collection_json["stac_extensions"].append(extension_url) + self.logger.debug( + f"Added Merkle extension to Collection '{collection_json.get('id', collection_path)}'." + ) + collection_json["stac_extensions"].sort() + + # Save the updated Collection JSON + with collection_path.open("w", encoding="utf-8") as f: + json.dump(collection_json, f, indent=2) + f.write("\n") + self.logger.info(f"Processed Collection: {collection_path}") + + # Build the hierarchical Merkle node + collection_node = { + "node_id": collection_json.get("id", str(collection_path)), + "type": "Collection", + "merkle:object_hash": own_object_hash, + "merkle:root": merkle_root, + "children": children, + } + + # Sort children by node_id for consistency + collection_node["children"] = sorted(children, key=lambda x: x["node_id"]) + + return collection_node + + except Exception as e: + self.logger.error(f"Error processing Collection {collection_path}: {e}") return {} - # Determine the hash_method to use - hash_method = catalog_json.get('merkle:hash_method', parent_hash_method) - - if not hash_method: - raise ValueError(f"Hash method not specified for {catalog_path}") + def process_catalog( + self, catalog_path: Path, parent_hash_method: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """ + Processes the root STAC Catalog to compute its merkle:root and builds a hierarchical Merkle node. - children = [] + Parameters: + - catalog_path (Path): Path to the Catalog JSON file. + - parent_hash_method (Dict[str, Any], optional): The hash method inherited from the parent. - catalog_dir = catalog_path.parent - - # Process collections in the 'collections' directory - collections_dir = catalog_dir / 'collections' - if not collections_dir.exists(): - print(f"No 'collections' directory found in {catalog_dir}") - # It's possible for a catalog to have no collections - else: - for collection_dir in collections_dir.iterdir(): - if collection_dir.is_dir(): - collection_json_path = collection_dir / 'collection.json' - if collection_json_path.exists(): - collection_node = process_collection(collection_json_path, hash_method) - if collection_node: - children.append(collection_node) - else: - print(f"'collection.json' not found in {collection_dir}") - - # Compute own merkle:object_hash - own_object_hash = compute_merkle_object_hash(catalog_json, hash_method) - catalog_json['merkle:object_hash'] = own_object_hash - - # Collect all hashes: own_object_hash + child hashes - child_hashes = [] - for child in children: - if child['type'] in {'Collection', 'Catalog'}: - child_hashes.append(child.get('merkle:root')) + Returns: + - Dict[str, Any]: The structured Merkle tree node for the catalog. + """ + self.logger.debug(f"Processing Catalog: {catalog_path}") + try: + with catalog_path.open("r", encoding="utf-8") as f: + catalog_json = json.load(f) + + if catalog_json.get("type") != "Catalog": + self.logger.warning(f"Skipping non-Catalog JSON: {catalog_path}") + return {} + + # Determine the hash_method to use + hash_method = catalog_json.get("merkle:hash_method", parent_hash_method) + if not hash_method: + self.logger.error(f"Hash method not specified for {catalog_path}") + raise ValueError(f"Hash method not specified for {catalog_path}") + + children = [] + + catalog_dir = catalog_path.parent + + # Process collections in the 'collections' directory + collections_dir = catalog_dir / "collections" + if not collections_dir.exists(): + self.logger.warning( + f"No 'collections' directory found in {catalog_dir}" + ) + # It's possible for a catalog to have no collections else: - child_hashes.append(child.get('merkle:object_hash')) - - # Exclude None values - child_hashes = [h for h in child_hashes if h] - - # Include own_object_hash - all_hashes = child_hashes + [own_object_hash] - - # Compute merkle:root - merkle_root = compute_merkle_root(all_hashes, hash_method) - - catalog_json['merkle:root'] = merkle_root - catalog_json['merkle:hash_method'] = hash_method - - # Ensure the Merkle extension is listed and sorted - extension_url = 'https://stacchain.github.io/merkle-tree/v1.0.0/schema.json' - catalog_json.setdefault('stac_extensions', []) - if extension_url not in catalog_json['stac_extensions']: - catalog_json['stac_extensions'].append(extension_url) - catalog_json['stac_extensions'].sort() - - # Save the updated Catalog JSON - with catalog_path.open('w', encoding='utf-8') as f: - json.dump(catalog_json, f, indent=2) - f.write('\n') - - print(f"Processed Catalog: {catalog_path}") - - # Build the hierarchical Merkle node - catalog_node = { - 'node_id': catalog_json.get('id', str(catalog_path)), - 'type': 'Catalog', - 'merkle:object_hash': own_object_hash, - 'merkle:root': merkle_root, - 'children': children - } - - return catalog_node - - except Exception as e: - print(f"Error processing Catalog {catalog_path}: {e}") - return {} + for collection_dir in collections_dir.iterdir(): + if collection_dir.is_dir(): + collection_json_path = collection_dir / "collection.json" + if collection_json_path.exists(): + collection_node = self.process_collection( + collection_json_path, hash_method + ) + if collection_node: + children.append(collection_node) + else: + self.logger.warning( + f"'collection.json' not found in {collection_dir}" + ) + + # Compute own merkle:object_hash + own_object_hash = self.compute_merkle_object_hash(catalog_json, hash_method) + catalog_json["merkle:object_hash"] = own_object_hash + self.logger.debug( + f"Computed merkle:object_hash for Catalog '{catalog_json.get('id', catalog_path)}': {own_object_hash}" + ) + + # Collect all hashes: own_object_hash + child hashes + child_hashes = [] + for child in children: + if child["type"] in {"Collection", "Catalog"}: + child_hash = child.get("merkle:root") + if child_hash: + child_hashes.append(child_hash) + self.logger.debug( + f"Added child merkle:root from '{child['node_id']}': {child_hash}" + ) + else: + child_hash = child.get("merkle:object_hash") + if child_hash: + child_hashes.append(child_hash) + self.logger.debug( + f"Added child merkle:object_hash from '{child['node_id']}': {child_hash}" + ) + + # Exclude None values + child_hashes = [h for h in child_hashes if h] + + # Include own_object_hash + all_hashes = child_hashes + [own_object_hash] + self.logger.debug( + f"All hashes for Merkle root computation in Catalog '{catalog_json.get('id', catalog_path)}': {all_hashes}" + ) + + # Compute merkle:root + merkle_root = self.compute_merkle_root(all_hashes, hash_method) + catalog_json["merkle:root"] = merkle_root + self.logger.debug( + f"Computed merkle:root for Catalog '{catalog_json.get('id', catalog_path)}': {merkle_root}" + ) + + catalog_json["merkle:hash_method"] = hash_method + + # Ensure the Merkle extension is listed and sorted + extension_url = "https://stacchain.github.io/merkle-tree/v1.0.0/schema.json" + catalog_json.setdefault("stac_extensions", []) + if extension_url not in catalog_json["stac_extensions"]: + catalog_json["stac_extensions"].append(extension_url) + self.logger.debug( + f"Added Merkle extension to Catalog '{catalog_json.get('id', catalog_path)}'." + ) + catalog_json["stac_extensions"].sort() + + # Save the updated Catalog JSON + with catalog_path.open("w", encoding="utf-8") as f: + json.dump(catalog_json, f, indent=2) + f.write("\n") + self.logger.info(f"Processed Catalog: {catalog_path}") + + # Build the hierarchical Merkle node + catalog_node = { + "node_id": catalog_json.get("id", str(catalog_path)), + "type": "Catalog", + "merkle:object_hash": own_object_hash, + "merkle:root": merkle_root, + "children": children, + } + + # Sort children by node_id for consistency + catalog_node["children"] = sorted( + catalog_node["children"], key=lambda x: x["node_id"] + ) + + return catalog_node + + except Exception as e: + self.logger.error(f"Error processing Catalog {catalog_path}: {e}") + return {} def is_item_directory(directory: Path) -> bool: @@ -368,12 +541,12 @@ def is_item_directory(directory: Path) -> bool: Returns: - bool: True if the directory contains exactly one Item JSON file, False otherwise. """ - item_files = list(directory.glob('*.json')) - if len(item_files) == 1: - try: - with item_files[0].open('r', encoding='utf-8') as f: - data = json.load(f) - return data.get('type') == 'Feature' - except: + try: + item_files = list(directory.glob("*.json")) + if len(item_files) != 1: return False - return False + with item_files[0].open("r", encoding="utf-8") as f: + data = json.load(f) + return data.get("type") == "Feature" + except Exception: + return False diff --git a/stac_merkle_tree_cli/verify_merkle_tree_json.py b/stac_merkle_tree_cli/verify_merkle_tree_json.py index 2313083..fdeead6 100644 --- a/stac_merkle_tree_cli/verify_merkle_tree_json.py +++ b/stac_merkle_tree_cli/verify_merkle_tree_json.py @@ -1,118 +1,227 @@ +# stac_merkle_cli/verify_merkle_tree_json.py + import hashlib -import logging import json +import logging from pathlib import Path -from typing import List, Dict, Any +from typing import Any, Dict, List, Optional -def compute_merkle_root(hashes: List[str], hash_method: Dict[str, Any]) -> str: - """ - Computes the Merkle root from a list of hashes based on the provided hash method. - """ - if not hashes: - return '' - - # Determine ordering - ordering = hash_method.get('ordering', 'ascending') - if ordering == 'ascending': - hashes.sort() - elif ordering == 'descending': - hashes.sort(reverse=True) - elif ordering == 'unsorted': - pass # Keep the original order - else: - raise ValueError(f"Unsupported ordering method: {ordering}") - - # Get the hash function - hash_function_name = hash_method.get('function', 'sha256').replace('-', '').lower() - hash_func = getattr(hashlib, hash_function_name, None) - if not hash_func: - raise ValueError(f"Unsupported hash function: {hash_function_name}") - - current_level = hashes.copy() - - while len(current_level) > 1: - next_level = [] - for i in range(0, len(current_level), 2): - left = current_level[i] - if i + 1 < len(current_level): - right = current_level[i + 1] - else: - right = left # Duplicate the last hash if odd number - - combined = bytes.fromhex(left) + bytes.fromhex(right) - new_hash = hash_func(combined).hexdigest() - next_level.append(new_hash) - current_level = next_level - return current_level[0] - -def verify_merkle_tree(merkle_tree_path: Path) -> bool: +class MerkleTreeVerifier: """ - Verifies that the merkle:root in the Merkle tree JSON matches the recalculated root. + A class to verify the integrity of a Merkle tree JSON structure. """ - try: - with merkle_tree_path.open('r', encoding='utf-8') as f: - merkle_tree = json.load(f) - - discrepancies = [] - calculated_root = calculate_merkle_root_with_discrepancies(merkle_tree, discrepancies) - original_root = merkle_tree.get('merkle:root') - - if not original_root: - print("Error: 'merkle:root' not found in the JSON.") - return False - - if calculated_root == original_root: - print(f"Verification Successful: The merkle:root matches ({calculated_root}).") - return True + def __init__(self, logger: Optional[logging.Logger] = None): + """ + Initializes the MerkleTreeVerifier with an optional logger. + + Parameters: + - logger (logging.Logger, optional): A logger instance for logging messages. + If not provided, a default logger is configured. + """ + if logger is None: + # Configure default logger + self.logger = logging.getLogger(self.__class__.__name__) + self.logger.setLevel(logging.INFO) + handler = logging.StreamHandler() + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + handler.setFormatter(formatter) + if not self.logger.handlers: + self.logger.addHandler(handler) else: - print(f"Verification Failed:") - print(f" - Expected merkle:root: {original_root}") - print(f" - Calculated merkle:root: {calculated_root}") - if discrepancies: - print("Discrepancies found in the following nodes:") - for discrepancy in discrepancies: - print(f" - {discrepancy}") + self.logger = logger + + def compute_merkle_root( + self, hashes: List[str], hash_method: Dict[str, Any] + ) -> str: + """ + Computes the Merkle root from a list of hashes based on the provided hash method. + + Parameters: + - hashes (List[str]): A list of hexadecimal hash strings. + - hash_method (Dict[str, Any]): The hash method details (function, ordering). + + Returns: + - str: The computed Merkle root as a hexadecimal string. + """ + self.logger.debug(f"Starting compute_merkle_root with {len(hashes)} hashes.") + if not hashes: + self.logger.warning("Empty hash list provided. Returning empty string.") + return "" + + # Determine ordering + ordering = hash_method.get("ordering", "ascending") + self.logger.debug(f"Hash ordering method: {ordering}") + if ordering == "ascending": + hashes.sort() + self.logger.debug("Hashes sorted in ascending order.") + elif ordering == "descending": + hashes.sort(reverse=True) + self.logger.debug("Hashes sorted in descending order.") + elif ordering == "unsorted": + self.logger.debug("Hashes remain unsorted.") + pass # Keep the original order + else: + self.logger.error(f"Unsupported ordering method: {ordering}") + raise ValueError(f"Unsupported ordering method: {ordering}") + + # Get the hash function + hash_function_name = ( + hash_method.get("function", "sha256").replace("-", "").lower() + ) + self.logger.debug(f"Using hash function: {hash_function_name}") + hash_func = getattr(hashlib, hash_function_name, None) + if not hash_func: + self.logger.error(f"Unsupported hash function: {hash_function_name}") + raise ValueError(f"Unsupported hash function: {hash_function_name}") + + current_level = hashes.copy() + self.logger.debug(f"Initial current_level: {current_level}") + + while len(current_level) > 1: + self.logger.debug(f"Processing level with {len(current_level)} hashes.") + next_level = [] + for i in range(0, len(current_level), 2): + left = current_level[i] + if i + 1 < len(current_level): + right = current_level[i + 1] + else: + right = left # Duplicate the last hash if odd number + self.logger.debug( + f"Odd number of hashes. Duplicating last hash: {left}" + ) + + self.logger.debug(f"Combining hashes: {left} + {right}") + try: + combined = bytes.fromhex(left) + bytes.fromhex(right) + except ValueError as e: + self.logger.error(f"Error converting hashes to bytes: {e}") + raise ValueError(f"Error converting hashes to bytes: {e}") + + new_hash = hash_func(combined).hexdigest() + self.logger.debug(f"New hash: {new_hash}") + next_level.append(new_hash) + current_level = next_level + self.logger.debug(f"Next level hashes: {current_level}") + + self.logger.info(f"Computed Merkle root: {current_level[0]}") + return current_level[0] + + def calculate_merkle_root_with_discrepancies( + self, node: Dict[str, Any], discrepancies: List[str] + ) -> str: + """ + Recursively calculates the Merkle root and records discrepancies. + + Parameters: + - node (Dict[str, Any]): The Merkle tree node to process. + - discrepancies (List[str]): A list to record discrepancies. + + Returns: + - str: The recalculated Merkle root. + """ + node_id = node.get("node_id", "Unknown") + node_type = node.get("type", "Unknown") + self.logger.debug(f"Processing node: {node_type} '{node_id}'") + + hash_method = node.get( + "merkle:hash_method", + { + "function": "sha256", + "fields": ["*"], + "ordering": "ascending", + "description": "Default hash method.", + }, + ) + self.logger.debug(f"Hash method for node '{node_id}': {hash_method}") + + # If the node is an Item, its merkle:root is its own merkle:object_hash + if node["type"] == "Item": + self.logger.debug( + f"Node '{node_id}' is an Item. Returning its merkle:object_hash." + ) + return node["merkle:object_hash"] + + # For Catalogs and Collections, collect child hashes + child_hashes = [] + for child in node.get("children", []): + child_root = self.calculate_merkle_root_with_discrepancies( + child, discrepancies + ) + if child_root: + child_hashes.append(child_root) + self.logger.debug( + f"Added child hash from node '{child.get('node_id', 'Unknown')}': {child_root}" + ) + + # Include own merkle:object_hash + own_hash = node.get("merkle:object_hash") + if own_hash: + child_hashes.append(own_hash) + self.logger.debug( + f"Added own merkle:object_hash for node '{node_id}': {own_hash}" + ) + + # Compute the Merkle root from child hashes + self.logger.debug(f"Child hashes for node '{node_id}': {child_hashes}") + calculated_root = self.compute_merkle_root(child_hashes, hash_method) + self.logger.debug(f"Calculated root for node '{node_id}': {calculated_root}") + + # Compare with the node's merkle:root + original_root = node.get("merkle:root") + self.logger.debug(f"Original merkle:root for node '{node_id}': {original_root}") + + if original_root != calculated_root: + discrepancy_message = f"{node_type} '{node_id}' has mismatched merkle:root." + discrepancies.append(discrepancy_message) + self.logger.warning(discrepancy_message) + + return calculated_root + + def verify_merkle_tree(self, merkle_tree_path: Path) -> bool: + """ + Verifies that the merkle:root in the Merkle tree JSON matches the recalculated root. + + Parameters: + - merkle_tree_path (Path): Path to the Merkle tree JSON file. + + Returns: + - bool: True if the merkle:root matches, False otherwise. + """ + self.logger.info(f"Verifying Merkle tree at path: {merkle_tree_path}") + try: + with merkle_tree_path.open("r", encoding="utf-8") as f: + merkle_tree = json.load(f) + self.logger.debug("Loaded Merkle tree JSON successfully.") + + discrepancies: List[str] = [] + calculated_root = self.calculate_merkle_root_with_discrepancies( + merkle_tree, discrepancies + ) + self.logger.debug(f"Calculated Merkle root: {calculated_root}") + + original_root = merkle_tree.get("merkle:root") + self.logger.debug(f"Original merkle:root from JSON: {original_root}") + + if not original_root: + self.logger.error("Error: 'merkle:root' not found in the JSON.") + return False + + if calculated_root == original_root: + self.logger.info( + f"Verification Successful: The merkle:root matches ({calculated_root})." + ) + return True + else: + self.logger.error("Verification Failed:") + self.logger.error(f" - Expected merkle:root: {original_root}") + self.logger.error(f" - Calculated merkle:root: {calculated_root}") + if discrepancies: + self.logger.error("Discrepancies found in the following nodes:") + for discrepancy in discrepancies: + self.logger.error(f" - {discrepancy}") + return False + except Exception as e: + self.logger.error(f"Error verifiying Merkle Tree: {e}") return False - - except Exception as e: - print(f"Error during verification: {e}") - return False - -def calculate_merkle_root_with_discrepancies(node: Dict[str, Any], discrepancies: List[str]) -> str: - """ - Recursively calculates the Merkle root and records discrepancies. - """ - hash_method = node.get('merkle:hash_method', { - 'function': 'sha256', - 'fields': ['*'], - 'ordering': 'ascending', - 'description': 'Default hash method.' - }) - - # If the node is an Item, its merkle:root is its own merkle:object_hash - if node['type'] == 'Item': - return node['merkle:object_hash'] - - # For Catalogs and Collections, collect child hashes - child_hashes = [] - for child in node.get('children', []): - child_root = calculate_merkle_root_with_discrepancies(child, discrepancies) - if child_root: - child_hashes.append(child_root) - - # Include own merkle:object_hash - own_hash = node.get('merkle:object_hash') - if own_hash: - child_hashes.append(own_hash) - - # Compute the Merkle root from child hashes - calculated_root = compute_merkle_root(child_hashes, hash_method) - - # Compare with the node's merkle:root - original_root = node.get('merkle:root') - if original_root != calculated_root: - discrepancies.append(f"{node['type']} '{node['node_id']}' has mismatched merkle:root.") - - return calculated_root \ No newline at end of file diff --git a/tests/test_compute_merkle_info.py b/tests/test_compute_merkle_info.py index a18433f..85007b2 100644 --- a/tests/test_compute_merkle_info.py +++ b/tests/test_compute_merkle_info.py @@ -1,23 +1,27 @@ # tests/test_compute_merkle_info.py -import unittest -import json import hashlib -import tempfile +import json import shutil +import tempfile +import unittest from pathlib import Path -from typing import List, Dict, Any, Optional -from unittest.mock import patch +from typing import Any, Dict, List, Optional + from stac_merkle_tree_cli.compute_merkle_info import ( - compute_merkle_object_hash, - remove_merkle_fields, - process_collection, - process_catalog, - is_item_directory + MerkleTreeProcessor, + is_item_directory, ) class TestComputeMerkleObjectHash(unittest.TestCase): + @classmethod + def setUpClass(cls): + """ + Initialize the MerkleTreeProcessor instance for the test class. + """ + cls.processor = MerkleTreeProcessor() + def test_compute_hash_all_fields_item(self): """ Test hashing all fields for a STAC Item, ensuring Merkle fields are excluded. @@ -28,23 +32,23 @@ def test_compute_hash_all_fields_item(self): "properties": { "datetime": "2024-10-15T12:00:00Z", "other_property": "value", - "merkle:object_hash": "should be excluded" + "merkle:object_hash": "should be excluded", }, "geometry": {}, "links": [], "assets": {}, "merkle:object_hash": "should be excluded at top level", - "merkle:hash_method": "should be excluded at top level" + "merkle:hash_method": "should be excluded at top level", } hash_method = { "function": "sha256", "fields": ["*"], "ordering": "ascending", - "description": "Test hash method." + "description": "Test hash method.", } - result = compute_merkle_object_hash(stac_object, hash_method) + result = self.processor.compute_merkle_object_hash(stac_object, hash_method) # Expected data excludes Merkle fields recursively expected_data = { @@ -52,14 +56,16 @@ def test_compute_hash_all_fields_item(self): "id": "test-item", "properties": { "datetime": "2024-10-15T12:00:00Z", - "other_property": "value" + "other_property": "value", }, "geometry": {}, "links": [], - "assets": {} + "assets": {}, } - expected_json_str = json.dumps(expected_data, sort_keys=True, separators=(',', ':')) - expected_hash = hashlib.sha256(expected_json_str.encode('utf-8')).hexdigest() + expected_json_str = json.dumps( + expected_data, sort_keys=True, separators=(",", ":") + ) + expected_hash = hashlib.sha256(expected_json_str.encode("utf-8")).hexdigest() self.assertEqual(result, expected_hash) def test_compute_hash_all_fields_collection(self): @@ -74,17 +80,17 @@ def test_compute_hash_all_fields_collection(self): "links": [], "merkle:object_hash": "should be excluded", "merkle:hash_method": "should be excluded", - "merkle:root": "should be excluded" + "merkle:root": "should be excluded", } hash_method = { "function": "sha256", "fields": ["*"], "ordering": "ascending", - "description": "Test hash method." + "description": "Test hash method.", } - result = compute_merkle_object_hash(stac_object, hash_method) + result = self.processor.compute_merkle_object_hash(stac_object, hash_method) # Expected data excludes Merkle fields expected_data = { @@ -92,10 +98,12 @@ def test_compute_hash_all_fields_collection(self): "id": "test-collection", "description": "A test collection", "extent": {}, - "links": [] + "links": [], } - expected_json_str = json.dumps(expected_data, sort_keys=True, separators=(',', ':')) - expected_hash = hashlib.sha256(expected_json_str.encode('utf-8')).hexdigest() + expected_json_str = json.dumps( + expected_data, sort_keys=True, separators=(",", ":") + ) + expected_hash = hashlib.sha256(expected_json_str.encode("utf-8")).hexdigest() self.assertEqual(result, expected_hash) def test_compute_hash_specific_fields_item(self): @@ -109,36 +117,51 @@ def test_compute_hash_specific_fields_item(self): "other_property": "value", "datetime": "2024-10-15T12:00:00Z", "extra_property": "should be excluded", - "merkle:object_hash": "should be excluded" + "merkle:object_hash": "should be excluded", }, "geometry": {}, - "links": [] + "links": [], } hash_method = { "function": "sha256", "fields": ["id", "properties"], "ordering": "ascending", - "description": "Test hash method with specific fields." + "description": "Test hash method with specific fields.", } - result = compute_merkle_object_hash(stac_object, hash_method) + result = self.processor.compute_merkle_object_hash(stac_object, hash_method) # Expected data includes only specified fields, excluding Merkle fields - selected_data = {field: stac_object[field] for field in hash_method['fields'] if field in stac_object} - expected_data = remove_merkle_fields(selected_data) + selected_data = { + field: stac_object[field] + for field in hash_method["fields"] + if field in stac_object + } + expected_data = self.processor.remove_merkle_fields(selected_data) # Debugging: Print the expected data being hashed - print("Expected data in test:", json.dumps(expected_data, indent=2, sort_keys=True)) - print("Expected JSON string in test:", json.dumps(expected_data, sort_keys=True, separators=(',', ':'))) + print( + "Expected data in test:", + json.dumps(expected_data, indent=2, sort_keys=True), + ) + print( + "Expected JSON string in test:", + json.dumps(expected_data, sort_keys=True, separators=(",", ":")), + ) - expected_json_str = json.dumps(expected_data, sort_keys=True, separators=(',', ':')) + expected_json_str = json.dumps( + expected_data, sort_keys=True, separators=(",", ":") + ) # Compute expected hash - expected_hash = hashlib.sha256(expected_json_str.encode('utf-8')).hexdigest() + expected_hash = hashlib.sha256(expected_json_str.encode("utf-8")).hexdigest() # Debugging: Print actual data being hashed in the function - print("Data to hash in function:", json.dumps(expected_data, indent=2, sort_keys=True)) + print( + "Data to hash in function:", + json.dumps(expected_data, indent=2, sort_keys=True), + ) print("JSON string in function:", expected_json_str) print("Expected hash:", expected_hash) print("Actual hash:", result) @@ -149,62 +172,57 @@ def test_compute_hash_unsupported_function(self): """ Test behavior when an unsupported hash function is specified. """ - stac_object = { - "id": "test-object" - } + stac_object = {"id": "test-object"} hash_method = { "function": "unsupported-hash", "fields": ["*"], "ordering": "ascending", - "description": "Test unsupported hash function." + "description": "Test unsupported hash function.", } with self.assertRaises(ValueError) as context: - compute_merkle_object_hash(stac_object, hash_method) + self.processor.compute_merkle_object_hash(stac_object, hash_method) self.assertIn("Unsupported hash function", str(context.exception)) def test_compute_hash_missing_fields(self): """ Test behavior when specified fields are missing from the object. """ - stac_object = { - "id": "test-object", - "some_field": "some value" - } + stac_object = {"id": "test-object", "some_field": "some value"} hash_method = { "function": "sha256", "fields": ["non_existent_field"], "ordering": "ascending", - "description": "Test with missing fields." + "description": "Test with missing fields.", } - result = compute_merkle_object_hash(stac_object, hash_method) + result = self.processor.compute_merkle_object_hash(stac_object, hash_method) # Expected data is empty because the specified field doesn't exist expected_data = {} - expected_json_str = json.dumps(expected_data, sort_keys=True, separators=(',', ':')) - expected_hash = hashlib.sha256(expected_json_str.encode('utf-8')).hexdigest() + expected_json_str = json.dumps( + expected_data, sort_keys=True, separators=(",", ":") + ) + expected_hash = hashlib.sha256(expected_json_str.encode("utf-8")).hexdigest() self.assertEqual(result, expected_hash) def test_compute_hash_different_hash_functions(self): """ Test hashing with different hash functions. """ - stac_object = { - "id": "test-object" - } + stac_object = {"id": "test-object"} hash_functions = ["sha256", "md5", "sha1", "sha512"] for func in hash_functions: hash_method = { "function": func, "fields": ["*"], "ordering": "ascending", - "description": f"Test with hash function {func}." + "description": f"Test with hash function {func}.", } - result = compute_merkle_object_hash(stac_object, hash_method) - expected_data = { - "id": "test-object" - } - expected_json_str = json.dumps(expected_data, sort_keys=True, separators=(',', ':')) - hash_func = getattr(hashlib, func.replace('-', '').lower()) - expected_hash = hash_func(expected_json_str.encode('utf-8')).hexdigest() + result = self.processor.compute_merkle_object_hash(stac_object, hash_method) + expected_data = {"id": "test-object"} + expected_json_str = json.dumps( + expected_data, sort_keys=True, separators=(",", ":") + ) + hash_func = getattr(hashlib, func.replace("-", "").lower()) + expected_hash = hash_func(expected_json_str.encode("utf-8")).hexdigest() self.assertEqual(result, expected_hash) def test_compute_hash_excludes_merkle_fields(self): @@ -216,26 +234,32 @@ def test_compute_hash_excludes_merkle_fields(self): "merkle:object_hash": "should be excluded", "merkle:hash_method": "should be excluded", "merkle:root": "should be excluded", - "other_field": "value" + "other_field": "value", } hash_method = { "function": "sha256", "fields": ["*"], "ordering": "ascending", - "description": "Test exclusion of Merkle fields." + "description": "Test exclusion of Merkle fields.", } - result = compute_merkle_object_hash(stac_object, hash_method) + result = self.processor.compute_merkle_object_hash(stac_object, hash_method) # Expected data excludes Merkle fields - expected_data = { - "id": "test-object", - "other_field": "value" - } - expected_json_str = json.dumps(expected_data, sort_keys=True, separators=(',', ':')) - expected_hash = hashlib.sha256(expected_json_str.encode('utf-8')).hexdigest() + expected_data = {"id": "test-object", "other_field": "value"} + expected_json_str = json.dumps( + expected_data, sort_keys=True, separators=(",", ":") + ) + expected_hash = hashlib.sha256(expected_json_str.encode("utf-8")).hexdigest() self.assertEqual(result, expected_hash) class TestProcessCollection(unittest.TestCase): + @classmethod + def setUpClass(cls): + """ + Set up the MerkleTreeProcessor instance for testing collections. + """ + cls.processor = MerkleTreeProcessor() + def setUp(self): """ Set up a temporary directory for testing collections. @@ -257,7 +281,7 @@ def create_collection( sub_collections: Optional[List[Dict[str, Any]]] = None, sub_catalogs: Optional[List[Dict[str, Any]]] = None, nested_items: Optional[List[Dict[str, Any]]] = None, - parent_dir: Optional[Path] = None # New parameter + parent_dir: Optional[Path] = None, # New parameter ): """ Helper function to create a collection with items, sub-collections, and sub-catalogs. @@ -280,20 +304,16 @@ def create_collection( "id": collection_id, "description": f"Description for {collection_id}", "extent": {}, - "links": [] + "links": [], } # Optionally add merkle:hash_method - hash_method = { - "function": "sha256", - "fields": ["*"], - "ordering": "ascending" - } + hash_method = {"function": "sha256", "fields": ["*"], "ordering": "ascending"} collection_json["merkle:hash_method"] = hash_method # Save collection.json collection_json_path = collection_dir / "collection.json" - with collection_json_path.open('w', encoding='utf-8') as f: + with collection_json_path.open("w", encoding="utf-8") as f: json.dump(collection_json, f, indent=2) # Create items @@ -301,7 +321,7 @@ def create_collection( item_dir = collection_dir / item["id"] item_dir.mkdir(parents=True, exist_ok=True) item_path = item_dir / f"{item['id']}.json" - with item_path.open('w', encoding='utf-8') as f: + with item_path.open("w", encoding="utf-8") as f: json.dump(item, f, indent=2) # Create sub-collections @@ -315,7 +335,7 @@ def create_collection( sub_col.get("sub_collections"), sub_col.get("sub_catalogs"), sub_col.get("nested_items"), - parent_dir=collection_dir # Directly nest under parent collection + parent_dir=collection_dir, # Directly nest under parent collection ) # Create sub-catalogs @@ -328,12 +348,12 @@ def create_collection( "type": "Catalog", "id": sub_cat_id, "description": f"Description for {sub_cat_id}", - "links": [] + "links": [], } # Optionally add merkle:hash_method sub_cat_json["merkle:hash_method"] = hash_method sub_cat_json_path = sub_cat_dir / "catalog.json" - with sub_cat_json_path.open('w', encoding='utf-8') as f: + with sub_cat_json_path.open("w", encoding="utf-8") as f: json.dump(sub_cat_json, f, indent=2) # Create collections within sub-catalogs @@ -344,17 +364,16 @@ def create_collection( sub_cat_collection.get("sub_collections"), sub_cat_collection.get("sub_catalogs"), sub_cat_collection.get("nested_items"), - parent_dir=sub_cat_dir # Directly nest under sub-catalog + parent_dir=sub_cat_dir, # Directly nest under sub-catalog ) # Create nested items if any (items directly within the collection directory) if nested_items: for item in nested_items: item_path = collection_dir / f"{item['id']}.json" - with item_path.open('w', encoding='utf-8') as f: + with item_path.open("w", encoding="utf-8") as f: json.dump(item, f, indent=2) - def test_process_collection_with_nested_items(self): """ Test processing a collection with items nested in their own directories. @@ -366,21 +385,21 @@ def test_process_collection_with_nested_items(self): "id": "item1", "properties": { "datetime": "2024-10-18T12:00:00Z", - "other_property": "value1" + "other_property": "value1", }, "geometry": {}, - "links": [] + "links": [], }, { "type": "Feature", "id": "item2", "properties": { "datetime": "2024-10-19T12:00:00Z", - "other_property": "value2" + "other_property": "value2", }, "geometry": {}, - "links": [] - } + "links": [], + }, ] self.create_collection( collection_id, @@ -391,12 +410,12 @@ def test_process_collection_with_nested_items(self): "id": "item3", "properties": { "datetime": "2024-10-20T12:00:00Z", - "other_property": "value3" + "other_property": "value3", }, "geometry": {}, - "links": [] + "links": [], } - ] + ], ) collection_json_path = self.collections_dir / collection_id / "collection.json" @@ -406,30 +425,32 @@ def test_process_collection_with_nested_items(self): "function": "sha256", "fields": ["*"], "ordering": "ascending", - "description": "Test hash method." + "description": "Test hash method.", } # Process the collection via process_collection only - collection_node = process_collection(collection_json_path, hash_method) + collection_node = self.processor.process_collection( + collection_json_path, hash_method + ) # Assertions self.assertIsNotNone(collection_node) - self.assertIn('node_id', collection_node) - self.assertIn('merkle:object_hash', collection_node) - self.assertIn('merkle:root', collection_node) - self.assertIn('children', collection_node) + self.assertIn("node_id", collection_node) + self.assertIn("merkle:object_hash", collection_node) + self.assertIn("merkle:root", collection_node) + self.assertIn("children", collection_node) - self.assertEqual(collection_node['node_id'], collection_id) - self.assertTrue(collection_node['merkle:object_hash']) - self.assertTrue(collection_node['merkle:root']) - self.assertEqual(len(collection_node['children']), 3) # item1, item2, item3 + self.assertEqual(collection_node["node_id"], collection_id) + self.assertTrue(collection_node["merkle:object_hash"]) + self.assertTrue(collection_node["merkle:root"]) + self.assertEqual(len(collection_node["children"]), 3) # item1, item2, item3 # Check individual items item_ids = {"item1", "item2", "item3"} - for child in collection_node['children']: - self.assertIn('node_id', child) - self.assertIn('merkle:object_hash', child) - self.assertIn(child['node_id'], item_ids) + for child in collection_node["children"]: + self.assertIn("node_id", child) + self.assertIn("merkle:object_hash", child) + self.assertIn(child["node_id"], item_ids) def test_process_collection_with_sub_collections_and_items_in_folders(self): """ @@ -442,10 +463,10 @@ def test_process_collection_with_sub_collections_and_items_in_folders(self): "id": "item1", "properties": { "datetime": "2024-10-18T12:00:00Z", - "other_property": "value1" + "other_property": "value1", }, "geometry": {}, - "links": [] + "links": [], } ] sub_collections = [ @@ -457,10 +478,10 @@ def test_process_collection_with_sub_collections_and_items_in_folders(self): "id": "item2", "properties": { "datetime": "2024-10-19T12:00:00Z", - "other_property": "value2" + "other_property": "value2", }, "geometry": {}, - "links": [] + "links": [], } ], "sub_collections": [ @@ -472,20 +493,18 @@ def test_process_collection_with_sub_collections_and_items_in_folders(self): "id": "item3", "properties": { "datetime": "2024-10-20T12:00:00Z", - "other_property": "value3" + "other_property": "value3", }, "geometry": {}, - "links": [] + "links": [], } - ] + ], } - ] + ], } ] self.create_collection( - collection_id, - items=items, - sub_collections=sub_collections + collection_id, items=items, sub_collections=sub_collections ) collection_json_path = self.collections_dir / collection_id / "collection.json" @@ -495,50 +514,84 @@ def test_process_collection_with_sub_collections_and_items_in_folders(self): "function": "sha256", "fields": ["*"], "ordering": "ascending", - "description": "Test hash method." + "description": "Test hash method.", } # Process the collection via process_collection only - collection_node = process_collection(collection_json_path, hash_method) + collection_node = self.processor.process_collection( + collection_path=collection_json_path, parent_hash_method=hash_method + ) # Assertions self.assertIsNotNone(collection_node) - self.assertIn('node_id', collection_node) - self.assertIn('merkle:object_hash', collection_node) - self.assertIn('merkle:root', collection_node) - self.assertIn('children', collection_node) - - self.assertEqual(collection_node['node_id'], collection_id) - self.assertTrue(collection_node['merkle:object_hash']) - self.assertTrue(collection_node['merkle:root']) - self.assertEqual(len(collection_node['children']), 2) # item1 and sub_collection1 + self.assertIn("node_id", collection_node) + self.assertIn("merkle:object_hash", collection_node) + self.assertIn("merkle:root", collection_node) + self.assertIn("children", collection_node) + + self.assertEqual(collection_node["node_id"], collection_id) + self.assertTrue(collection_node["merkle:object_hash"]) + self.assertTrue(collection_node["merkle:root"]) + self.assertEqual( + len(collection_node["children"]), 2 + ) # item1 and sub_collection1 # Check individual children - child_ids = {child['node_id'] for child in collection_node['children']} - self.assertIn('item1', child_ids) - self.assertIn('sub_collection1', child_ids) + child_ids = {child["node_id"] for child in collection_node["children"]} + self.assertIn("item1", child_ids) + self.assertIn("sub_collection1", child_ids) # Further checks to ensure sub_collection1 has its children - sub_collection_node = next((child for child in collection_node['children'] if child['node_id'] == 'sub_collection1'), None) + sub_collection_node = next( + ( + child + for child in collection_node["children"] + if child["node_id"] == "sub_collection1" + ), + None, + ) self.assertIsNotNone(sub_collection_node) - self.assertIn('children', sub_collection_node) - self.assertEqual(len(sub_collection_node['children']), 2) # item2 and sub_sub_collection1 + self.assertIn("children", sub_collection_node) + self.assertEqual( + len(sub_collection_node["children"]), 2 + ) # item2 and sub_sub_collection1 # Check sub_sub_collection1 - sub_sub_collection_node = next((child for child in sub_collection_node['children'] if child['node_id'] == 'sub_sub_collection1'), None) + sub_sub_collection_node = next( + ( + child + for child in sub_collection_node["children"] + if child["node_id"] == "sub_sub_collection1" + ), + None, + ) self.assertIsNotNone(sub_sub_collection_node) - self.assertIn('children', sub_sub_collection_node) - self.assertEqual(len(sub_sub_collection_node['children']), 1) # item3 + self.assertIn("children", sub_sub_collection_node) + self.assertEqual(len(sub_sub_collection_node["children"]), 1) # item3 # Check item2 - item2_node = next((child for child in sub_collection_node['children'] if child['node_id'] == 'item2'), None) + item2_node = next( + ( + child + for child in sub_collection_node["children"] + if child["node_id"] == "item2" + ), + None, + ) self.assertIsNotNone(item2_node) - self.assertIn('merkle:object_hash', item2_node) + self.assertIn("merkle:object_hash", item2_node) # Check item3 - item3_node = next((child for child in sub_sub_collection_node['children'] if child['node_id'] == 'item3'), None) + item3_node = next( + ( + child + for child in sub_sub_collection_node["children"] + if child["node_id"] == "item3" + ), + None, + ) self.assertIsNotNone(item3_node) - self.assertIn('merkle:object_hash', item3_node) + self.assertIn("merkle:object_hash", item3_node) class TestIsItemDirectory(unittest.TestCase): @@ -565,12 +618,12 @@ def test_is_item_directory_true(self): "id": "itema", "properties": {}, "geometry": {}, - "links": [] + "links": [], } item_path = item_dir / "itema.json" - with item_path.open('w', encoding='utf-8') as f: + with item_path.open("w", encoding="utf-8") as f: json.dump(item_json, f, indent=2) - + self.assertTrue(is_item_directory(item_dir)) def test_is_item_directory_false_multiple_files(self): @@ -584,22 +637,22 @@ def test_is_item_directory_false_multiple_files(self): "id": "itemb1", "properties": {}, "geometry": {}, - "links": [] + "links": [], } item_json2 = { "type": "Feature", "id": "itemb2", "properties": {}, "geometry": {}, - "links": [] + "links": [], } item_path1 = item_dir / "itemb1.json" item_path2 = item_dir / "itemb2.json" - with item_path1.open('w', encoding='utf-8') as f: + with item_path1.open("w", encoding="utf-8") as f: json.dump(item_json1, f, indent=2) - with item_path2.open('w', encoding='utf-8') as f: + with item_path2.open("w", encoding="utf-8") as f: json.dump(item_json2, f, indent=2) - + self.assertFalse(is_item_directory(item_dir)) def test_is_item_directory_false_non_feature(self): @@ -611,12 +664,12 @@ def test_is_item_directory_false_non_feature(self): non_feature_json = { "type": "Collection", "id": "itemc", - "description": "A non-Feature type" + "description": "A non-Feature type", } item_path = item_dir / "itemc.json" - with item_path.open('w', encoding='utf-8') as f: + with item_path.open("w", encoding="utf-8") as f: json.dump(non_feature_json, f, indent=2) - + self.assertFalse(is_item_directory(item_dir)) def test_is_item_directory_false_no_json_files(self): @@ -630,6 +683,13 @@ def test_is_item_directory_false_no_json_files(self): class TestProcessCatalog(unittest.TestCase): + @classmethod + def setUpClass(cls): + """ + Set up the MerkleTreeProcessor instance for testing collections. + """ + cls.processor = MerkleTreeProcessor() + def setUp(self): """ Set up a temporary directory for testing catalogs. @@ -653,7 +713,7 @@ def create_collection( sub_collections: Optional[List[Dict[str, Any]]] = None, sub_catalogs: Optional[List[Dict[str, Any]]] = None, nested_items: Optional[List[Dict[str, Any]]] = None, - parent_dir: Optional[Path] = None # New parameter + parent_dir: Optional[Path] = None, # New parameter ): """ Helper function to create a collection with items, sub-collections, and sub-catalogs. @@ -676,20 +736,16 @@ def create_collection( "id": collection_id, "description": f"Description for {collection_id}", "extent": {}, - "links": [] + "links": [], } # Optionally add merkle:hash_method - hash_method = { - "function": "sha256", - "fields": ["*"], - "ordering": "ascending" - } + hash_method = {"function": "sha256", "fields": ["*"], "ordering": "ascending"} collection_json["merkle:hash_method"] = hash_method # Save collection.json collection_json_path = collection_dir / "collection.json" - with collection_json_path.open('w', encoding='utf-8') as f: + with collection_json_path.open("w", encoding="utf-8") as f: json.dump(collection_json, f, indent=2) # Create items @@ -697,7 +753,7 @@ def create_collection( item_dir = collection_dir / item["id"] item_dir.mkdir(parents=True, exist_ok=True) item_path = item_dir / f"{item['id']}.json" - with item_path.open('w', encoding='utf-8') as f: + with item_path.open("w", encoding="utf-8") as f: json.dump(item, f, indent=2) # Create sub-collections @@ -714,7 +770,7 @@ def create_collection( sub_col.get("sub_collections"), sub_col.get("sub_catalogs"), sub_col.get("nested_items"), - parent_dir=sub_collections_dir # Pass the 'collections' subdirectory + parent_dir=sub_collections_dir, # Pass the 'collections' subdirectory ) # Create sub-catalogs @@ -727,12 +783,12 @@ def create_collection( "type": "Catalog", "id": sub_cat_id, "description": f"Description for {sub_cat_id}", - "links": [] + "links": [], } # Optionally add merkle:hash_method sub_cat_json["merkle:hash_method"] = hash_method sub_cat_json_path = sub_cat_dir / "catalog.json" - with sub_cat_json_path.open('w', encoding='utf-8') as f: + with sub_cat_json_path.open("w", encoding="utf-8") as f: json.dump(sub_cat_json, f, indent=2) # Create collections within sub-catalogs @@ -743,17 +799,17 @@ def create_collection( sub_cat_collection.get("sub_collections"), sub_cat_collection.get("sub_catalogs"), sub_cat_collection.get("nested_items"), - parent_dir=sub_cat_dir / "collections" # Pass the 'collections' subdirectory + parent_dir=sub_cat_dir + / "collections", # Pass the 'collections' subdirectory ) # Create nested items if any (items directly within the collection directory) if nested_items: for item in nested_items: item_path = collection_dir / f"{item['id']}.json" - with item_path.open('w', encoding='utf-8') as f: + with item_path.open("w", encoding="utf-8") as f: json.dump(item, f, indent=2) - @unittest.skip("Skipping this test temporarily due to CI environment inconsistency. Passing locally?") def test_process_catalog_simple(self): """ Test processing a simple catalog with a single collection and items. @@ -767,17 +823,13 @@ def test_process_catalog_simple(self): "id": collection_id, "description": "A simple collection", "extent": {}, - "links": [] - } - hash_method = { - "function": "sha256", - "fields": ["*"], - "ordering": "ascending" + "links": [], } + hash_method = {"function": "sha256", "fields": ["*"], "ordering": "ascending"} collection_json["merkle:hash_method"] = hash_method collection_json_path = collection_dir / "collection.json" - with collection_json_path.open('w', encoding='utf-8') as f: + with collection_json_path.open("w", encoding="utf-8") as f: json.dump(collection_json, f, indent=2) # Create items @@ -786,31 +838,31 @@ def test_process_catalog_simple(self): "id": "item1", "properties": { "datetime": "2024-10-23T12:00:00Z", - "other_property": "value1" + "other_property": "value1", }, "geometry": {}, - "links": [] + "links": [], } item2 = { "type": "Feature", "id": "item2", "properties": { "datetime": "2024-10-24T12:00:00Z", - "other_property": "value2" + "other_property": "value2", }, "geometry": {}, - "links": [] + "links": [], } item1_dir = collection_dir / "item1" item1_dir.mkdir() item1_path = item1_dir / "item1.json" - with item1_path.open('w', encoding='utf-8') as f: + with item1_path.open("w", encoding="utf-8") as f: json.dump(item1, f, indent=2) item2_dir = collection_dir / "item2" item2_dir.mkdir() item2_path = item2_dir / "item2.json" - with item2_path.open('w', encoding='utf-8') as f: + with item2_path.open("w", encoding="utf-8") as f: json.dump(item2, f, indent=2) # Create catalog.json @@ -818,44 +870,44 @@ def test_process_catalog_simple(self): "type": "Catalog", "id": "root_catalog", "description": "Root Catalog", - "links": [] + "links": [], } catalog_json["merkle:hash_method"] = hash_method catalog_json_path = self.catalog_dir / "catalog.json" - with catalog_json_path.open('w', encoding='utf-8') as f: + with catalog_json_path.open("w", encoding="utf-8") as f: json.dump(catalog_json, f, indent=2) # Process the catalog instead of processing the collection directly - merkle_tree = process_catalog(catalog_json_path, hash_method) + merkle_tree = self.processor.process_catalog(catalog_json_path, hash_method) # Assertions self.assertIsNotNone(merkle_tree) - self.assertIn('node_id', merkle_tree) - self.assertIn('merkle:object_hash', merkle_tree) - self.assertIn('merkle:root', merkle_tree) - self.assertIn('children', merkle_tree) - self.assertEqual(merkle_tree['node_id'], 'root_catalog') - self.assertTrue(merkle_tree['merkle:object_hash']) - self.assertTrue(merkle_tree['merkle:root']) - self.assertEqual(len(merkle_tree['children']), 1) # Only collection1 + self.assertIn("node_id", merkle_tree) + self.assertIn("merkle:object_hash", merkle_tree) + self.assertIn("merkle:root", merkle_tree) + self.assertIn("children", merkle_tree) + self.assertEqual(merkle_tree["node_id"], "root_catalog") + self.assertTrue(merkle_tree["merkle:object_hash"]) + self.assertTrue(merkle_tree["merkle:root"]) + self.assertEqual(len(merkle_tree["children"]), 1) # Only collection1 # Check children (collections) - collection_node = merkle_tree['children'][0] - self.assertEqual(collection_node['node_id'], 'collection1') - self.assertIn('merkle:object_hash', collection_node) - self.assertIn('merkle:root', collection_node) - self.assertIn('children', collection_node) - self.assertEqual(len(collection_node['children']), 2) # item1 and item2 + collection_node = merkle_tree["children"][0] + self.assertEqual(collection_node["node_id"], "collection1") + self.assertIn("merkle:object_hash", collection_node) + self.assertIn("merkle:root", collection_node) + self.assertIn("children", collection_node) + self.assertEqual(len(collection_node["children"]), 2) # item1 and item2 # Check items - item_node1 = collection_node['children'][0] - self.assertEqual(item_node1['node_id'], 'item1') - self.assertIn('merkle:object_hash', item_node1) + item_node1 = collection_node["children"][0] + self.assertEqual(item_node1["node_id"], "item1") + self.assertIn("merkle:object_hash", item_node1) - item_node2 = collection_node['children'][1] - self.assertEqual(item_node2['node_id'], 'item2') - self.assertIn('merkle:object_hash', item_node2) + item_node2 = collection_node["children"][1] + self.assertEqual(item_node2["node_id"], "item2") + self.assertIn("merkle:object_hash", item_node2) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/utilities/split-geojson-features/split-geojson-features.py b/utilities/split-geojson-features/split-geojson-features.py index 955c7d7..8275008 100644 --- a/utilities/split-geojson-features/split-geojson-features.py +++ b/utilities/split-geojson-features/split-geojson-features.py @@ -1,10 +1,12 @@ #!/usr/bin/env python3 -import click import json import os from pathlib import Path +import click + + def load_feature_collection(input_file: str) -> dict: """ Loads the GeoJSON Feature Collection file. @@ -16,18 +18,24 @@ def load_feature_collection(input_file: str) -> dict: - dict: Parsed JSON content of the feature collection. """ try: - with open(input_file, 'r', encoding='utf-8') as f: + with open(input_file, "r", encoding="utf-8") as f: data = json.load(f) if data.get("type") != "FeatureCollection" or "features" not in data: - raise ValueError("The provided file is not a valid GeoJSON Feature Collection.") + raise ValueError( + "The provided file is not a valid GeoJSON Feature Collection." + ) return data except Exception as e: click.echo(f"Error loading Feature Collection file: {e}", err=True) raise + @click.command() -@click.argument('input_file', type=click.Path(exists=True, readable=True)) -@click.argument('output_directory', type=click.Path(file_okay=False, writable=True, resolve_path=True)) +@click.argument("input_file", type=click.Path(exists=True, readable=True)) +@click.argument( + "output_directory", + type=click.Path(file_okay=False, writable=True, resolve_path=True), +) def main(input_file, output_directory): """ Load a GeoJSON Feature Collection from INPUT_FILE and output each feature as a separate JSON @@ -55,11 +63,12 @@ def main(input_file, output_directory): # Write feature to JSON file try: - with open(output_file_path, 'w', encoding='utf-8') as f: + with open(output_file_path, "w", encoding="utf-8") as f: json.dump(feature, f, indent=2) click.echo(f"Feature saved to: {output_file_path}") except Exception as e: click.echo(f"Error writing feature {feature_id} to file: {e}", err=True) + if __name__ == "__main__": main() diff --git a/utilities/verify-collection-merkle-proof/verify_collection_cli.py b/utilities/verify-collection-merkle-proof/verify_collection_cli.py index 94d9da8..606d02b 100644 --- a/utilities/verify-collection-merkle-proof/verify_collection_cli.py +++ b/utilities/verify-collection-merkle-proof/verify_collection_cli.py @@ -1,11 +1,13 @@ #!/usr/bin/env python3 -import click +import hashlib import json import sys -import hashlib from typing import List +import click + + def load_collection(collection_file: str) -> dict: """ Loads the STAC Collection JSON file. @@ -17,13 +19,14 @@ def load_collection(collection_file: str) -> dict: - dict: Parsed JSON content of the collection. """ try: - with open(collection_file, 'r', encoding='utf-8') as f: + with open(collection_file, "r", encoding="utf-8") as f: collection = json.load(f) return collection except Exception as e: click.echo(f"Error loading collection file: {e}", err=True) sys.exit(1) + def get_merkle_fields(collection: dict) -> dict: """ Extracts the Merkle-related fields from the collection. @@ -35,24 +38,33 @@ def get_merkle_fields(collection: dict) -> dict: - dict: A dictionary containing merkle:object_hash, merkle:proof, and merkle:hash_method. """ try: - merkle_object_hash = collection.get("merkle:object_hash") or collection["properties"]["merkle:object_hash"] - merkle_proof = collection.get("merkle:proof") or collection["properties"]["merkle:proof"] - merkle_hash_method = collection.get("merkle:hash_method") or collection["properties"]["merkle:hash_method"] + merkle_object_hash = ( + collection.get("merkle:object_hash") + or collection["properties"]["merkle:object_hash"] + ) + merkle_proof = ( + collection.get("merkle:proof") or collection["properties"]["merkle:proof"] + ) + merkle_hash_method = ( + collection.get("merkle:hash_method") + or collection["properties"]["merkle:hash_method"] + ) return { "object_hash": merkle_object_hash, "proof": merkle_proof, - "hash_method": merkle_hash_method + "hash_method": merkle_hash_method, } except KeyError as e: click.echo(f"Missing required Merkle field in collection: {e}", err=True) sys.exit(1) + def verify_merkle_proof( collection_hash: str, proof_hashes: List[str], proof_positions: List[str], merkle_root: str, - hash_function: str = "sha256" + hash_function: str = "sha256", ) -> bool: """ Verifies that a given collection hash is part of the Merkle tree with the specified Merkle root. @@ -68,7 +80,10 @@ def verify_merkle_proof( - bool: True if verification is successful, False otherwise. """ if len(proof_hashes) != len(proof_positions): - click.echo("The number of proof hashes must match the number of proof positions.", err=True) + click.echo( + "The number of proof hashes must match the number of proof positions.", + err=True, + ) sys.exit(1) # Initialize current hash with the collection's hash @@ -79,11 +94,16 @@ def verify_merkle_proof( sys.exit(1) # Iterate through each proof step - for idx, (sibling_hash_hex, position) in enumerate(zip(proof_hashes, proof_positions)): + for idx, (sibling_hash_hex, position) in enumerate( + zip(proof_hashes, proof_positions) + ): try: sibling_hash = bytes.fromhex(sibling_hash_hex) except ValueError: - click.echo(f"Invalid hex string in proof_hashes at index {idx}: {sibling_hash_hex}", err=True) + click.echo( + f"Invalid hex string in proof_hashes at index {idx}: {sibling_hash_hex}", + err=True, + ) sys.exit(1) if position.lower() == "left": @@ -91,7 +111,10 @@ def verify_merkle_proof( elif position.lower() == "right": combined = current_hash + sibling_hash else: - click.echo(f"Invalid position value at index {idx}: {position}. Must be 'left' or 'right'.", err=True) + click.echo( + f"Invalid position value at index {idx}: {position}. Must be 'left' or 'right'.", + err=True, + ) sys.exit(1) # Compute the new hash using the specified hash function @@ -107,9 +130,12 @@ def verify_merkle_proof( print("computed_merkle_root: ", computed_merkle_root) return computed_merkle_root.lower() == merkle_root.lower() + @click.command() -@click.argument('collection_file', type=click.Path(exists=True)) -@click.option('--merkle-root', required=False, help='Merkle root of the catalog (hex string).') +@click.argument("collection_file", type=click.Path(exists=True)) +@click.option( + "--merkle-root", required=False, help="Merkle root of the catalog (hex string)." +) def main(collection_file, merkle_root): """ Verify if a STAC collection is part of a catalog using Merkle proofs. @@ -145,13 +171,19 @@ def main(collection_file, merkle_root): proof_hashes=proof_hashes, proof_positions=proof_positions, merkle_root=merkle_root, - hash_function=hash_function + hash_function=hash_function, ) if is_valid: - click.secho("Verification successful: The collection is part of the catalog.", fg='green') + click.secho( + "Verification successful: The collection is part of the catalog.", + fg="green", + ) else: - click.secho("Verification failed: The collection is NOT part of the catalog.", fg='red') + click.secho( + "Verification failed: The collection is NOT part of the catalog.", fg="red" + ) + if __name__ == "__main__": main()