From 694c9702ba3cdbf24fd7529c5a5168cbe4bdc4ec Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Sat, 22 Feb 2025 16:36:02 +0800 Subject: [PATCH 01/15] docs: Add the Slurm agent fn task example usage Signed-off-by: JiangJiaWei1103 --- examples/slurm_agent/README.md | 7 ++ examples/slurm_agent/requirements.in | 1 + examples/slurm_agent/slurm_agent/__init__.py | 0 .../slurm_agent/slurm_agent_example_usage.py | 94 +++++++++++++++++++ 4 files changed, 102 insertions(+) create mode 100644 examples/slurm_agent/README.md create mode 100644 examples/slurm_agent/requirements.in create mode 100644 examples/slurm_agent/slurm_agent/__init__.py create mode 100644 examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py diff --git a/examples/slurm_agent/README.md b/examples/slurm_agent/README.md new file mode 100644 index 000000000..641a19b11 --- /dev/null +++ b/examples/slurm_agent/README.md @@ -0,0 +1,7 @@ +(slurm_agent)= + +# Slurm agent + +```{eval-rst} +.. tags:: Integration, HighPerformanceComputing, Advanced +``` \ No newline at end of file diff --git a/examples/slurm_agent/requirements.in b/examples/slurm_agent/requirements.in new file mode 100644 index 000000000..3aa09dcd0 --- /dev/null +++ b/examples/slurm_agent/requirements.in @@ -0,0 +1 @@ +flytekitplugins-slurm diff --git a/examples/slurm_agent/slurm_agent/__init__.py b/examples/slurm_agent/slurm_agent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py b/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py new file mode 100644 index 000000000..9f1941a57 --- /dev/null +++ b/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py @@ -0,0 +1,94 @@ +# %% [markdown] +# (slurm_agent_example_usage)= +# +# # Slurm agent example usage +# The Slurm agent enables seamless integration between Flyte workflows and Slurm-managed high-performance computing (HPC) clusters, allowing users to take advantage of Slurm’s powerful resource allocation, scheduling, and monitoring capabilities. +# +# The following examples demonstrate how to run different types of tasks using the Slurm agent. Let’s start by importing the necessary packages. +# %% +import os + +from flytekit import task, workflow +from flytekitplugins.slurm import SlurmFunction + +# %% [markdown] +# ## Slurm Function Task +# `SlurmFunctionTask` is a highly flexible task type that allows you to run a user-defined task function on a Slurm cluster. To configure this task, you need to specify the following fields: +# * `ssh_config`: Options of SSH client connection. +# * Authentication is done via key pair verification. For available options, please refer to [here](https://github.com/JiangJiaWei1103/flytekit/blob/d0d59d3f809bad89a7567ce49d95c84c3f38bf5f/plugins/flytekit-slurm/flytekitplugins/slurm/ssh_utils.py#L21-L39). +# * `sbatch_conf` (optional): Options of `sbatch` command. If not provided, defaults to an empty dict. +# * For available options, please refer to the [official Slurm documentation](https://slurm.schedmd.com/sbatch.html). +# * `script` (optional): A user-defined script where `{task.fn}` serves as a placeholder for the task function execution. +# * You should insert `{task.fn}` at the desired execution point within the script. If no script is provided, the task function will be executed directly. +# %% +@task( + task_config=SlurmFunction( + ssh_config={ + "host": "aws", + "username": "ubuntu", + "client_keys": ["~/.ssh/private_key.pem"], + }, + sbatch_conf={ + "partition": "debug", + "job-name": "tiny-slurm", + "output": "/home/ubuntu/fn_task.log" + }, + script="""#!/bin/bash -i + +echo [TEST SLURM FN TASK 1] Run the first user-defined task function... + +# Setup env vars +export MY_ENV_VAR=123 + +# Source the virtual env +. /home/ubuntu/.cache/pypoetry/virtualenvs/demo-4A8TrTN7-py3.12/bin/activate + +# Run the user-defined task function +{task.fn} +""" + ) +) +def plus_one(x: int) -> int: + print(os.getenv("MY_ENV_VAR")) + return x + 1 + + +@task( + task_config=SlurmFunction( + ssh_config={ + "host": "aws", + "username": "ubuntu", + }, + script="""#!/bin/bash -i + +echo [TEST SLURM FN TASK 2] Run the second user-defined task function... + +. /home/ubuntu/.cache/pypoetry/virtualenvs/demo-4A8TrTN7-py3.12/bin/activate +{task.fn} +""" + ) +) +def greet(year: int) -> str: + return f"Hello {year}!!!" + + +@workflow +def wf(x: int) -> str: + x = plus_one(x=x) + msg = greet(year=x) + return msg + + +# %% [markdown] +# Finally, you can execute the workflow locally as below: +# %% +if __name__ == "__main__": + from flytekit.clis.sdk_in_container import pyflyte + from click.testing import CliRunner + + runner = CliRunner() + path = os.path.realpath(__file__) + + print(f">>> LOCAL EXEC <<<") + result = runner.invoke(pyflyte.main, ["run", "--raw-output-data-prefix", "s3://my-flyte-slurm-agent", path, "wf", "--x", 2024]) + print(result.output) From e065265cce773d066114951f20d8f0460028e36a Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Tue, 11 Mar 2025 23:52:22 +0800 Subject: [PATCH 02/15] docs: Add example usage for SlurmTask and SlurmShellTask Signed-off-by: JiangJiaWei1103 --- .../slurm_agent/slurm_agent_example_usage.py | 152 +++++++++++++++--- 1 file changed, 132 insertions(+), 20 deletions(-) diff --git a/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py b/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py index 9f1941a57..3d64cdc48 100644 --- a/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py +++ b/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py @@ -1,28 +1,73 @@ # %% [markdown] # (slurm_agent_example_usage)= # -# # Slurm agent example usage +# # Slurm agent example usage # The Slurm agent enables seamless integration between Flyte workflows and Slurm-managed high-performance computing (HPC) clusters, allowing users to take advantage of Slurm’s powerful resource allocation, scheduling, and monitoring capabilities. # -# The following examples demonstrate how to run different types of tasks using the Slurm agent. Let’s start by importing the necessary packages. +# The following examples demonstrate how to run different types of tasks using the Slurm agent, covering both basic and advanced use cases. Let’s start by importing the necessary packages. # %% import os from flytekit import task, workflow -from flytekitplugins.slurm import SlurmFunction +from flytekitplugins.slurm import Slurm, SlurmFunction, SlurmRemoteScript, SlurmShellTask, SlurmTask # %% [markdown] -# ## Slurm Function Task -# `SlurmFunctionTask` is a highly flexible task type that allows you to run a user-defined task function on a Slurm cluster. To configure this task, you need to specify the following fields: +# ## `SlurmTask` +# First, `SlurmTask` is the most basic use case, allowing users to directly run a pre-existing shell script on the Slurm cluster. To configure this task, you need to specify the following fields: # * `ssh_config`: Options of SSH client connection. # * Authentication is done via key pair verification. For available options, please refer to [here](https://github.com/JiangJiaWei1103/flytekit/blob/d0d59d3f809bad89a7567ce49d95c84c3f38bf5f/plugins/flytekit-slurm/flytekitplugins/slurm/ssh_utils.py#L21-L39). +# * `batch_script_path`: Path to the shell script on the Slurm cluster. # * `sbatch_conf` (optional): Options of `sbatch` command. If not provided, defaults to an empty dict. # * For available options, please refer to the [official Slurm documentation](https://slurm.schedmd.com/sbatch.html). -# * `script` (optional): A user-defined script where `{task.fn}` serves as a placeholder for the task function execution. -# * You should insert `{task.fn}` at the desired execution point within the script. If no script is provided, the task function will be executed directly. +# * `batch_script_args` (optional): Additional arguments for the batch script on Slurm cluster. # %% -@task( - task_config=SlurmFunction( +slurm_task = SlurmTask( + name="basic", + task_config=SlurmRemoteScript( + ssh_config={ + "host": "aws", + "username": "ubuntu", + }, + sbatch_conf={ + "partition": "debug", + "job-name": "job0", + }, + batch_script_path="/home/ubuntu/echo.sh", + ), +) + + +@workflow +def wf() -> None: + slurm_task() + + +# %% [markdown] +# Then, you can execute the workflow locally as below: +# %% +if __name__ == "__main__": + from click.testing import CliRunner + from flytekit.clis.sdk_in_container import pyflyte + + runner = CliRunner() + path = os.path.realpath(__file__) + + print(">>> LOCAL EXEC <<<") + result = runner.invoke(pyflyte.main, ["run", path, "wf"]) + print(result.output) + + +# %% [markdown] +# ## `SlurmShellTask` +# Instead of running a pre-existing shell script on the Slurm cluster, `SlurmShellTask` allows users to define the script content within the interface as shown below: +# %% +shell_task = SlurmShellTask( + name="shell", + script="""#!/bin/bash -i + +echo [TEST SLURM SHELL TASK 1] Run the user-defined script... +""", + task_config=Slurm( ssh_config={ "host": "aws", "username": "ubuntu", @@ -30,9 +75,74 @@ }, sbatch_conf={ "partition": "debug", - "job-name": "tiny-slurm", - "output": "/home/ubuntu/fn_task.log" + "job-name": "job1", + }, + ), +) + + +shell_task_with_args = SlurmShellTask( + name="shell", + script="""#!/bin/bash -i + +echo [TEST SLURM SHELL TASK 2] Run the user-defined script with args... +echo Arg1: $1 +echo Arg2: $2 +echo Arg3: $3 +""", + task_config=Slurm( + ssh_config={ + "host": "aws", + "username": "ubuntu", }, + sbatch_conf={ + "partition": "debug", + "job-name": "job2", + }, + batch_script_args=["0", "a", "xyz"], + ), +) + + +@workflow +def wf() -> None: + shell_task() + shell_task_with_args() + + +# %% [markdown] +# Once again, execute the workflow locally to view the results: +# %% +if __name__ == "__main__": + from click.testing import CliRunner + from flytekit.clis.sdk_in_container import pyflyte + + runner = CliRunner() + path = os.path.realpath(__file__) + + print(">>> LOCAL EXEC <<<") + result = runner.invoke(pyflyte.main, ["run", path, "wf"]) + print(result.output) + + +# %% [markdown] +# ## `SlurmFunctionTask` +# Finally, `SlurmFunctionTask` is a highly flexible task type that allows you to run a user-defined task function on a Slurm cluster. To configure this task, you need to specify the following fields: +# * `ssh_config`: Options of SSH client connection. +# * Authentication is done via key pair verification. For available options, please refer to [here](https://github.com/JiangJiaWei1103/flytekit/blob/d0d59d3f809bad89a7567ce49d95c84c3f38bf5f/plugins/flytekit-slurm/flytekitplugins/slurm/ssh_utils.py#L21-L39). +# * `sbatch_conf` (optional): Options of `sbatch` command. If not provided, defaults to an empty dict. +# * For available options, please refer to the [official Slurm documentation](https://slurm.schedmd.com/sbatch.html). +# * `script` (optional): A user-defined script where `{task.fn}` serves as a placeholder for the task function execution. +# * You should insert `{task.fn}` at the desired execution point within the script. If no script is provided, the task function will be executed directly. +# %% +@task( + task_config=SlurmFunction( + ssh_config={ + "host": "aws", + "username": "ubuntu", + "client_keys": ["~/.ssh/private_key.pem"], + }, + sbatch_conf={"partition": "debug", "job-name": "job3", "output": "/home/ubuntu/fn_task.log"}, script="""#!/bin/bash -i echo [TEST SLURM FN TASK 1] Run the first user-defined task function... @@ -41,14 +151,14 @@ export MY_ENV_VAR=123 # Source the virtual env -. /home/ubuntu/.cache/pypoetry/virtualenvs/demo-4A8TrTN7-py3.12/bin/activate +. /home/ubuntu/.cache/pypoetry/virtualenvs/demo-4A8TrTN7-py3.12/bin/activate # Run the user-defined task function {task.fn} -""" +""", ) ) -def plus_one(x: int) -> int: +def plus_one(x: int) -> int: print(os.getenv("MY_ENV_VAR")) return x + 1 @@ -63,9 +173,9 @@ def plus_one(x: int) -> int: echo [TEST SLURM FN TASK 2] Run the second user-defined task function... -. /home/ubuntu/.cache/pypoetry/virtualenvs/demo-4A8TrTN7-py3.12/bin/activate +. /home/ubuntu/.cache/pypoetry/virtualenvs/demo-4A8TrTN7-py3.12/bin/activate {task.fn} -""" +""", ) ) def greet(year: int) -> str: @@ -80,15 +190,17 @@ def wf(x: int) -> str: # %% [markdown] -# Finally, you can execute the workflow locally as below: +# Let's execute the workflow: # %% if __name__ == "__main__": - from flytekit.clis.sdk_in_container import pyflyte from click.testing import CliRunner + from flytekit.clis.sdk_in_container import pyflyte runner = CliRunner() path = os.path.realpath(__file__) - print(f">>> LOCAL EXEC <<<") - result = runner.invoke(pyflyte.main, ["run", "--raw-output-data-prefix", "s3://my-flyte-slurm-agent", path, "wf", "--x", 2024]) + print(">>> LOCAL EXEC <<<") + result = runner.invoke( + pyflyte.main, ["run", "--raw-output-data-prefix", "s3://my-flyte-slurm-agent", path, "wf", "--x", 2024] + ) print(result.output) From 85b700889bfec9a6c726de5af89bf203ceeca4cc Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Thu, 13 Mar 2025 00:18:34 +0800 Subject: [PATCH 03/15] docs: Add install command Signed-off-by: JiangJiaWei1103 --- examples/slurm_agent/README.md | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/examples/slurm_agent/README.md b/examples/slurm_agent/README.md index 641a19b11..743a70024 100644 --- a/examples/slurm_agent/README.md +++ b/examples/slurm_agent/README.md @@ -4,4 +4,22 @@ ```{eval-rst} .. tags:: Integration, HighPerformanceComputing, Advanced -``` \ No newline at end of file +``` + +## Installation + +To install the Slurm agent, run the following command: + +```{eval-rst} +.. prompt:: bash + + pip install flytekitplugins-slurm +``` + +## Example usage + + +## Local testing + + +## Flyte deployment configuration From 142530e81979d410d07b52f3c9c89d8d365f67ce Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Mon, 17 Mar 2025 16:14:24 +0800 Subject: [PATCH 04/15] update host name Signed-off-by: Future-Outlier --- .../slurm_agent/slurm_agent_example_usage.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py b/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py index 3d64cdc48..2aefe49a8 100644 --- a/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py +++ b/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py @@ -25,7 +25,7 @@ name="basic", task_config=SlurmRemoteScript( ssh_config={ - "host": "aws", + "host": "ec2-11-22-33-444.us-west-2.compute.amazonaws.com", "username": "ubuntu", }, sbatch_conf={ @@ -69,7 +69,7 @@ def wf() -> None: """, task_config=Slurm( ssh_config={ - "host": "aws", + "host": "ec2-11-22-33-444.us-west-2.compute.amazonaws.com", "username": "ubuntu", "client_keys": ["~/.ssh/private_key.pem"], }, @@ -92,7 +92,7 @@ def wf() -> None: """, task_config=Slurm( ssh_config={ - "host": "aws", + "host": "ec2-11-22-33-444.us-west-2.compute.amazonaws.com", "username": "ubuntu", }, sbatch_conf={ @@ -138,7 +138,7 @@ def wf() -> None: @task( task_config=SlurmFunction( ssh_config={ - "host": "aws", + "host": "ec2-11-22-33-444.us-west-2.compute.amazonaws.com", "username": "ubuntu", "client_keys": ["~/.ssh/private_key.pem"], }, @@ -166,7 +166,7 @@ def plus_one(x: int) -> int: @task( task_config=SlurmFunction( ssh_config={ - "host": "aws", + "host": "ec2-11-22-33-444.us-west-2.compute.amazonaws.com", "username": "ubuntu", }, script="""#!/bin/bash -i From 90af9e89c7481133c88e8d7c6f5cf14fc5f331ce Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Mon, 17 Mar 2025 23:02:22 +0800 Subject: [PATCH 05/15] refactor: Define unique workflow names Signed-off-by: JiangJiaWei1103 --- .../slurm_agent/slurm_agent_example_usage.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py b/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py index 2aefe49a8..06d0142e7 100644 --- a/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py +++ b/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py @@ -38,7 +38,7 @@ @workflow -def wf() -> None: +def basic_wf() -> None: slurm_task() @@ -53,7 +53,7 @@ def wf() -> None: path = os.path.realpath(__file__) print(">>> LOCAL EXEC <<<") - result = runner.invoke(pyflyte.main, ["run", path, "wf"]) + result = runner.invoke(pyflyte.main, ["run", path, "basic_wf"]) print(result.output) @@ -105,7 +105,7 @@ def wf() -> None: @workflow -def wf() -> None: +def shell_wf() -> None: shell_task() shell_task_with_args() @@ -121,7 +121,7 @@ def wf() -> None: path = os.path.realpath(__file__) print(">>> LOCAL EXEC <<<") - result = runner.invoke(pyflyte.main, ["run", path, "wf"]) + result = runner.invoke(pyflyte.main, ["run", path, "shell_wf"]) print(result.output) @@ -183,7 +183,7 @@ def greet(year: int) -> str: @workflow -def wf(x: int) -> str: +def function_wf(x: int) -> str: x = plus_one(x=x) msg = greet(year=x) return msg @@ -201,6 +201,6 @@ def wf(x: int) -> str: print(">>> LOCAL EXEC <<<") result = runner.invoke( - pyflyte.main, ["run", "--raw-output-data-prefix", "s3://my-flyte-slurm-agent", path, "wf", "--x", 2024] + pyflyte.main, ["run", "--raw-output-data-prefix", "s3://my-flyte-slurm-agent", path, "function_wf", "--x", 2024] ) print(result.output) From d9519bb120b964cb44960781ae6e48ee1be4b2e9 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Mon, 17 Mar 2025 23:30:07 +0800 Subject: [PATCH 06/15] docs: Add three main components for DL training wf example Signed-off-by: JiangJiaWei1103 --- .../slurm_agent/slurm_agent_example_usage.py | 185 ++++++++++++++++++ 1 file changed, 185 insertions(+) diff --git a/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py b/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py index 06d0142e7..333453fcf 100644 --- a/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py +++ b/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py @@ -204,3 +204,188 @@ def function_wf(x: int) -> str: pyflyte.main, ["run", "--raw-output-data-prefix", "s3://my-flyte-slurm-agent", path, "function_wf", "--x", 2024] ) print(result.output) + + +# %% [markdown] +# ## Train and Evaluate a DL Model with `SlurmFunctionTask` +# The following example demonstrates how `SlurmFunctionTask` can be integrated into a standard deep learning model training workflow. At the highest level, this workflow consists of three main components: +# * `dataset`: Manage dataset downloading and data preprocessing (MNIST is used as an example). +# * `model`: Define the deep learning model architecture (e.g., a convolutional neural network). +# * `trainer`: Handle the training process, including `train_epoch` and `eval_epoch`. +# +# Let’s first take a closer look at each component before diving into the main training workflow. +# +# ### Dataset +# %% +from typing import Tuple + +from torch.utils.data import Dataset +from torchvision import datasets, transforms + + +def get_dataset(download_path: str = "/tmp/torch_data") -> Tuple[Dataset, Dataset]: + """Process data and build training and validation sets. + + Args: + download_path: Directory to store the raw data. + + Returns: + A tuple (tr_ds, val_ds), where tr_ds is a training set and val_ds is a valiation set. + """ + # Define data processing pipeline + transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))]) + + tr_ds = datasets.MNIST(root=download_path, train=True, download=True, transform=transform) + val_ds = datasets.MNIST(root=download_path, train=True, download=True, transform=transform) + + return tr_ds, val_ds + + +# %% [markdown] +# ### Model +# %% +from typing import Dict + +import torch.nn as nn +from torch import Tensor + + +class Model(nn.Module): + def __init__(self) -> None: + super(Model, self).__init__() + + self.cnn_encoder = nn.Sequential( + # Block 1 + nn.Conv2d(in_channels=1, out_channels=16, kernel_size=5, padding=0), + nn.ReLU(), + nn.MaxPool2d(kernel_size=2), + # Block 2 + nn.Conv2d(in_channels=16, out_channels=32, kernel_size=5, padding=0), + nn.ReLU(), + nn.MaxPool2d(kernel_size=2), + ) + self.clf = nn.Linear(32 * 4 * 4, 10) + + def forward(self, inputs: Dict[str, Tensor]) -> Tensor: + x = inputs["x"] + bs = x.size(0) + + x = self.cnn_encoder(x) + x = x.reshape(bs, -1) + logits = self.clf(x) + + return logits + + +# %% [markdown] +# ### Trainer +# %% +import gc +from typing import Tuple + +import torch +import torch.nn as nn +from torch.optim import Optimizer +from torch.utils.data import DataLoader +from tqdm import tqdm + + +def train_epoch( + tr_loader: DataLoader, model: nn.Module, loss_fn: nn.Module, optimizer: Optimizer, debug: bool = False +) -> float: + """Run training for one epoch. + + Args: + tr_loader: Training dataloader. + model: Model instance. + loss_fn: Loss criterion. + optimizer: Optimizer. + debug: If True, run one batch only. + + Returns: + The average training loss over batches. + """ + tr_loss_tot = 0.0 + + model.train() + for i, batch_data in tqdm(enumerate(tr_loader), total=len(tr_loader)): + optimizer.zero_grad(set_to_none=True) + + # Retrieve batched raw data + x, y = batch_data + inputs = {"x": x} + + # Forward pass + logits = model(inputs) + + # Derive loss + loss = loss_fn(logits, y) + tr_loss_tot += loss.item() + + # Backpropagation + loss.backward() + + optimizer.step() + + del x, y, inputs, logits + _ = gc.collect() + + if debug: + break + + tr_loss_avg = tr_loss_tot / len(tr_loader) + + return tr_loss_avg + + +@torch.no_grad() +def eval_epoch( + eval_loader: DataLoader, model: nn.Module, loss_fn: nn.Module, debug: bool = False +) -> Tuple[float, float]: + """Run evaluation for one epoch. + + Args: + eval_loader: Evaluation dataloader. + model: Model instance. + loss_fn: Loss criterion. + debug: If True, run one batch only. + + Returns: + A tuple (eval_loss_avg, acc), where eval_loss_avg is the average evaluation loss over batches + and acc is the accuracy. + """ + eval_loss_tot = 0 + y_true, y_pred = [], [] + + model.eval() + for i, batch_data in tqdm(enumerate(eval_loader), total=len(eval_loader)): + # Retrieve batched raw data + x, y = batch_data + inputs = {"x": x} + + # Forward pass + logits = model(inputs) + + # Derive loss + loss = loss_fn(logits, y) + eval_loss_tot += loss.item() + + # Record batched output + y_true.append(y.detach()) + y_pred.append(logits.detach()) + + del x, y, inputs, logits + _ = gc.collect() + + if debug: + break + + eval_loss_avg = eval_loss_tot / len(eval_loader) + + # Derive accuracy + y_true = torch.cat(y_true, dim=0) + y_pred = torch.cat(y_pred, dim=0) + y_pred = torch.argmax(y_pred, dim=1) + acc = (y_true == y_pred).sum() / len(y_true) + + return eval_loss_avg, acc From 8888b3a62b24efcf727d359a67acc0009fb04342 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Mon, 17 Mar 2025 23:48:16 +0800 Subject: [PATCH 07/15] docs: Add DL model training wf Signed-off-by: JiangJiaWei1103 --- .../slurm_agent/slurm_agent_example_usage.py | 214 ++++++++++++++++++ 1 file changed, 214 insertions(+) diff --git a/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py b/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py index 333453fcf..a160fd2c8 100644 --- a/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py +++ b/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py @@ -389,3 +389,217 @@ def eval_epoch( acc = (y_true == y_pred).sum() / len(y_true) return eval_loss_avg, acc + + +# %% [markdown] +# ### Deep Learning Model Training Workflow +# Once the three main components are in place, you can now train your deep learning model using GPUs on the Slurm cluster with the highly flexible `SlurmFunctionTask`. +# %% +import os +from pathlib import Path +from typing import Dict, Optional + +import torch +import torch.nn as nn +import torch.optim as optim +from flytekit import task, workflow +from flytekit.types.file import FlyteFile +from flytekitplugins.slurm import SlurmFunction +from torch.utils.data import DataLoader + + +@task( + task_config=SlurmFunction( + ssh_config={ + "host": "ec2-11-22-33-444.us-west-2.compute.amazonaws.com", + "username": "ubuntu", + }, + sbatch_conf={ + "partition": "debug", + "job-name": "process-data", + "output": "/home/ubuntu/dp.log", + }, + script="""#!/bin/bash -i + +echo "Process and build torch datasets..." +{task.fn} +""", + ) +) +def process_data(raw_data_path: str) -> str: + # Download the MNIST dataset but ignore the torch training and validation datasets, + # which are built in the `train` function below + _ = get_dataset(download_path=raw_data_path) + proc_data_path = raw_data_path + + return proc_data_path + + +@task( + task_config=SlurmFunction( + ssh_config={ + "host": "ec2-11-22-33-444.us-west-2.compute.amazonaws.com", + "username": "ubuntu", + }, + sbatch_conf={ + "partition": "debug", + "job-name": "train-model", + "output": "/home/ubuntu/train.log", + }, + script="""#!/bin/bash -i + +echo "Training process..." +{task.fn} +""", + ) +) +def train( + data_path: str, + epochs: int = 5, + batch_size: int = 32, + lr: float = 1e-3, + ckpt_path: Optional[str] = None, + debug: bool = False, +) -> FlyteFile: + # -------------------------------------------------------------------------------- + # HARD-CODE CUDA DEVICE AND ASSERT IT'S AVAILABLE + # -------------------------------------------------------------------------------- + device = torch.device("cuda") + assert torch.cuda.is_available(), "Requested GPU but no CUDA device found!" + print(f"[train] Using device: {device}") + + ckpt_path = Path("./output") if ckpt_path is None else Path(ckpt_path) + ckpt_path.mkdir(exist_ok=True) + model_path = ckpt_path / "model.pth" + + # Build dataloaders + tr_ds, val_ds = get_dataset(download_path=data_path) + tr_loader = DataLoader(tr_ds, batch_size=batch_size, shuffle=True, drop_last=True) + val_loader = DataLoader(val_ds, batch_size=batch_size * 4, shuffle=False) + + # Build model + model = Model() + + # Builc loss criterion + loss_fn = nn.CrossEntropyLoss() + + # Build solvers + # Optimizer + optimizer = optim.Adam(model.parameters(), lr=lr) + # LR scheduler + # lr_skd = None + + # Run training and evaluation + best_score = 1e16 + for ep in range(epochs): + tr_loss = train_epoch( + tr_loader=tr_loader, + model=model, + loss_fn=loss_fn, + optimizer=optimizer, + debug=debug, + ) + val_loss, acc = eval_epoch(eval_loader=val_loader, model=model, loss_fn=loss_fn, debug=debug) + + # Save model ckpt + if val_loss < best_score: + best_score = val_loss + torch.save(model.state_dict(), model_path) + + print(f"Epoch [{ep+1}/{epochs}] TRAIN LOSS {tr_loss:.4f} | VAL LOSS {val_loss:.4f} | ACC {acc:.4f}") + + return FlyteFile(path=model_path) + + +@task( + task_config=SlurmFunction( + ssh_config={ + "host": "ec2-11-22-33-444.us-west-2.compute.amazonaws.com", + "username": "ubuntu", + }, + sbatch_conf={ + "partition": "debug", + "job-name": "eval-model", + "output": "/home/ubuntu/eval.log", + "gres": "gpu:1", + }, + script="""#!/bin/bash -i + +echo "Evaluation process..." +{task.fn} +""", + ) +) +@torch.no_grad() +def run_infer(data_path: str, model_path: FlyteFile) -> Dict[str, float]: + # -------------------------------------------------------------------------------- + # HARD-CODE CUDA DEVICE AND ASSERT IT'S AVAILABLE + # -------------------------------------------------------------------------------- + # Build validation dataloader + _, val_ds = get_dataset(download_path=data_path) + val_loader = DataLoader(val_ds, batch_size=2048, shuffle=False) + + # Load model + model = Model() + model.load_state_dict(torch.load(model_path.download())) + + y_true, y_pred = [], [] + model.eval() + for i, batch_data in tqdm(enumerate(val_loader), total=len(val_loader)): + # Retrieve batched raw data + x, y = batch_data + inputs = {"x": x} + + # Forward pass + logits = model(inputs) + + # Record batched output + y_true.append(y.detach()) + y_pred.append(logits.detach()) + + # Derive accuracy + y_true = torch.cat(y_true, dim=0) + y_pred = torch.cat(y_pred, dim=0).argmax(dim=1) + prf_report = {"acc": ((y_true == y_pred).sum() / len(y_true)).item()} + + return prf_report + + +@workflow +def dl_wf( + raw_data_path: str, + epochs: int = 1, + debug: bool = True, +) -> Dict[str, float]: + proc_data_path = process_data(raw_data_path=raw_data_path) + output_path = train(data_path=proc_data_path, epochs=epochs, debug=debug) + prf_report = run_infer(data_path=proc_data_path, model_path=output_path) + + return prf_report + + +# %% [markdown] +# Run the following code snippet and enjoy your training journey! +# %% +if __name__ == "__main__": + from click.testing import CliRunner + from flytekit.clis.sdk_in_container import pyflyte + + runner = CliRunner() + path = os.path.realpath(__file__) + + # Local run + print(">>> LOCAL EXEC <<<") + result = runner.invoke( + pyflyte.main, + [ + "run", + "--raw-output-data-prefix", + "s3://my-flyte-slurm-agent", + path, + "dl_wf", + "--raw_data_path", + "/tmp/torch_data", + ], + ) + print(result.output) From 5aa9db5f79c63a35f56139362c344144149a2f89 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Wed, 19 Mar 2025 21:00:30 +0800 Subject: [PATCH 08/15] docs: Complete index page Signed-off-by: JiangJiaWei1103 --- examples/slurm_agent/README.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/examples/slurm_agent/README.md b/examples/slurm_agent/README.md index 743a70024..63a850ff6 100644 --- a/examples/slurm_agent/README.md +++ b/examples/slurm_agent/README.md @@ -18,8 +18,28 @@ To install the Slurm agent, run the following command: ## Example usage +For the example usage of different Slurm task types, please see {doc}`Slurm agent example usage`. ## Local testing +To test the Slurm agent locally, create a class for the agent task that inherits from [AsyncAgentExecutorMixin](https://github.com/flyteorg/flytekit/blob/cd6bd01ad0ba6688afc71a33a59ece53f90e841a/flytekit/extend/backend/base_agent.py#L3). This mixin can handle asynchronous tasks and allows flytekit to mimic FlytePropeller's behavior in calling the agent. For more information, see "[Testing agents locally](https://docs.flyte.org/en/latest/flyte_agents/testing_agents_in_a_local_python_environment.html)". + +```{note} +In some cases, you will need to store credentials in your local environment when testing locally. +``` ## Flyte deployment configuration + +```{note} +If you are using a managed deployment of Flyte, you will need to contact your deployment administrator to configure agents in your deployment. +``` + +To enable the Slurm agent in your Flyte deployment, see the {ref}`Slurm agent deployment guide`. + + +```{toctree} +:maxdepth: -1 +:hidden: + +slurm_agent_example_usage +``` \ No newline at end of file From 3c37213ea46f82ab57d93e8509ffa8986feb7d59 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Thu, 20 Mar 2025 20:39:31 +0800 Subject: [PATCH 09/15] Lint Signed-off-by: JiangJiaWei1103 --- examples/slurm_agent/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/slurm_agent/README.md b/examples/slurm_agent/README.md index 63a850ff6..47111e906 100644 --- a/examples/slurm_agent/README.md +++ b/examples/slurm_agent/README.md @@ -42,4 +42,4 @@ To enable the Slurm agent in your Flyte deployment, see the {ref}`Slurm agent de :hidden: slurm_agent_example_usage -``` \ No newline at end of file +``` From 0003649f758138142c3e19c352faaa70d0e24db3 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Thu, 20 Mar 2025 20:51:25 +0800 Subject: [PATCH 10/15] Add missing python pkgs Signed-off-by: JiangJiaWei1103 --- examples/slurm_agent/requirements.in | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/slurm_agent/requirements.in b/examples/slurm_agent/requirements.in index 3aa09dcd0..cd775be33 100644 --- a/examples/slurm_agent/requirements.in +++ b/examples/slurm_agent/requirements.in @@ -1 +1,3 @@ flytekitplugins-slurm +torch +torchvision From 61e1a5c74a2375e66b76c9b9fc7b491cba082a1b Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Thu, 20 Mar 2025 20:55:01 +0800 Subject: [PATCH 11/15] Add tqdm to requirements Signed-off-by: JiangJiaWei1103 --- examples/slurm_agent/requirements.in | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/slurm_agent/requirements.in b/examples/slurm_agent/requirements.in index cd775be33..10f287bba 100644 --- a/examples/slurm_agent/requirements.in +++ b/examples/slurm_agent/requirements.in @@ -1,3 +1,4 @@ flytekitplugins-slurm torch torchvision +tqdm From 9b3ed42f5f3dae4c5a41dbb52e282f2c39cdc6f6 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Thu, 20 Mar 2025 21:09:08 +0800 Subject: [PATCH 12/15] Add the Slurm agent to integrations Signed-off-by: JiangJiaWei1103 --- docs/integrations/index.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/integrations/index.md b/docs/integrations/index.md index f2dad8a56..194292699 100644 --- a/docs/integrations/index.md +++ b/docs/integrations/index.md @@ -118,6 +118,8 @@ Native backend plugins can be executed without any external service dependencies - Run sensor jobs in your workflows with the sensor agent. * - {doc}`Snowflake agent ` - Run Snowflake jobs in your workflows with the Snowflake agent. +* - {doc}`Slurm agent ` + - Run Slurm jobs in your workflows with the Slurm agent. ``` (external_service_backend_plugins)= From 875dfcab5572b211540fe41f978c873e7166ab36 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Fri, 21 Mar 2025 10:49:41 +0800 Subject: [PATCH 13/15] push Signed-off-by: Future-Outlier --- docs/integrations/index.md | 1 + .../memray_plugin/memray_example.py | 5 ++-- examples/slurm_agent/Dockerfile | 23 +++++++++++++++++++ 3 files changed, 27 insertions(+), 2 deletions(-) create mode 100644 examples/slurm_agent/Dockerfile diff --git a/docs/integrations/index.md b/docs/integrations/index.md index 194292699..3fab7785d 100644 --- a/docs/integrations/index.md +++ b/docs/integrations/index.md @@ -248,6 +248,7 @@ OpenAI batch agent PERIAN Job Platform agent Sensor agent Snowflake agent +Slurm agent ``` ```{toctree} diff --git a/examples/memray_plugin/memray_plugin/memray_example.py b/examples/memray_plugin/memray_plugin/memray_example.py index 69351a2f0..9e6124f99 100644 --- a/examples/memray_plugin/memray_plugin/memray_example.py +++ b/examples/memray_plugin/memray_plugin/memray_example.py @@ -5,10 +5,11 @@ # Memray tracks and reports memory allocations, both in python code and in compiled extension modules. # This Memray Profiling plugin enables memory tracking on the Flyte task level and renders a memgraph profiling graph on Flyte Deck. # %% -from flytekit import workflow, task, ImageSpec -from flytekitplugins.memray import memray_profiling import time +from flytekit import ImageSpec, task, workflow +from flytekitplugins.memray import memray_profiling + # %% [markdown] # First, we use `ImageSpec` to construct a container that contains the dependencies for the # tasks, we want to profile: diff --git a/examples/slurm_agent/Dockerfile b/examples/slurm_agent/Dockerfile new file mode 100644 index 000000000..0c46be23a --- /dev/null +++ b/examples/slurm_agent/Dockerfile @@ -0,0 +1,23 @@ +# ###################### +# NOTE: For CI/CD only # +######################## +FROM python:3.11-slim-buster +LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytesnacks + +WORKDIR /root +ENV VENV /opt/venv +ENV LANG C.UTF-8 +ENV LC_ALL C.UTF-8 +ENV PYTHONPATH /root + +# Install Python dependencies +COPY requirements.in /root +RUN pip install -r /root/requirements.in + +# Copy the actual code +COPY . /root/ + +# This tag is supplied by the build script and will be used to determine the version +# when registering tasks, workflows, and launch plans +ARG tag +ENV FLYTE_INTERNAL_IMAGE $tag From 60a36001dc1c0cbdf75599b1d3b466e99d8b184a Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Fri, 21 Mar 2025 10:56:53 +0800 Subject: [PATCH 14/15] update Signed-off-by: Future-Outlier --- docs/integrations/index.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/integrations/index.md b/docs/integrations/index.md index 3fab7785d..e6c0616c4 100644 --- a/docs/integrations/index.md +++ b/docs/integrations/index.md @@ -116,10 +116,10 @@ Native backend plugins can be executed without any external service dependencies - Execute tasks on PERIAN Job Platform. * - {doc}`Sensor agent ` - Run sensor jobs in your workflows with the sensor agent. -* - {doc}`Snowflake agent ` - - Run Snowflake jobs in your workflows with the Snowflake agent. * - {doc}`Slurm agent ` - Run Slurm jobs in your workflows with the Slurm agent. +* - {doc}`Snowflake agent ` + - Run Snowflake jobs in your workflows with the Snowflake agent. ``` (external_service_backend_plugins)= @@ -247,8 +247,8 @@ Memory Machine Cloud agent OpenAI batch agent PERIAN Job Platform agent Sensor agent -Snowflake agent Slurm agent +Snowflake agent ``` ```{toctree} From d382dffc09114a7db9ce1fbf403025fb96879ee9 Mon Sep 17 00:00:00 2001 From: JiangJiaWei1103 Date: Thu, 27 Mar 2025 20:18:48 +0800 Subject: [PATCH 15/15] fix: Use unique task name for each shell task instance Signed-off-by: JiangJiaWei1103 --- examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py b/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py index a160fd2c8..cf259945b 100644 --- a/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py +++ b/examples/slurm_agent/slurm_agent/slurm_agent_example_usage.py @@ -62,7 +62,7 @@ def basic_wf() -> None: # Instead of running a pre-existing shell script on the Slurm cluster, `SlurmShellTask` allows users to define the script content within the interface as shown below: # %% shell_task = SlurmShellTask( - name="shell", + name="shell0", script="""#!/bin/bash -i echo [TEST SLURM SHELL TASK 1] Run the user-defined script... @@ -82,7 +82,7 @@ def basic_wf() -> None: shell_task_with_args = SlurmShellTask( - name="shell", + name="shell1", script="""#!/bin/bash -i echo [TEST SLURM SHELL TASK 2] Run the user-defined script with args...