Skip to content

Modify download apis for minio mounted fs #117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions pyclowder/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Connector(object):
"""

def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True,
mounted_paths=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None):
mounted_paths=None, minio_mounted_path=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None):
self.extractor_name = extractor_name
self.extractor_info = extractor_info
self.check_message = check_message
Expand All @@ -73,6 +73,10 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m
self.mounted_paths = {}
else:
self.mounted_paths = mounted_paths
if minio_mounted_path is None:
self.minio_mounted_path = ''
else:
self.minio_mounted_path = minio_mounted_path
self.clowder_url = clowder_url
self.clowder_email = clowder_email
self.extractor_key = extractor_key
Expand Down Expand Up @@ -268,8 +272,13 @@ def _build_resource(self, body, host, secret_key, clowder_version):
"metadata": body['metadata']
}

def _check_for_local_file(self, file_metadata):
def _check_for_local_file(self, file_metadata, file_id=None):
""" Try to get pointer to locally accessible copy of file for extractor."""
# Check if file is present in a minio mount (only valid for Clowder v2)
if self.minio_mounted_path and file_id:
minio_file_path = self.minio_mounted_path + "/" + file_id
if os.path.isfile(minio_file_path):
return minio_file_path

# first check if file is accessible locally
if 'filepath' in file_metadata:
Expand All @@ -278,7 +287,6 @@ def _check_for_local_file(self, file_metadata):
# first simply check if file is present locally
if os.path.isfile(file_path):
return file_path

# otherwise check any mounted paths...
if len(self.mounted_paths) > 0:
for source_path in self.mounted_paths:
Expand Down Expand Up @@ -317,7 +325,6 @@ def _prepare_dataset(self, host, secret_key, resource):
temp_link_dir = tempfile.mkdtemp()
tmp_dirs_created.append(temp_link_dir)

# first check if any files in dataset accessible locally
ds_file_list = pyclowder.datasets.get_file_list(self, host, secret_key, resource["id"])
for ds_file in ds_file_list:
file_path = self._check_for_local_file(ds_file)
Expand All @@ -333,7 +340,7 @@ def _prepare_dataset(self, host, secret_key, resource):

# Also get file metadata in format expected by extrator
(file_md_dir, file_md_tmp) = self._download_file_metadata(host, secret_key, ds_file['id'],
ds_file['filepath'])
ds_file['filepath'])
located_files.append(file_path)
located_files.append(file_md_tmp)
tmp_files_created.append(file_md_tmp)
Expand Down Expand Up @@ -427,7 +434,7 @@ def _process_message(self, body):
try:
if check_result != pyclowder.utils.CheckMessage.bypass:
file_metadata = pyclowder.files.download_info(self, host, secret_key, resource["id"])
file_path = self._check_for_local_file(file_metadata)
file_path = self._check_for_local_file(file_metadata, resource["id"])
if not file_path:
file_path = pyclowder.files.download(self, host, secret_key, resource["id"],
resource["intermediate_id"],
Expand Down Expand Up @@ -628,10 +635,10 @@ class RabbitMQConnector(Connector):
# pylint: disable=too-many-arguments
def __init__(self, extractor_name, extractor_info,
rabbitmq_uri, rabbitmq_key=None, rabbitmq_queue=None,
check_message=None, process_message=None, ssl_verify=True, mounted_paths=None,
check_message=None, process_message=None, ssl_verify=True, mounted_paths=None, minio_mounted_path=None,
heartbeat=10, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None):
super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message,
ssl_verify, mounted_paths, clowder_url, max_retry, extractor_key, clowder_email)
ssl_verify, mounted_paths, minio_mounted_path, clowder_url, max_retry, extractor_key, clowder_email)
self.rabbitmq_uri = rabbitmq_uri
self.rabbitmq_key = rabbitmq_key
if rabbitmq_queue is None:
Expand Down Expand Up @@ -756,7 +763,7 @@ def on_message(self, channel, method, header, body):
job_id = None

self.worker = RabbitMQHandler(self.extractor_name, self.extractor_info, job_id, self.check_message,
self.process_message, self.ssl_verify, self.mounted_paths, self.clowder_url,
self.process_message, self.ssl_verify, self.mounted_paths, self.minio_mounted_path, self.clowder_url,
method, header, body)
self.worker.start_thread(json_body)

Expand Down Expand Up @@ -836,10 +843,10 @@ class RabbitMQHandler(Connector):
"""

