From f2a071a4b71f5189d612ebb51461de3fe9ebff6a Mon Sep 17 00:00:00 2001 From: Craig Willis Date: Mon, 11 Jun 2018 21:51:25 -0500 Subject: [PATCH 1/2] Added etcd locks --- .../globusmonitor/globus_uploader_service.py | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/scripts/globusmonitor/globus_uploader_service.py b/scripts/globusmonitor/globus_uploader_service.py index 9de894c0..47c3375f 100644 --- a/scripts/globusmonitor/globus_uploader_service.py +++ b/scripts/globusmonitor/globus_uploader_service.py @@ -15,6 +15,7 @@ from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from io import BlockingIOError from urllib3.filepost import encode_multipart_formdata +import etcd, traceback from pyclowder.datasets import download_metadata from terrautils.metadata import clean_metadata @@ -50,6 +51,8 @@ current_task = None +etcd_client = etcd.Client(host='etcd.terraref', port=4001) + # ---------------------------------------------------------- # SHARED UTILS # ---------------------------------------------------------- @@ -184,7 +187,7 @@ def writeTaskToDatabase(task): recv = task['received'] comp = task['completed'] guser = task['user'] - jbody = json.dumps(task['contents']) + jbody = json.dumps(task['contents']).replace("'", "") # Attempt to insert, update if globus ID already exists q_insert = "INSERT INTO globus_tasks (globus_id, status, received, completed, globus_user, contents) " \ @@ -385,6 +388,10 @@ def notifyClowderOfCompletedTask(task): else: c_sensor, c_date, c_year, c_month = ds, None, None, None + logger.info("Acquiring lock for %s" % ds) + lock = etcd.Lock(etcd_client, ds) + lock.acquire(blocking=True, lock_ttl=300) + logger.info("Acquired lock") # Get dataset from clowder, or create & associate with collections try: @@ -398,6 +405,11 @@ def notifyClowderOfCompletedTask(task): response = "RETRY" continue + + logger.info("Releasing lock") + lock.release() + logger.info("Lock release") + if dsid: dsFileList = fetchDatasetFileList(dsid, sess) # Only send files not already present in dataset by path @@ -481,7 +493,7 @@ def notifyClowderOfCompletedTask(task): headers={'Content-Type':header}, data=content) - if fi.status_code in [104, 500, 502, 504]: + if fi.status_code in [500, 502, 504]: logger.error("[%s] failed to attach files (%s: %s)" % (ds, fi.status_code, fi.text)) updatedTask['contents'][ds]['files'][datasetMDFile]['retry'] = "%s: %s" % (fi.status_code, fi.text) response = "RETRY" @@ -554,6 +566,12 @@ def clowderSubmissionLoop(): logger.error("Connection reset on %s; marking RETRY (%s)" % (globusID, str(e))) task['status'] = 'RETRY' writeTaskToDatabase(task) + except etcd.EtcdException as e: + logger.error("Lock error on %s; marking RETRY (%s)" % (globusID, str(e))) + task['status'] = 'RETRY' + traceback.print_exc() + writeTaskToDatabase(task) + time.sleep(10000) except requests.ConnectionError as e: logger.error("Connection error on %s; marking RETRY (%s)" % (globusID, str(e))) task['status'] = 'RETRY' @@ -591,6 +609,12 @@ def clowderSubmissionLoop(): logger.error("Connection reset on %s; marking RETRY (%s)" % (globusID, str(e))) task['status'] = 'RETRY' writeTaskToDatabase(task) + except etcd.EtcdException as e: + logger.error("Lock error on %s; marking RETRY (%s)" % (globusID, str(e))) + task['status'] = 'RETRY' + traceback.print_exc() + writeTaskToDatabase(task) + time.sleep(10000) except requests.ConnectionError as e: logger.error("Connection error on %s; marking RETRY (%s)" % (globusID, str(e))) task['status'] = 'RETRY' From f09f2222dfeee1e778fa89ee84053325242298b7 Mon Sep 17 00:00:00 2001 From: Craig Willis Date: Thu, 9 Aug 2018 12:51:15 -0500 Subject: [PATCH 2/2] Changed lock name --- scripts/globusmonitor/Dockerfile_uploader | 4 +++- .../globusmonitor/globus_uploader_service.py | 17 +++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/scripts/globusmonitor/Dockerfile_uploader b/scripts/globusmonitor/Dockerfile_uploader index 6af94f04..9792d18a 100644 --- a/scripts/globusmonitor/Dockerfile_uploader +++ b/scripts/globusmonitor/Dockerfile_uploader @@ -6,7 +6,9 @@ RUN apt-get -y update \ && pip install flask-restful \ python-logstash \ globusonline-transfer-api-client \ - psycopg2 + psycopg2 python-etcd + + COPY *.py *.json /home/globusmonitor/ diff --git a/scripts/globusmonitor/globus_uploader_service.py b/scripts/globusmonitor/globus_uploader_service.py index 5d3984f1..eb3fc837 100644 --- a/scripts/globusmonitor/globus_uploader_service.py +++ b/scripts/globusmonitor/globus_uploader_service.py @@ -12,6 +12,7 @@ import signal import psycopg2 import socket +import re from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from io import BlockingIOError from urllib3.filepost import encode_multipart_formdata @@ -51,7 +52,7 @@ current_task = None -etcd_client = etcd.Client(host='etcd.terraref', port=4001) +etcd_client = etcd.Client(host='etcd2.terraref', port=4001) # ---------------------------------------------------------- # SHARED UTILS @@ -386,9 +387,12 @@ def notifyClowderOfCompletedTask(task): else: c_sensor, c_date, c_year, c_month = ds, None, None, None - logger.info("Acquiring lock for %s" % ds) - lock = etcd.Lock(etcd_client, ds) - lock.acquire(blocking=True, lock_ttl=300) + + + lockname = re.sub(" |_|-", "", ds) + logger.info("Acquiring lock for task %s" % lockname) + lock = etcd.Lock(etcd_client, lockname) + lock.acquire(blocking=True, lock_ttl=300, timeout=60) logger.info("Acquired lock") # Get dataset from clowder, or create & associate with collections @@ -403,10 +407,9 @@ def notifyClowderOfCompletedTask(task): response = "RETRY" continue - logger.info("Releasing lock") lock.release() - logger.info("Lock release") + logger.info("Lock released") if dsid: dsFileList = fetchDatasetFileList(dsid, sess) @@ -569,7 +572,6 @@ def clowderSubmissionLoop(): task['status'] = 'RETRY' traceback.print_exc() writeTaskToDatabase(task) - time.sleep(10000) except requests.ConnectionError as e: logger.error("Connection error on %s; marking RETRY (%s)" % (globusID, str(e))) task['status'] = 'RETRY' @@ -612,7 +614,6 @@ def clowderSubmissionLoop(): task['status'] = 'RETRY' traceback.print_exc() writeTaskToDatabase(task) - time.sleep(10000) except requests.ConnectionError as e: logger.error("Connection error on %s; marking RETRY (%s)" % (globusID, str(e))) task['status'] = 'RETRY'