Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a simple pipeline unit test without mara db #71

Merged
merged 4 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions mara_pipelines/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ def run_pipeline(pipeline: pipelines.Pipeline, nodes: {pipelines.Node} = None,
# A queue for receiving events from forked sub processes
event_queue = multiprocessing_context.Queue()

# The historical node cost is taken from the 'mara' db. If the 'mara' datatabase is not defined
# (e.g. in testing scenarios) we do not use the node cost.
import mara_db.config
use_historical_node_cost = 'mara' in mara_db.config.databases()
if not use_historical_node_cost:
print(f"[WARNING] The 'mara' database is not defined. The historical node costs are not used, the execution path might be inefficient.", file=sys.stderr)

leo-schick marked this conversation as resolved.
Show resolved Hide resolved
# The function that is run in a sub process
def run():

Expand All @@ -64,7 +71,7 @@ def run():
node_queue: [pipelines.Node] = []

# data needed for computing cost
node_durations_and_run_times = node_cost.node_durations_and_run_times(pipeline)
node_durations_and_run_times = node_cost.node_durations_and_run_times(pipeline) if use_historical_node_cost else {}

# Putting nodes into the node queue
def queue(nodes: [pipelines.Node]):
Expand Down Expand Up @@ -207,7 +214,8 @@ def track_finished_pipelines():
next_node.add_dependency(pipeline_node, downstream)

# get cost information for children
node_durations_and_run_times.update(node_cost.node_durations_and_run_times(next_node))
if use_historical_node_cost:
node_durations_and_run_times.update(node_cost.node_durations_and_run_times(next_node))

# queue all child nodes
queue(list(next_node.nodes.values()))
Expand Down Expand Up @@ -314,7 +322,11 @@ def track_finished_pipelines():
run_process = multiprocessing_context.Process(target=run, name='pipeline-' + '-'.join(pipeline.path()))
run_process.start()

runlogger = run_log.RunLogger()
if 'mara' in mara_db.config.databases():
runlogger = run_log.RunLogger()
else:
runlogger = run_log.NullLogger()
print(f"[WARNING] The events of the pipeline execution are not saved in a db", file=sys.stderr)

# make sure that we close this run (if still open) as failed when we close this python process
# On SIGKILL we will still leave behind open runs...
Expand Down
22 changes: 20 additions & 2 deletions mara_pipelines/logging/run_log.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
"""Logging pipeline runs, node output and status information in mara database"""

import psycopg2.extensions
import sqlalchemy.orm
from sqlalchemy.ext.declarative import declarative_base

import mara_db.postgresql
from .. import config
from ..logging import pipeline_events, system_statistics
from .. import events
Expand Down Expand Up @@ -73,6 +71,15 @@ def close_open_run_after_error(run_id: int):
"""Closes all open run and node_run for this run_id as failed"""
if run_id is None:
return

import mara_db.config

if 'mara' not in mara_db.config.databases():
return

import psycopg2.extensions
import mara_db.postgresql

_close_run = f'''
UPDATE data_integration_run
SET end_time = now(), succeeded = FALSE
Expand All @@ -98,11 +105,22 @@ def close_open_run_after_error(run_id: int):
print(f'Cleaned up open runs/node_runs (run_id = {run_id})')


class NullLogger(events.EventHandler):
"""A run logger not handling events"""
run_id: int = None
leo-schick marked this conversation as resolved.
Show resolved Hide resolved

def handle_event(self, event: events.Event):
pass


class RunLogger(events.EventHandler):
"""A run logger saving the pipeline events to the 'mara' database alias"""
run_id: int = None
node_output: {tuple: [pipeline_events.Output]} = None

def handle_event(self, event: events.Event):
import psycopg2.extensions
import mara_db.postgresql

if isinstance(event, pipeline_events.RunStarted):
with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor
Expand Down
6 changes: 6 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ def get_long_description():
'requests>=2.19.1'
],

extras_require={
'test': [
'pytest',
'mara_app>=1.5.2'],
},

setup_requires=['setuptools_scm'],
include_package_data=True,

Expand Down
26 changes: 26 additions & 0 deletions tests/test_execute_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pytest

from mara_app.monkey_patch import patch

import mara_db.config
patch(mara_db.config.databases)(lambda: {})


def test_execute_without_db():
from mara_pipelines.commands.python import RunFunction
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.ui.cli import run_pipeline

pipeline = Pipeline(
id='test_execute_without_db',
description="Tests if a pipeline can be executed without database")

def command_function() -> bool:
return True

pipeline.add(
Task(id='run_python_function',
description="Runs a sample python function",
commands=[RunFunction(function=command_function)]))

assert run_pipeline(pipeline)