-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathpre_request_map_clusters_dag.py
67 lines (60 loc) · 2.19 KB
/
pre_request_map_clusters_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from datetime import datetime, timedelta
from textwrap import dedent
from pprint import pprint
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import psycopg2.extras
from lib.contracts_earnings_fcc import contract_earnings_fcc
from lib.pre_request import pre_request
from lib.planter_entity import planter_entity
from airflow.models import Variable
from lib.pre_request_map_clusters import pre_request_map_clusters
from lib.utils import on_failure_callback
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 15,
'retry_delay': timedelta(seconds=1),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
'on_failure_callback': on_failure_callback, # needs to be set in default_args to work correctly: https://github.com/apache/airflow/issues/26760
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
with DAG(
'pre_request_map_clusters',
default_args=default_args,
description='Rre-request the all map cluster version 2.2',
schedule_interval= '*/5 * * * *',
start_date=datetime(2021, 1, 1),
max_active_runs=1,
catchup=False,
tags=['reporting', 'map'],
) as dag:
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
def pre_request_job(ds, **kwargs):
print("do pre request job:")
pre_request_map_clusters("http://treetracker-tile-server.tile-server.svc.cluster.local")
return 1
pre_request_map_cluster_job = PythonOperator(
task_id='pre_request_map_cluster',
python_callable=pre_request_job,
)
pre_request_map_cluster_job >> t1