diff --git a/cob_datapipeline/gencon_ingest_dag.py b/cob_datapipeline/gencon_ingest_dag.py new file mode 100644 index 00000000..f6c2c3a5 --- /dev/null +++ b/cob_datapipeline/gencon_ingest_dag.py @@ -0,0 +1,85 @@ +"""Airflow DAG to index GENCON Databases into Solr.""" +from datetime import datetime, timedelta +import airflow +import os +import pendulum +from airflow.models import Variable +from airflow.hooks.base import BaseHook +from airflow.operators.bash import BashOperator +from tulflow import tasks +from airflow.providers.slack.notifications.slack import send_slack_notification +from airflow.models.connection import Connection +from airflow.providers.amazon.aws.operators.s3 import S3ListOperator + +""" +INIT SYSTEMWIDE VARIABLES + +check for existence of systemwide variables shared across tasks that can be +initialized here if not found (i.e. if this is a new installation) & defaults exist +""" + +AIRFLOW_HOME = Variable.get("AIRFLOW_HOME") +AIRFLOW_USER_HOME = Variable.get("AIRFLOW_USER_HOME") + +SCHEDULE_INTERVAL = Variable.get("GENCON_INDEX_SCHEDULE_INTERVAL") + +GENCON_INDEX_VERSION = Variable.get("GENCON_INDEX_VERSION") +GENCON_INDEX_PATH = Variable.get("GENCON_INDEX_PATH") +GENCON_TEMP_PATH = Variable.get("GENCON_TEMP_PATH") +GENCON_CSV_S3 = Variable.get("GENCON_CSV_S3") + +# Get S3 data bucket variables +AIRFLOW_S3 = BaseHook.get_connection("AIRFLOW_S3") +AIRFLOW_DATA_BUCKET = Variable.get("AIRFLOW_DATA_BUCKET") + +# Get Solr URL & Collection Name for indexing info; error out if not entered +SOLR_CONN = BaseHook.get_connection("SOLRCLOUD-WRITER") +SOLR_CONFIG = Variable.get("GENCON_SOLR_CONFIG", deserialize_json=True) +CONFIGSET = SOLR_CONFIG.get("configset") +ALIAS = CONFIGSET + "-prod" +REPLICATION_FACTOR = SOLR_CONFIG.get("replication_factor") + +# CREATE DAG +DEFAULT_ARGS = { + "owner": "airflow", + "depends_on_past": False, + "start_date": pendulum.datetime(2018, 12, 13, tz="UTC"), + "email_on_failure": False, + "email_on_retry": False, + "retries": 2, + "retry_delay": timedelta(minutes=5), +} + +DAG = airflow.DAG( + "gencon_index", + default_args=DEFAULT_ARGS, + catchup=False, + max_active_runs=1, + schedule=SCHEDULE_INTERVAL +) + +""" +CREATE TASKS +Tasks with all logic contained in a single operator can be declared here. +Tasks with custom logic are relegated to individual Python files. +""" + +INDEX_GENCON = BashOperator( + task_id="index_gencon", + bash_command="/opt/airflow/dags/cob_datapipeline/scripts/ingest_gencon.sh ", + retries=1, + env={ + "AWS_ACCESS_KEY_ID": AIRFLOW_S3.login, + "AWS_SECRET_ACCESS_KEY": AIRFLOW_S3.password, + "BUCKET": AIRFLOW_DATA_BUCKET, + "GIT_BRANCH": GENCON_INDEX_VERSION, + "HOME": AIRFLOW_USER_HOME, + "GENCON_INDEX_PATH": GENCON_INDEX_PATH, + "GENCON_TEMP_PATH": GENCON_TEMP_PATH, + "GENCON_CSV_S3": GENCON_CSV_S3, + "SOLR_AUTH_USER": SOLR_CONN.login if SOLR_CONN.login else "", + "SOLR_AUTH_PASSWORD": SOLR_CONN.password if SOLR_CONN.password else "", + "SOLR_WEB_URL": tasks.get_solr_url(SOLR_CONN, CONFIGSET), + }, + dag=DAG +) diff --git a/cob_datapipeline/scripts/.bundle/config b/cob_datapipeline/scripts/.bundle/config new file mode 100644 index 00000000..c708cfc3 --- /dev/null +++ b/cob_datapipeline/scripts/.bundle/config @@ -0,0 +1,2 @@ +--- +BUNDLE_WITHOUT: "debug" diff --git a/cob_datapipeline/scripts/.ruby-version b/cob_datapipeline/scripts/.ruby-version new file mode 100644 index 00000000..18091983 --- /dev/null +++ b/cob_datapipeline/scripts/.ruby-version @@ -0,0 +1 @@ +3.4.0 diff --git a/cob_datapipeline/scripts/ingest_gencon.sh b/cob_datapipeline/scripts/ingest_gencon.sh new file mode 100755 index 00000000..7967f4c3 --- /dev/null +++ b/cob_datapipeline/scripts/ingest_gencon.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +set -eo pipefail + +# export / set all environment variables passed here by task for pick-up by subprocess +export AIRFLOW_USER_HOME="/home/airflow" +export HOME=$AIRFLOW_USER_HOME + +source $HOME/.bashrc + +export PATH="$AIRFLOW_USER_HOME/.local/bin:$PATH" +export PATH="$AIRFLOW_USER_HOME/.rbenv/shims:$AIRFLOW_USER_HOME/.rbenv/bin:$PATH" + +export SOLR_URL="${SOLR_WEB_URL//\/\////$SOLR_AUTH_USER:$SOLR_AUTH_PASSWORD@}" +export GENCON_INDEX_PATH="$PWD/gencon_index" + +echo ">>> My Dreictory: $PWD" + +# Get the raw CSV files from S3 +aws s3 sync $GENCON_CSV_S3 $GENCON_TEMP_PATH --include "*.csv" + +if [[ ! -d "$GENCON_INDEX_PATH" ]]; then + git clone https://github.com/tulibraries/gencon_index.git $GENCON_INDEX_PATH + cd $GENCON_INDEX_PATH +else + # If the repository already exists locally, navigate to its directory and pull the latest changes. + + if [[ -d "$GENCON_INDEX_PATH/.git" ]]; then + cd $GENCON_INDEX_PATH + git pull origin main + else + echo "Error: Local 'gencon_index' directory is not a Git repository." + exit 1; + fi +fi + +bundle config set force_ruby_platform true +bundle install --without=debug + +ruby harvest_all.rb diff --git a/variables.json b/variables.json index 14ff34ed..09b77154 100644 --- a/variables.json +++ b/variables.json @@ -78,6 +78,15 @@ "configset": "tul_cob-web-0", "replication_factor": 4 }, + "GENCON_INDEX_SCHEDULE_INTERVAL": "@monthly", + "GENCON_INDEX_VERSION": "main", + "GENCON_INDEX_PATH": "/opt/airflow/dags/cob_datapipeline/scripts/gencon_index", + "GENCON_TEMP_PATH": "/tmp/gencon", + "GENCON_CSV_S3": "s3://tulib-airflow-prod/gencon", + "GENCON_SOLR_CONFIG": { + "configset": "gencon50-v3.0.1", + "replication_factor": 4 + }, "almaoai_last_num_oai_delete_recs": 1349, "almaoai_last_num_oai_update_recs": 34655 -} \ No newline at end of file +}