diff --git a/scripts/globusmonitor/Dockerfile_uploader b/scripts/globusmonitor/Dockerfile_uploader index 6af94f04..9792d18a 100644 --- a/scripts/globusmonitor/Dockerfile_uploader +++ b/scripts/globusmonitor/Dockerfile_uploader @@ -6,7 +6,9 @@ RUN apt-get -y update \ && pip install flask-restful \ python-logstash \ globusonline-transfer-api-client \ - psycopg2 + psycopg2 python-etcd + + COPY *.py *.json /home/globusmonitor/ diff --git a/scripts/globusmonitor/globus_uploader_service.py b/scripts/globusmonitor/globus_uploader_service.py index 2efeea60..eb3fc837 100644 --- a/scripts/globusmonitor/globus_uploader_service.py +++ b/scripts/globusmonitor/globus_uploader_service.py @@ -12,9 +12,11 @@ import signal import psycopg2 import socket +import re from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from io import BlockingIOError from urllib3.filepost import encode_multipart_formdata +import etcd, traceback from pyclowder.datasets import download_metadata from terrautils.metadata import clean_metadata @@ -50,6 +52,8 @@ current_task = None +etcd_client = etcd.Client(host='etcd2.terraref', port=4001) + # ---------------------------------------------------------- # SHARED UTILS # ---------------------------------------------------------- @@ -208,7 +212,7 @@ def writeTaskToDatabase(task): recv = task['received'] comp = task['completed'] guser = task['user'] - jbody = json.dumps(task['contents']) + jbody = json.dumps(task['contents']).replace("'", "") # Attempt to insert, update if globus ID already exists q_insert = "INSERT INTO globus_tasks (globus_id, status, received, completed, globus_user, contents) " \ @@ -384,6 +388,13 @@ def notifyClowderOfCompletedTask(task): c_sensor, c_date, c_year, c_month = ds, None, None, None + + lockname = re.sub(" |_|-", "", ds) + logger.info("Acquiring lock for task %s" % lockname) + lock = etcd.Lock(etcd_client, lockname) + lock.acquire(blocking=True, lock_ttl=300, timeout=60) + logger.info("Acquired lock") + # Get dataset from clowder, or create & associate with collections try: hierarchy_host = clowder_host + ("/" if not clowder_host.endswith("/") else "") @@ -396,6 +407,10 @@ def notifyClowderOfCompletedTask(task): response = "RETRY" continue + logger.info("Releasing lock") + lock.release() + logger.info("Lock released") + if dsid: dsFileList = fetchDatasetFileList(dsid, sess) # Only send files not already present in dataset by path @@ -479,7 +494,7 @@ def notifyClowderOfCompletedTask(task): headers={'Content-Type':header}, data=content) - if fi.status_code in [104, 500, 502, 504]: + if fi.status_code in [500, 502, 504]: logger.error("[%s] failed to attach files (%s: %s)" % (ds, fi.status_code, fi.text)) updatedTask['contents'][ds]['files'][datasetMDFile]['retry'] = "%s: %s" % (fi.status_code, fi.text) response = "RETRY" @@ -552,6 +567,11 @@ def clowderSubmissionLoop(): logger.error("Connection reset on %s; marking RETRY (%s)" % (globusID, str(e))) task['status'] = 'RETRY' writeTaskToDatabase(task) + except etcd.EtcdException as e: + logger.error("Lock error on %s; marking RETRY (%s)" % (globusID, str(e))) + task['status'] = 'RETRY' + traceback.print_exc() + writeTaskToDatabase(task) except requests.ConnectionError as e: logger.error("Connection error on %s; marking RETRY (%s)" % (globusID, str(e))) task['status'] = 'RETRY' @@ -589,6 +609,11 @@ def clowderSubmissionLoop(): logger.error("Connection reset on %s; marking RETRY (%s)" % (globusID, str(e))) task['status'] = 'RETRY' writeTaskToDatabase(task) + except etcd.EtcdException as e: + logger.error("Lock error on %s; marking RETRY (%s)" % (globusID, str(e))) + task['status'] = 'RETRY' + traceback.print_exc() + writeTaskToDatabase(task) except requests.ConnectionError as e: logger.error("Connection error on %s; marking RETRY (%s)" % (globusID, str(e))) task['status'] = 'RETRY'