Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 22b3e5a

Browse files
authored
Merge pull request #5 from PageUpPeopleOrg/feature/OSC-1137-persist-model-checksums
[OSC-1136, OSC-1137] - add 'compare' and 'complete' commands
2 parents b3b4598 + d7d1a83 commit 22b3e5a

13 files changed

+286
-101
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,6 @@ venv.bak/
107107
.vscode/
108108

109109
# JetBrains
110-
.idea/
110+
.idea/
111+
112+
test-models/

README.md

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,20 @@ py mcd.py [options] <command> [command-parameters]
1313
- `options` include:
1414
- `--help | -h`: displays help menu.
1515
- `--log-level | -l`: choose program's logging level, from CRITICAL, ERROR, WARNING, INFO, DEBUG; default is INFO.
16+
- `db-connection-string`: a [PostgreSQL Db Connection String](http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.psycopg2) of the format `postgresql+psycopg2://user:password@host:port/dbname`
1617
- `command` is the function to be performed by the utility. The currently supported values are:
17-
- `start`: Marks the start of a new execution by creating a record for the same in the given database. Returns an `execution-id` which is a GUID identifier of the new execution.
18-
- `db-connection-string`: a [PostgreSQL Db Connection String](http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.psycopg2) of the format `postgresql+psycopg2://user:password@host:port/dbname`
19-
- `finish`: Marks the completion of an existing execution by updating a record for the same in the given database. Returns nothing unless there's an error.
20-
- `db-connection-string`: a [PostgreSQL Db Connection String](http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.psycopg2) of the format `postgresql+psycopg2://user:password@host:port/dbname`
21-
- `execution-id`: a GUID identifier of an existing data pipeline execution.
18+
- `init`: Marks the start of a new execution by creating a record for the same in the given database. Returns an `execution-id` which is a GUID identifier of the new execution.
19+
- `compare`: Compares & persists SHA256-hashed checksums of the given models against those of the last successful execution. Returns comma-separated string of changed model names.
20+
- `execution-id`: a GUID identifier of an existing data pipeline execution as returned by the `init` command.
21+
- `model-type`: type of models being processed e.g.: `load`, `transform`, etc. this `model-type` is used to group the model checksums by and used to find and compare older ones.
22+
- `base-path`: absolute or relative path to the models e.g.: `./load`, `/home/local/load`, `C:/path/to/load`
23+
- `model-patterns`: path-based patterns _(relative to `base-path`)_ to different models with extensions. models within a model-type must be named uniquely regardless of their file extension. e.g.: `*.txt`, `**/*.txt`, `./relative/path/to/some_models/**/*.csv`, `relative/path/to/some/more/related/models/**/*.sql`
24+
- `complete`: Marks the completion of an existing execution by updating a record for the same in the given database. Returns nothing unless there's an error.
25+
- `execution-id`: a GUID identifier of an existing data pipeline execution as returned by the `init` command.
2226

2327
To get help,use:
2428

25-
```
29+
```commandline
2630
py mcd.py --help
2731
py mcd.py <command> --help
2832
```
@@ -41,7 +45,10 @@ new-env\scripts\activate
4145
4246
py -m pip install -r requirements.txt
4347
44-
py mcd.py start postgresql+psycopg2://user:password@host:port/dbname
48+
py mcd.py postgresql+psycopg2://user:password@host:port/dbname init
49+
py mcd.py postgresql+psycopg2://user:password@host:port/dbname compare execution-id-as-retured-by-init-command load ./relative/path/to/load/models **/*.json
50+
py mcd.py postgresql+psycopg2://user:password@host:port/dbname compare execution-id-as-retured-by-init-command transform C:/absolute/path/to/transform/models group1/*.csv ./group2/**/*.sql
51+
py mcd.py postgresql+psycopg2://user:password@host:port/dbname complete execution-id-as-retured-by-init-command
4552
```
4653

