From 962d023bf2ead1dcaf5eb97cf1a3680fb7a83e95 Mon Sep 17 00:00:00 2001 From: Miguel P Z <60221874+MiguelElGallo@users.noreply.github.com> Date: Mon, 20 May 2024 12:38:54 +0300 Subject: [PATCH 1/5] Update notifier.py to have a pydantic model --- adenotifier/notifier.py | 367 +++++++++++++++++++++++++++------------- 1 file changed, 252 insertions(+), 115 deletions(-) diff --git a/adenotifier/notifier.py b/adenotifier/notifier.py index cacf2f9..1f8c843 100644 --- a/adenotifier/notifier.py +++ b/adenotifier/notifier.py @@ -5,8 +5,17 @@ from urllib3.util import Retry from .manifest import Manifest from typing import List, Set, Dict, Tuple, Optional +from . import adetypes -def search_manifests(source_system_name: str, source_entity_name: str, base_url: str, notify_api_key: str, notify_api_key_secret: str, state: str): + +def search_manifests( + source_system_name: str, + source_entity_name: str, + base_url: str, + notify_api_key: str, + notify_api_key_secret: str, + state: str, +): """Searches manifests from ADE Notify API. Args: @@ -21,13 +30,25 @@ def search_manifests(source_system_name: str, source_entity_name: str, base_url: List [str] of manifest ids. """ - + session = requests.Session() session.auth = (notify_api_key, notify_api_key_secret) session.headers.update({"Content-Type": "application/json"}) - session.mount('https://', HTTPAdapter(max_retries=Retry(total=3, status_forcelist=[401, 404, 429, 500, 502, 503, 504], backoff_factor=2, allowed_methods=None, raise_on_status=True))) # HTTP request retry settings. - request_url = "{0}/tenants/local/installations/local/environments/local/source-systems/{1}/source-entities/{2}/manifests"\ - .format(base_url, source_system_name, source_entity_name) + session.mount( + "https://", + HTTPAdapter( + max_retries=Retry( + total=3, + status_forcelist=[401, 404, 429, 500, 502, 503, 504], + backoff_factor=2, + allowed_methods=None, + raise_on_status=True, + ) + ), + ) # HTTP request retry settings. + request_url = "{0}/tenants/local/installations/local/environments/local/source-systems/{1}/source-entities/{2}/manifests".format( + base_url, source_system_name, source_entity_name + ) if state != "": response = session.get(request_url + "?state={0}".format(state.upper())) @@ -39,10 +60,11 @@ def search_manifests(source_system_name: str, source_entity_name: str, base_url: manifests = response.json() if manifests != []: # Ordering manifests by created time - manifests = (sorted(manifests, key = lambda i: i['created'])) + manifests = sorted(manifests, key=lambda i: i["created"]) return manifests + def parse_batch(file_url: str, regexp: str): """Parses batch number from given file url with given regular expression. @@ -58,13 +80,20 @@ def parse_batch(file_url: str, regexp: str): batch = "" result = re.search(regexp, file_url) - + for group in result.groups(): batch += group - + return int(batch) -def add_to_manifest(file_url: str, source: object, base_url: str, notify_api_key: str, notify_api_key_secret: str): + +def add_to_manifest( + file_url: str, + source: object, + base_url: str, + notify_api_key: str, + notify_api_key_secret: str, +): """Utilizes Manifest class and other functions to add the given file_url to a manifest for the given configured data source. Args: @@ -76,97 +105,145 @@ def add_to_manifest(file_url: str, source: object, base_url: str, notify_api_key Returns: Manifest object. - + """ - + # Put source object into Ade_Datasource class + try: + source_validated = adetypes.Ade_Datasource(**source) + except Exception as e: + logging.error("Error validating source object: {0}".format(e)) + raise # Set single_file_manifest based on configuration - if ('single_file_manifest' in source['attributes']): - if(source['attributes']['single_file_manifest']): + if ( + "single_file_manifest" in source["attributes"] + and source["attributes"]["single_file_manifest"] is not None + ): + if source["attributes"]["single_file_manifest"]: single_file_manifest = True else: single_file_manifest = False else: single_file_manifest = False - + open_manifest_ids = [] # Search open manifests for data source if not single_file_manifest - if(not single_file_manifest): + if not single_file_manifest: open_manifests = search_manifests( - source_system_name = source['attributes']['ade_source_system'], - source_entity_name = source['attributes']['ade_source_entity'], - base_url = base_url, - notify_api_key = notify_api_key, - notify_api_key_secret = notify_api_key_secret, - state = "OPEN" + source_system_name=source_validated.attributes.ade_source_system, # source["attributes"]["ade_source_system"], + source_entity_name=source_validated.attributes.ade_source_entity, + base_url=base_url, + notify_api_key=notify_api_key, + notify_api_key_secret=notify_api_key_secret, + state="OPEN", ) - + for open_manifest_id in open_manifests: - open_manifest_ids.append(open_manifest_id['id']) + open_manifest_ids.append(open_manifest_id["id"]) - logging.info('Open manifests: {0}'.format(open_manifest_ids)) + logging.info("Open manifests: {0}".format(open_manifest_ids)) # Initialize a manifest object with mandatory attributes. manifest = Manifest( - base_url = base_url, - source_system_name = source['attributes']['ade_source_system'], - source_entity_name = source['attributes']['ade_source_entity'], - format = source['manifest_parameters']['format'], - notify_api_key = notify_api_key, - notify_api_key_secret = notify_api_key_secret + base_url=base_url, + source_system_name=source_validated.attributes.ade_source_system, + source_entity_name=source_validated.attributes.ade_source_entity, + format=source_validated.manifest_parameters.format, + notify_api_key=notify_api_key, + notify_api_key_secret=notify_api_key_secret, ) # Set optional manifest attributes if configured in data source. - if ('columns' in source['manifest_parameters']): - manifest.columns = source['manifest_parameters']['columns'] - if ('compression' in source['manifest_parameters']): - manifest.compression = source['manifest_parameters']['compression'] - if ('delim' in source['manifest_parameters']): - manifest.delim = source['manifest_parameters']['delim'] - if ('fullscanned' in source['manifest_parameters']): - manifest.fullscanned = source['manifest_parameters']['fullscanned'] - if ('skiph' in source['manifest_parameters']): - manifest.skiph = source['manifest_parameters']['skiph'] - - if (open_manifest_ids == []): + # This could be also none if not configured. + + if ( + "columns" in source["manifest_parameters"] + and source_validated.manifest_parameters.columns is not None + ): + manifest.columns = source["manifest_parameters"]["columns"] + + if ( + "compression" in source["manifest_parameters"] + and source_validated.manifest_parameters.compression is not None + ): + manifest.compression = source["manifest_parameters"]["compression"] + + if ( + "delim" in source["manifest_parameters"] + and source_validated.manifest_parameters.delim is not None + ): + manifest.delim = source["manifest_parameters"]["delim"] + + if ( + "fullscanned" in source["manifest_parameters"] + and source_validated.manifest_parameters.fullscanned is not None + ): + manifest.fullscanned = source["manifest_parameters"]["fullscanned"] + + if ( + "skiph" in source["manifest_parameters"] + and source_validated.manifest_parameters.skiph is not None + ): + manifest.skiph = source["manifest_parameters"]["skiph"] + + if open_manifest_ids == []: # Create a new manifest if open manifests are not found. manifest.create() - logging.info('Manifest created: {0}'.format(manifest.id)) + logging.info("Manifest created: {0}".format(manifest.id)) else: - if ('max_files_in_manifest' in source['attributes']): + if ( + "max_files_in_manifest" in source["attributes"] + and source_validated.attributes.max_files_in_manifest is not None + ): manifest.fetch_manifest(open_manifest_ids[-1]) manifest.fetch_manifest_entries() manifest_entries = manifest.manifest_entries - if (len(manifest_entries) >= source['attributes']['max_files_in_manifest']): - logging.info('Max files in manifest reached. Creating a new manifest') + if ( + len(manifest_entries) + >= source_validated.attributes.max_files_in_manifest + ): + logging.info("Max files in manifest reached. Creating a new manifest") # Create a new manifest if current manifest has already reached max files limit manifest.create() - logging.info('Manifest created: {0}'.format(manifest.id)) + logging.info("Manifest created: {0}".format(manifest.id)) else: # Use latest existing manifest if open manifests are found and max_files_in_manifest not reached. manifest.fetch_manifest(open_manifest_ids[-1]) - logging.info('Using open manifest: {0}'.format(manifest.id)) + logging.info("Using open manifest: {0}".format(manifest.id)) else: # Use latest existing manifest if open manifests are found. manifest.fetch_manifest(open_manifest_ids[-1]) - logging.info('Using open manifest: {0}'.format(manifest.id)) - - if ('path_replace' in source['attributes'] and 'path_replace_with' in source['attributes']): + logging.info("Using open manifest: {0}".format(manifest.id)) + + if ( + "path_replace" in source["attributes"] + and source_validated.attributes.path_replace is not None + and "path_replace_with" in source["attributes"] + and source_validated.attributes.path_replace_with is not None + ): # Modify manifest entry file url if configured. - entry_path = file_url.replace(source['attributes']['path_replace'], source['attributes']['path_replace_with']) + entry_path = file_url.replace( + source["attributes"]["path_replace"], + source["attributes"]["path_replace_with"], + ) else: entry_path = file_url - if ('batch_from_file_path_regex' in source['attributes']): + if ( + "batch_from_file_path_regex" in source["attributes"] + and source_validated.attributes.batch_from_file_path_regex is not None + ): # Parse entry specific batch number from file name if configured. try: - batch = parse_batch(file_url, source['attributes']['batch_from_file_path_regex']) - logging.info('Batch: {0}'.format(batch)) + batch = parse_batch( + file_url, source["attributes"]["batch_from_file_path_regex"] + ) + logging.info("Batch: {0}".format(batch)) except Exception as e: batch = None - logging.warning('Batch parsing failed:\n{0}'.format(e)) + logging.warning("Batch parsing failed:\n{0}".format(e)) else: batch = None @@ -175,22 +252,32 @@ def add_to_manifest(file_url: str, source: object, base_url: str, notify_api_key manifest.add_entry(entry_path, batch) except Exception as e: # Retry with a new manifest if e.g. an uncontrolled parallel execution has closed the manifest - logging.warning('Adding entry to manifest failed, retrying with a new manifest.') + logging.warning( + "Adding entry to manifest failed, retrying with a new manifest." + ) manifest.create() - logging.info('Manifest created: {0}'.format(manifest.id)) + logging.info("Manifest created: {0}".format(manifest.id)) manifest.add_entry(entry_path, batch) - logging.info('Added entry: {0}'.format(entry_path)) - + logging.info("Added entry: {0}".format(entry_path)) + # Notify manifest if single_file_manifest = true - if (single_file_manifest): - logging.info('Single_file_manifest set as true, notifying.') + if single_file_manifest: + logging.info("Single_file_manifest set as true, notifying.") manifest.notify() - logging.info('Notified manifest: {0}.'.format(manifest.id)) - + logging.info("Notified manifest: {0}.".format(manifest.id)) + return manifest -def add_multiple_entries_to_manifest(entries: List[dict], source: object, base_url: str, notify_api_key: str, notify_api_key_secret: str, batch: int = None): + +def add_multiple_entries_to_manifest( + entries: List[dict], + source: object, + base_url: str, + notify_api_key: str, + notify_api_key_secret: str, + batch: int = None, +): """Utilizes Manifest class and other functions to add the given file_url to a manifest for the given configured data source. Args: @@ -205,65 +292,105 @@ def add_multiple_entries_to_manifest(entries: List[dict], source: object, base_u Manifest object. """ - + # Put source object into Ade_Datasource class + try: + source_validated = adetypes.Ade_Datasource(**source) + except Exception as e: + logging.error("Error validating source object: {0}".format(e)) + raise + # Set single_file_manifest based on configuration # Initialize a manifest object with mandatory attributes. manifest = Manifest( - base_url = base_url, - source_system_name = source['attributes']['ade_source_system'], - source_entity_name = source['attributes']['ade_source_entity'], - format = source['manifest_parameters']['format'], - notify_api_key = notify_api_key, - notify_api_key_secret = notify_api_key_secret + base_url=base_url, + source_system_name=source_validated.attributes.ade_source_system, # source["attributes"]["ade_source_system"], + source_entity_name=source_validated.attributes.ade_source_entity, + format=source["manifest_parameters"]["format"], + notify_api_key=notify_api_key, + notify_api_key_secret=notify_api_key_secret, ) # Set optional manifest attributes if configured in data source. - if ('columns' in source['manifest_parameters']): - manifest.columns = source['manifest_parameters']['columns'] - if ('compression' in source['manifest_parameters']): - manifest.compression = source['manifest_parameters']['compression'] - if ('delim' in source['manifest_parameters']): - manifest.delim = source['manifest_parameters']['delim'] - if ('fullscanned' in source['manifest_parameters']): - manifest.fullscanned = source['manifest_parameters']['fullscanned'] - if ('skiph' in source['manifest_parameters']): - manifest.skiph = source['manifest_parameters']['skiph'] + if ( + "columns" in source["manifest_parameters"] + and source_validated.manifest_parameters.columns is not None + ): + manifest.columns = source["manifest_parameters"]["columns"] + + if ( + "compression" in source["manifest_parameters"] + and source_validated.manifest_parameters.compression is not None + ): + manifest.compression = source["manifest_parameters"]["compression"] + + if ( + "delim" in source["manifest_parameters"] + and source_validated.manifest_parameters.delim is not None + ): + manifest.delim = source["manifest_parameters"]["delim"] + + if ( + "fullscanned" in source["manifest_parameters"] + and source_validated.manifest_parameters.fullscanned is not None + ): + manifest.fullscanned = source["manifest_parameters"]["fullscanned"] + + if ( + "skiph" in source["manifest_parameters"] + and source_validated.manifest_parameters.skiph is not None + ): + manifest.skiph = source["manifest_parameters"]["skiph"] # Setting manifest-level batch if needed - if (batch is not None): + if batch is not None: manifest.batch = batch # Create a new manifest. manifest.create() - logging.info('Manifest created: {0}'.format(manifest.id)) - - if ('path_replace' in source['attributes'] and 'path_replace_with' in source['attributes']): + logging.info("Manifest created: {0}".format(manifest.id)) + + if ( + "path_replace" in source["attributes"] + and source_validated.attributes.path_replace is not None + and "path_replace_with" in source["attributes"] + and source_validated.attributes.path_replace_with is not None + ): # Modify manifest entry file url if configured. for entry in entries: - entry['sourceFile'] = entry['sourceFile'].replace( - source['attributes']['path_replace'], source['attributes']['path_replace_with']) - - if ('batch_from_file_path_regex' in source['attributes']): + entry["sourceFile"] = entry["sourceFile"].replace( + source["attributes"]["path_replace"], + source["attributes"]["path_replace_with"], + ) + + if ( + "batch_from_file_path_regex" in source["attributes"] + and source_validated.attributes.ade_source_system is not None + ): # Parse entry specific batch number from file name if configured. try: for entry_batch in entries: - entry_batch['batch'] = parse_batch(entry['sourceFile'], source['attributes']['batch_from_file_path_regex']) - logging.info('Batch: {0}'.format(batch)) + entry_batch["batch"] = parse_batch( + entry["sourceFile"], + source["attributes"]["batch_from_file_path_regex"], + ) + logging.info("Batch: {0}".format(batch)) except Exception as e: batch = None - logging.warning('Batch parsing failed:\n{0}'.format(e)) + logging.warning("Batch parsing failed:\n{0}".format(e)) else: batch = None # Add entry to manifest. manifest.add_entries(entries) - logging.info('Added entries: {0}'.format(entries)) - + logging.info("Added entries: {0}".format(entries)) + manifest.notify(manifest.id) return manifest -def notify_manifests(source: object, base_url: str, notify_api_key: str, notify_api_key_secret: str): +def notify_manifests( + source: object, base_url: str, notify_api_key: str, notify_api_key_secret: str +): """Utilizes Manifest class and other functions to notify all open manifests for the given configured data source. Args: @@ -274,44 +401,54 @@ def notify_manifests(source: object, base_url: str, notify_api_key: str, notify_ Returns: Array of Manifest objects. - + """ + # Put source object into Ade_Datasource class + try: + source_validated = adetypes.Ade_Datasource(**source) + except Exception as e: + logging.error("Error validating source object: {0}".format(e)) + raise # Search open manifests for data source. open_manifests = search_manifests( - source_system_name = source['attributes']['ade_source_system'], - source_entity_name = source['attributes']['ade_source_entity'], - base_url = base_url, - notify_api_key = notify_api_key, - notify_api_key_secret = notify_api_key_secret, - state = "OPEN" + source_system_name=source_validated.attributes.ade_source_system, + source_entity_name=source_validated.attributes.ade_source_entity, + base_url=base_url, + notify_api_key=notify_api_key, + notify_api_key_secret=notify_api_key_secret, + state="OPEN", ) open_manifest_ids = [] for open_manifest_id in open_manifests: - open_manifest_ids.append(open_manifest_id['id']) + open_manifest_ids.append(open_manifest_id["id"]) # Initialize a manifest object with mandatory attributes. manifest = Manifest( - base_url = base_url, - source_system_name = source['attributes']['ade_source_system'], - source_entity_name = source['attributes']['ade_source_entity'], - format = source['manifest_parameters']['format'], - notify_api_key = notify_api_key, - notify_api_key_secret = notify_api_key_secret + base_url=base_url, + source_system_name=source_validated.attributes.ade_source_system, + source_entity_name=source_validated.attributes.ade_source_entity, + format=source_validated.manifest_parameters.format, + notify_api_key=notify_api_key, + notify_api_key_secret=notify_api_key_secret, ) manifests = [] - if (open_manifest_ids == []): + if open_manifest_ids == []: # Warning if open manifests not found. - logging.warning('Open manifests for source {0} not found when attempting to notify.'.format(source['id'])) + logging.warning( + "Open manifests for source {0} not found when attempting to notify.".format( + source["id"] + ) + ) else: # Notify all open manifests for data source. for manifest_id in open_manifest_ids: manifest.fetch_manifest(manifest_id) manifest.notify() - logging.info('Notified manifest: {0}.'.format(manifest.id)) + logging.info("Notified manifest: {0}.".format(manifest.id)) manifests.append(manifest) - - return manifests \ No newline at end of file + + return manifests From b3f905e58ccbe811c69f88d0df3ba5e9ae83dd75 Mon Sep 17 00:00:00 2001 From: Miguel P Z <60221874+MiguelElGallo@users.noreply.github.com> Date: Mon, 20 May 2024 12:39:47 +0300 Subject: [PATCH 2/5] Create adetypes.py using Pydantic --- adenotifier/adetypes.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 adenotifier/adetypes.py diff --git a/adenotifier/adetypes.py b/adenotifier/adetypes.py new file mode 100644 index 0000000..061d54b --- /dev/null +++ b/adenotifier/adetypes.py @@ -0,0 +1,36 @@ +from pydantic import BaseModel +from typing import List, Optional + + +class Ade_manifest_attributes(BaseModel): + """ + Represents the attributes of an ADE manifest. + """ + ade_source_system: str + ade_source_entity: str + batch_from_file_path_regex: Optional[str] = None + path_replace: Optional[str] = None + path_replace_with: Optional[str] = None + single_file_manifest: Optional[str] = None + max_files_in_manifest: Optional[int] = 1000 + + +class Ade_manifest_parameters(BaseModel): + """ + Represents the parameters of an ADE manifest. + """ + columns: Optional[List[str]] + compression: Optional[str] = None + delim: Optional[str] = None + format: str + fullscanned: Optional[bool] = None + skiph: Optional[int] = None + + +class Ade_Datasource(BaseModel): + """ + Represents an ADE datasource. + """ + id: str + attributes: Ade_manifest_attributes + manifest_parameters: Ade_manifest_parameters From ea9a159d4d87b7fb63918e8c4cf07efb5b215fca Mon Sep 17 00:00:00 2001 From: Miguel P Z <60221874+MiguelElGallo@users.noreply.github.com> Date: Mon, 20 May 2024 09:50:35 +0000 Subject: [PATCH 3/5] add build --- .github/workflows/python-publish.yml | 33 ++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 .github/workflows/python-publish.yml diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml new file mode 100644 index 0000000..c559cfe --- /dev/null +++ b/.github/workflows/python-publish.yml @@ -0,0 +1,33 @@ +# This workflow will upload a Python Package using Twine when a release is created +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python#publishing-to-package-registries + +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +name: Upload Python Package + +on: + workflow_dispatch + +permissions: + contents: read + +jobs: + deploy: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v3 + with: + python-version: '3.x' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install build + - name: Build package + run: python -m build \ No newline at end of file From 3d2ce493ef8cf9413e1d829dddec912caaa0cf6e Mon Sep 17 00:00:00 2001 From: Miguel P Z <60221874+MiguelElGallo@users.noreply.github.com> Date: Mon, 20 May 2024 12:54:01 +0300 Subject: [PATCH 4/5] Update setup.py with pydantic --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 327c4d0..bbaa836 100644 --- a/setup.py +++ b/setup.py @@ -16,5 +16,5 @@ license='MIT', packages=['adenotifier'], py_modules=['manifest', 'notifier'], - install_requires=['requests'] -) \ No newline at end of file + install_requires=['requests', 'pydantic'] +) From f868c47e594b18bc0446cae80722f7db09bcfd70 Mon Sep 17 00:00:00 2001 From: Miguel P Z <60221874+MiguelElGallo@users.noreply.github.com> Date: Mon, 20 May 2024 09:56:55 +0000 Subject: [PATCH 5/5] No modules --- setup.py | 1 - 1 file changed, 1 deletion(-) diff --git a/setup.py b/setup.py index bbaa836..2d3797a 100644 --- a/setup.py +++ b/setup.py @@ -15,6 +15,5 @@ }, license='MIT', packages=['adenotifier'], - py_modules=['manifest', 'notifier'], install_requires=['requests', 'pydantic'] )