Skip to content

Commit c4bc2cb

Browse files
authored
update ingest script to handle big lists (#1680)
* Update ingest script to handle larger lists of files. * Add a task execution timeout. Fail if any task takes longer than 10 hours. * Fix test. * Fix tests.
1 parent 3ee3453 commit c4bc2cb

File tree

5 files changed

+26
-14
lines changed

5 files changed

+26
-14
lines changed

cob_datapipeline/catalog_preproduction_oai_harvest_dag.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@
8080
"start_date": pendulum.datetime(2018, 12, 13, tz="America/New_York"),
8181
"on_failure_callback": [slackpostonfail],
8282
"retries": 0,
83-
"retry_delay": timedelta(minutes=10)
83+
"retry_delay": timedelta(minutes=10),
84+
"execution_timeout": timedelta(hours=10)
8485
}
8586

8687
DAG = airflow.DAG(
@@ -184,7 +185,7 @@
184185

185186
INDEX_UPDATES_OAI_MARC = BashOperator(
186187
task_id="index_updates_oai_marc",
187-
bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_marc.sh ",
188+
bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_marc.sh",
188189
env={**os.environ, **{
189190
"AWS_ACCESS_KEY_ID": AIRFLOW_S3.login,
190191
"AWS_SECRET_ACCESS_KEY": AIRFLOW_S3.password,
@@ -214,7 +215,7 @@
214215

215216
INDEX_DELETES_OAI_MARC = BashOperator(
216217
task_id="index_deletes_oai_marc",
217-
bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_marc.sh ",
218+
bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_marc.sh",
218219
env={**os.environ, **{
219220
"AWS_ACCESS_KEY_ID": AIRFLOW_S3.login,
220221
"AWS_SECRET_ACCESS_KEY": AIRFLOW_S3.password,

cob_datapipeline/catalog_production_oai_harvest_dag.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@
176176

177177
INDEX_UPDATES_OAI_MARC = BashOperator(
178178
task_id="index_updates_oai_marc",
179-
bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_marc.sh ",
179+
bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_marc.sh",
180180
env={**os.environ, **{
181181
"AWS_ACCESS_KEY_ID": AIRFLOW_S3.login,
182182
"AWS_SECRET_ACCESS_KEY": AIRFLOW_S3.password,
@@ -206,7 +206,7 @@
206206

207207
INDEX_DELETES_OAI_MARC = BashOperator(
208208
task_id="index_deletes_oai_marc",
209-
bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_marc.sh ",
209+
bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_marc.sh",
210210
env={**os.environ, **{
211211
"AWS_ACCESS_KEY_ID": AIRFLOW_S3.login,
212212
"AWS_SECRET_ACCESS_KEY": AIRFLOW_S3.password,

cob_datapipeline/scripts/ingest_marc.sh

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ gem install bundler
1818
bundle config set force_ruby_platform true
1919
bundle install --without=debug
2020

21-
data_in=$(echo $DATA | jq -r '.[]')
22-
2321
if [ -z "$COMMAND" ]; then
2422
COMMAND=ingest
2523
fi
2624

27-
for file in $data_in
28-
do
25+
ingest_marc_file() {
26+
set -eo pipefail # Ensure failures stop execution inside this function
27+
cd tmp/cob_index || exit 1
28+
file="$1"
2929
echo "Indexing file: "$file
3030
bundle exec cob_index $COMMAND $(aws s3 presign s3://$BUCKET/$file)
3131
processed_file=$(echo $file | sed 's/new-updated/processed-new-updated/' | sed 's/deleted/processed-deleted/')
@@ -34,4 +34,15 @@ do
3434
if [ "$file" != "$processed_file" ]; then
3535
aws s3 mv s3://$BUCKET/$file s3://$BUCKET/$processed_file
3636
fi
37-
done
37+
}
38+
39+
export -f ingest_marc_file
40+
export COMMAND
41+
export BUCKET
42+
export SOLR_AUTH_USER
43+
export SOLR_AUTH_PASSWORD
44+
export SOLR_URL
45+
export ALMAOAI_LAST_HARVEST_FROM_DATE
46+
47+
echo "$DATA" | jq -r '.[]' | xargs -n 1 -I {} bash -c 'igest_marc_file "{}"'
48+

tests/catalog_preproduction_oai_harvest_dag_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@ def setUp(self):
1212

1313
def test_index_deletes_oai_marc(self):
1414
task = DAG.get_task("index_deletes_oai_marc")
15-
expected_bash_path = self.airflow_home + "/dags/cob_datapipeline/scripts/ingest_marc.sh "
15+
expected_bash_path = self.airflow_home + "/dags/cob_datapipeline/scripts/ingest_marc.sh"
1616
self.assertEqual(task.env["COMMAND"], "delete --suppress")
1717
self.assertEqual(task.bash_command, expected_bash_path)
1818

1919
def test_index_ingest_oai_marc(self):
2020
task = DAG.get_task("index_updates_oai_marc")
21-
expected_bash_path = self.airflow_home + "/dags/cob_datapipeline/scripts/ingest_marc.sh "
21+
expected_bash_path = self.airflow_home + "/dags/cob_datapipeline/scripts/ingest_marc.sh"
2222
self.assertEqual(task.env["COMMAND"], "ingest")
2323
self.assertEqual(task.bash_command, expected_bash_path)
2424

tests/catalog_production_oai_harvest_dag_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ def setUp(self):
1212

1313
def test_index_deletes_oai_marc(self):
1414
task = DAG.get_task("index_deletes_oai_marc")
15-
expected_bash_path = self.airflow_home + "/dags/cob_datapipeline/scripts/ingest_marc.sh "
15+
expected_bash_path = self.airflow_home + "/dags/cob_datapipeline/scripts/ingest_marc.sh"
1616
self.assertEqual(task.env["COMMAND"], "delete --suppress")
1717
self.assertEqual(task.bash_command, expected_bash_path)
1818

1919
def test_index_ingest_oai_marc(self):
2020
task = DAG.get_task("index_updates_oai_marc")
21-
expected_bash_path = self.airflow_home + "/dags/cob_datapipeline/scripts/ingest_marc.sh "
21+
expected_bash_path = self.airflow_home + "/dags/cob_datapipeline/scripts/ingest_marc.sh"
2222
self.assertEqual(task.env["COMMAND"], "ingest")
2323
self.assertEqual(task.bash_command, expected_bash_path)

0 commit comments

Comments
 (0)