4754
### As a package
@@ -64,7 +71,10 @@ new-env\scripts\activate
6471
6572
pip install -e git+git://github.com/PageUpPeopleOrg/model-change-detector.git#egg=mcd
6673
67-
py -m mcd start postgresql+psycopg2://user:password@host:port/dbname
74+
py -m mcd postgresql+psycopg2://user:password@host:port/dbname init
75+
py -m mcd postgresql+psycopg2://user:password@host:port/dbname compare execution-id-as-retured-by-init-command load ./relative/path/to/load/models **/*.json
76+
py -m mcd postgresql+psycopg2://user:password@host:port/dbname compare execution-id-as-retured-by-init-command transform C:/absolute/path/to/transform/models group1/*.csv ./group2/**/*.sql
77+
py -m mcd postgresql+psycopg2://user:password@host:port/dbname complete execution-id-as-retured-by-init-command
6878
```
6979

7080
## Setup
@@ -113,7 +123,7 @@ On Linux / Mac OS
113123

114124
You should see the name of your virtual environment in brackets on your terminal line, e.g.:
115125

116-
```
126+
```commandline
117127
C:\path\to\working\dir: new-env\scripts\activate
118128
(new-env) C:\path\to\working\dir: _
119129
```

modules/DataPipelineExecutionRepository.py

Lines changed: 0 additions & 40 deletions
This file was deleted.

modules/DataRepository.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
from sqlalchemy import desc
2+
3+
from modules import Shared
4+
from modules.BaseObject import BaseObject
5+
from modules.Shared import Constants
6+
from modules.entities.DataPipelineExecutionEntity import DataPipelineExecutionEntity
7+
from modules.entities.ModelChecksumEntity import ModelChecksumEntity
8+
9+
10+
class DataRepository(BaseObject):
11+
def __init__(self, session_maker, logger=None):
12+
super().__init__(logger)
13+
self.session_maker = session_maker
14+
15+
def ensure_schema_exists(self, engine):
16+
engine.execute(f'CREATE SCHEMA IF NOT EXISTS {Constants.DATA_PIPELINE_EXECUTION_SCHEMA_NAME}')
17+
Shared.BaseEntity.metadata.create_all(engine)
18+
19+
def initialise_execution(self):
20+
session = self.session_maker()
21+
22+
data_pipeline_execution = DataPipelineExecutionEntity()
23+
session.add(data_pipeline_execution)
24+
25+
session.commit()
26+
return data_pipeline_execution
27+
28+
def get_last_successful_models(self, model_type):
29+
last_successful_models = {}
30+
session = self.session_maker()
31+
32+
last_successful_execution = session.query(DataPipelineExecutionEntity) \
33+
.filter_by(status=Constants.DataPipelineExecutionStatus.COMPLETED) \
34+
.order_by(desc(DataPipelineExecutionEntity.last_updated_on)) \
35+
.order_by(desc(DataPipelineExecutionEntity.created_on)) \
36+
.first()
37+
38+
if last_successful_execution is None:
39+
return last_successful_models
40+
41+
previous_model_checksums = session.query(ModelChecksumEntity) \
42+
.filter_by(execution_id=last_successful_execution.id, type=model_type)
43+
44+
for model_checksum_entity in previous_model_checksums:
45+
last_successful_models[model_checksum_entity.name] = model_checksum_entity.checksum
46+
47+
return last_successful_models
48+
49+
def save_execution_progress(self, execution_id, model_type, model_checksums):
50+
session = self.session_maker()
51+
52+
data_pipeline_execution = session.query(DataPipelineExecutionEntity) \
53+
.filter_by(id=execution_id) \
54+
.one()
55+
56+
for model, checksum in sorted(model_checksums.items()):
57+
model_checksum_entity = ModelChecksumEntity(execution_id=data_pipeline_execution.id,
58+
type=model_type,
59+
name=model,
60+
checksum=checksum)
61+
session.add(model_checksum_entity)
62+
63+
session.commit()
64+
return data_pipeline_execution
65+
66+
def complete_execution(self, execution_id):
67+
session = self.session_maker()
68+
69+
data_pipeline_execution = session.query(DataPipelineExecutionEntity) \
70+
.filter_by(id=execution_id) \
71+
.one()
72+
data_pipeline_execution.status = Constants.DataPipelineExecutionStatus.COMPLETED
73+
74+
session.commit()
75+
return data_pipeline_execution

modules/ModelChangeDetector.py

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import argparse
22
import logging
3+
34
from modules import Shared
4-
from modules.Shared import Constants
55
from modules.BaseObject import BaseObject
6-
from modules.commands.StartCommand import StartCommand
7-
from modules.commands.FinishCommand import FinishCommand
6+
from modules.Shared import Constants
7+
from modules.commands.CompareCommand import CompareCommand
8+
from modules.commands.CompleteCommand import CompleteCommand
9+
from modules.commands.InitialiseCommand import InitialiseCommand
810

