Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submit to Ray cluster #20

Draft
wants to merge 2 commits into
base: try-ray-inference
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
# Original source: https://github.com/github/gitignore/blob/main/Python.gitignore

# Needed for ray exclusion
.git

# Exclude data folder
data/

# Exclude model
hyp_best_train_weights_final.h5

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down Expand Up @@ -71,3 +77,6 @@ venv.bak/

# Cython debug symbols
cython_debug/

# PyCharm settings
.idea/
4 changes: 2 additions & 2 deletions environment_maple.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ dependencies:
- shapely
- pip=23.3.1
- pip:
- tensorflow[and-cuda]==2.15.1
- tensorflow[and-cuda]
- opencv-python
- ray
- pandas==2.1.4
- pandas
- pyarrow
17 changes: 17 additions & 0 deletions environment_maple_delta.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: maple_py310_ray
channels:
- defaults
- conda-forge
dependencies:
- gdal
- fiona
- pyshp
- scikit-image
- shapely
- pip=23.3.1
- tensorflow
- pandas
- pyarrow
- requests
- pip:
- ray[default]
17 changes: 17 additions & 0 deletions environment_maple_osx-64.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: maple_py310_ray_osx-64
channels:
- defaults
dependencies:
- python=3.10
- gdal
- fiona
- pyshp
- scikit-image
- shapely
- pip=23.3.1
- pip:
- tensorflow
- opencv-python
- ray[default]
- pandas==2.1.4
- pyarrow
17 changes: 17 additions & 0 deletions environment_maple_radiant_pdg.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: maple_py310_ray
channels:
- defaults
- conda-forge
dependencies:
- python=3.10
- fiona
- pyshp
- scikit-image
- shapely
- pip=23.3.1
- pip:
- tensorflow==2.15.1
- opencv-python
- ray
- pandas==2.1.4
- pyarrow
3 changes: 2 additions & 1 deletion mpl_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ def __init__(
self.TEMP_W_IMG_DIR = self.ROOT_DIR + r"/data/water_mask/temp"
self.OUTPUT_IMAGE_DIR = self.ROOT_DIR + r"/data/output_img"
self.WORKER_ROOT = self.ROOT_DIR + r"/data"
self.MODEL_DIR = self.ROOT_DIR + r"/local_dir/datasets/logs"
self.MODEL_DIR = self.ROOT_DIR + r"/data/ray_model_weights"
self.RAY_SHAPEFILES = self.ROOT_DIR + r"/data/ray_shapefiles"
self.MODEL_WEIGHTS_DIR = self.ROOT_DIR + r"/data/ray_model_weights"

# ADDED to include inference cleaning post-processing
self.CLEAN_DATA_DIR = self.ROOT_DIR + r"/data/cln_data"
Expand Down
3 changes: 3 additions & 0 deletions mpl_workflow_create_dir_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ def create_maple_dir_structure():
create_dir_path(os.path.join(config.WORKER_ROOT, "neighbors/"))
create_dir_path(os.path.join(config.WORKER_ROOT, "projected_shp/"))
create_dir_path(config.CLEAN_DATA_DIR)
create_dir_path(config.MODEL_DIR)
create_dir_path(config.MODEL_WEIGHTS_DIR)
create_dir_path(config.RAY_SHAPEFILES)


if __name__ == "__main__":
Expand Down
66 changes: 66 additions & 0 deletions ray_kuberay_submit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import argparse
import logging
import time

from ray.job_submission import JobSubmissionClient, JobStatus


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Launch Ray job using JobSubmissionClient"
)

# Required Arguments
parser.add_argument(
"--host",
required=True,
default="http://localhost:8000/",
help="Clowder host",
)

parser.add_argument(
"--dataset_id",
required=True,
help="Clowder dataset from which to get input files and model weights",
)

parser.add_argument(
"--key",
required=True,
help="Clowder API key",
)

parser.add_argument(
"--env",
required=True,
default="environment_maple.yml",
help="Conda yml environment",
)
args = parser.parse_args()

client = JobSubmissionClient("http://127.0.0.1:8265")
job_id = client.submit_job(
# Entrypoint shell command to execute
entrypoint=f"python ray_maple_workflow.py --host {args.host} --dataset_id {args.dataset_id} --key {args.key}",
# Path to the local directory that contains the script.py file
runtime_env={"working_dir": "./", "conda": args.env}
)
print(job_id)


def wait_until_status(job_id, status_to_wait_for, timeout_seconds=10000):
start = time.time()
while time.time() - start <= timeout_seconds:
status = client.get_job_status(job_id)
print(f"status: {status}")
if status in status_to_wait_for:
break
time.sleep(1)


wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})
logs = client.get_job_logs(job_id)
print(logs)

# Finish
logging.warning("Done")
39 changes: 38 additions & 1 deletion ray_maple_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import argparse
import os
import sys
from typing import Any, Dict

