Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit b3b4598

Browse files
authored
Merge pull request #3 from PageUpPeopleOrg/feature/OSC-986-finish-execution
feature/OSC-986 add support to finish a data pipeline execution
2 parents 4cf3072 + 6f31c53 commit b3b4598

File tree

8 files changed

+81
-103
lines changed

8 files changed

+81
-103
lines changed

README.md

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,36 @@
22

33
## About
44

5-
A utility that detects changes in models.
5+
A utility that detects changes in models.
66

77
## Usage
88

99
```commandline
10-
py mcd.py <command> <db-connection-string> [--help] [--log-level]
10+
py mcd.py [options] <command> [command-parameters]
1111
```
1212

13-
- `command` is the function to be performed by the utility. The currently supported values are
14-
- `START`: Marks the start of a new execution by creating a record for the same in the given database and returns an ID of the new execution.
15-
- `db-connection-string` is a [PostgreSQL Db Connection String](http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.psycopg2) of the format `postgresql+psycopg2://user:password@host:port/dbname`
13+
- `options` include:
14+
- `--help | -h`: displays help menu.
15+
- `--log-level | -l`: choose program's logging level, from CRITICAL, ERROR, WARNING, INFO, DEBUG; default is INFO.
16+
- `command` is the function to be performed by the utility. The currently supported values are:
17+
- `start`: Marks the start of a new execution by creating a record for the same in the given database. Returns an `execution-id` which is a GUID identifier of the new execution.
18+
- `db-connection-string`: a [PostgreSQL Db Connection String](http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.psycopg2) of the format `postgresql+psycopg2://user:password@host:port/dbname`
19+
- `finish`: Marks the completion of an existing execution by updating a record for the same in the given database. Returns nothing unless there's an error.
20+
- `db-connection-string`: a [PostgreSQL Db Connection String](http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.psycopg2) of the format `postgresql+psycopg2://user:password@host:port/dbname`
21+
- `execution-id`: a GUID identifier of an existing data pipeline execution.
22+
23+
To get help,use:
24+
25+
```
26+
py mcd.py --help
27+
py mcd.py <command> --help
28+
```
1629

1730
### As a script
1831

1932
- Use a local isolated/virtual python environment for this project
2033
- Install project dependencies
21-
- `py mcd.py <command> <db-connection-string> [--help] [--log-level]`
34+
- `py mcd.py [options] <command> [command-parameters]`
2235

2336
_Windows example:_
2437

@@ -28,7 +41,7 @@ new-env\scripts\activate
2841
2942
py -m pip install -r requirements.txt
3043
31-
py mcd.py START postgresql+psycopg2://user:password@host:port/dbname
44+
py mcd.py start postgresql+psycopg2://user:password@host:port/dbname
3245
```
3346

3447
### As a package
@@ -38,7 +51,7 @@ py mcd.py START postgresql+psycopg2://user:password@host:port/dbname
3851
- [Install](https://pip.pypa.io/en/stable/reference/pip_install/#editable-installs) this package
3952
- `pip install -e path/to/ProjectX`
4053
- `pip install -e git+git://github.com/ProjectX.git#egg=ProjectX`
41-
- `py -m mcd <command> <db-connection-string> [--help] [--log-level]`
54+
- `py -m mcd [options] <command> <command-options>`
4255

4356
_Windows example:_
4457

@@ -51,15 +64,15 @@ new-env\scripts\activate
5164
5265
pip install -e git+git://github.com/PageUpPeopleOrg/model-change-detector.git#egg=mcd
5366
54-
py -m mcd START postgresql+psycopg2://user:password@host:port/dbname
67+
py -m mcd start postgresql+psycopg2://user:password@host:port/dbname
5568
```
5669

5770
## Setup
5871

5972
1. Install pre-requisites
6073
2. Use a local isolated/virtual python environment for this project
6174
3. Install project dependencies
62-
4. Develop and test code changes
75+
4. Develop and test code changes
6376
5. Once done, deactivate the virtual environment
6477

6578
### Install pre-requisites
@@ -96,13 +109,15 @@ On Windows:
96109

97110
On Linux / Mac OS
98111

99-
`source path/to/environment/bin/activate` _e.g._ `source new-env/bin/activate`
112+
`source path/to/environment/bin/activate` _e.g._ `source new-env/bin/activate`
113+
114+
You should see the name of your virtual environment in brackets on your terminal line, e.g.:
100115

101-
You should see the name of your virtual environment in brackets on your terminal line, e.g.:
102116
```
103117
C:\path\to\working\dir: new-env\scripts\activate
104118
(new-env) C:\path\to\working\dir: _
105119
```
120+
106121
Any python commands you use will now, work within your virtual environment only.
107122

108123
### Install project dependencies
@@ -113,7 +128,7 @@ pip install -r requirements.txt
113128

114129
### Deactivate the virtual environment
115130

116-
Once done, deactivate the virtual environment with a simple `decativate` command, e.g.:
131+
Once done, deactivate the virtual environment with a simple `decativate` command, e.g.:
117132

118133
```commandline
119134
(new-env) C:\path\to\working\dir: deactivate

mcd.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
from modules.ModelChangeDetector import ModelChangeDetector
22

33
if __name__ == "__main__":
4-
ModelChangeDetector().main()
4+
ModelChangeDetector()

modules/DataPipelineExecutionRepository.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,13 @@ def get_last_successful_data_load_execution(self):
2828
.order_by(desc(DataPipelineExecutionEntity.last_updated_on))\
2929
.first()
3030