911

1012
class ModelChangeDetector(BaseObject):
@@ -19,41 +21,63 @@ def __init__(self, logger=None):
1921

2022
self.args.func()
2123

22-
def __process_start_command(self):
23-
StartCommand(self.args.db_connection_string).execute()
24+
def __process_init_command(self):
25+
InitialiseCommand(self.args.db_connection_string).execute()
26+
27+
def __process_compare_command(self):
28+
CompareCommand(self.args.db_connection_string, self.args.execution_id, self.args.model_type,
29+
self.args.base_path, self.args.model_patterns).execute()
2430

25-
def __process_finish_command(self):
26-
FinishCommand(self.args.db_connection_string, self.args.execution_id).execute()
31+
def __process_complete_command(self):
32+
CompleteCommand(self.args.db_connection_string, self.args.execution_id).execute()
2733

2834
def __get_arguments(self):
2935
parser = argparse.ArgumentParser(description=Constants.APP_NAME,
30-
usage='mcd [options] <command> [command-parameters]\n\n'
36+
usage='mcd [options] <db-connection-string> <command> [command-parameters]\n\n'
3137
'To see help text, you can run\n'
3238
' mcd --help\n'
33-
' mcd <command> --help',
39+
' mcd <db-connection-string> <command> --help\n\n',
3440
parents=[Shared.get_default_arguments()])
3541

42+
parser.add_argument('db_connection_string',
43+
metavar='db-connection-string',
44+
help='provide in PostgreSQL & Psycopg format, '
45+
'postgresql+psycopg2://username:password@host:port/dbname')
46+
3647
subparsers = parser.add_subparsers(title='commands', metavar='', dest='command')
3748

38-
start_command_parser = subparsers.add_parser('start', help='help text for \'start\' command')
39-
start_command_parser.set_defaults(func=self.__process_start_command)
40-
self.__get_default_command_arguments(start_command_parser)
49+
init_command_parser = subparsers.add_parser('init', help='initialises a new data pipeline execution')
50+
init_command_parser.set_defaults(func=self.__process_init_command)
51+
52+
compare_command_parser = subparsers.add_parser('compare', help='compares given models with those of the last '
53+
'successfully processed data pipeline '
54+
'execution. also persists given models against '
55+
'the given data pipeline execution.')
56+
compare_command_parser.set_defaults(func=self.__process_compare_command)
57+
compare_command_parser.add_argument('execution_id',
58+
metavar='execution-id',
59+
help='data pipeline execution id as received using \'init\' command')
60+
compare_command_parser.add_argument('model_type',
61+
metavar='model-type',
62+
help='a string name for the type of models to compare. used to group '
63+
'models between various calls to this command for same data pipeline '
64+
'execution. e.g. load, transform')
65+
compare_command_parser.add_argument('base_path',
66+
metavar='base-path',
67+
help='absolute or relative path to the base directory of all models')
68+
compare_command_parser.add_argument('model_patterns',
69+
metavar='model-patterns',
70+
nargs='+',
71+
help='one or more unix-style search patterns for model files. e.g.: '
72+
'*.txt, **/*.json, ./path/to/some_models/**/*.csv, '
73+
'path/to/some/more/related/models/**/*.sql')
4174

42-
finish_command_parser = subparsers.add_parser('finish', help='help text for \'finish\' command')
43-
finish_command_parser.set_defaults(func=self.__process_finish_command)
44-
self.__get_default_command_arguments(finish_command_parser)
45-
finish_command_parser.add_argument('execution_id',
46-
metavar='execution_id',
47-
help='data pipeline execution id as received using \'start\' command')
75+
complete_command_parser = subparsers.add_parser('complete', help='completees the given data pipeline execution.')
76+
complete_command_parser.set_defaults(func=self.__process_complete_command)
77+
complete_command_parser.add_argument('execution_id',
78+
metavar='execution-id',
79+
help='data pipeline execution id as received using \'init\' command')
4880

4981
args = parser.parse_args()
5082

5183
return args
52-
53-
@staticmethod
54-
def __get_default_command_arguments(command_parser):
55-
command_parser.add_argument('db_connection_string',
56-
metavar='db-connection-string',
57-
help='provide in PostgreSQL & Psycopg format, '
58-
'postgresql+psycopg2://username:password@host:port/dbname')
59-

