diff --git a/.gitignore b/.gitignore index 1b3422a..923746c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,14 @@ # Original source: https://github.com/github/gitignore/blob/main/Python.gitignore +# Needed for ray exclusion +.git + # Exclude data folder data/ +# Exclude model +hyp_best_train_weights_final.h5 + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] @@ -71,3 +77,6 @@ venv.bak/ # Cython debug symbols cython_debug/ + +# PyCharm settings +.idea/ diff --git a/environment_maple.yml b/environment_maple.yml index 221aff3..83f1a97 100644 --- a/environment_maple.yml +++ b/environment_maple.yml @@ -10,8 +10,8 @@ dependencies: - shapely - pip=23.3.1 - pip: - - tensorflow[and-cuda]==2.15.1 + - tensorflow[and-cuda] - opencv-python - ray - - pandas==2.1.4 + - pandas - pyarrow diff --git a/environment_maple_delta.yml b/environment_maple_delta.yml new file mode 100644 index 0000000..754c4a5 --- /dev/null +++ b/environment_maple_delta.yml @@ -0,0 +1,17 @@ +name: maple_py310_ray +channels: + - defaults + - conda-forge +dependencies: + - gdal + - fiona + - pyshp + - scikit-image + - shapely + - pip=23.3.1 + - tensorflow + - pandas + - pyarrow + - requests + - pip: + - ray[default] diff --git a/environment_maple_osx-64.yml b/environment_maple_osx-64.yml new file mode 100644 index 0000000..d98773f --- /dev/null +++ b/environment_maple_osx-64.yml @@ -0,0 +1,17 @@ +name: maple_py310_ray_osx-64 +channels: + - defaults +dependencies: + - python=3.10 + - gdal + - fiona + - pyshp + - scikit-image + - shapely + - pip=23.3.1 + - pip: + - tensorflow + - opencv-python + - ray[default] + - pandas==2.1.4 + - pyarrow diff --git a/environment_maple_radiant_pdg.yml b/environment_maple_radiant_pdg.yml new file mode 100644 index 0000000..6ece98a --- /dev/null +++ b/environment_maple_radiant_pdg.yml @@ -0,0 +1,17 @@ +name: maple_py310_ray +channels: + - defaults + - conda-forge +dependencies: + - python=3.10 + - fiona + - pyshp + - scikit-image + - shapely + - pip=23.3.1 + - pip: + - tensorflow==2.15.1 + - opencv-python + - ray + - pandas==2.1.4 + - pyarrow diff --git a/mpl_config.py b/mpl_config.py index 31639b6..135ce49 100644 --- a/mpl_config.py +++ b/mpl_config.py @@ -48,8 +48,9 @@ def __init__( self.TEMP_W_IMG_DIR = self.ROOT_DIR + r"/data/water_mask/temp" self.OUTPUT_IMAGE_DIR = self.ROOT_DIR + r"/data/output_img" self.WORKER_ROOT = self.ROOT_DIR + r"/data" - self.MODEL_DIR = self.ROOT_DIR + r"/local_dir/datasets/logs" + self.MODEL_DIR = self.ROOT_DIR + r"/data/ray_model_weights" self.RAY_SHAPEFILES = self.ROOT_DIR + r"/data/ray_shapefiles" + self.MODEL_WEIGHTS_DIR = self.ROOT_DIR + r"/data/ray_model_weights" # ADDED to include inference cleaning post-processing self.CLEAN_DATA_DIR = self.ROOT_DIR + r"/data/cln_data" diff --git a/mpl_workflow_create_dir_struct.py b/mpl_workflow_create_dir_struct.py index 9d5f6ca..a4e3461 100644 --- a/mpl_workflow_create_dir_struct.py +++ b/mpl_workflow_create_dir_struct.py @@ -102,6 +102,9 @@ def create_maple_dir_structure(): create_dir_path(os.path.join(config.WORKER_ROOT, "neighbors/")) create_dir_path(os.path.join(config.WORKER_ROOT, "projected_shp/")) create_dir_path(config.CLEAN_DATA_DIR) + create_dir_path(config.MODEL_DIR) + create_dir_path(config.MODEL_WEIGHTS_DIR) + create_dir_path(config.RAY_SHAPEFILES) if __name__ == "__main__": diff --git a/ray_kuberay_submit.py b/ray_kuberay_submit.py new file mode 100644 index 0000000..96c95eb --- /dev/null +++ b/ray_kuberay_submit.py @@ -0,0 +1,66 @@ +import argparse +import logging +import time + +from ray.job_submission import JobSubmissionClient, JobStatus + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Launch Ray job using JobSubmissionClient" + ) + + # Required Arguments + parser.add_argument( + "--host", + required=True, + default="http://localhost:8000/", + help="Clowder host", + ) + + parser.add_argument( + "--dataset_id", + required=True, + help="Clowder dataset from which to get input files and model weights", + ) + + parser.add_argument( + "--key", + required=True, + help="Clowder API key", + ) + + parser.add_argument( + "--env", + required=True, + default="environment_maple.yml", + help="Conda yml environment", + ) + args = parser.parse_args() + + client = JobSubmissionClient("http://127.0.0.1:8265") + job_id = client.submit_job( + # Entrypoint shell command to execute + entrypoint=f"python ray_maple_workflow.py --host {args.host} --dataset_id {args.dataset_id} --key {args.key}", + # Path to the local directory that contains the script.py file + runtime_env={"working_dir": "./", "conda": args.env} + ) + print(job_id) + + + def wait_until_status(job_id, status_to_wait_for, timeout_seconds=10000): + start = time.time() + while time.time() - start <= timeout_seconds: + status = client.get_job_status(job_id) + print(f"status: {status}") + if status in status_to_wait_for: + break + time.sleep(1) + + + wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}) + logs = client.get_job_logs(job_id) + print(logs) + + # Finish + logging.warning("Done") diff --git a/ray_maple_workflow.py b/ray_maple_workflow.py index ba8c18f..c3206b0 100644 --- a/ray_maple_workflow.py +++ b/ray_maple_workflow.py @@ -15,6 +15,7 @@ import argparse import os +import sys from typing import Any, Dict import tensorflow as tf @@ -25,6 +26,8 @@ import ray_infer_tiles as ray_inference import ray_write_shapefiles import ray_tile_and_stitch_util +from mpl_workflow_create_dir_struct import create_maple_dir_structure +from ray_stage_input import download_input_files_clowder, download_model_weights_clowder, upload_output_clowder def create_geotiff_images_dataset(input_image_dir: str) -> ray.data.Dataset: @@ -42,6 +45,26 @@ def add_image_name(row: Dict[str, Any]) -> Dict[str, Any]: description="Extract IWPs from satellite image scenes using MAPLE." ) + # Required Arguments + parser.add_argument( + "--host", + required=True, + default="http://localhost:8000/", + help="Clowder host", + ) + + parser.add_argument( + "--dataset_id", + required=True, + help="Clowder dataset from which to get input files and model weights", + ) + + parser.add_argument( + "--key", + required=True, + help="Clowder API key", + ) + # Optional Arguments parser.add_argument( "--image", @@ -62,7 +85,7 @@ def add_image_name(row: Dict[str, Any]) -> Dict[str, Any]: parser.add_argument( "--weight_file", required=False, - default="hyp_best_train_weights_final.h5", + default="data/ray_model_weights/hyp_best_train_weights_final.h5", help="The file path to where the model weights can be found. Should be " "relative to the root directory.", ) @@ -83,6 +106,18 @@ def add_image_name(row: Dict[str, Any]) -> Dict[str, Any]: config = MPL_Config( args.root_dir, args.weight_file, num_gpus_per_core=args.gpus_per_core ) + host = args.host + dataset_id = args.dataset_id + key = args.key + + print("Create required directory structure") + create_maple_dir_structure() + + print("Stage input files locally") + download_input_files_clowder(host, dataset_id, key, config.INPUT_IMAGE_DIR) + + print("Stage model weights locally") + download_model_weights_clowder(host, dataset_id, key, config.MODEL_WEIGHTS_DIR) print("0. Load geotiffs into ray dataset") dataset = create_geotiff_images_dataset( @@ -109,6 +144,8 @@ def add_image_name(row: Dict[str, Any]) -> Dict[str, Any]: print("5. Write shapefiles") shapefiles_dataset = data_per_image.map( fn=ray_write_shapefiles.WriteShapefiles, fn_constructor_kwargs={"shpfile_output_dir": config.RAY_SHAPEFILES}, concurrency=2) + print("6. Upload shapefiles") + upload_output_clowder(host, dataset_id, key, config.RAY_SHAPEFILES) print("Done writing shapefiles", shapefiles_dataset.schema()) # Once you are done you can check the output on ArcGIS (win) or else you can check in QGIS (nx) Add the image and the diff --git a/ray_stage_input.py b/ray_stage_input.py new file mode 100644 index 0000000..d50654d --- /dev/null +++ b/ray_stage_input.py @@ -0,0 +1,74 @@ +import json +import os +import posixpath + +import requests + + +def download_input_files_clowder(host, dataset_id, key, directory): + headers = {'Content-Type': 'application/json', + 'X-API-KEY': key} + url = posixpath.join(host, "api/v2/datasets/%s/files" % dataset_id) + + result = requests.get(url, headers=headers) + result.raise_for_status() + + files = json.loads(result.text)["data"] + print(files) + + # # Loop through dataset and download all file "locally" + for file_dict in files: + # Use the correct key depending on the Clowder version + extension = "." + file_dict['content_type']['content_type'].split("/")[1] + if file_dict['name'] != 'hyp_best_train_weights_final.h5': + _download_file(host, key, file_dict['id'], file_dict['name'], directory) + + +def download_model_weights_clowder (host, dataset_id, key, directory): + headers = {'Content-Type': 'application/json', + 'X-API-KEY': key} + url = posixpath.join(host, "api/v2/datasets/%s/files" % dataset_id) + + result = requests.get(url, headers=headers) + result.raise_for_status() + + files = json.loads(result.text)["data"] + print(files) + + for file_dict in files: + # Use the correct key depending on the Clowder version + extension = "." + file_dict['content_type']['content_type'].split("/")[1] + if file_dict['name'] == 'hyp_best_train_weights_final.h5': + _download_file(host, key, file_dict['id'], file_dict['name'], directory) + + +def upload_output_clowder(host, dataset_id, key, directory): + headers = {'Content-Type': 'application/json', + 'X-API-KEY': key} + url = posixpath.join(host, "api/v2/datasets/%s/filesMultiple" % dataset_id) + + files = [] + dir_list = os.listdir(directory) + print("Output files found: ", dir_list) + for file in dir_list: + files.append(("files", open(file, "rb"))) + response = requests.post( + url, + headers=headers, + files=files, + ) + response.raise_for_status() + + +def _download_file(host, key, file_id, name, directory): + url = posixpath.join(host, f'api/v2/files/{file_id}') + headers = {"X-API-KEY": key} + file_path = os.path.join(directory, name) + + r = requests.get(url, stream=True, headers=headers) + + with open(file_path, 'wb') as f: + for chunk in r.iter_content(chunk_size=16 * 1024): + f.write(chunk) + f.close() +