31+
def finish_existing(self, execution_id):
32+
session = self.session_maker()
33+
data_pipeline_execution = session.query(DataPipelineExecutionEntity)\
34+
.filter_by(id=execution_id)\
35+
.first()
36+
data_pipeline_execution.status = Constants.DataPipelineExecutionStatus.COMPLETED_SUCCESSFULLY
37+
session.commit()
38+
return data_pipeline_execution
39+
40+

modules/ModelChangeDetector.py

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,52 +2,58 @@
22
import logging
33
from modules import Shared
44
from modules.Shared import Constants
5-
from modules.commands import Commands
6-
from modules.commands.CommandFactory import CommandFactory
75
from modules.BaseObject import BaseObject
6+
from modules.commands.StartCommand import StartCommand
7+
from modules.commands.FinishCommand import FinishCommand
88

99

1010
class ModelChangeDetector(BaseObject):
11-
12-
_commandNames = [Commands.get_name(Commands.START)]
13-
1411
def __init__(self, logger=None):
15-
self.args = self.get_arguments()
12+
self.args = self.__get_arguments()
1613
Shared.configure_root_logger(self.args.log_level)
1714

1815
super().__init__(logger)
1916

2017
self.logger.debug(self.args)
2118
self.logger.debug(f'args.log_level = {self.args.log_level} = {logging.getLevelName(self.args.log_level)}')
2219

23-
self.command_factory = CommandFactory()
20+
self.args.func()
21+
22+
def __process_start_command(self):
23+
StartCommand(self.args.db_connection_string).execute()
2424

25-
def main(self):
26-
command_executor = self.command_factory.create_command(self.args.command, self.args.db_connection_string)
27-
command_executor.execute()
25+
def __process_finish_command(self):
26+
FinishCommand(self.args.db_connection_string, self.args.execution_id).execute()
2827

29-
def get_arguments(self):
28+
def __get_arguments(self):
3029
parser = argparse.ArgumentParser(description=Constants.APP_NAME,
30+
usage='mcd [options] <command> [command-parameters]\n\n'
31+
'To see help text, you can run\n'
32+
' mcd --help\n'
33+
' mcd <command> --help',
3134
parents=[Shared.get_default_arguments()])
3235

33-
parser.add_argument('command',
34-
type=self.get_command_value_from_name,
35-
help=f'choose from {", ".join(self._commandNames)}, more coming soon..')
36+
subparsers = parser.add_subparsers(title='commands', metavar='', dest='command')
3637

37-
parser.add_argument('db_connection_string',
38-
metavar='db-connection-string',
39-
help='provide in PostgreSQL & Psycopg format, '
40-
'postgresql+psycopg2://username:password@host:port/dbname')
38+
start_command_parser = subparsers.add_parser('start', help='help text for \'start\' command')
39+
start_command_parser.set_defaults(func=self.__process_start_command)
40+
self.__get_default_command_arguments(start_command_parser)
41+
42+
finish_command_parser = subparsers.add_parser('finish', help='help text for \'finish\' command')
43+
finish_command_parser.set_defaults(func=self.__process_finish_command)
44+
self.__get_default_command_arguments(finish_command_parser)
45+
finish_command_parser.add_argument('execution_id',
46+
metavar='execution_id',
47+
help='data pipeline execution id as received using \'start\' command')
4148

4249
args = parser.parse_args()
4350

4451
return args
4552

46-
def get_command_value_from_name(self, command_name):
47-
if command_name not in self._commandNames:
48-
message = f'invalid choice: {command_name} (choose from {", ".join(self._commandNames)})'
49-
raise argparse.ArgumentTypeError(message)
50-
51-
command_value = getattr(Commands, command_name, Commands.UNKNOWN)
53+
@staticmethod
54+
def __get_default_command_arguments(command_parser):
55+
command_parser.add_argument('db_connection_string',
56+
metavar='db-connection-string',
57+
help='provide in PostgreSQL & Psycopg format, '
58+
'postgresql+psycopg2://username:password@host:port/dbname')
5259

53-
return command_value

modules/commands/CommandFactory.py

Lines changed: 0 additions & 16 deletions
This file was deleted.

modules/commands/Commands.py

Lines changed: 0 additions & 43 deletions
This file was deleted.

modules/commands/FinishCommand.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from modules.commands.BaseCommand import BaseCommand
2+
3+
4+
class FinishCommand(BaseCommand):
5+
def __init__(self, db_connection_string, execution_id, logger=None):
6+
super().__init__(db_connection_string, logger)
7+
self._executionId = execution_id
8+
9+
def execute(self):
10+
data_pipeline_execution = self.repository.finish_existing(self._executionId)
11+
self.logger.debug('Finised data_pipeline_execution = ' + str(data_pipeline_execution))

modules/commands/StartCommand.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,10 @@
1-
from modules.commands import Commands
21
from modules.commands.BaseCommand import BaseCommand
32

43

54
class StartCommand(BaseCommand):
65
def __init__(self, db_connection_string, logger=None):
76
super().__init__(db_connection_string, logger)
87

9-
@staticmethod
10-
def can_execute_command(command):
11-
return command == Commands.START
12-
138
def execute(self):
149
data_pipeline_execution = self.repository.start_new()
1510
self.logger.debug('Started new data_pipeline_execution = ' + str(data_pipeline_execution))

0 commit comments

Comments
 (0)