modules/Shared.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
import logging
22
import argparse
3+
from sqlalchemy.ext.declarative import declarative_base
4+
5+
BaseEntity = declarative_base()
36

47

58
class Constants:
69
APP_NAME = 'model-change-detector'
710
DATA_PIPELINE_EXECUTION_SCHEMA_NAME = 'data_pipeline'
811

912
class DataPipelineExecutionStatus:
10-
STARTED = 'STARTED'
11-
COMPLETED_SUCCESSFULLY = 'SUCCESSFUL'
13+
INITIALISED = 'INITIALISED'
14+
COMPLETED = 'COMPLETED'
1215

1316

1417
_logLevelStrings = [logging.getLevelName(logging.CRITICAL),

modules/commands/BaseCommand.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from sqlalchemy import create_engine
22
from sqlalchemy.orm import sessionmaker
3-
from modules.DataPipelineExecutionRepository import DataPipelineExecutionRepository
3+
from modules.DataRepository import DataRepository
44
from modules.BaseObject import BaseObject
55

66

@@ -9,5 +9,5 @@ def __init__(self, db_connection_string, logger=None):
99
super().__init__(logger)
1010
self.db_engine = create_engine(db_connection_string, echo=False)
1111
self.session_maker = sessionmaker(bind=self.db_engine)
12-
self.repository = DataPipelineExecutionRepository(self.session_maker)
13-
self.repository.create_schema(engine=self.db_engine)
12+
self.repository = DataRepository(self.session_maker)
13+
self.repository.ensure_schema_exists(engine=self.db_engine)

modules/commands/CompareCommand.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import hashlib
2+
from pathlib import Path
3+
from modules.commands.BaseCommand import BaseCommand
4+
5+
6+
class CompareCommand(BaseCommand):
7+
def __init__(self, db_connection_string, execution_id, model_type, base_path, model_patterns, logger=None):
8+
super().__init__(db_connection_string, logger)
9+
self._execution_id = execution_id
10+
self._model_type = model_type
11+
self._base_path = base_path
12+
self._model_patterns = model_patterns
13+
self._changed_models_separator = ','
14+
15+
def execute(self):
16+
model_folder = Path(self._base_path)
17+
if not model_folder.is_dir():
18+
raise NotADirectoryError(self._base_path)
19+
20+
current_model_checksums = {}
21+
for model_pattern in self._model_patterns:
22+
for model_file in model_folder.glob(model_pattern):
23+
if model_file.is_file():
24+
current_model_checksums[model_file.stem] = self.__get_file_checksum(model_file)
25+
26+
data_pipeline_execution = self.repository.save_execution_progress(self._execution_id, self._model_type, current_model_checksums)
27+
self.logger.debug(f'Comparing data_pipeline_execution = ${str(data_pipeline_execution)}')
28+
29+
previous_model_checksums = self.repository.get_last_successful_models(self._model_type)
30+
31+
if len(previous_model_checksums) == 0:
32+
print('*')
33+
self.logger.debug(f'Changed models: ALL')
34+
return
35+
36+
changed_models = []
37+
for model, current_checksum in current_model_checksums.items():
38+
if model not in previous_model_checksums or previous_model_checksums[model] != current_checksum:
39+
changed_models.append(model)
40+
41+
print(self._changed_models_separator.join(changed_models))
42+
self.logger.debug(f'Changed models: \'${str(changed_models)}\'')
43+
44+
def __get_file_checksum(self, file: Path):
45+
data = file.read_bytes()
46+
hash_function = hashlib.sha256()
47+
hash_function.update(data)
48+
checksum = hash_function.hexdigest()
49+
self.logger.debug(f'filename={file.name}, filepath=\'{file.absolute().as_posix()}\'')
50+
self.logger.debug(f'hash_function={hash_function.name}, checksum_len={len(checksum)}, checksum={checksum}')
51+
return checksum
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import hashlib
2+
from pathlib import Path
3+
from modules.commands.BaseCommand import BaseCommand
4+
5+
6+
class CompleteCommand(BaseCommand):
7+
def __init__(self, db_connection_string, execution_id, logger=None):
8+
super().__init__(db_connection_string, logger)
9+
self._execution_id = execution_id
10+
11+
def execute(self):
12+
data_pipeline_execution = self.repository.complete_execution(self._execution_id)
13+
self.logger.debug('Completed data_pipeline_execution = ' + str(data_pipeline_execution))

0 commit comments

Comments
 (0)