|
1 | 1 | # Airflow DAG to index Web Content into SolrCloud.
|
2 | 2 | from datetime import datetime, timedelta
|
| 3 | +from tulflow import tasks |
3 | 4 | import airflow
|
4 | 5 | from airflow.models import Variable
|
5 | 6 | from airflow.hooks.base_hook import BaseHook
|
6 | 7 | from airflow.operators.bash_operator import BashOperator
|
7 | 8 | from airflow.operators.python_operator import PythonOperator
|
8 | 9 | from cob_datapipeline.task_slack_posts import web_content_slackpostonsuccess
|
9 | 10 | from cob_datapipeline.task_sc_get_num_docs import task_solrgetnumdocs
|
10 |
| -from tulflow import tasks |
| 11 | +from cob_datapipeline.operators import\ |
| 12 | + PushVariable, DeleteAliasListVariable, DeleteCollectionListVariable |
11 | 13 |
|
12 | 14 | """
|
13 | 15 | INIT SYSTEMWIDE VARIABLES
|
|
22 | 24 |
|
23 | 25 | # Get Solr URL & Collection Name for indexing info; error out if not entered
|
24 | 26 | SOLR_CONN = BaseHook.get_connection("SOLRCLOUD")
|
25 |
| -WEB_CONTENT_SOLR_CONFIG = Variable.get("WEB_CONTENT_SOLR_CONFIG", deserialize_json=True) |
| 27 | +SOLR_CONFIG = Variable.get("WEB_CONTENT_SOLR_CONFIG", deserialize_json=True) |
26 | 28 | # {"configset": "tul_cob-web-2", "replication_factor": 2}
|
27 |
| -CONFIGSET = WEB_CONTENT_SOLR_CONFIG.get("configset") |
| 29 | +CONFIGSET = SOLR_CONFIG.get("configset") |
28 | 30 | ALIAS = CONFIGSET + "-prod"
|
29 |
| -REPLICATION_FACTOR = WEB_CONTENT_SOLR_CONFIG.get("replication_factor") |
| 31 | +REPLICATION_FACTOR = SOLR_CONFIG.get("replication_factor") |
30 | 32 | WEB_CONTENT_BRANCH = Variable.get("WEB_CONTENT_PROD_BRANCH")
|
31 | 33 |
|
32 | 34 | # Manifold website creds
|
|
109 | 111 | ALIAS
|
110 | 112 | )
|
111 | 113 |
|
| 114 | +PUSH_ALIAS = PushVariable( |
| 115 | + task_id="push_alias", |
| 116 | + name="WEB_CONTENT_QA_ALIASES", |
| 117 | + value=ALIAS, |
| 118 | + dag=DAG) |
| 119 | + |
| 120 | +DELETE_ALIAS = DeleteAliasListVariable( |
| 121 | + task_id="delete_aliases", |
| 122 | + solr_conn_id='SOLRCLOUD', |
| 123 | + list_variable="WEB_CONTENT_QA_ALIASES", |
| 124 | + skip_from_last=2, |
| 125 | + skip_included=[ALIAS], |
| 126 | + dag=DAG) |
| 127 | + |
| 128 | +PUSH_COLLECTION = PushVariable( |
| 129 | + task_id="push_collection", |
| 130 | + name="WEB_CONTENT_QA_COLLECTIONS", |
| 131 | + value=CONFIGSET +"-{{ ti.xcom_pull(task_ids='set_collection_name') }}", |
| 132 | + dag=DAG) |
| 133 | + |
| 134 | +DELETE_COLLECTIONS = DeleteCollectionListVariable( |
| 135 | + task_id="delete_collections", |
| 136 | + solr_conn_id='SOLRCLOUD', |
| 137 | + list_variable="WEB_CONTENT_QA_COLLECTIONS", |
| 138 | + skip_from_last=2, |
| 139 | + skip_included=[CONFIGSET +"-{{ ti.xcom_pull(task_ids='set_collection_name') }}"], |
| 140 | + dag=DAG) |
| 141 | + |
112 | 142 | POST_SLACK = PythonOperator(
|
113 | 143 | task_id='slack_post_succ',
|
114 | 144 | python_callable=web_content_slackpostonsuccess,
|
|
122 | 152 | INDEX_WEB_CONTENT.set_upstream(CREATE_COLLECTION)
|
123 | 153 | GET_NUM_SOLR_DOCS_POST.set_upstream(INDEX_WEB_CONTENT)
|
124 | 154 | SOLR_ALIAS_SWAP.set_upstream(GET_NUM_SOLR_DOCS_POST)
|
125 |
| -POST_SLACK.set_upstream(SOLR_ALIAS_SWAP) |
| 155 | +PUSH_ALIAS.set_upstream(SOLR_ALIAS_SWAP) |
| 156 | +DELETE_ALIAS.set_upstream(PUSH_ALIAS) |
| 157 | +PUSH_COLLECTION.set_upstream(DELETE_ALIAS) |
| 158 | +DELETE_COLLECTIONS.set_upstream(PUSH_COLLECTION) |
| 159 | +POST_SLACK.set_upstream(DELETE_COLLECTIONS) |
0 commit comments