import tensorflow as tf
Expand All @@ -25,6 +26,8 @@
import ray_infer_tiles as ray_inference
import ray_write_shapefiles
import ray_tile_and_stitch_util
from mpl_workflow_create_dir_struct import create_maple_dir_structure
from ray_stage_input import download_input_files_clowder, download_model_weights_clowder, upload_output_clowder


def create_geotiff_images_dataset(input_image_dir: str) -> ray.data.Dataset:
Expand All @@ -42,6 +45,26 @@ def add_image_name(row: Dict[str, Any]) -> Dict[str, Any]:
description="Extract IWPs from satellite image scenes using MAPLE."
)

# Required Arguments
parser.add_argument(
"--host",
required=True,
default="http://localhost:8000/",
help="Clowder host",
)

parser.add_argument(
"--dataset_id",
required=True,
help="Clowder dataset from which to get input files and model weights",
)

parser.add_argument(
"--key",
required=True,
help="Clowder API key",
)

# Optional Arguments
parser.add_argument(
"--image",
Expand All @@ -62,7 +85,7 @@ def add_image_name(row: Dict[str, Any]) -> Dict[str, Any]:
parser.add_argument(
"--weight_file",
required=False,
default="hyp_best_train_weights_final.h5",
default="data/ray_model_weights/hyp_best_train_weights_final.h5",
help="The file path to where the model weights can be found. Should be "
"relative to the root directory.",
)
Expand All @@ -83,6 +106,18 @@ def add_image_name(row: Dict[str, Any]) -> Dict[str, Any]:
config = MPL_Config(
args.root_dir, args.weight_file, num_gpus_per_core=args.gpus_per_core
)
host = args.host
dataset_id = args.dataset_id
key = args.key

print("Create required directory structure")
create_maple_dir_structure()

print("Stage input files locally")
download_input_files_clowder(host, dataset_id, key, config.INPUT_IMAGE_DIR)

print("Stage model weights locally")
download_model_weights_clowder(host, dataset_id, key, config.MODEL_WEIGHTS_DIR)

print("0. Load geotiffs into ray dataset")
dataset = create_geotiff_images_dataset(
Expand All @@ -109,6 +144,8 @@ def add_image_name(row: Dict[str, Any]) -> Dict[str, Any]:
print("5. Write shapefiles")
shapefiles_dataset = data_per_image.map(
fn=ray_write_shapefiles.WriteShapefiles, fn_constructor_kwargs={"shpfile_output_dir": config.RAY_SHAPEFILES}, concurrency=2)
print("6. Upload shapefiles")
upload_output_clowder(host, dataset_id, key, config.RAY_SHAPEFILES)
print("Done writing shapefiles", shapefiles_dataset.schema())

# Once you are done you can check the output on ArcGIS (win) or else you can check in QGIS (nx) Add the image and the
Expand Down
74 changes: 74 additions & 0 deletions ray_stage_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import json
import os
import posixpath

import requests


def download_input_files_clowder(host, dataset_id, key, directory):
headers = {'Content-Type': 'application/json',
'X-API-KEY': key}
url = posixpath.join(host, "api/v2/datasets/%s/files" % dataset_id)

result = requests.get(url, headers=headers)
result.raise_for_status()

files = json.loads(result.text)["data"]
print(files)

# # Loop through dataset and download all file "locally"
for file_dict in files:
# Use the correct key depending on the Clowder version
extension = "." + file_dict['content_type']['content_type'].split("/")[1]
if file_dict['name'] != 'hyp_best_train_weights_final.h5':
_download_file(host, key, file_dict['id'], file_dict['name'], directory)


def download_model_weights_clowder (host, dataset_id, key, directory):
headers = {'Content-Type': 'application/json',
'X-API-KEY': key}
url = posixpath.join(host, "api/v2/datasets/%s/files" % dataset_id)

result = requests.get(url, headers=headers)
result.raise_for_status()

files = json.loads(result.text)["data"]
print(files)

for file_dict in files:
# Use the correct key depending on the Clowder version
extension = "." + file_dict['content_type']['content_type'].split("/")[1]
if file_dict['name'] == 'hyp_best_train_weights_final.h5':
_download_file(host, key, file_dict['id'], file_dict['name'], directory)


def upload_output_clowder(host, dataset_id, key, directory):
headers = {'Content-Type': 'application/json',
'X-API-KEY': key}
url = posixpath.join(host, "api/v2/datasets/%s/filesMultiple" % dataset_id)

files = []
dir_list = os.listdir(directory)
print("Output files found: ", dir_list)
for file in dir_list:
files.append(("files", open(file, "rb")))
response = requests.post(
url,
headers=headers,
files=files,
)
response.raise_for_status()


def _download_file(host, key, file_id, name, directory):
url = posixpath.join(host, f'api/v2/files/{file_id}')
headers = {"X-API-KEY": key}
file_path = os.path.join(directory, name)

r = requests.get(url, stream=True, headers=headers)

with open(file_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=16 * 1024):
f.write(chunk)
f.close()