def __init__(self, extractor_name, extractor_info, job_id, check_message=None, process_message=None, ssl_verify=True,
mounted_paths=None, clowder_url=None, method=None, header=None, body=None, max_retry=10):
mounted_paths=None, minio_mounted_path=None, clowder_url=None, method=None, header=None, body=None, max_retry=10):

super(RabbitMQHandler, self).__init__(extractor_name, extractor_info, check_message, process_message,
ssl_verify, mounted_paths, clowder_url, max_retry)
ssl_verify, mounted_paths, minio_mounted_path,clowder_url, max_retry)
self.method = method
self.header = header
self.body = body
Expand Down
5 changes: 5 additions & 0 deletions pyclowder/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(self):
clowder_email = os.getenv("CLOWDER_EMAIL", "")
logging_config = os.getenv("LOGGING")
mounted_paths = os.getenv("MOUNTED_PATHS", "{}")
minio_mounted_path = os.getenv("MINIO_MOUNTED_PATH", "")
input_file_path = os.getenv("INPUT_FILE_PATH")
output_file_path = os.getenv("OUTPUT_FILE_PATH")
connector_default = "RabbitMQ"
Expand Down Expand Up @@ -105,6 +106,8 @@ def __init__(self):
help='rabbitMQ queue name (default=%s)' % rabbitmq_queuename)
self.parser.add_argument('--mounts', '-m', dest="mounted_paths", default=mounted_paths,
help="dictionary of {'remote path':'local path'} mount mappings")
self.parser.add_argument('--minio-mount', dest="minio_mounted_path", default=minio_mounted_path,
help="path to mount Minio storage")
self.parser.add_argument('--input-file-path', '-ifp', dest="input_file_path", default=input_file_path,
help="Full path to local input file to be processed (used by Big Data feature)")
self.parser.add_argument('--output-file-path', '-ofp', dest="output_file_path", default=output_file_path,
Expand Down Expand Up @@ -175,6 +178,7 @@ def start(self):
rabbitmq_key=rabbitmq_key,
rabbitmq_queue=self.args.rabbitmq_queuename,
mounted_paths=json.loads(self.args.mounted_paths),
minio_mounted_path=self.args.minio_mounted_path,
clowder_url=self.args.clowder_url,
max_retry=self.args.max_retry,
heartbeat=self.args.heartbeat,
Expand All @@ -193,6 +197,7 @@ def start(self):
process_message=self.process_message,
picklefile=self.args.hpc_picklefile,
mounted_paths=json.loads(self.args.mounted_paths),
minio_mounted_path=self.args.minio_mounted_path,
max_retry=self.args.max_retry)
threading.Thread(target=connector.listen, name="HPCConnector").start()

Expand Down
8 changes: 8 additions & 0 deletions pyclowder/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ def download(connector, host, key, fileid, intermediatefileid=None, ext="", trac
tracking -- should the download action be tracked
"""
client = ClowderClient(host=host, key=key)
# Check if minio mounted path is set
minio_mounted_path = os.getenv("MINIO_MOUNTED_PATH", "")
if minio_mounted_path:
# Check if the file is stored in Minio mount path
minio_file_path = minio_mounted_path + "/" + fileid
if os.path.isfile(minio_file_path):
return minio_file_path
# Else download the file from Clowder
inputfilename = files.download(connector, client, fileid, intermediatefileid, ext)
return inputfilename

Expand Down
Loading