Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Private extractor support #105

Merged
merged 15 commits into from
May 24, 2024
Merged
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ This will result in only those donwloads to be counted by users, not extractors.

- Ask not to track a download from an extractor.

## Unreleased

### Added
- Add support for `EXTRACTOR_KEY` and `CLOWDER_EMAIL` environment variables to register
an extractor for just one user.

## 2.6.0 - 2022-06-14

This will change how clowder sees the extractors. If you have an extractor, and you specify
Expand Down
19 changes: 14 additions & 5 deletions pyclowder/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Connector(object):
"""

def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True,
mounted_paths=None, clowder_url=None, max_retry=10):
mounted_paths=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None):
self.extractor_name = extractor_name
self.extractor_info = extractor_info
self.check_message = check_message
Expand All @@ -74,6 +74,10 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m
else:
self.mounted_paths = mounted_paths
self.clowder_url = clowder_url
self.clowder_email = clowder_email
self.extractor_key = extractor_key
if extractor_key:
self.extractor_info["unique_key"] = extractor_key
self.max_retry = max_retry

filename = 'notifications.json'
Expand Down Expand Up @@ -625,15 +629,18 @@ class RabbitMQConnector(Connector):
def __init__(self, extractor_name, extractor_info,
rabbitmq_uri, rabbitmq_key=None, rabbitmq_queue=None,
check_message=None, process_message=None, ssl_verify=True, mounted_paths=None,
heartbeat=5*60, clowder_url=None, max_retry=10):
heartbeat=10, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None):
super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message,
ssl_verify, mounted_paths, clowder_url, max_retry)
ssl_verify, mounted_paths, clowder_url, max_retry, extractor_key, clowder_email)
self.rabbitmq_uri = rabbitmq_uri
self.rabbitmq_key = rabbitmq_key
if rabbitmq_queue is None:
self.rabbitmq_queue = extractor_info['name']
else:
self.rabbitmq_queue = rabbitmq_queue
self.extractor_key = extractor_key
if extractor_key:
self.rabbitmq_queue = "private.%s.%s" % (extractor_key, self.rabbitmq_queue)
self.channel = None
self.connection = None
self.consumer_tag = None
Expand All @@ -659,7 +666,7 @@ def connect(self):
self.channel.queue_declare(queue='error.'+self.rabbitmq_queue, durable=True)

# start the extractor announcer
self.announcer = RabbitMQBroadcast(self.rabbitmq_uri, self.extractor_info, self.rabbitmq_queue, self.heartbeat)
self.announcer = RabbitMQBroadcast(self.rabbitmq_uri, self.extractor_info, self.clowder_email, self.rabbitmq_queue, self.heartbeat)
self.announcer.start_thread()

def listen(self):
Expand Down Expand Up @@ -765,10 +772,11 @@ def on_message(self, channel, method, header, body):


class RabbitMQBroadcast:
def __init__(self, rabbitmq_uri, extractor_info, rabbitmq_queue, heartbeat):
def __init__(self, rabbitmq_uri, extractor_info, clowder_email, rabbitmq_queue, heartbeat):
self.active = True
self.rabbitmq_uri = rabbitmq_uri
self.extractor_info = extractor_info
self.clowder_email = clowder_email
self.rabbitmq_queue = rabbitmq_queue
self.heartbeat = heartbeat
self.id = str(uuid.uuid4())
Expand Down Expand Up @@ -798,6 +806,7 @@ def send_heartbeat(self):
message = {
'id': self.id,
'queue': self.rabbitmq_queue,
'owner': self.clowder_email,
'extractor_info': self.extractor_info
}
next_heartbeat = time.time()
Expand Down
12 changes: 11 additions & 1 deletion pyclowder/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def __init__(self):
rabbitmq_uri = os.getenv('RABBITMQ_URI', "amqp://guest:[email protected]/%2f")
clowder_url = os.getenv("CLOWDER_URL", "")
registration_endpoints = os.getenv('REGISTRATION_ENDPOINTS', "")
extractor_key = os.getenv("EXTRACTOR_KEY", "")
clowder_email = os.getenv("CLOWDER_EMAIL", "")
logging_config = os.getenv("LOGGING")
mounted_paths = os.getenv("MOUNTED_PATHS", "{}")
input_file_path = os.getenv("INPUT_FILE_PATH")
Expand All @@ -90,6 +92,12 @@ def __init__(self):
help='pickle file that needs to be processed (only needed for HPC)')
self.parser.add_argument('--clowderURL', nargs='?', dest='clowder_url', default=clowder_url,
help='Clowder host URL')
self.parser.add_argument('--key', '-k', dest="extractor_key",
default=extractor_key,
help='Unique key to use for extractor queue ID (sets extractor to private)')
self.parser.add_argument('--user', '-u', dest="clowder_email",
default=clowder_email,
help='Email address of Clowder user who will initially be assigned ownership (ignored if no --key provided)')
self.parser.add_argument('--rabbitmqURI', nargs='?', dest='rabbitmq_uri', default=rabbitmq_uri,
help='rabbitMQ URI (default=%s)' % rabbitmq_uri.replace("%", "%%"))
self.parser.add_argument('--rabbitmqQUEUE', nargs='?', dest='rabbitmq_queuename',
Expand Down Expand Up @@ -169,7 +177,9 @@ def start(self):
mounted_paths=json.loads(self.args.mounted_paths),
clowder_url=self.args.clowder_url,
max_retry=self.args.max_retry,
heartbeat=self.args.heartbeat)
heartbeat=self.args.heartbeat,
extractor_key=self.args.extractor_key,
clowder_email=self.args.clowder_email)
connector.connect()
threading.Thread(target=connector.listen, name="RabbitMQConnector").start()

Expand Down
Loading