Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev to Main #394

Merged
merged 10 commits into from
Jul 6, 2024
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ RUN apt-get update \
curl unzip \
postgresql-16-pgrouting \
nlohmann-json3-dev \
osmium-tool \
&& rm -rf /var/lib/apt/lists/*

RUN wget https://luarocks.org/releases/luarocks-3.9.2.tar.gz \
Expand Down
6 changes: 4 additions & 2 deletions db/deploy/osm_pgosm_flex.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ BEGIN;
CREATE TABLE IF NOT EXISTS {schema_name}.pgosm_flex (
id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY,
imported TIMESTAMPTZ NOT NULL DEFAULT NOW(),
osm_date date NOT NULL,
osm_date TIMESTAMPTZ NOT NULL,
region text NOT NULL,
layerset TEXT NULL,
srid text NOT NULL,
Expand Down Expand Up @@ -43,10 +43,12 @@ ALTER TABLE {schema_name}.pgosm_flex
ALTER TABLE {schema_name}.pgosm_flex DROP COLUMN IF EXISTS project_url;
ALTER TABLE {schema_name}.pgosm_flex DROP COLUMN IF EXISTS default_date;

ALTER TABLE {schema_name}.pgosm_flex ALTER COLUMN osm_date TYPE TIMESTAMPTZ;

COMMENT ON TABLE {schema_name}.pgosm_flex IS 'Provides meta information on the PgOSM-Flex project including version and SRID used during the import. One row per import.';

COMMENT ON COLUMN {schema_name}.pgosm_flex.imported IS 'Indicates when the import was ran.';
COMMENT ON COLUMN {schema_name}.pgosm_flex.osm_date IS 'Indicates the date of the OpenStreetMap data loaded. Recommended to set PGOSM_DATE env var at runtime, otherwise defaults to the date PgOSM-Flex was run.';
COMMENT ON COLUMN {schema_name}.pgosm_flex.osm_date IS 'Indicates the date of the OpenStreetMap data loaded. Uses timestamp from PBF file metadata when available. If metadata not available this represents --osm-date at runtime, or the date of today in timezone based on computer running import.';
COMMENT ON COLUMN {schema_name}.pgosm_flex.srid IS 'SRID of imported data.';
COMMENT ON COLUMN {schema_name}.pgosm_flex.pgosm_flex_version IS 'Version of PgOSM-Flex used to generate schema.';
COMMENT ON COLUMN {schema_name}.pgosm_flex.osm2pgsql_version IS 'Version of osm2pgsql used to load data.';
Expand Down
24 changes: 20 additions & 4 deletions docker/db.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
"""Module to interact with Postgres database.

Dynamic SQL is used in this module to allow customized schema names for storing
data. At a glance, this is vulnerable to SQLi (SQL Injection) considering the
``schema_name`` variable is technically "user input". This is not considered
a concern for this project because the user inputting the ``schema_name`` value
is considered a trusted user.
"""
import logging
import os
Expand Down Expand Up @@ -607,7 +613,7 @@ def run_pg_dump(export_path, skip_qgis_style):
fix_pg_dump_create_public(export_path)


def fix_pg_dump_create_public(export_path):
def fix_pg_dump_create_public(export_path: str):
"""Using pg_dump with `--schema=public` results in
a .sql script containing `CREATE SCHEMA public;`, nearly always breaks
in target DB. Replaces with `CREATE SCHEMA IF NOT EXISTS public;`
Expand All @@ -623,24 +629,34 @@ def fix_pg_dump_create_public(export_path):
LOGGER.debug(result)


def log_import_message(import_id, msg, schema_name):
def log_import_message(import_id: int, msg: str, schema_name: str):
"""Logs msg to database in osm.pgosm_flex for import_uuid.

Overwrites `osm_date` if `pbf_timestamp` is set.

Parameters
-------------------------------
import_id : int
msg : str
schema_name: str
"""
try:
pbf_timestamp = os.environ['PBF_TIMESTAMP']
except KeyError:
pbf_timestamp = os.environ['PGOSM_DATE']

sql_raw = """
UPDATE {schema_name}.pgosm_flex
SET import_status = %(msg)s
SET import_status = %(msg)s ,
osm_date = COALESCE( %(pbf_timestamp)s , osm_date)
WHERE id = %(import_id)s
;
"""
sql_raw = sql_raw.format(schema_name=schema_name)
with get_db_conn(conn_string=os.environ['PGOSM_CONN']) as conn:
params = {'import_id': import_id, 'msg': msg}
params = {'import_id': import_id,
'msg': msg,
'pbf_timestamp': pbf_timestamp}
cur = conn.cursor()
cur.execute(sql_raw, params=params)

Expand Down
36 changes: 36 additions & 0 deletions docker/geofabrik.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""This module handles the auto-file handling using Geofabrik's download service.
"""
import logging
import json
import os
import shutil
import subprocess
Expand Down Expand Up @@ -67,10 +68,45 @@ def prepare_data(out_path: str) -> str:
md5_file_with_date)

helpers.verify_checksum(md5_file, out_path)
set_date_from_metadata(pbf_file=pbf_file)

return pbf_file


def set_date_from_metadata(pbf_file: str):
"""Use `osmium fileinfo` to set a more accurate date to represent when it was
extracted from OpenStreetMap.

Parameters
---------------------
pbf_file : str
Full path to the `.osm.pbf` file.
"""
logger = logging.getLogger('pgosm-flex')
osmium_cmd = f'osmium fileinfo {pbf_file} --json'
output = []
returncode = helpers.run_command_via_subprocess(cmd=osmium_cmd.split(),
cwd=None,
output_lines=output,
print_to_log=False)
if returncode != 0:
logger.error(f'osmium fileinfo failed. Output: {output}')

output_joined = json.loads(''.join(output))
meta_options = output_joined['header']['option']

try:
meta_timestamp = meta_options['timestamp']
except KeyError:
try:
meta_timestamp = meta_options['osmosis_replication_timestamp']
except KeyError:
meta_timestamp = None

logger.info(f'PBF Meta timestamp: {meta_timestamp}')
os.environ['PBF_TIMESTAMP'] = meta_timestamp


def pbf_download_needed(pbf_file_with_date: str, md5_file_with_date: str,
pgosm_date: str) -> bool:
"""Decides if the PBF/MD5 files need to be downloaded.
Expand Down
193 changes: 189 additions & 4 deletions docker/helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Generic functions and attributes used in multiple modules of PgOSM Flex.
"""
import datetime
import json
import logging
from packaging.version import parse as parse_version
import subprocess
import os
import sys
Expand All @@ -25,8 +27,11 @@ def get_today() -> str:
return today


def run_command_via_subprocess(cmd: list, cwd: str, output_lines: list=[],
print: bool=False) -> int:
def run_command_via_subprocess(cmd: list,
cwd: str,
output_lines: list=[],
print_to_log: bool=False
) -> int:
"""Wraps around subprocess.Popen() to run commands outside of Python. Prints
output as it goes, returns the status code from the command.

Expand All @@ -38,7 +43,7 @@ def run_command_via_subprocess(cmd: list, cwd: str, output_lines: list=[],
Set the working directory, or to None.
output_lines : list
Pass in a list to return the output details.
print : bool
print_to_log : bool
Default False. Set to true to also print to logger

Returns
Expand All @@ -58,12 +63,23 @@ def run_command_via_subprocess(cmd: list, cwd: str, output_lines: list=[],
if output:
ln = output.strip().decode('utf-8')
output_lines.append(ln)
if print:
if print_to_log:
logger.info(ln)

# Detects issue reported in https://github.com/rustprooflabs/pgosm-flex/issues/391
# Status code is incorrectly returned is 0, cannot detect
# problem using that method so forcing failure with custom
# status code.
if 'Error during diff download. Bailing out.' in ln:
logger.error('Data in database is too far behind replication service.')
return 999

else:
# Only sleep when there wasn't output
sleep(1)

status = process.poll()

return status


Expand Down Expand Up @@ -223,3 +239,172 @@ def unset_env_vars():
os.environ.pop('PGOSM_CONN', None)
os.environ.pop('PGOSM_CONN_PG', None)
os.environ.pop('SCHEMA_NAME', None)


class ImportMode():
"""Determines logical variables used to control program flow.

WARNING: The values for `append_first_run` and `replication_update`
are used to determine when to drop the local DB. Be careful with any
changes to these values.
"""
def __init__(self, replication: bool, replication_update: bool,
update: str, force: bool):
"""Computes two variables, slim_no_drop and append_first_run
based on inputs.

Parameters
--------------------------
replication : bool
replication_update : bool
update : str or None
Valid options are 'create' or 'append', lining up with osm2pgsql's
`--create` and `--append` modes.
force : bool
"""
self.logger = logging.getLogger('pgosm-flex')
self.replication = replication
self.replication_update = replication_update

# The input via click should enforce this, still worth checking here
valid_update_options = ['append', 'create', None]

if update not in valid_update_options:
raise ValueError(f'Invalid option for --update. Valid options: {valid_update_options}')

self.update = update
self.force = force

self.set_slim_no_drop()
self.set_append_first_run()
self.set_run_post_sql()


def okay_to_run(self, prior_import: dict) -> bool:
"""Determines if it is okay to run PgOSM Flex without fear of data loss.

This logic was along with the `--force` option to make it
less likely to accidentally lose data with improper PgOSM Flex
options.

Remember, this is free and open source software and there is
no warranty!
This does not imply a guarantee that you **cannot** lose data,
only that we want to make it **less likely** something bad will happen.
If you find a way bad things can happen that could be detected here,
please open an issue:

https://github.com/rustprooflabs/pgosm-flex/issues/new?assignees=&labels=&projects=&template=bug_report.md&title=Data%20Safety%20Idea

Parameters
-------------------
prior_import : dict
Details about the latest import from osm.pgosm_flex table.

An empty dictionary (len==0) indicates no prior import.
Only the replication key is specifically used

Returns
-------------------
okay_to_run : bool
"""
self.logger.debug(f'Checking if it is okay to run...')
if self.force:
self.logger.warn(f'Using --force, kiss existing data goodbye')
return True

# If no prior imports, do not require force
if len(prior_import) == 0:
self.logger.debug(f'No prior import found, okay to proceed.')
return True

prior_replication = prior_import['replication']

# Check git version against latest.
# If current version is lower than prior version from latest import, stop.
prior_import_version = prior_import['pgosm_flex_version_no_hash']
git_tag = get_git_info(tag_only=True)

if git_tag == '-- (version unknown) --':
msg = 'Unable to detect PgOSM Flex version from Git.'
msg += ' Not enforcing version check against prior version.'
self.logger.warning(msg)
elif parse_version(git_tag) < parse_version(prior_import_version):
msg = f'PgOSM Flex version ({git_tag}) is lower than latest import'
msg += f' tracked in the pgosm_flex table ({prior_import_version}).'
msg += f' Use PgOSM Flex version {prior_import_version} or newer'
self.logger.error(msg)
return False
else:
self.logger.info(f'Prior import used PgOSM Flex: {prior_import_version}')

if self.replication:
if not prior_replication:
self.logger.error('Running w/ replication but prior import did not. Requires --force to proceed.')
return False
self.logger.debug('Okay to proceed with replication')
return True

msg = 'Prior data exists in the osm schema and --force was not used.'
self.logger.error(msg)
return False

def set_append_first_run(self):
"""Uses `replication_update` and `update` to determine value for
`self.append_first_run`
"""
if self.replication_update:
self.append_first_run = False
else:
self.append_first_run = True

if self.update is not None:
if self.update == 'create':
self.append_first_run = True
else:
self.append_first_run = False

def set_slim_no_drop(self):
"""Uses `replication` and `update` to determine value for
`self.slim_no_drop`
"""
self.slim_no_drop = False

if self.replication:
self.slim_no_drop = True

if self.update is not None:
self.slim_no_drop = True

def set_run_post_sql(self):
"""Uses `update` value to determine value for
`self.run_post_sql`. This value determines if the post-processing SQL
should be executed.

Note: Not checking replication/replication_update because subsequent
imports use osm2pgsql-replication, which does not attempt to run
the post-processing SQL scripts.
"""
self.run_post_sql = True

if self.update is not None:
if self.update == 'append':
self.run_post_sql = False

def as_json(self) -> str:
"""Packs key details as a dictionary passed through `json.dumps()`

Returns
------------------------
json_text : str
Text representation of JSON object built using class attributes.
"""
self_as_dict = {'update': self.update,
'replication': self.replication,
'replication_update': self.replication_update,
'append_first_run': self.append_first_run,
'slim_no_drop': self.slim_no_drop,
'run_post_sql': self.run_post_sql}
return json.dumps(self_as_dict)


Loading
Loading