-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathmysecond.py
74 lines (66 loc) · 2.23 KB
/
mysecond.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from lib.utils import on_failure_callback
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
'on_failure_callback': on_failure_callback, # needs to be set in default_args to work correctly: https://github.com/apache/airflow/issues/26760
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
dag = DAG(
'myfirst',
default_args=default_args,
description='First attempt to process a tree',
)
from types import SimpleNamespace
def allow_conf_testing(func):
def wrapper(*args, **kwargs):
if kwargs.get('test_mode', False):
kwargs['dag_run'] = SimpleNamespace(conf=kwargs.get('params', {}))
func(*args, **kwargs)
return wrapper
templated_command = """
echo "hi"
"""
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='echo',
bash_command=templated_command,
dag=dag,
)
@allow_conf_testing
def print_context(ds, **kwargs):
print(kwargs)
print(ds)
print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
return 'Whatever you return gets printed in the logs'
t2 = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag,
)