From be6ae23c0a3d4448bc2a6ab66beff83a5daf7ef8 Mon Sep 17 00:00:00 2001 From: ori Date: Mon, 15 Jul 2024 19:48:21 +0300 Subject: [PATCH] fixes to ckan dataset fetcher --- .../generic_fetchers/ckan_dataset_fetcher.py | 29 +++++++++++++------ .../operators/generic_fetcher.py | 4 +-- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py b/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py index 103c5ea..1ea10ad 100644 --- a/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py +++ b/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py @@ -1,6 +1,5 @@ import os import json -import uuid import shutil import requests @@ -37,15 +36,24 @@ def get_filtered_tabular_resources_to_update(tmpdir, source_filter, id_, name, f print(f'filtering tabular data from {filename} with format {format_}...') resources_to_update = [] DF.Flow( - DF.load(f'{tmpdir}/{id_}', name='filtered', format=format_.lower()), + DF.load(f'{tmpdir}/{id_}', name='filtered', format=format_.lower(), infer_strategy=DF.load.INFER_STRINGS, cast_strategy=DF.load.CAST_TO_STRINGS), DF.filter_rows(lambda row: all(row.get(k) == v for k, v in source_filter.items())), DF.printer(), DF.dump_to_path(f'{tmpdir}/{id_}-filtered') ).process() with open(f'{tmpdir}/{id_}-filtered/datapackage.json', 'r') as f: - hash_ = json.load(f)['hash'] - shutil.copyfile(f'{tmpdir}/{id_}-filtered/filtered.csv', f'{tmpdir}/{id_}') - resources_to_update.append((id_, name, 'CSV', hash_, description, filename)) + dp = json.load(f) + hash_ = dp['hash'] + count_of_rows = dp['count_of_rows'] + if count_of_rows == 0: + print('no rows found, skipping resource') + else: + shutil.copyfile(f'{tmpdir}/{id_}-filtered/filtered.csv', f'{tmpdir}/{id_}') + if not filename.lower().endswith('.csv'): + filename = filename.lower().replace('.xlsx', '.csv').replace('.xls', '.csv') + if not filename.endswith('.csv'): + filename = f'{filename}.csv' + resources_to_update.append((id_, name, 'CSV', hash_, description, filename)) return resources_to_update @@ -56,10 +64,13 @@ def get_filtered_geojson_resources_to_update(tmpdir, source_filter, id_, name, f data = json.load(f) features = data.get('features') or [] features = [feature for feature in features if all(feature['properties'].get(k) == v for k, v in source_filter.items())] - data['features'] = features - with open(f'{tmpdir}/{id_}', 'w') as f: - json.dump(data, f) - resources_to_update.append((id_, name, 'GEOJSON', hash_, description, filename)) + if not features: + print('no features found, skipping resource') + else: + data['features'] = features + with open(f'{tmpdir}/{id_}', 'w') as f: + json.dump(data, f) + resources_to_update.append((id_, name, 'GEOJSON', hash_, description, filename)) return resources_to_update diff --git a/datacity_ckan_dgp/operators/generic_fetcher.py b/datacity_ckan_dgp/operators/generic_fetcher.py index 85c7993..b4021d0 100644 --- a/datacity_ckan_dgp/operators/generic_fetcher.py +++ b/datacity_ckan_dgp/operators/generic_fetcher.py @@ -35,9 +35,7 @@ def operator(name, params): tmpdir = params.get('tmpdir') with tempdir(tmpdir) as tmpdir: print('starting generic_fetcher operator') - print(f'source_url={source_url} target_instance_name={target_instance_name} target_package_id={target_package_id} target_organization_id={target_organization_id}') - print(f'source_filter={source_filter}') - print(f'tmpdir={tmpdir}') + print(json.dumps(params)) for fetcher in FETCHERS: assert fetcher['match'].keys() == {'url_contains'}, 'only url_contains match is supported at the moment' if fetcher['match']['url_contains'] in source_url: