From 06be904273e3ca17e44ae310ef1e0b44ccc390bf Mon Sep 17 00:00:00 2001 From: Ryan Lambert Date: Thu, 23 May 2024 18:14:22 -0600 Subject: [PATCH 1/6] Move InputMode to helpers to avoid overlapping var and module names --- docker/helpers.py | 171 ++++++++++++++++++++++++++++ docker/import_mode.py | 175 ----------------------------- docker/osm2pgsql_recommendation.py | 10 +- docker/pgosm_flex.py | 13 +-- docker/tests/test_import_mode.py | 140 +++++++++++------------ 5 files changed, 252 insertions(+), 257 deletions(-) delete mode 100644 docker/import_mode.py diff --git a/docker/helpers.py b/docker/helpers.py index a8e6dcf..a04643c 100644 --- a/docker/helpers.py +++ b/docker/helpers.py @@ -1,7 +1,9 @@ """Generic functions and attributes used in multiple modules of PgOSM Flex. """ import datetime +import json import logging +from packaging.version import parse as parse_version import subprocess import os import sys @@ -223,3 +225,172 @@ def unset_env_vars(): os.environ.pop('PGOSM_CONN', None) os.environ.pop('PGOSM_CONN_PG', None) os.environ.pop('SCHEMA_NAME', None) + + +class ImportMode(): + """Determines logical variables used to control program flow. + + WARNING: The values for `append_first_run` and `replication_update` + are used to determine when to drop the local DB. Be careful with any + changes to these values. + """ + def __init__(self, replication: bool, replication_update: bool, + update: str, force: bool): + """Computes two variables, slim_no_drop and append_first_run + based on inputs. + + Parameters + -------------------------- + replication : bool + replication_update : bool + update : str or None + Valid options are 'create' or 'append', lining up with osm2pgsql's + `--create` and `--append` modes. + force : bool + """ + self.logger = logging.getLogger('pgosm-flex') + self.replication = replication + self.replication_update = replication_update + + # The input via click should enforce this, still worth checking here + valid_update_options = ['append', 'create', None] + + if update not in valid_update_options: + raise ValueError(f'Invalid option for --update. Valid options: {valid_update_options}') + + self.update = update + self.force = force + + self.set_slim_no_drop() + self.set_append_first_run() + self.set_run_post_sql() + + + def okay_to_run(self, prior_import: dict) -> bool: + """Determines if it is okay to run PgOSM Flex without fear of data loss. + + This logic was along with the `--force` option to make it + less likely to accidentally lose data with improper PgOSM Flex + options. + + Remember, this is free and open source software and there is + no warranty! + This does not imply a guarantee that you **cannot** lose data, + only that we want to make it **less likely** something bad will happen. + If you find a way bad things can happen that could be detected here, + please open an issue: + + https://github.com/rustprooflabs/pgosm-flex/issues/new?assignees=&labels=&projects=&template=bug_report.md&title=Data%20Safety%20Idea + + Parameters + ------------------- + prior_import : dict + Details about the latest import from osm.pgosm_flex table. + + An empty dictionary (len==0) indicates no prior import. + Only the replication key is specifically used + + Returns + ------------------- + okay_to_run : bool + """ + self.logger.debug(f'Checking if it is okay to run...') + if self.force: + self.logger.warn(f'Using --force, kiss existing data goodbye') + return True + + # If no prior imports, do not require force + if len(prior_import) == 0: + self.logger.debug(f'No prior import found, okay to proceed.') + return True + + prior_replication = prior_import['replication'] + + # Check git version against latest. + # If current version is lower than prior version from latest import, stop. + prior_import_version = prior_import['pgosm_flex_version_no_hash'] + git_tag = helpers.get_git_info(tag_only=True) + + if git_tag == '-- (version unknown) --': + msg = 'Unable to detect PgOSM Flex version from Git.' + msg += ' Not enforcing version check against prior version.' + self.logger.warning(msg) + elif parse_version(git_tag) < parse_version(prior_import_version): + msg = f'PgOSM Flex version ({git_tag}) is lower than latest import' + msg += f' tracked in the pgosm_flex table ({prior_import_version}).' + msg += f' Use PgOSM Flex version {prior_import_version} or newer' + self.logger.error(msg) + return False + else: + self.logger.info(f'Prior import used PgOSM Flex: {prior_import_version}') + + if self.replication: + if not prior_replication: + self.logger.error('Running w/ replication but prior import did not. Requires --force to proceed.') + return False + self.logger.debug('Okay to proceed with replication') + return True + + msg = 'Prior data exists in the osm schema and --force was not used.' + self.logger.error(msg) + return False + + def set_append_first_run(self): + """Uses `replication_update` and `update` to determine value for + `self.append_first_run` + """ + if self.replication_update: + self.append_first_run = False + else: + self.append_first_run = True + + if self.update is not None: + if self.update == 'create': + self.append_first_run = True + else: + self.append_first_run = False + + def set_slim_no_drop(self): + """Uses `replication` and `update` to determine value for + `self.slim_no_drop` + """ + self.slim_no_drop = False + + if self.replication: + self.slim_no_drop = True + + if self.update is not None: + self.slim_no_drop = True + + def set_run_post_sql(self): + """Uses `update` value to determine value for + `self.run_post_sql`. This value determines if the post-processing SQL + should be executed. + + Note: Not checking replication/replication_update because subsequent + imports use osm2pgsql-replication, which does not attempt to run + the post-processing SQL scripts. + """ + self.run_post_sql = True + + if self.update is not None: + if self.update == 'append': + self.run_post_sql = False + + def as_json(self) -> str: + """Packs key details as a dictionary passed through `json.dumps()` + + Returns + ------------------------ + json_text : str + Text representation of JSON object built using class attributes. + """ + self_as_dict = {'update': self.update, + 'replication': self.replication, + 'replication_update': self.replication_update, + 'append_first_run': self.append_first_run, + 'slim_no_drop': self.slim_no_drop, + 'run_post_sql': self.run_post_sql} + return json.dumps(self_as_dict) + + diff --git a/docker/import_mode.py b/docker/import_mode.py deleted file mode 100644 index d56662c..0000000 --- a/docker/import_mode.py +++ /dev/null @@ -1,175 +0,0 @@ -"""Import Mode provides class to ease logic related to various import modes. -""" -import logging -import json -from packaging.version import parse as parse_version - -import helpers - - -class ImportMode(): - """Determines logical variables used to control program flow. - - WARNING: The values for `append_first_run` and `replication_update` - are used to determine when to drop the local DB. Be careful with any - changes to these values. - """ - def __init__(self, replication: bool, replication_update: bool, - update: str, force: bool): - """Computes two variables, slim_no_drop and append_first_run - based on inputs. - - Parameters - -------------------------- - replication : bool - replication_update : bool - update : str or None - Valid options are 'create' or 'append', lining up with osm2pgsql's - `--create` and `--append` modes. - force : bool - """ - self.logger = logging.getLogger('pgosm-flex') - self.replication = replication - self.replication_update = replication_update - - # The input via click should enforce this, still worth checking here - valid_update_options = ['append', 'create', None] - - if update not in valid_update_options: - raise ValueError(f'Invalid option for --update. Valid options: {valid_update_options}') - - self.update = update - self.force = force - - self.set_slim_no_drop() - self.set_append_first_run() - self.set_run_post_sql() - - - def okay_to_run(self, prior_import: dict) -> bool: - """Determines if it is okay to run PgOSM Flex without fear of data loss. - - This logic was along with the `--force` option to make it - less likely to accidentally lose data with improper PgOSM Flex - options. - - Remember, this is free and open source software and there is - no warranty! - This does not imply a guarantee that you **cannot** lose data, - only that we want to make it **less likely** something bad will happen. - If you find a way bad things can happen that could be detected here, - please open an issue: - - https://github.com/rustprooflabs/pgosm-flex/issues/new?assignees=&labels=&projects=&template=bug_report.md&title=Data%20Safety%20Idea - - Parameters - ------------------- - prior_import : dict - Details about the latest import from osm.pgosm_flex table. - - An empty dictionary (len==0) indicates no prior import. - Only the replication key is specifically used - - Returns - ------------------- - okay_to_run : bool - """ - self.logger.debug(f'Checking if it is okay to run...') - if self.force: - self.logger.warn(f'Using --force, kiss existing data goodbye') - return True - - # If no prior imports, do not require force - if len(prior_import) == 0: - self.logger.debug(f'No prior import found, okay to proceed.') - return True - - prior_replication = prior_import['replication'] - - # Check git version against latest. - # If current version is lower than prior version from latest import, stop. - prior_import_version = prior_import['pgosm_flex_version_no_hash'] - git_tag = helpers.get_git_info(tag_only=True) - - if git_tag == '-- (version unknown) --': - msg = 'Unable to detect PgOSM Flex version from Git.' - msg += ' Not enforcing version check against prior version.' - self.logger.warning(msg) - elif parse_version(git_tag) < parse_version(prior_import_version): - msg = f'PgOSM Flex version ({git_tag}) is lower than latest import' - msg += f' tracked in the pgosm_flex table ({prior_import_version}).' - msg += f' Use PgOSM Flex version {prior_import_version} or newer' - self.logger.error(msg) - return False - else: - self.logger.info(f'Prior import used PgOSM Flex: {prior_import_version}') - - if self.replication: - if not prior_replication: - self.logger.error('Running w/ replication but prior import did not. Requires --force to proceed.') - return False - self.logger.debug('Okay to proceed with replication') - return True - - msg = 'Prior data exists in the osm schema and --force was not used.' - self.logger.error(msg) - return False - - def set_append_first_run(self): - """Uses `replication_update` and `update` to determine value for - `self.append_first_run` - """ - if self.replication_update: - self.append_first_run = False - else: - self.append_first_run = True - - if self.update is not None: - if self.update == 'create': - self.append_first_run = True - else: - self.append_first_run = False - - def set_slim_no_drop(self): - """Uses `replication` and `update` to determine value for - `self.slim_no_drop` - """ - self.slim_no_drop = False - - if self.replication: - self.slim_no_drop = True - - if self.update is not None: - self.slim_no_drop = True - - def set_run_post_sql(self): - """Uses `update` value to determine value for - `self.run_post_sql`. This value determines if the post-processing SQL - should be executed. - - Note: Not checking replication/replication_update because subsequent - imports use osm2pgsql-replication, which does not attempt to run - the post-processing SQL scripts. - """ - self.run_post_sql = True - - if self.update is not None: - if self.update == 'append': - self.run_post_sql = False - - def as_json(self) -> str: - """Packs key details as a dictionary passed through `json.dumps()` - - Returns - ------------------------ - json_text : str - Text representation of JSON object built using class attributes. - """ - self_as_dict = {'update': self.update, - 'replication': self.replication, - 'replication_update': self.replication_update, - 'append_first_run': self.append_first_run, - 'slim_no_drop': self.slim_no_drop, - 'run_post_sql': self.run_post_sql} - return json.dumps(self_as_dict) - diff --git a/docker/osm2pgsql_recommendation.py b/docker/osm2pgsql_recommendation.py index 25d53e7..9ec23ec 100644 --- a/docker/osm2pgsql_recommendation.py +++ b/docker/osm2pgsql_recommendation.py @@ -5,13 +5,13 @@ import os import osm2pgsql_tuner as tuner -import db, import_mode +import db, helpers LOGGER = logging.getLogger('pgosm-flex') def osm2pgsql_recommendation(ram: float, pbf_filename: str, out_path: str, - import_mode: import_mode.ImportMode) -> str: + import_mode: helpers.ImportMode) -> str: """Returns recommended osm2pgsql command from the osm2pgsql-tuner Python module: https://pypi.org/project/osm2pgsql-tuner/ @@ -24,7 +24,7 @@ def osm2pgsql_recommendation(ram: float, pbf_filename: str, out_path: str, Total system RAM available in GB pbf_filename : str out_path : str - import_mode : import_mode.ImportMode + import_mode : helpers.ImportMode Returns ---------------------- @@ -49,7 +49,7 @@ def osm2pgsql_recommendation(ram: float, pbf_filename: str, out_path: str, def get_recommended_script(system_ram_gb: float, osm_pbf_gb: float, - import_mode:import_mode.ImportMode, + import_mode:helpers.ImportMode, pbf_filename: str, output_path: str) -> str: """Generates recommended osm2pgsql command from osm2pgsql-tuner. @@ -58,7 +58,7 @@ def get_recommended_script(system_ram_gb: float, ------------------------------- system_ram_gb : float osm_pbf_gb : float - import_mode : import_mode.ImportMode + import_mode : helpers.ImportMode pbf_filename : str Can be filename or absolute path. output_path : str diff --git a/docker/pgosm_flex.py b/docker/pgosm_flex.py index 2df2911..401c882 100644 --- a/docker/pgosm_flex.py +++ b/docker/pgosm_flex.py @@ -18,7 +18,6 @@ import db import geofabrik import helpers -from import_mode import ImportMode @click.command() @@ -118,10 +117,10 @@ def run_pgosm_flex(ram, region, subregion, debug, force, logger.debug(f'UPDATE setting: {update}') # Warning: Reusing the module's name here as import_mode... - import_mode = ImportMode(replication=replication, - replication_update=replication_update, - update=update, - force=force) + import_mode = helpers.ImportMode(replication=replication, + replication_update=replication_update, + update=update, + force=force) db.prepare_pgosm_db(skip_qgis_style=skip_qgis_style, db_path=paths['db_path'], @@ -201,7 +200,7 @@ def run_osm2pgsql_standard(input_file, out_path, flex_path, ram, skip_nested, flex_path : str ram : float skip_nested : boolean - import_mode : import_mode.ImportMode + import_mode : helpers.helpers.ImportMode debug : boolean schema_name : str @@ -516,7 +515,7 @@ def run_post_processing(flex_path, skip_nested, import_mode, schema_name): ---------------------- flex_path : str skip_nested : bool - import_mode : import_mode.ImportMode + import_mode : helpers.helpers.ImportMode schema_name : str Returns diff --git a/docker/tests/test_import_mode.py b/docker/tests/test_import_mode.py index 13fc3a4..197ca84 100644 --- a/docker/tests/test_import_mode.py +++ b/docker/tests/test_import_mode.py @@ -2,7 +2,7 @@ import os import unittest -import import_mode +import helpers class ImportModeTests(unittest.TestCase): @@ -13,12 +13,12 @@ def test_import_mode_with_no_replication_or_update_returns_append_first_run_True update = None expected = True - im = import_mode.ImportMode(replication=replication, - replication_update=replication_update, - update=update, - force=False) + input_mode = helpers.ImportMode(replication=replication, + replication_update=replication_update, + update=update, + force=False) - actual = im.append_first_run + actual = input_mode.append_first_run self.assertEqual(expected, actual) def test_import_mode_with_replication_update_returns_append_first_run_False(self): @@ -27,12 +27,12 @@ def test_import_mode_with_replication_update_returns_append_first_run_False(self update = None expected = False - im = import_mode.ImportMode(replication=replication, - replication_update=replication_update, - update=update, - force=False) + input_mode = helpers.ImportMode(replication=replication, + replication_update=replication_update, + update=update, + force=False) - actual = im.append_first_run + actual = input_mode.append_first_run self.assertEqual(expected, actual) def test_import_mode_with_update_eq_create_returns_True(self): @@ -41,12 +41,12 @@ def test_import_mode_with_update_eq_create_returns_True(self): update = 'create' expected = True - im = import_mode.ImportMode(replication=replication, - replication_update=replication_update, - update=update, - force=False) + input_mode = helpers.ImportMode(replication=replication, + replication_update=replication_update, + update=update, + force=False) - actual = im.append_first_run + actual = input_mode.append_first_run self.assertEqual(expected, actual) def test_import_mode_with_update_eq_append_returns_False(self): @@ -55,12 +55,12 @@ def test_import_mode_with_update_eq_append_returns_False(self): update = 'append' expected = False - im = import_mode.ImportMode(replication=replication, - replication_update=replication_update, - update=update, - force=False) + input_mode = helpers.ImportMode(replication=replication, + replication_update=replication_update, + update=update, + force=False) - actual = im.append_first_run + actual = input_mode.append_first_run self.assertEqual(expected, actual) def test_import_mode_invalid_update_value_raises_ValueError(self): @@ -69,10 +69,10 @@ def test_import_mode_invalid_update_value_raises_ValueError(self): update = False # Boolean is invalid for this with self.assertRaises(ValueError): - import_mode.ImportMode(replication=replication, - replication_update=replication_update, - update=update, - force=False) + helpers.ImportMode(replication=replication, + replication_update=replication_update, + update=update, + force=False) @@ -81,12 +81,12 @@ def test_import_mode_with_update_create_sets_value_run_post_sql_True(self): replication_update = False update = 'create' expected = True - im = import_mode.ImportMode(replication=replication, - replication_update=replication_update, - update=update, - force=False) + input_mode = helpers.ImportMode(replication=replication, + replication_update=replication_update, + update=update, + force=False) - actual = im.run_post_sql + actual = input_mode.run_post_sql self.assertEqual(expected, actual) @@ -95,12 +95,12 @@ def test_import_mode_with_update_append_sets_value_run_post_sql_False(self): replication_update = False update = 'append' expected = False - im = import_mode.ImportMode(replication=replication, - replication_update=replication_update, - update=update, - force=False) + input_mode = helpers.ImportMode(replication=replication, + replication_update=replication_update, + update=update, + force=False) - actual = im.run_post_sql + actual = input_mode.run_post_sql self.assertEqual(expected, actual) def test_import_mode_okay_to_run_returns_expected_type(self): @@ -109,14 +109,14 @@ def test_import_mode_okay_to_run_returns_expected_type(self): update = None force = True - im = import_mode.ImportMode(replication=replication, - replication_update=replication_update, - update=update, - force=force) + input_mode = helpers.ImportMode(replication=replication, + replication_update=replication_update, + update=update, + force=force) expected = bool prior_import = {'replication': False} - results = im.okay_to_run(prior_import=prior_import) + results = input_mode.okay_to_run(prior_import=prior_import) actual = type(results) self.assertEqual(expected, actual) @@ -127,14 +127,14 @@ def test_import_mode_okay_to_run_returns_true_when_force(self): update = None force = True - im = import_mode.ImportMode(replication=replication, - replication_update=replication_update, - update=update, - force=force) + input_mode = helpers.ImportMode(replication=replication, + replication_update=replication_update, + update=update, + force=force) expected = True prior_import = {'replication': False} - actual = im.okay_to_run(prior_import=prior_import) + actual = input_mode.okay_to_run(prior_import=prior_import) self.assertEqual(expected, actual) @@ -153,14 +153,14 @@ def test_import_mode_okay_to_run_returns_false_when_prior_record_not_replication update = None force = False - im = import_mode.ImportMode(replication=replication, - replication_update=replication_update, - update=update, - force=force) + input_mode = helpers.ImportMode(replication=replication, + replication_update=replication_update, + update=update, + force=force) expected = False - actual = im.okay_to_run(prior_import=prior_import) + actual = input_mode.okay_to_run(prior_import=prior_import) self.assertEqual(expected, actual) @@ -179,14 +179,14 @@ def test_import_mode_okay_to_run_returns_true_when_replication_prior_record_repl update = None force = False - im = import_mode.ImportMode(replication=replication, - replication_update=replication_update, - update=update, - force=force) + input_mode = helpers.ImportMode(replication=replication, + replication_update=replication_update, + update=update, + force=force) expected = True - actual = im.okay_to_run(prior_import=prior_import) + actual = input_mode.okay_to_run(prior_import=prior_import) self.assertEqual(expected, actual) @@ -204,14 +204,14 @@ def test_import_mode_okay_to_run_returns_false_when_prior_import(self): update = None force = False - im = import_mode.ImportMode(replication=replication, - replication_update=replication_update, - update=update, - force=force) + input_mode = helpers.ImportMode(replication=replication, + replication_update=replication_update, + update=update, + force=force) expected = False - actual = im.okay_to_run(prior_import=prior_import) + actual = input_mode.okay_to_run(prior_import=prior_import) self.assertEqual(expected, actual) def test_import_mode_okay_to_run_returns_true_no_prior_import(self): @@ -226,14 +226,14 @@ def test_import_mode_okay_to_run_returns_true_no_prior_import(self): update = None force = False - im = import_mode.ImportMode(replication=replication, - replication_update=replication_update, - update=update, - force=force) + input_mode = helpers.ImportMode(replication=replication, + replication_update=replication_update, + update=update, + force=force) expected = True - actual = im.okay_to_run(prior_import=prior_import) + actual = input_mode.okay_to_run(prior_import=prior_import) self.assertEqual(expected, actual) @@ -244,12 +244,12 @@ def test_import_mode_as_json_expected_type(self): update = None force = True - im = import_mode.ImportMode(replication=replication, - replication_update=replication_update, - update=update, - force=force) + input_mode = helpers.ImportMode(replication=replication, + replication_update=replication_update, + update=update, + force=force) expected = str - results = im.as_json() + results = input_mode.as_json() actual = type(results) self.assertEqual(expected, actual) From c821bec9e3c378c18dab0388133909e345408e06 Mon Sep 17 00:00:00 2001 From: Ryan Lambert Date: Thu, 23 May 2024 20:00:26 -0600 Subject: [PATCH 2/6] Use osmium to report more accurate time of the data from pbf metadata --- Dockerfile | 1 + db/deploy/osm_pgosm_flex.sql | 6 ++++-- docker/db.py | 10 ++++++++-- docker/geofabrik.py | 36 ++++++++++++++++++++++++++++++++++++ 4 files changed, 49 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index 7ec98ea..062a691 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,6 +20,7 @@ RUN apt-get update \ curl unzip \ postgresql-16-pgrouting \ nlohmann-json3-dev \ + osmium-tool \ && rm -rf /var/lib/apt/lists/* RUN wget https://luarocks.org/releases/luarocks-3.9.2.tar.gz \ diff --git a/db/deploy/osm_pgosm_flex.sql b/db/deploy/osm_pgosm_flex.sql index 4c64ff4..029e871 100644 --- a/db/deploy/osm_pgosm_flex.sql +++ b/db/deploy/osm_pgosm_flex.sql @@ -5,7 +5,7 @@ BEGIN; CREATE TABLE IF NOT EXISTS {schema_name}.pgosm_flex ( id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY, imported TIMESTAMPTZ NOT NULL DEFAULT NOW(), - osm_date date NOT NULL, + osm_date TIMESTAMPTZ NOT NULL, region text NOT NULL, layerset TEXT NULL, srid text NOT NULL, @@ -43,10 +43,12 @@ ALTER TABLE {schema_name}.pgosm_flex ALTER TABLE {schema_name}.pgosm_flex DROP COLUMN IF EXISTS project_url; ALTER TABLE {schema_name}.pgosm_flex DROP COLUMN IF EXISTS default_date; +ALTER TABLE {schema_name}.pgosm_flex ALTER COLUMN osm_date TYPE TIMESTAMPTZ; + COMMENT ON TABLE {schema_name}.pgosm_flex IS 'Provides meta information on the PgOSM-Flex project including version and SRID used during the import. One row per import.'; COMMENT ON COLUMN {schema_name}.pgosm_flex.imported IS 'Indicates when the import was ran.'; -COMMENT ON COLUMN {schema_name}.pgosm_flex.osm_date IS 'Indicates the date of the OpenStreetMap data loaded. Recommended to set PGOSM_DATE env var at runtime, otherwise defaults to the date PgOSM-Flex was run.'; +COMMENT ON COLUMN {schema_name}.pgosm_flex.osm_date IS 'Indicates the date of the OpenStreetMap data loaded. Uses timestamp from PBF file metadata when available. If metadata not available this represents --osm-date at runtime, or the date of today in timezone based on computer running import.'; COMMENT ON COLUMN {schema_name}.pgosm_flex.srid IS 'SRID of imported data.'; COMMENT ON COLUMN {schema_name}.pgosm_flex.pgosm_flex_version IS 'Version of PgOSM-Flex used to generate schema.'; COMMENT ON COLUMN {schema_name}.pgosm_flex.osm2pgsql_version IS 'Version of osm2pgsql used to load data.'; diff --git a/docker/db.py b/docker/db.py index 0bf69a7..36d1e20 100644 --- a/docker/db.py +++ b/docker/db.py @@ -626,21 +626,27 @@ def fix_pg_dump_create_public(export_path): def log_import_message(import_id, msg, schema_name): """Logs msg to database in osm.pgosm_flex for import_uuid. + Overwrites `osm_date` IF `pbf_timestamp` is set. + Parameters ------------------------------- import_id : int msg : str schema_name: str """ + pbf_timestamp = os.environ['PBF_TIMESTAMP'] sql_raw = """ UPDATE {schema_name}.pgosm_flex - SET import_status = %(msg)s + SET import_status = %(msg)s , + osm_date = COALESCE( %(pbf_timestamp)s , osm_date) WHERE id = %(import_id)s ; """ sql_raw = sql_raw.format(schema_name=schema_name) with get_db_conn(conn_string=os.environ['PGOSM_CONN']) as conn: - params = {'import_id': import_id, 'msg': msg} + params = {'import_id': import_id, + 'msg': msg, + 'pbf_timestamp': pbf_timestamp} cur = conn.cursor() cur.execute(sql_raw, params=params) diff --git a/docker/geofabrik.py b/docker/geofabrik.py index b7ecf02..957a26e 100644 --- a/docker/geofabrik.py +++ b/docker/geofabrik.py @@ -1,6 +1,7 @@ """This module handles the auto-file handling using Geofabrik's download service. """ import logging +import json import os import shutil import subprocess @@ -67,10 +68,45 @@ def prepare_data(out_path: str) -> str: md5_file_with_date) helpers.verify_checksum(md5_file, out_path) + set_date_from_metadata(pbf_file=pbf_file) return pbf_file +def set_date_from_metadata(pbf_file: str): + """Use `osmium fileinfo` to set a more accurate date to represent when it was + extracted from OpenStreetMap. + + Parameters + --------------------- + pbf_file : str + Full path to the `.osm.pbf` file. + """ + logger = logging.getLogger('pgosm-flex') + osmium_cmd = f'osmium fileinfo {pbf_file} --json' + output = [] + returncode = helpers.run_command_via_subprocess(cmd=osmium_cmd.split(), + cwd=None, + output_lines=output, + print=False) + if returncode != 0: + logger.error(f'osmium fileinfo failed. Output: {output}') + + output_joined = json.loads(''.join(output)) + meta_options = output_joined['header']['option'] + + try: + meta_timestamp = meta_options['timestamp'] + except KeyError: + try: + meta_timestamp = meta_options['osmosis_replication_timestamp'] + except KeyError: + meta_timestamp = None + + logger.info(f'PBF Meta timestamp: {meta_timestamp}') + os.environ['PBF_TIMESTAMP'] = meta_timestamp + + def pbf_download_needed(pbf_file_with_date: str, md5_file_with_date: str, pgosm_date: str) -> bool: """Decides if the PBF/MD5 files need to be downloaded. From 75a1529662f2418f242461c58b0190a29d6f7889 Mon Sep 17 00:00:00 2001 From: Ryan Lambert Date: Thu, 23 May 2024 20:32:03 -0600 Subject: [PATCH 3/6] Fix make --- docker/db.py | 5 ++++- docker/helpers.py | 2 +- docker/tests/test_osm2pgsql_recommendation.py | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/docker/db.py b/docker/db.py index 36d1e20..f0956bf 100644 --- a/docker/db.py +++ b/docker/db.py @@ -634,7 +634,10 @@ def log_import_message(import_id, msg, schema_name): msg : str schema_name: str """ - pbf_timestamp = os.environ['PBF_TIMESTAMP'] + try: + pbf_timestamp = os.environ['PBF_TIMESTAMP'] + except KeyError: + pbf_timestamp = os.environ['PGOSM_DATE'] sql_raw = """ UPDATE {schema_name}.pgosm_flex SET import_status = %(msg)s , diff --git a/docker/helpers.py b/docker/helpers.py index a04643c..0400ae7 100644 --- a/docker/helpers.py +++ b/docker/helpers.py @@ -309,7 +309,7 @@ def okay_to_run(self, prior_import: dict) -> bool: # Check git version against latest. # If current version is lower than prior version from latest import, stop. prior_import_version = prior_import['pgosm_flex_version_no_hash'] - git_tag = helpers.get_git_info(tag_only=True) + git_tag = get_git_info(tag_only=True) if git_tag == '-- (version unknown) --': msg = 'Unable to detect PgOSM Flex version from Git.' diff --git a/docker/tests/test_osm2pgsql_recommendation.py b/docker/tests/test_osm2pgsql_recommendation.py index 9f41569..06f6607 100644 --- a/docker/tests/test_osm2pgsql_recommendation.py +++ b/docker/tests/test_osm2pgsql_recommendation.py @@ -3,7 +3,7 @@ import unittest import osm2pgsql_recommendation -from import_mode import ImportMode +from helpers import ImportMode class Osm2pgsqlRecommendationTests(unittest.TestCase): From 22e8aa4f6564b50e638e8fe2717061ea6a9c3617 Mon Sep 17 00:00:00 2001 From: Ryan Lambert Date: Sat, 6 Jul 2024 08:18:56 -0600 Subject: [PATCH 4/6] Update to handle error when data is too stale to update via replication. Cleanup bad var name --- docker/geofabrik.py | 2 +- docker/helpers.py | 22 ++++++++++++++++++---- docker/pgosm_flex.py | 8 ++++---- 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/docker/geofabrik.py b/docker/geofabrik.py index 957a26e..0b02d4c 100644 --- a/docker/geofabrik.py +++ b/docker/geofabrik.py @@ -88,7 +88,7 @@ def set_date_from_metadata(pbf_file: str): returncode = helpers.run_command_via_subprocess(cmd=osmium_cmd.split(), cwd=None, output_lines=output, - print=False) + print_to_log=False) if returncode != 0: logger.error(f'osmium fileinfo failed. Output: {output}') diff --git a/docker/helpers.py b/docker/helpers.py index 0400ae7..aa4aae5 100644 --- a/docker/helpers.py +++ b/docker/helpers.py @@ -27,8 +27,11 @@ def get_today() -> str: return today -def run_command_via_subprocess(cmd: list, cwd: str, output_lines: list=[], - print: bool=False) -> int: +def run_command_via_subprocess(cmd: list, + cwd: str, + output_lines: list=[], + print_to_log: bool=False + ) -> int: """Wraps around subprocess.Popen() to run commands outside of Python. Prints output as it goes, returns the status code from the command. @@ -40,7 +43,7 @@ def run_command_via_subprocess(cmd: list, cwd: str, output_lines: list=[], Set the working directory, or to None. output_lines : list Pass in a list to return the output details. - print : bool + print_to_log : bool Default False. Set to true to also print to logger Returns @@ -60,12 +63,23 @@ def run_command_via_subprocess(cmd: list, cwd: str, output_lines: list=[], if output: ln = output.strip().decode('utf-8') output_lines.append(ln) - if print: + if print_to_log: logger.info(ln) + + # Detects issue reported in https://github.com/rustprooflabs/pgosm-flex/issues/391 + # Status code is incorrectly returned is 0, cannot detect + # problem using that method so forcing failure with custom + # status code. + if 'Error during diff download. Bailing out.' in ln: + logger.error('Data in database is too far behind replication service.') + return 999 + else: # Only sleep when there wasn't output sleep(1) + status = process.poll() + return status diff --git a/docker/pgosm_flex.py b/docker/pgosm_flex.py index 401c882..b79eff0 100644 --- a/docker/pgosm_flex.py +++ b/docker/pgosm_flex.py @@ -273,7 +273,7 @@ def run_replication_update(skip_nested, flex_path): update_cmd = update_cmd.replace('-d $PGOSM_CONN', f'-d {conn_string}') returncode = helpers.run_command_via_subprocess(cmd=update_cmd.split(), cwd=flex_path, - print=True) + print_to_log=True) if returncode != 0: err_msg = f'Failure. Return code: {returncode}' @@ -424,7 +424,7 @@ def run_osm2pgsql(osm2pgsql_command, flex_path, debug): returncode = helpers.run_command_via_subprocess(cmd=osm2pgsql_command.split(), cwd=flex_path, - print=True) + print_to_log=True) if returncode != 0: err_msg = f'Failed to run osm2pgsql. Return code: {returncode}' @@ -586,7 +586,7 @@ def check_replication_exists(): return True -def run_osm2pgsql_replication_init(pbf_path, pbf_filename): +def run_osm2pgsql_replication_init(pbf_path: str, pbf_filename: str): """Runs osm2pgsql-replication init to support replication mode. Parameters @@ -604,7 +604,7 @@ def run_osm2pgsql_replication_init(pbf_path, pbf_filename): returncode = helpers.run_command_via_subprocess(cmd=init_cmd.split(), cwd=None, - print=True) + print_to_log=True) if returncode != 0: err_msg = f'Failed to run osm2pgsql-replication. Return code: {returncode}' From 04d4146fc3ae4236cafa3cd19521325ca934f9d8 Mon Sep 17 00:00:00 2001 From: Ryan Lambert Date: Sat, 6 Jul 2024 09:20:24 -0600 Subject: [PATCH 5/6] Catch when --replication and --force used with existing replication data. Add docs on how to work around for dev purposes --- docker/pgosm_flex.py | 6 ++++++ docs/src/replication.md | 23 +++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/docker/pgosm_flex.py b/docker/pgosm_flex.py index b79eff0..190490c 100644 --- a/docker/pgosm_flex.py +++ b/docker/pgosm_flex.py @@ -105,6 +105,12 @@ def run_pgosm_flex(ram, region, subregion, debug, force, if replication: replication_update = check_replication_exists() + if replication_update and force: + err_msg = 'Using --force is invalid when --replication is running an update.' + err_msg += ' See https://pgosm-flex.com/replication.html#resetting-replication' + err_msg += ' for instructions around this on a development server.' + logger.error(err_msg) + sys.exit(f'ERROR: {err_msg}') else: replication_update = False diff --git a/docs/src/replication.md b/docs/src/replication.md index d58b7db..d751ad5 100644 --- a/docs/src/replication.md +++ b/docs/src/replication.md @@ -86,3 +86,26 @@ using `--schema-name`, replication via osm2pgsql-replication only supports a single source. See [this issue](https://github.com/openstreetmap/osm2pgsql/pull/1769) for details. Possibly this ability will be supported in the future. + +## Resetting Replication + +> ⚠️ WARNING! ⚠️ This section is only suitable for DEVELOPMENT databases. +> Do NOT USE on production databases! + +Replication with PgOSM Flex `--replication` is simply a wrapper around the +`osm2pgsql-replication` tool. If you need to reload a development +database after using `--replication` you must remove the data from the +`public.osm2pgsql_properties` table. If you do not remove this data, +PgOSM Flex will detect the replication setup and attempt to update data, not +load fresh. + + +```sql +DELETE FROM public.osm2pgsql_properties; +``` + +> WARNING: This process works as an okay hack when you are using the same layerset +> in the new import as was previously used. If you use a layerset with fewer +> tables, the original tables from the original layerset will persist and can +> cause confusion about what was loaded. + From 20f91a571c706b8c8b3eaa23ae8b550bd61ab750 Mon Sep 17 00:00:00 2001 From: Ryan Lambert Date: Sat, 6 Jul 2024 09:46:01 -0600 Subject: [PATCH 6/6] Minor cleanup --- docker/db.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/docker/db.py b/docker/db.py index f0956bf..637fa6a 100644 --- a/docker/db.py +++ b/docker/db.py @@ -1,4 +1,10 @@ """Module to interact with Postgres database. + +Dynamic SQL is used in this module to allow customized schema names for storing +data. At a glance, this is vulnerable to SQLi (SQL Injection) considering the +``schema_name`` variable is technically "user input". This is not considered +a concern for this project because the user inputting the ``schema_name`` value +is considered a trusted user. """ import logging import os @@ -607,7 +613,7 @@ def run_pg_dump(export_path, skip_qgis_style): fix_pg_dump_create_public(export_path) -def fix_pg_dump_create_public(export_path): +def fix_pg_dump_create_public(export_path: str): """Using pg_dump with `--schema=public` results in a .sql script containing `CREATE SCHEMA public;`, nearly always breaks in target DB. Replaces with `CREATE SCHEMA IF NOT EXISTS public;` @@ -623,10 +629,10 @@ def fix_pg_dump_create_public(export_path): LOGGER.debug(result) -def log_import_message(import_id, msg, schema_name): +def log_import_message(import_id: int, msg: str, schema_name: str): """Logs msg to database in osm.pgosm_flex for import_uuid. - Overwrites `osm_date` IF `pbf_timestamp` is set. + Overwrites `osm_date` if `pbf_timestamp` is set. Parameters ------------------------------- @@ -638,6 +644,7 @@ def log_import_message(import_id, msg, schema_name): pbf_timestamp = os.environ['PBF_TIMESTAMP'] except KeyError: pbf_timestamp = os.environ['PGOSM_DATE'] + sql_raw = """ UPDATE {schema_name}.pgosm_flex SET import_status = %(msg)s ,