From 659d0939a243863115adc18afc9f75b45f314d88 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Wed, 15 Jan 2025 15:10:22 -0500 Subject: [PATCH 01/10] Add etcd to the deployment --- deploy/docker-compose-CeleryExecutor.yml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/deploy/docker-compose-CeleryExecutor.yml b/deploy/docker-compose-CeleryExecutor.yml index 5324e8c..41efd58 100644 --- a/deploy/docker-compose-CeleryExecutor.yml +++ b/deploy/docker-compose-CeleryExecutor.yml @@ -215,6 +215,18 @@ services: <<: *airflow-common-env command: python utils/memory_monitor.py amqp://rabbitmq worker-client-queue + etcd: + image: quay.io/coreos/etcd:v3.5.17 + ports: + - "2379:2379/tcp" + command: > + /usr/local/bin/etcd + --data-dir /var/lib/etcd + --enable-v2 + --listen-client-urls http://0.0.0.0:2379 + --advertise-client-urls http://0.0.0.0:2379 + --initial-cluster-state new + proxy: image: nginx:1.23.0-alpine environment: From a582934c9348f8bc680ad267f1836689900e0d36 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Thu, 16 Jan 2025 16:12:40 -0500 Subject: [PATCH 02/10] Use host network for training container Easier for the trainers to communicate with each other --- dags/training.py | 1 + dags/worker_op.py | 1 + 2 files changed, 2 insertions(+) diff --git a/dags/training.py b/dags/training.py index 4cfaedf..333f8e0 100644 --- a/dags/training.py +++ b/dags/training.py @@ -104,6 +104,7 @@ def training_op(dag: DAG, queue="deepem-gpu") -> Operator: dag=dag, qos=False, shm_size=4 * (2 ** 30), # 4 GB + network_mode="host", ) diff --git a/dags/worker_op.py b/dags/worker_op.py index 3a4ac40..17ed0a0 100644 --- a/dags/worker_op.py +++ b/dags/worker_op.py @@ -26,4 +26,5 @@ def worker_op(**kwargs): retry_delay=kwargs.get("retry_delay", default_args.get("retry_delay", 60)), retry_exponential_backoff=kwargs.get("retry_exponential_backoff", default_args.get("retry_exponential_backoff", False)), shm_size=kwargs.get("shm_size", None), + network_mode=kwargs.get("network_mode", None), ) From 15bd9f5aaf60ad9cbb4aabacb1d1f3a1b5de237e Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Thu, 16 Jan 2025 16:18:07 -0500 Subject: [PATCH 03/10] Disable ipv6 on the workers This is for DDP, for some reason DDP prefer to use ipv6 to communicate, disable ipv6 is simpler than extending the deployment to support ipv6. --- cloud/google/workers.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cloud/google/workers.py b/cloud/google/workers.py index ed340bf..eb9aacd 100644 --- a/cloud/google/workers.py +++ b/cloud/google/workers.py @@ -26,6 +26,13 @@ def GenerateWorkerStartupScript(context, hostname_nfs_server, env_variables, cmd startup_script = f''' #!/bin/bash set -e + +echo "net.ipv6.conf.all.disable_ipv6 = 1" >> /etc/sysctl.conf +echo "net.ipv6.conf.default.disable_ipv6 = 1" >> /etc/sysctl.conf +echo "net.ipv6.conf.lo.disable_ipv6 = 1" >> /etc/sysctl.conf + +sysctl -p + mount -t tmpfs -o size=80%,noatime tmpfs /tmp mkdir -p /var/log/airflow/logs chmod 777 /var/log/airflow/logs From b8dde297eec29d781fc6dced49ee453bd6997c68 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Thu, 16 Jan 2025 16:25:48 -0500 Subject: [PATCH 04/10] DDP support for deepem training Use torchrun to setup the environment and launch training script --- dags/training.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/dags/training.py b/dags/training.py index 333f8e0..3f2d71d 100644 --- a/dags/training.py +++ b/dags/training.py @@ -3,6 +3,7 @@ import json import os +import uuid from datetime import datetime from airflow import DAG @@ -19,8 +20,12 @@ PARAM = Variable.get("training_param", {}, deserialize_json=True) DEEPEM_IMAGE = PARAM.get("deepem_image", "zettaai/deepem") -SKIP_EXPORT = PARAM.pop("skip_export", False) +if "rdzv_id" not in PARAM: + PARAM["rdzv_id"] = str(uuid.uuid4()) + Variable.set("training_param", PARAM, serialize_json=True) + +SKIP_EXPORT = PARAM.pop("skip_export", False) default_args = dict( owner="seuronbot", @@ -59,7 +64,12 @@ def prep_parameters() -> dict: return param -def make_argstr(param: dict) -> str: +def make_argstr(param: dict, num_trainers: int, rank: int, rdzv_id: str) -> str: + + launch_command = ["torchrun", f"--nproc_per_node={len(param['gpu_ids'])}", + f"--nnodes={num_trainers}", f"--node_rank={rank}", f"--rdzv_id={rdzv_id}", + "--rdzv_backend=etcd-v2", f"--rdzv_endpoint={os.environ['REDIS_SERVER']}:2379", + "/DeepEM/deepem/train/run.py"] def format_arg(item) -> str: k, v = item @@ -72,15 +82,17 @@ def format_arg(item) -> str: else: return f"--{k} {v}" - return " ".join(map(format_arg, param.items())) + return " ".join(launch_command + list(map(format_arg, param.items()))) -def training_op(dag: DAG, queue="deepem-gpu") -> Operator: +def training_op(dag: DAG, rank=0, queue="deepem-gpu") -> Operator: param = prep_parameters() wandb_api_key = param.pop("WANDB_API_KEY", None) environment = {"WANDB_API_KEY": wandb_api_key} if wandb_api_key else None + num_trainers = param.pop("NUM_TRAINERS", 1) + rdzv_id = param.pop("rdzv_id", None) # these variables will be mounted in the containers mount_secrets = param.pop("MOUNT_SECRETS", []) variables = [] @@ -90,8 +102,8 @@ def training_op(dag: DAG, queue="deepem-gpu") -> Operator: return worker_op( variables=variables, mount_point=param.pop("MOUNT_PATH", default_mount_path), - task_id="training", - command=make_argstr(param), + task_id=f"training_{rank}", + command=make_argstr(param, num_trainers, rank, rdzv_id), use_gpus=True, environment=environment, force_pull=True, @@ -156,11 +168,12 @@ def report_model() -> None: ) collect_metrics = collect_metrics_op(training_dag) -scale_up = scale_up_cluster_op(training_dag, "training", "deepem-gpu", 1, 1, "cluster") +num_trainers = PARAM["NUM_TRAINERS"] +scale_up = scale_up_cluster_op(training_dag, "training", "deepem-gpu", num_trainers, num_trainers, "cluster") scale_down = scale_down_cluster_op( training_dag, "training", "deepem-gpu", 0, "cluster", trigger_rule="all_done" ) -training = training_op(training_dag) +training = [training_op(training_dag, i) for i in range(num_trainers)] report_training = PythonOperator( task_id="report_model", python_callable=report_model, From 513e07d7e18de7ab5e8443a24383975f4d0f9570 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Thu, 16 Jan 2025 21:26:39 -0500 Subject: [PATCH 05/10] Reset rdzv_id when rank 0 node failed It is does not necessarily run the rank 0 processes, but should still be fine --- dags/training.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dags/training.py b/dags/training.py index 3f2d71d..c46e775 100644 --- a/dags/training.py +++ b/dags/training.py @@ -36,6 +36,13 @@ ) +def reset_rdzv_id(context): + from airflow.models import Variable + param = Variable.get("training_param", {}, deserialize_json=True) + param["rdzv_id"] = str(uuid.uuid4()) + Variable.set("training_param", param, serialize_json=True) + + def prep_parameters() -> dict: """Modify the user-supplied parameters to be used as a command for DeepEM.""" param = PARAM.copy() @@ -107,6 +114,7 @@ def training_op(dag: DAG, rank=0, queue="deepem-gpu") -> Operator: use_gpus=True, environment=environment, force_pull=True, + on_retry_callback=reset_rdzv_id if rank == 0 else None, on_failure_callback=task_failure_alert, on_success_callback=task_done_alert, image=DEEPEM_IMAGE, From 81da544250d1ae2b31444fff9463497f08c69133 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Wed, 19 Mar 2025 13:38:17 -0400 Subject: [PATCH 06/10] Common variable for deepem queue --- dags/training.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dags/training.py b/dags/training.py index c46e775..dc48b46 100644 --- a/dags/training.py +++ b/dags/training.py @@ -20,6 +20,7 @@ PARAM = Variable.get("training_param", {}, deserialize_json=True) DEEPEM_IMAGE = PARAM.get("deepem_image", "zettaai/deepem") +training_cluster = "deepem-gpu" if "rdzv_id" not in PARAM: PARAM["rdzv_id"] = str(uuid.uuid4()) @@ -92,7 +93,7 @@ def format_arg(item) -> str: return " ".join(launch_command + list(map(format_arg, param.items()))) -def training_op(dag: DAG, rank=0, queue="deepem-gpu") -> Operator: +def training_op(dag: DAG, rank=0, queue=training_cluster) -> Operator: param = prep_parameters() wandb_api_key = param.pop("WANDB_API_KEY", None) @@ -177,9 +178,9 @@ def report_model() -> None: collect_metrics = collect_metrics_op(training_dag) num_trainers = PARAM["NUM_TRAINERS"] -scale_up = scale_up_cluster_op(training_dag, "training", "deepem-gpu", num_trainers, num_trainers, "cluster") +scale_up = scale_up_cluster_op(training_dag, "training", training_cluster, num_trainers, num_trainers, "cluster") scale_down = scale_down_cluster_op( - training_dag, "training", "deepem-gpu", 0, "cluster", trigger_rule="all_done" + training_dag, "training", training_cluster, 0, "cluster", trigger_rule="all_done" ) training = [training_op(training_dag, i) for i in range(num_trainers)] report_training = PythonOperator( From 31a74398f544c28ee34f851a3970f83b94938528 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Wed, 19 Mar 2025 13:40:52 -0400 Subject: [PATCH 07/10] Limit the number of nodes to the cluster size limit --- dags/training.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/dags/training.py b/dags/training.py index dc48b46..595f724 100644 --- a/dags/training.py +++ b/dags/training.py @@ -10,6 +10,7 @@ from airflow.utils.weight_rule import WeightRule from airflow.operators.python import PythonOperator from airflow.models import Variable, BaseOperator as Operator +from airflow.hooks.base_hook import BaseHook from worker_op import worker_op from helper_ops import scale_up_cluster_op, scale_down_cluster_op, collect_metrics_op @@ -20,8 +21,11 @@ PARAM = Variable.get("training_param", {}, deserialize_json=True) DEEPEM_IMAGE = PARAM.get("deepem_image", "zettaai/deepem") +cluster_info = json.loads(BaseHook.get_connection("InstanceGroups").extra) training_cluster = "deepem-gpu" +max_trainers = sum(c['max_size'] for c in cluster_info[training_cluster]) + if "rdzv_id" not in PARAM: PARAM["rdzv_id"] = str(uuid.uuid4()) Variable.set("training_param", PARAM, serialize_json=True) @@ -99,7 +103,7 @@ def training_op(dag: DAG, rank=0, queue=training_cluster) -> Operator: wandb_api_key = param.pop("WANDB_API_KEY", None) environment = {"WANDB_API_KEY": wandb_api_key} if wandb_api_key else None - num_trainers = param.pop("NUM_TRAINERS", 1) + num_trainers = min(param.pop("NUM_TRAINERS", 1), max_trainers) rdzv_id = param.pop("rdzv_id", None) # these variables will be mounted in the containers mount_secrets = param.pop("MOUNT_SECRETS", []) @@ -177,7 +181,8 @@ def report_model() -> None: ) collect_metrics = collect_metrics_op(training_dag) -num_trainers = PARAM["NUM_TRAINERS"] +num_trainers = min(PARAM.get("NUM_TRAINERS", 1), max_trainers) + scale_up = scale_up_cluster_op(training_dag, "training", training_cluster, num_trainers, num_trainers, "cluster") scale_down = scale_down_cluster_op( training_dag, "training", training_cluster, 0, "cluster", trigger_rule="all_done" From b301cb80846158c5794a53be741ba8b60ba4e38e Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Wed, 19 Mar 2025 13:47:45 -0400 Subject: [PATCH 08/10] Compatible with old deepem images If a custom entrypoint is set in the image, do not launch with torchrun --- common/docker_helper.py | 14 ++++++++++++++ dags/training.py | 12 ++++++++---- slackbot/training_commands.py | 11 +++++++++++ 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/common/docker_helper.py b/common/docker_helper.py index 9026706..a9bd54e 100644 --- a/common/docker_helper.py +++ b/common/docker_helper.py @@ -13,6 +13,20 @@ def health_check_info(image_name): return False +def has_custom_entrypoint(image_name): + image = pull_image(image_name) + try: + entrypoint = image.attrs.get("Config", {}).get("Entrypoint", None) + + if entrypoint: + return True + else: + return False + except Exception as e: + print(f"Error: {e}") + return True + + def pull_image(image_name): import docker import traceback diff --git a/dags/training.py b/dags/training.py index 595f724..3ac7516 100644 --- a/dags/training.py +++ b/dags/training.py @@ -78,10 +78,14 @@ def prep_parameters() -> dict: def make_argstr(param: dict, num_trainers: int, rank: int, rdzv_id: str) -> str: - launch_command = ["torchrun", f"--nproc_per_node={len(param['gpu_ids'])}", - f"--nnodes={num_trainers}", f"--node_rank={rank}", f"--rdzv_id={rdzv_id}", - "--rdzv_backend=etcd-v2", f"--rdzv_endpoint={os.environ['REDIS_SERVER']}:2379", - "/DeepEM/deepem/train/run.py"] + torchrun_launcher = param.pop("TORCHRUN_LAUNCHER", None) + if torchrun_launcher: + launch_command = ["torchrun", f"--nproc_per_node={len(param['gpu_ids'])}", + f"--nnodes={num_trainers}", f"--node_rank={rank}", f"--rdzv_id={rdzv_id}", + "--rdzv_backend=etcd-v2", f"--rdzv_endpoint={os.environ['REDIS_SERVER']}:2379", + "/DeepEM/deepem/train/run.py"] + else: + launch_command = [] def format_arg(item) -> str: k, v = item diff --git a/slackbot/training_commands.py b/slackbot/training_commands.py index c0eda31..4a6919a 100644 --- a/slackbot/training_commands.py +++ b/slackbot/training_commands.py @@ -9,6 +9,7 @@ from airflow_api import run_dag from bot_utils import replyto, download_json from airflow_api import get_variable, set_variable +from common import docker_helper @SeuronBot.on_message("update training parameters", @@ -25,6 +26,16 @@ def update_training_parameters(msg: dict) -> None: except Exception as e: replyto(msg, f"Error parsing parameters: {e}") + replyto(msg, "Download deepem image and check for custom entrypoint") + deepem_image = json_obj.get("deepem_image", "zettaai/deepem") + if docker_helper.has_custom_entrypoint(deepem_image): + replyto(msg, ":disappointed:Custom entrypoint found, disable DDP") + json_obj["TORCHRUN_LAUNCHER"] = False + json_obj["NUM_TRAINERS"] = 1 + else: + replyto(msg, ":cool:Launch training script with torchrun") + json_obj["TORCHRUN_LAUNCHER"] = True + set_variable("training_param", json_obj, serialize_json=True) replyto(msg, "Parameters successfully updated") From b2483b1e080e1d14f93534ac63093fcc4c4fb827 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Wed, 19 Mar 2025 23:21:49 -0400 Subject: [PATCH 09/10] Supply default gpuids --- dags/training.py | 6 ++++++ pipeline/init_pipeline.py | 2 ++ 2 files changed, 8 insertions(+) diff --git a/dags/training.py b/dags/training.py index 3ac7516..9af9c3c 100644 --- a/dags/training.py +++ b/dags/training.py @@ -30,6 +30,12 @@ PARAM["rdzv_id"] = str(uuid.uuid4()) Variable.set("training_param", PARAM, serialize_json=True) +if "gpu_ids" not in PARAM: + num_gpus = cluster_info[training_cluster][0]['gpuWorkerAcceleratorCount'] + PARAM["gpu_ids"] = list(range(num_gpus)) + Variable.set("training_param", PARAM, serialize_json=True) + + SKIP_EXPORT = PARAM.pop("skip_export", False) default_args = dict( diff --git a/pipeline/init_pipeline.py b/pipeline/init_pipeline.py index d8929ff..6cfe977 100644 --- a/pipeline/init_pipeline.py +++ b/pipeline/init_pipeline.py @@ -44,6 +44,8 @@ def parse_metadata(): worker_setting['workerConcurrencies'] = c['workerConcurrencies'] else: worker_setting['concurrency'] = c.get('concurrency', 1) + if c['type'] == 'deepem-gpu': + worker_setting['gpuWorkerAcceleratorCount'] = c.get('gpuWorkerAcceleratorCount', 1) instance_groups[c['type']].append(worker_setting) elif item["key"] == "easyseg-worker": worker = json.loads(item["value"]) From 5246afd4a73e60545d5dd64fa8ef9f1cf5696d7f Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Fri, 21 Mar 2025 13:47:54 -0400 Subject: [PATCH 10/10] Skip all training tasks if one fails --- dags/training.py | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/dags/training.py b/dags/training.py index 9af9c3c..902ae47 100644 --- a/dags/training.py +++ b/dags/training.py @@ -11,6 +11,8 @@ from airflow.operators.python import PythonOperator from airflow.models import Variable, BaseOperator as Operator from airflow.hooks.base_hook import BaseHook +from airflow.utils.state import State +from airflow.models import TaskInstance from worker_op import worker_op from helper_ops import scale_up_cluster_op, scale_down_cluster_op, collect_metrics_op @@ -47,6 +49,36 @@ ) +def skip_parallel_tasks(context): + task_failure_alert(context) + + slack_message(":exclamation: Stop the rest of training nodes...") + + task_instance = context['task_instance'] + dag_run = context['dag_run'] + + # Get all tasks in the parallel_tasks group + parallel_task_ids = [ + t.task_id for t in dag_run.dag.tasks + if t.task_id.startswith('training_') and t.task_id != task_instance.task_id + ] + + # Mark all other running parallel tasks as skipped + for task_id in parallel_task_ids: + ti = TaskInstance.get_task_instance( + task_id=task_id, + dag_id=dag_run.dag_id, + run_id=dag_run.run_id, + map_index=-1, + ) + + # Only modify tasks that aren't already in a terminal state + if ti and ti.state not in State.finished: + ti.set_state(State.SKIPPED) + + slack_message(":exclamation: Training cluster stopped") + + def reset_rdzv_id(context): from airflow.models import Variable param = Variable.get("training_param", {}, deserialize_json=True) @@ -130,7 +162,7 @@ def training_op(dag: DAG, rank=0, queue=training_cluster) -> Operator: environment=environment, force_pull=True, on_retry_callback=reset_rdzv_id if rank == 0 else None, - on_failure_callback=task_failure_alert, + on_failure_callback=skip_parallel_tasks, on_success_callback=task_done_alert, image=DEEPEM_IMAGE, priority_weight=100_000,