diff --git a/CHANGELOG.md b/CHANGELOG.md index 44c7a1f..07c0762 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,6 +77,12 @@ This will result in only those donwloads to be counted by users, not extractors. - Ask not to track a download from an extractor. +## Unreleased + +### Added +- Add support for `EXTRACTOR_KEY` and `CLOWDER_EMAIL` environment variables to register +an extractor for just one user. + ## 2.6.0 - 2022-06-14 This will change how clowder sees the extractors. If you have an extractor, and you specify diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index 252cedd..14543be 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -63,7 +63,7 @@ class Connector(object): """ def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True, - mounted_paths=None, clowder_url=None, max_retry=10): + mounted_paths=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None): self.extractor_name = extractor_name self.extractor_info = extractor_info self.check_message = check_message @@ -74,6 +74,10 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m else: self.mounted_paths = mounted_paths self.clowder_url = clowder_url + self.clowder_email = clowder_email + self.extractor_key = extractor_key + if extractor_key: + self.extractor_info["unique_key"] = extractor_key self.max_retry = max_retry filename = 'notifications.json' @@ -625,15 +629,18 @@ class RabbitMQConnector(Connector): def __init__(self, extractor_name, extractor_info, rabbitmq_uri, rabbitmq_key=None, rabbitmq_queue=None, check_message=None, process_message=None, ssl_verify=True, mounted_paths=None, - heartbeat=5*60, clowder_url=None, max_retry=10): + heartbeat=10, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None): super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message, - ssl_verify, mounted_paths, clowder_url, max_retry) + ssl_verify, mounted_paths, clowder_url, max_retry, extractor_key, clowder_email) self.rabbitmq_uri = rabbitmq_uri self.rabbitmq_key = rabbitmq_key if rabbitmq_queue is None: self.rabbitmq_queue = extractor_info['name'] else: self.rabbitmq_queue = rabbitmq_queue + self.extractor_key = extractor_key + if extractor_key: + self.rabbitmq_queue = "private.%s.%s" % (extractor_key, self.rabbitmq_queue) self.channel = None self.connection = None self.consumer_tag = None @@ -659,7 +666,7 @@ def connect(self): self.channel.queue_declare(queue='error.'+self.rabbitmq_queue, durable=True) # start the extractor announcer - self.announcer = RabbitMQBroadcast(self.rabbitmq_uri, self.extractor_info, self.rabbitmq_queue, self.heartbeat) + self.announcer = RabbitMQBroadcast(self.rabbitmq_uri, self.extractor_info, self.clowder_email, self.rabbitmq_queue, self.heartbeat) self.announcer.start_thread() def listen(self): @@ -765,10 +772,11 @@ def on_message(self, channel, method, header, body): class RabbitMQBroadcast: - def __init__(self, rabbitmq_uri, extractor_info, rabbitmq_queue, heartbeat): + def __init__(self, rabbitmq_uri, extractor_info, clowder_email, rabbitmq_queue, heartbeat): self.active = True self.rabbitmq_uri = rabbitmq_uri self.extractor_info = extractor_info + self.clowder_email = clowder_email self.rabbitmq_queue = rabbitmq_queue self.heartbeat = heartbeat self.id = str(uuid.uuid4()) @@ -798,6 +806,7 @@ def send_heartbeat(self): message = { 'id': self.id, 'queue': self.rabbitmq_queue, + 'owner': self.clowder_email, 'extractor_info': self.extractor_info } next_heartbeat = time.time() diff --git a/pyclowder/extractors.py b/pyclowder/extractors.py index 27ac81d..60a3a87 100644 --- a/pyclowder/extractors.py +++ b/pyclowder/extractors.py @@ -68,6 +68,8 @@ def __init__(self): rabbitmq_uri = os.getenv('RABBITMQ_URI', "amqp://guest:guest@127.0.0.1/%2f") clowder_url = os.getenv("CLOWDER_URL", "") registration_endpoints = os.getenv('REGISTRATION_ENDPOINTS', "") + extractor_key = os.getenv("EXTRACTOR_KEY", "") + clowder_email = os.getenv("CLOWDER_EMAIL", "") logging_config = os.getenv("LOGGING") mounted_paths = os.getenv("MOUNTED_PATHS", "{}") input_file_path = os.getenv("INPUT_FILE_PATH") @@ -90,6 +92,12 @@ def __init__(self): help='pickle file that needs to be processed (only needed for HPC)') self.parser.add_argument('--clowderURL', nargs='?', dest='clowder_url', default=clowder_url, help='Clowder host URL') + self.parser.add_argument('--key', '-k', dest="extractor_key", + default=extractor_key, + help='Unique key to use for extractor queue ID (sets extractor to private)') + self.parser.add_argument('--user', '-u', dest="clowder_email", + default=clowder_email, + help='Email address of Clowder user who will initially be assigned ownership (ignored if no --key provided)') self.parser.add_argument('--rabbitmqURI', nargs='?', dest='rabbitmq_uri', default=rabbitmq_uri, help='rabbitMQ URI (default=%s)' % rabbitmq_uri.replace("%", "%%")) self.parser.add_argument('--rabbitmqQUEUE', nargs='?', dest='rabbitmq_queuename', @@ -169,7 +177,9 @@ def start(self): mounted_paths=json.loads(self.args.mounted_paths), clowder_url=self.args.clowder_url, max_retry=self.args.max_retry, - heartbeat=self.args.heartbeat) + heartbeat=self.args.heartbeat, + extractor_key=self.args.extractor_key, + clowder_email=self.args.clowder_email) connector.connect() threading.Thread(target=connector.listen, name="RabbitMQConnector").start()