From d8e5e268c4bae52e3946a6f1ad0d9bd9e4f257b7 Mon Sep 17 00:00:00 2001 From: vismayak Date: Thu, 9 Jan 2025 12:37:07 -0600 Subject: [PATCH 1/7] first attempt at accesing miniofs --- pyclowder/files.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pyclowder/files.py b/pyclowder/files.py index cfe3b27..b78068c 100644 --- a/pyclowder/files.py +++ b/pyclowder/files.py @@ -38,7 +38,7 @@ def get_download_url(connector, host, key, fileid, intermediatefileid=None, ext= # pylint: disable=too-many-arguments -def download(connector, host, key, fileid, intermediatefileid=None, ext="", tracking=True): +def download(connector, host, key, fileid, intermediatefileid=None, ext="", tracking=True, mounted_filesystem = False, mounted_dir = "None"): """Download file to be processed from Clowder. Keyword arguments: @@ -49,7 +49,14 @@ def download(connector, host, key, fileid, intermediatefileid=None, ext="", trac intermediatefileid -- either same as fileid, or the intermediate file to be used ext -- the file extension, the downloaded file will end with this extension tracking -- should the download action be tracked + mounted_filesystem -- if the minio storage is mounted to the local filesystem """ + + if mounted_filesystem: + if mounted_dir == "None": + mounted_dir = "/clowderfs" + inputfilename = "/clowderfs/" + fileid + return inputfilename client = ClowderClient(host=host, key=key) inputfilename = files.download(connector, client, fileid, intermediatefileid, ext) return inputfilename From 38ed787685d0a135a8ff8894a4fd2987c8dcbed5 Mon Sep 17 00:00:00 2001 From: Vismayak Mohanarajan Date: Thu, 9 Jan 2025 14:00:50 -0600 Subject: [PATCH 2/7] Revert "first attempt at accesing miniofs" This reverts commit d8e5e268c4bae52e3946a6f1ad0d9bd9e4f257b7. --- pyclowder/files.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/pyclowder/files.py b/pyclowder/files.py index b78068c..cfe3b27 100644 --- a/pyclowder/files.py +++ b/pyclowder/files.py @@ -38,7 +38,7 @@ def get_download_url(connector, host, key, fileid, intermediatefileid=None, ext= # pylint: disable=too-many-arguments -def download(connector, host, key, fileid, intermediatefileid=None, ext="", tracking=True, mounted_filesystem = False, mounted_dir = "None"): +def download(connector, host, key, fileid, intermediatefileid=None, ext="", tracking=True): """Download file to be processed from Clowder. Keyword arguments: @@ -49,14 +49,7 @@ def download(connector, host, key, fileid, intermediatefileid=None, ext="", trac intermediatefileid -- either same as fileid, or the intermediate file to be used ext -- the file extension, the downloaded file will end with this extension tracking -- should the download action be tracked - mounted_filesystem -- if the minio storage is mounted to the local filesystem """ - - if mounted_filesystem: - if mounted_dir == "None": - mounted_dir = "/clowderfs" - inputfilename = "/clowderfs/" + fileid - return inputfilename client = ClowderClient(host=host, key=key) inputfilename = files.download(connector, client, fileid, intermediatefileid, ext) return inputfilename From f42b3443f4fdb5ac938772eeea16af1372b1b762 Mon Sep 17 00:00:00 2001 From: vismayak Date: Thu, 9 Jan 2025 14:57:37 -0600 Subject: [PATCH 3/7] tested extractor on single file and is working --- pyclowder/connectors.py | 28 +++++++++++++++++++--------- pyclowder/extractors.py | 5 +++++ 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index 14543be..eea9990 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -63,7 +63,7 @@ class Connector(object): """ def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True, - mounted_paths=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None): + mounted_paths=None, minio_mounted_path=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None): self.extractor_name = extractor_name self.extractor_info = extractor_info self.check_message = check_message @@ -73,6 +73,10 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m self.mounted_paths = {} else: self.mounted_paths = mounted_paths + if minio_mounted_path is None: + self.minio_mounted_path = '' + else: + self.minio_mounted_path = minio_mounted_path self.clowder_url = clowder_url self.clowder_email = clowder_email self.extractor_key = extractor_key @@ -268,8 +272,14 @@ def _build_resource(self, body, host, secret_key, clowder_version): "metadata": body['metadata'] } - def _check_for_local_file(self, file_metadata): + def _check_for_local_file(self, file_metadata, file_id=None): """ Try to get pointer to locally accessible copy of file for extractor.""" + # Check if file is present in a minio mount (only valid for Clowder v2) + if self.minio_mounted_path and file_id: + minio_file_path = self.minio_mounted_path + "/" + file_id + print("Checking for minio local file: %s" % minio_file_path) + if os.path.isfile(minio_file_path): + return minio_file_path # first check if file is accessible locally if 'filepath' in file_metadata: @@ -278,7 +288,7 @@ def _check_for_local_file(self, file_metadata): # first simply check if file is present locally if os.path.isfile(file_path): return file_path - + # otherwise check any mounted paths... if len(self.mounted_paths) > 0: for source_path in self.mounted_paths: @@ -427,7 +437,7 @@ def _process_message(self, body): try: if check_result != pyclowder.utils.CheckMessage.bypass: file_metadata = pyclowder.files.download_info(self, host, secret_key, resource["id"]) - file_path = self._check_for_local_file(file_metadata) + file_path = self._check_for_local_file(file_metadata, resource["id"]) if not file_path: file_path = pyclowder.files.download(self, host, secret_key, resource["id"], resource["intermediate_id"], @@ -628,10 +638,10 @@ class RabbitMQConnector(Connector): # pylint: disable=too-many-arguments def __init__(self, extractor_name, extractor_info, rabbitmq_uri, rabbitmq_key=None, rabbitmq_queue=None, - check_message=None, process_message=None, ssl_verify=True, mounted_paths=None, + check_message=None, process_message=None, ssl_verify=True, mounted_paths=None, minio_mounted_path=None, heartbeat=10, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None): super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message, - ssl_verify, mounted_paths, clowder_url, max_retry, extractor_key, clowder_email) + ssl_verify, mounted_paths, minio_mounted_path, clowder_url, max_retry, extractor_key, clowder_email) self.rabbitmq_uri = rabbitmq_uri self.rabbitmq_key = rabbitmq_key if rabbitmq_queue is None: @@ -756,7 +766,7 @@ def on_message(self, channel, method, header, body): job_id = None self.worker = RabbitMQHandler(self.extractor_name, self.extractor_info, job_id, self.check_message, - self.process_message, self.ssl_verify, self.mounted_paths, self.clowder_url, + self.process_message, self.ssl_verify, self.mounted_paths, self.minio_mounted_path, self.clowder_url, method, header, body) self.worker.start_thread(json_body) @@ -836,10 +846,10 @@ class RabbitMQHandler(Connector): """ def __init__(self, extractor_name, extractor_info, job_id, check_message=None, process_message=None, ssl_verify=True, - mounted_paths=None, clowder_url=None, method=None, header=None, body=None, max_retry=10): + mounted_paths=None, minio_mounted_path=None, clowder_url=None, method=None, header=None, body=None, max_retry=10): super(RabbitMQHandler, self).__init__(extractor_name, extractor_info, check_message, process_message, - ssl_verify, mounted_paths, clowder_url, max_retry) + ssl_verify, mounted_paths, minio_mounted_path,clowder_url, max_retry) self.method = method self.header = header self.body = body diff --git a/pyclowder/extractors.py b/pyclowder/extractors.py index 60a3a87..ee1b480 100644 --- a/pyclowder/extractors.py +++ b/pyclowder/extractors.py @@ -72,6 +72,7 @@ def __init__(self): clowder_email = os.getenv("CLOWDER_EMAIL", "") logging_config = os.getenv("LOGGING") mounted_paths = os.getenv("MOUNTED_PATHS", "{}") + minio_mounted_path = os.getenv("MINIO_MOUNTED_PATH", "") input_file_path = os.getenv("INPUT_FILE_PATH") output_file_path = os.getenv("OUTPUT_FILE_PATH") connector_default = "RabbitMQ" @@ -105,6 +106,8 @@ def __init__(self): help='rabbitMQ queue name (default=%s)' % rabbitmq_queuename) self.parser.add_argument('--mounts', '-m', dest="mounted_paths", default=mounted_paths, help="dictionary of {'remote path':'local path'} mount mappings") + self.parser.add_argument('--minio-mount', dest="minio_mounted_path", default=minio_mounted_path, + help="path to mount Minio storage") self.parser.add_argument('--input-file-path', '-ifp', dest="input_file_path", default=input_file_path, help="Full path to local input file to be processed (used by Big Data feature)") self.parser.add_argument('--output-file-path', '-ofp', dest="output_file_path", default=output_file_path, @@ -175,6 +178,7 @@ def start(self): rabbitmq_key=rabbitmq_key, rabbitmq_queue=self.args.rabbitmq_queuename, mounted_paths=json.loads(self.args.mounted_paths), + minio_mounted_path=self.args.minio_mounted_path, clowder_url=self.args.clowder_url, max_retry=self.args.max_retry, heartbeat=self.args.heartbeat, @@ -193,6 +197,7 @@ def start(self): process_message=self.process_message, picklefile=self.args.hpc_picklefile, mounted_paths=json.loads(self.args.mounted_paths), + minio_mounted_path=self.args.minio_mounted_path, max_retry=self.args.max_retry) threading.Thread(target=connector.listen, name="HPCConnector").start() From df8ff14c4d6532093cc617d10e2dfc8539f31ea4 Mon Sep 17 00:00:00 2001 From: vismayak Date: Thu, 9 Jan 2025 15:13:30 -0600 Subject: [PATCH 4/7] Working on getting filepaths for local dataset --- pyclowder/connectors.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index eea9990..1a510b7 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -277,7 +277,6 @@ def _check_for_local_file(self, file_metadata, file_id=None): # Check if file is present in a minio mount (only valid for Clowder v2) if self.minio_mounted_path and file_id: minio_file_path = self.minio_mounted_path + "/" + file_id - print("Checking for minio local file: %s" % minio_file_path) if os.path.isfile(minio_file_path): return minio_file_path @@ -327,7 +326,14 @@ def _prepare_dataset(self, host, secret_key, resource): temp_link_dir = tempfile.mkdtemp() tmp_dirs_created.append(temp_link_dir) - # first check if any files in dataset accessible locally + # Check if miniomounted path is set and if the file is in the minio mounted path + if self.minio_mounted_path: + for file in resource['files']: + file_path = self._check_for_local_file(file, file['id']) + if file_path: + # print("Found file locally: %s" % file_path) + located_files.append(file_path) + #check if any files in dataset accessible locally ds_file_list = pyclowder.datasets.get_file_list(self, host, secret_key, resource["id"]) for ds_file in ds_file_list: file_path = self._check_for_local_file(ds_file) From 1edecd502ee0f4cf8001e3f9eed14e03acc90f2b Mon Sep 17 00:00:00 2001 From: vismayak Date: Mon, 13 Jan 2025 22:18:22 -0600 Subject: [PATCH 5/7] Tried working with connectors for datasets but just modified download for files --- pyclowder/connectors.py | 26 +++++++++++++++++++------- pyclowder/files.py | 8 ++++++++ 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index 1a510b7..05314dc 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -309,6 +309,8 @@ def _download_file_metadata(self, host, secret_key, fileid, filepath): (fd, md_file) = tempfile.mkstemp(suffix=md_name, dir=md_dir) with os.fdopen(fd, "wb") as tmp_file: + print("Writing metadata to %s" % md_file) + print(file_md) tmp_file.write(json.dumps(file_md)) return (md_dir, md_file) @@ -327,12 +329,21 @@ def _prepare_dataset(self, host, secret_key, resource): tmp_dirs_created.append(temp_link_dir) # Check if miniomounted path is set and if the file is in the minio mounted path - if self.minio_mounted_path: - for file in resource['files']: - file_path = self._check_for_local_file(file, file['id']) - if file_path: - # print("Found file locally: %s" % file_path) - located_files.append(file_path) + # if self.minio_mounted_path: + # for file in resource['files']: + # file_path = self._check_for_local_file(file, file['id']) + # if not file_path: + # missing_files.append(file) + # else: + # md_file_path = file['name'].split('.')[0] + "_metadata.json" + # (file_md_dir, file_md_tmp) = self._download_file_metadata(host, secret_key, file['id'], md_file_path) + # located_files.append(file_path) + # located_files.append(file_md_tmp) + # tmp_files_created.append(file_md_tmp) + # tmp_dirs_created.append(file_md_dir) + + # else: + #check if any files in dataset accessible locally ds_file_list = pyclowder.datasets.get_file_list(self, host, secret_key, resource["id"]) for ds_file in ds_file_list: @@ -349,7 +360,7 @@ def _prepare_dataset(self, host, secret_key, resource): # Also get file metadata in format expected by extrator (file_md_dir, file_md_tmp) = self._download_file_metadata(host, secret_key, ds_file['id'], - ds_file['filepath']) + ds_file['filepath']) located_files.append(file_path) located_files.append(file_md_tmp) tmp_files_created.append(file_md_tmp) @@ -393,6 +404,7 @@ def _prepare_dataset(self, host, secret_key, resource): except Exception as e: logger.exception("No files found and download failed") + print("File paths: %s" % file_paths) return (file_paths, tmp_files_created, tmp_dirs_created) # pylint: disable=too-many-branches,too-many-statements diff --git a/pyclowder/files.py b/pyclowder/files.py index cfe3b27..55b1b0b 100644 --- a/pyclowder/files.py +++ b/pyclowder/files.py @@ -51,6 +51,14 @@ def download(connector, host, key, fileid, intermediatefileid=None, ext="", trac tracking -- should the download action be tracked """ client = ClowderClient(host=host, key=key) + # Check if minio mounted path is set + minio_mounted_path = os.getenv("MINIO_MOUNTED_PATH", "") + if minio_mounted_path: + # Check if the file is stored in Minio mount path + minio_file_path = minio_mounted_path + "/" + fileid + if os.path.isfile(minio_file_path): + return minio_file_path + # Else download the file from Clowder inputfilename = files.download(connector, client, fileid, intermediatefileid, ext) return inputfilename From 9b4f052b25d87a48ca6a31445ba73f6f3318daa1 Mon Sep 17 00:00:00 2001 From: vismayak Date: Tue, 18 Mar 2025 13:01:56 -0500 Subject: [PATCH 6/7] Cleaning up --- pyclowder/connectors.py | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index 05314dc..9b94e51 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -309,8 +309,6 @@ def _download_file_metadata(self, host, secret_key, fileid, filepath): (fd, md_file) = tempfile.mkstemp(suffix=md_name, dir=md_dir) with os.fdopen(fd, "wb") as tmp_file: - print("Writing metadata to %s" % md_file) - print(file_md) tmp_file.write(json.dumps(file_md)) return (md_dir, md_file) @@ -328,23 +326,6 @@ def _prepare_dataset(self, host, secret_key, resource): temp_link_dir = tempfile.mkdtemp() tmp_dirs_created.append(temp_link_dir) - # Check if miniomounted path is set and if the file is in the minio mounted path - # if self.minio_mounted_path: - # for file in resource['files']: - # file_path = self._check_for_local_file(file, file['id']) - # if not file_path: - # missing_files.append(file) - # else: - # md_file_path = file['name'].split('.')[0] + "_metadata.json" - # (file_md_dir, file_md_tmp) = self._download_file_metadata(host, secret_key, file['id'], md_file_path) - # located_files.append(file_path) - # located_files.append(file_md_tmp) - # tmp_files_created.append(file_md_tmp) - # tmp_dirs_created.append(file_md_dir) - - # else: - - #check if any files in dataset accessible locally ds_file_list = pyclowder.datasets.get_file_list(self, host, secret_key, resource["id"]) for ds_file in ds_file_list: file_path = self._check_for_local_file(ds_file) @@ -404,7 +385,6 @@ def _prepare_dataset(self, host, secret_key, resource): except Exception as e: logger.exception("No files found and download failed") - print("File paths: %s" % file_paths) return (file_paths, tmp_files_created, tmp_dirs_created) # pylint: disable=too-many-branches,too-many-statements From e933b3aa978a960cdd51faaff855739446ed8044 Mon Sep 17 00:00:00 2001 From: vismayak Date: Tue, 18 Mar 2025 13:02:39 -0500 Subject: [PATCH 7/7] Cleaning up --- pyclowder/connectors.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index 9b94e51..d1af961 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -287,7 +287,6 @@ def _check_for_local_file(self, file_metadata, file_id=None): # first simply check if file is present locally if os.path.isfile(file_path): return file_path - # otherwise check any mounted paths... if len(self.mounted_paths) > 0: for source_path in self.mounted_paths: