Skip to content

Commit 4e5e858

Browse files
authored
revert big list update (#1683)
* Revert "update ingest script to handle big lists (#1680)" This reverts commit c4bc2cb. * update execution timeouts defaults for harvest tasks.
1 parent b9d1f70 commit 4e5e858

File tree

5 files changed

+17
-25
lines changed

5 files changed

+17
-25
lines changed

cob_datapipeline/catalog_preproduction_oai_harvest_dag.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@
185185

186186
INDEX_UPDATES_OAI_MARC = BashOperator(
187187
task_id="index_updates_oai_marc",
188-
bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_marc.sh",
188+
bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_marc.sh ",
189189
env={**os.environ, **{
190190
"AWS_ACCESS_KEY_ID": AIRFLOW_S3.login,
191191
"AWS_SECRET_ACCESS_KEY": AIRFLOW_S3.password,
@@ -201,6 +201,7 @@
201201
"DATA": "{{ ti.xcom_pull(task_ids='list_updated_files') | tojson }}",
202202
}},
203203
trigger_rule="none_failed_min_one_success",
204+
execution_timeout=timedelta(hours=24),
204205
dag=DAG
205206
)
206207

@@ -215,7 +216,7 @@
215216

216217
INDEX_DELETES_OAI_MARC = BashOperator(
217218
task_id="index_deletes_oai_marc",
218-
bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_marc.sh",
219+
bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_marc.sh ",
219220
env={**os.environ, **{
220221
"AWS_ACCESS_KEY_ID": AIRFLOW_S3.login,
221222
"AWS_SECRET_ACCESS_KEY": AIRFLOW_S3.password,

cob_datapipeline/catalog_production_oai_harvest_dag.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@
8080
"start_date": pendulum.datetime(2018, 12, 13, tz="America/New_York"),
8181
"on_failure_callback": [slackpostonfail],
8282
"retries": 0,
83-
"retry_delay": timedelta(minutes=10)
83+
"retry_delay": timedelta(minutes=10),
84+
"execution_timeout": timedelta(hours=10)
8485
}
8586

8687
DAG = airflow.DAG(
@@ -176,7 +177,7 @@
176177

177178
INDEX_UPDATES_OAI_MARC = BashOperator(
178179
task_id="index_updates_oai_marc",
179-
bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_marc.sh",
180+
bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_marc.sh ",
180181
env={**os.environ, **{
181182
"AWS_ACCESS_KEY_ID": AIRFLOW_S3.login,
182183
"AWS_SECRET_ACCESS_KEY": AIRFLOW_S3.password,
@@ -192,6 +193,7 @@
192193
"DATA": "{{ ti.xcom_pull(task_ids='list_updated_files') | tojson }}",
193194
}},
194195
trigger_rule="none_failed_min_one_success",
196+
execution_timeout=timedelta(hours=24),
195197
dag=DAG
196198
)
197199

@@ -206,7 +208,7 @@
206208

207209
INDEX_DELETES_OAI_MARC = BashOperator(
208210
task_id="index_deletes_oai_marc",
209-
bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_marc.sh",
211+
bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_marc.sh ",
210212
env={**os.environ, **{
211213
"AWS_ACCESS_KEY_ID": AIRFLOW_S3.login,
212214
"AWS_SECRET_ACCESS_KEY": AIRFLOW_S3.password,

cob_datapipeline/scripts/ingest_marc.sh

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ gem install bundler
1818
bundle config set force_ruby_platform true
1919
bundle install --without=debug
2020

21+
data_in=$(echo $DATA | jq -r '.[]')
22+
2123
if [ -z "$COMMAND" ]; then
2224
COMMAND=ingest
2325
fi
2426

25-
ingest_marc_file() {
26-
set -eo pipefail # Ensure failures stop execution inside this function
27-
cd tmp/cob_index || exit 1
28-
file="$1"
27+
for file in $data_in
28+
do
2929
echo "Indexing file: "$file
3030
bundle exec cob_index $COMMAND $(aws s3 presign s3://$BUCKET/$file)
3131
processed_file=$(echo $file | sed 's/new-updated/processed-new-updated/' | sed 's/deleted/processed-deleted/')
@@ -34,15 +34,4 @@ ingest_marc_file() {
3434
if [ "$file" != "$processed_file" ]; then
3535
aws s3 mv s3://$BUCKET/$file s3://$BUCKET/$processed_file
3636
fi
37-
}
38-
39-
export -f ingest_marc_file
40-
export COMMAND
41-
export BUCKET
42-
export SOLR_AUTH_USER
43-
export SOLR_AUTH_PASSWORD
44-
export SOLR_URL
45-
export ALMAOAI_LAST_HARVEST_FROM_DATE
46-
47-
echo "$DATA" | jq -r '.[]' | xargs -n 1 -I {} bash -c 'igest_marc_file "{}"'
48-
37+
done

tests/catalog_preproduction_oai_harvest_dag_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@ def setUp(self):
1212

1313
def test_index_deletes_oai_marc(self):
1414
task = DAG.get_task("index_deletes_oai_marc")
15-
expected_bash_path = self.airflow_home + "/dags/cob_datapipeline/scripts/ingest_marc.sh"
15+
expected_bash_path = self.airflow_home + "/dags/cob_datapipeline/scripts/ingest_marc.sh "
1616
self.assertEqual(task.env["COMMAND"], "delete --suppress")
1717
self.assertEqual(task.bash_command, expected_bash_path)
1818

1919
def test_index_ingest_oai_marc(self):
2020
task = DAG.get_task("index_updates_oai_marc")
21-
expected_bash_path = self.airflow_home + "/dags/cob_datapipeline/scripts/ingest_marc.sh"
21+
expected_bash_path = self.airflow_home + "/dags/cob_datapipeline/scripts/ingest_marc.sh "
2222
self.assertEqual(task.env["COMMAND"], "ingest")
2323
self.assertEqual(task.bash_command, expected_bash_path)
2424

tests/catalog_production_oai_harvest_dag_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ def setUp(self):
1212

1313
def test_index_deletes_oai_marc(self):
1414
task = DAG.get_task("index_deletes_oai_marc")
15-
expected_bash_path = self.airflow_home + "/dags/cob_datapipeline/scripts/ingest_marc.sh"
15+
expected_bash_path = self.airflow_home + "/dags/cob_datapipeline/scripts/ingest_marc.sh "
1616
self.assertEqual(task.env["COMMAND"], "delete --suppress")
1717
self.assertEqual(task.bash_command, expected_bash_path)
1818

1919
def test_index_ingest_oai_marc(self):
2020
task = DAG.get_task("index_updates_oai_marc")
21-
expected_bash_path = self.airflow_home + "/dags/cob_datapipeline/scripts/ingest_marc.sh"
21+
expected_bash_path = self.airflow_home + "/dags/cob_datapipeline/scripts/ingest_marc.sh "
2222
self.assertEqual(task.env["COMMAND"], "ingest")
2323
self.assertEqual(task.bash_command, expected_bash_path)

0 commit comments

Comments
 (0)