Skip to content

DEVO-1167 Create airflow dag for gencon solr collections #1711

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions cob_datapipeline/gencon_ingest_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""Airflow DAG to index GENCON Databases into Solr."""
from datetime import datetime, timedelta
import airflow
import os
import pendulum
from airflow.models import Variable
from airflow.hooks.base import BaseHook
from airflow.operators.bash import BashOperator
from tulflow import tasks
from airflow.providers.slack.notifications.slack import send_slack_notification
from airflow.models.connection import Connection
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator

"""
INIT SYSTEMWIDE VARIABLES

check for existence of systemwide variables shared across tasks that can be
initialized here if not found (i.e. if this is a new installation) & defaults exist
"""

AIRFLOW_HOME = Variable.get("AIRFLOW_HOME")
AIRFLOW_USER_HOME = Variable.get("AIRFLOW_USER_HOME")

SCHEDULE_INTERVAL = Variable.get("GENCON_INDEX_SCHEDULE_INTERVAL")

GENCON_INDEX_VERSION = Variable.get("GENCON_INDEX_VERSION")
GENCON_INDEX_PATH = Variable.get("GENCON_INDEX_PATH")
GENCON_TEMP_PATH = Variable.get("GENCON_TEMP_PATH")
GENCON_CSV_S3 = Variable.get("GENCON_CSV_S3")

# Get S3 data bucket variables
AIRFLOW_S3 = BaseHook.get_connection("AIRFLOW_S3")
AIRFLOW_DATA_BUCKET = Variable.get("AIRFLOW_DATA_BUCKET")

# Get Solr URL & Collection Name for indexing info; error out if not entered
SOLR_CONN = BaseHook.get_connection("SOLRCLOUD-WRITER")
SOLR_CONFIG = Variable.get("GENCON_SOLR_CONFIG", deserialize_json=True)
CONFIGSET = SOLR_CONFIG.get("configset")
ALIAS = CONFIGSET + "-prod"
REPLICATION_FACTOR = SOLR_CONFIG.get("replication_factor")

# CREATE DAG
DEFAULT_ARGS = {
"owner": "airflow",
"depends_on_past": False,
"start_date": pendulum.datetime(2018, 12, 13, tz="UTC"),
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
}

DAG = airflow.DAG(
"gencon_index",
default_args=DEFAULT_ARGS,
catchup=False,
max_active_runs=1,
schedule=SCHEDULE_INTERVAL
)

"""
CREATE TASKS
Tasks with all logic contained in a single operator can be declared here.
Tasks with custom logic are relegated to individual Python files.
"""

INDEX_GENCON = BashOperator(
task_id="index_gencon",
bash_command="/opt/airflow/dags/cob_datapipeline/scripts/ingest_gencon.sh ",
retries=1,
env={
"AWS_ACCESS_KEY_ID": AIRFLOW_S3.login,
"AWS_SECRET_ACCESS_KEY": AIRFLOW_S3.password,
"BUCKET": AIRFLOW_DATA_BUCKET,
"GIT_BRANCH": GENCON_INDEX_VERSION,
"HOME": AIRFLOW_USER_HOME,
"GENCON_INDEX_PATH": GENCON_INDEX_PATH,
"GENCON_TEMP_PATH": GENCON_TEMP_PATH,
"GENCON_CSV_S3": GENCON_CSV_S3,
"SOLR_AUTH_USER": SOLR_CONN.login if SOLR_CONN.login else "",
"SOLR_AUTH_PASSWORD": SOLR_CONN.password if SOLR_CONN.password else "",
"SOLR_WEB_URL": tasks.get_solr_url(SOLR_CONN, CONFIGSET),
},
dag=DAG
)
2 changes: 2 additions & 0 deletions cob_datapipeline/scripts/.bundle/config
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
---
BUNDLE_WITHOUT: "debug"
1 change: 1 addition & 0 deletions cob_datapipeline/scripts/.ruby-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.4.0
40 changes: 40 additions & 0 deletions cob_datapipeline/scripts/ingest_gencon.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env bash

set -eo pipefail

# export / set all environment variables passed here by task for pick-up by subprocess
export AIRFLOW_USER_HOME="/home/airflow"
export HOME=$AIRFLOW_USER_HOME

source $HOME/.bashrc

export PATH="$AIRFLOW_USER_HOME/.local/bin:$PATH"
export PATH="$AIRFLOW_USER_HOME/.rbenv/shims:$AIRFLOW_USER_HOME/.rbenv/bin:$PATH"

export SOLR_URL="${SOLR_WEB_URL//\/\////$SOLR_AUTH_USER:$SOLR_AUTH_PASSWORD@}"
export GENCON_INDEX_PATH="$PWD/gencon_index"

echo ">>> My Dreictory: $PWD"

# Get the raw CSV files from S3
aws s3 sync $GENCON_CSV_S3 $GENCON_TEMP_PATH --include "*.csv"

if [[ ! -d "$GENCON_INDEX_PATH" ]]; then
git clone https://github.com/tulibraries/gencon_index.git $GENCON_INDEX_PATH
cd $GENCON_INDEX_PATH
else
# If the repository already exists locally, navigate to its directory and pull the latest changes.

if [[ -d "$GENCON_INDEX_PATH/.git" ]]; then
cd $GENCON_INDEX_PATH
git pull origin main
else
echo "Error: Local 'gencon_index' directory is not a Git repository."
exit 1;
fi
fi

bundle config set force_ruby_platform true
bundle install --without=debug

ruby harvest_all.rb
11 changes: 10 additions & 1 deletion variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@
"configset": "tul_cob-web-0",
"replication_factor": 4
},
"GENCON_INDEX_SCHEDULE_INTERVAL": "@monthly",
"GENCON_INDEX_VERSION": "main",
"GENCON_INDEX_PATH": "/opt/airflow/dags/cob_datapipeline/scripts/gencon_index",
"GENCON_TEMP_PATH": "/tmp/gencon",
"GENCON_CSV_S3": "s3://tulib-airflow-prod/gencon",
"GENCON_SOLR_CONFIG": {
"configset": "gencon50-v3.0.1",
"replication_factor": 4
},
"almaoai_last_num_oai_delete_recs": 1349,
"almaoai_last_num_oai_update_recs": 34655
}
}