Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions examples/stress/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Stress Examples

This directory contains ad hoc stress and failure-mode examples for Flyte and Union dogfood testing.

## Primary Entry Point

Use [sleep_fanout_harness_wrapper.sh](/Users/praful/flyte-sdk/examples/stress/sleep_fanout_harness_wrapper.sh) for multi-run `core-sleep` fanout tests:

```bash
examples/stress/sleep_fanout_harness_wrapper.sh \
--config ~/.flyte/config-dogfood.yaml \
--total-runs 10 \
--submit-concurrency 10 \
--n-children 1000 \
--sleep-duration 600 \
--poll-interval 1 \
--run-env _F_MAX_QPS=150 \
--run-env _F_CTRL_WORKERS=20 \
--run-env _F_P_CNC=1000
```

This wrapper:
- submits many top-level `sleep_fanout` runs through `flyte run`
- tracks aggregate child visibility and running counts
- prints parent-run counts (`p_live`, `p_run`) and child creation rate (`create_rps`, `rps/p`)

The underlying task definitions live in [sleep_fanout.py](/Users/praful/flyte-sdk/examples/stress/sleep_fanout.py), and the local submit helper lives in [sleep_fanout_harness.py](/Users/praful/flyte-sdk/examples/stress/sleep_fanout_harness.py).

## Key Files

- [sleep_fanout.py](/Users/praful/flyte-sdk/examples/stress/sleep_fanout.py): `core-sleep` leaf task, parent fanout task, and swarm submit task definitions.
- [sleep_fanout_harness.py](/Users/praful/flyte-sdk/examples/stress/sleep_fanout_harness.py): local async submit harness used by the wrapper.
- [runs_per_second.py](/Users/praful/flyte-sdk/examples/stress/runs_per_second.py): launch-rate test helper.
- [fanout_concurrency.py](/Users/praful/flyte-sdk/examples/stress/fanout_concurrency.py): simple fanout/concurrency experiment.
- [large_fanout.py](/Users/praful/flyte-sdk/examples/stress/large_fanout.py): wide fanout example.
- [duplicate_action_id.py](/Users/praful/flyte-sdk/examples/stress/duplicate_action_id.py): action-id collision / dedupe behavior probe.
- [crash_recovery_trace.py](/Users/praful/flyte-sdk/examples/stress/crash_recovery_trace.py), [long_recovery.py](/Users/praful/flyte-sdk/examples/stress/long_recovery.py), [fast_crasher.py](/Users/praful/flyte-sdk/examples/stress/fast_crasher.py): controller and recovery failure scenarios.
- [cpu_gremlin.py](/Users/praful/flyte-sdk/examples/stress/cpu_gremlin.py), [network_gremlin.py](/Users/praful/flyte-sdk/examples/stress/network_gremlin.py): fault-injection style workload examples.
- [large_file_io.py](/Users/praful/flyte-sdk/examples/stress/large_file_io.py), [large_dir_io.py](/Users/praful/flyte-sdk/examples/stress/large_dir_io.py), [benchmark/large_io_comparison.py](/Users/praful/flyte-sdk/examples/stress/benchmark/large_io_comparison.py): large I/O stress examples.
- [scale_test_same_image.py](/Users/praful/flyte-sdk/examples/stress/scale_test_same_image.py), [scale_test_varied_images.py](/Users/praful/flyte-sdk/examples/stress/scale_test_varied_images.py), [image_builds.py](/Users/praful/flyte-sdk/examples/stress/image_builds.py): image build and scale tests.

## Notes

- `sleep_fanout` leaves use the `core-sleep` plugin, so the children run in leaseworker instead of creating task pods.
- Parent resource defaults for fanout are controlled in `sleep_fanout.py` via:
- `FLYTE_STRESS_FANOUT_CPU_REQUEST`
- `FLYTE_STRESS_FANOUT_CPU_LIMIT`
- `FLYTE_STRESS_FANOUT_MEMORY_REQUEST`
- `FLYTE_STRESS_FANOUT_MEMORY_LIMIT`
- Remote image contents come from the built wheel in `dist/`, not directly from local `src/`. If the wrapper warns that `src/flyte` is newer than the wheel, rebuild the wheel before relying on SDK changes in remote runs.
102 changes: 97 additions & 5 deletions examples/stress/sleep_fanout.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,104 @@
import asyncio
import os
from datetime import timedelta

import flyte
import flyte.report
from flyte.extras import Sleep

_STRESS_IMAGE_REGISTRY = os.getenv("FLYTE_STRESS_IMAGE_REGISTRY")
_STRESS_IMAGE_NAME = os.getenv("FLYTE_STRESS_IMAGE_NAME")
_STRESS_IMAGE_PLATFORMS = tuple(
p.strip() for p in os.getenv("FLYTE_STRESS_IMAGE_PLATFORMS", "linux/amd64").split(",") if p.strip()
)
_STRESS_RUNTIME_ENV = {
k: v
for k, v in {
"FLYTE_STRESS_IMAGE_REGISTRY": _STRESS_IMAGE_REGISTRY,
"FLYTE_STRESS_IMAGE_NAME": _STRESS_IMAGE_NAME,
"FLYTE_STRESS_IMAGE_PLATFORMS": ",".join(_STRESS_IMAGE_PLATFORMS),
}.items()
if v
}

# Let remote runs redirect image builds to a writable registry without
# touching the task definitions. For dogfood, this can point at the shared ECR
# repo used for ad hoc SDK test images. Default to amd64-only so the first
# build is faster and matches the dogfood cluster architecture.
stress_image = flyte.Image.from_debian_base(
python_version=(3, 12),
registry=_STRESS_IMAGE_REGISTRY,
name=_STRESS_IMAGE_NAME,
platform=_STRESS_IMAGE_PLATFORMS,
)


def _fanout_resources() -> flyte.Resources:
# Default for the distributed harness shape. Override these env vars when
# testing a single huge parent that needs much more headroom.
cpu_request = int(os.getenv("FLYTE_STRESS_FANOUT_CPU_REQUEST", "1"))
cpu_limit = int(os.getenv("FLYTE_STRESS_FANOUT_CPU_LIMIT", "2"))
memory_request = os.getenv("FLYTE_STRESS_FANOUT_MEMORY_REQUEST", "2Gi")
memory_limit = os.getenv("FLYTE_STRESS_FANOUT_MEMORY_LIMIT", "4Gi")
return flyte.Resources(cpu=(cpu_request, cpu_limit), memory=(memory_request, memory_limit))


def _controller_tuning_env() -> dict[str, str]:
env: dict[str, str] = {}
for key in (
"_F_MAX_QPS",
"_F_CTRL_WORKERS",
"_F_P_CNC",
"_U_USE_ACTIONS",
"_F_TRACE_SUBMIT",
"_F_TRACE_SUBMIT_LIMIT",
):
value = os.getenv(key)
if value is not None:
env[key] = value
return env


def _nested_run_env() -> dict[str, str]:
return {
**_STRESS_RUNTIME_ENV,
**_controller_tuning_env(),
}


def _controller_tuning_summary() -> str:
env = _controller_tuning_env()
return (
"controller_env "
f"_F_MAX_QPS={env.get('_F_MAX_QPS', '<unset>')} "
f"_F_CTRL_WORKERS={env.get('_F_CTRL_WORKERS', '<unset>')} "
f"_F_P_CNC={env.get('_F_P_CNC', '<unset>')} "
f"_U_USE_ACTIONS={env.get('_U_USE_ACTIONS', '<unset>')} "
f"_F_TRACE_SUBMIT={env.get('_F_TRACE_SUBMIT', '<unset>')} "
f"_F_TRACE_SUBMIT_LIMIT={env.get('_F_TRACE_SUBMIT_LIMIT', '<unset>')}"
)

# Leaves run in leaseworker via the core-sleep plugin: no task pods are created,
# so we can fan out wide without paying pod-startup cost.
sleep_env = flyte.TaskEnvironment(
name="sleep_fanout_leaf",
image=stress_image,
env_vars=_STRESS_RUNTIME_ENV,
plugin_config=Sleep(),
)

fanout_env = flyte.TaskEnvironment(
name="sleep_fanout",
resources=flyte.Resources(cpu="50m", memory="200Mi"),
image=stress_image,
env_vars=_STRESS_RUNTIME_ENV,
resources=_fanout_resources(),
depends_on=[sleep_env],
)

swarm_env = flyte.TaskEnvironment(
name="sleep_fanout_swarm",
image=stress_image,
env_vars=_STRESS_RUNTIME_ENV,
resources=flyte.Resources(cpu=1, memory="500Mi"),
depends_on=[fanout_env],
)
Expand All @@ -41,8 +120,15 @@ async def sleep_fanout(
All leaves run in leaseworker via the core-sleep plugin, so no task pods
are created.
"""
print(
f"fanout_inputs n_children={n_children} "
f"sleep_duration={sleep_duration} "
f"sleep_seconds={sleep_duration.total_seconds()}",
flush=True,
)
print(_controller_tuning_summary(), flush=True)
await asyncio.gather(*(sleep_leaf(duration=sleep_duration) for _ in range(n_children)))
print(f"Done. Total leaves: {n_children}")
print(f"Done. Total leaves: {n_children}", flush=True)
return n_children


Expand All @@ -60,13 +146,19 @@ async def submit_runs(
from aiolimiter import AsyncLimiter

limiter = AsyncLimiter(max_rps, 1)
child_run_env = _nested_run_env()

async def submit_one() -> str:
async def submit_one(idx: int) -> str:
async with limiter:
run = await flyte.run.aio(sleep_fanout, n_children=n_children, sleep_duration=sleep_duration)
run = await flyte.run.aio(
sleep_fanout.override(env_vars=child_run_env),
n_children=n_children,
sleep_duration=sleep_duration,
)
print(f"submitted_run idx={idx} url={run.url}", flush=True)
return run.url

urls = await asyncio.gather(*(submit_one() for _ in range(n_runs)))
urls = await asyncio.gather(*(submit_one(i) for i in range(n_runs)))
print(f"Swarm worker done. Submitted {len(urls)} runs at <= {max_rps} rps.")
return list(urls)

Expand Down
117 changes: 105 additions & 12 deletions examples/stress/sleep_fanout_harness.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Submit N copies of the sleep_fanout `main` task as fast as possible.
Submit N copies of the `sleep_fanout` task through the `flyte run` CLI.

Each run spawns n_children core-sleep leaves in leaseworker (no task pods).
Submissions are launched with a bounded semaphore to cap in-flight TCP
Expand All @@ -9,34 +9,123 @@

import argparse
import asyncio
import os
import pathlib
import re
import shutil
import time
from datetime import timedelta

from sleep_fanout import sleep_fanout as sleep_fanout_main
RUN_URL_RE = re.compile(r"URL:\s+(\S+/runs/[^/?\s]+)")
RUN_NAME_RE = re.compile(r"Created Run:\s+([^\s]+)")
RUNS_FILE = os.getenv("FLYTE_HARNESS_RUNS_FILE")
REPO_ROOT = pathlib.Path(__file__).resolve().parents[2]
LOCAL_SDK_SRC = REPO_ROOT / "src"
FLYTE_BIN = os.getenv("FLYTE_HARNESS_FLYTE_BIN") or shutil.which("flyte") or "flyte"
FORCE_LOCAL_SDK = os.getenv("FLYTE_HARNESS_FORCE_LOCAL_SDK", "").lower() in {"1", "true", "yes", "on"}

import flyte

def _subprocess_env() -> dict[str, str]:
env = os.environ.copy()
if FORCE_LOCAL_SDK:
existing = env.get("PYTHONPATH", "")
local_src = str(LOCAL_SDK_SRC)
env["PYTHONPATH"] = f"{local_src}:{existing}" if existing else local_src
return env


def _append_run_name(path: str, name: str) -> None:
with open(path, "a", encoding="utf-8") as f:
f.write(f"{name}\n")


async def submit_one(sem: asyncio.Semaphore, idx: int, n_children: int, sleep_duration: timedelta) -> str | None:
async with sem:
os.environ.setdefault("_U_USE_ACTIONS", "1")
config = os.getenv("FLYTE_HARNESS_CONFIG", os.path.expanduser("~/.flyte/config-dogfood.yaml"))
image_builder = os.getenv("FLYTE_HARNESS_IMAGE_BUILDER", "remote")
project = os.getenv("FLYTE_HARNESS_PROJECT", "")
domain = os.getenv("FLYTE_HARNESS_DOMAIN", "")
run_env_keys = tuple(
k
for k in (
"_F_MAX_QPS",
"_F_CTRL_WORKERS",
"_F_P_CNC",
"_U_USE_ACTIONS",
"_F_TRACE_SUBMIT",
"_F_TRACE_SUBMIT_LIMIT",
)
if os.getenv(k)
)

cmd = [FLYTE_BIN, "-c", config, "--image-builder", image_builder, "run"]
if project:
cmd.extend(["-p", project])
if domain:
cmd.extend(["-d", domain])
for key in run_env_keys:
cmd.extend(["--env", f"{key}={os.environ[key]}"])
cmd.extend(
[
"examples/stress/sleep_fanout.py",
"sleep_fanout",
"--n_children",
str(n_children),
"--sleep_duration",
f"PT{int(sleep_duration.total_seconds())}S",
]
)

try:
run = await flyte.with_runcontext("remote").run.aio(
sleep_fanout_main,
n_children=n_children,
sleep_duration=sleep_duration,
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
env=_subprocess_env(),
)
return run.url

output_lines: list[str] = []
assert proc.stdout is not None
while True:
line = await proc.stdout.readline()
if not line:
break
text = line.decode("utf-8", errors="replace").rstrip()
output_lines.append(text)

rc = await proc.wait()
output = "\n".join(output_lines)
if rc != 0:
print(f"[{idx}] submit failed rc={rc} output={output!r}", flush=True)
return None

url_match = RUN_URL_RE.search(output)
if url_match:
return url_match.group(1)

name_match = RUN_NAME_RE.search(output)
if name_match:
return name_match.group(1)

if "/runs/" in output:
print(f"[{idx}] submit failed: partial run URL parse failure output={output!r}", flush=True)
else:
print(f"[{idx}] submit failed: could not parse run id from output={output!r}", flush=True)
return None
return None
except Exception as e:
cause = getattr(e, "__cause__", None)
print(f"[{idx}] submit failed: {type(e).__name__}: {e!r} cause={cause!r}", flush=True)
return None


async def submit_many(total: int, concurrency: int, n_children: int, sleep_duration: timedelta) -> None:
async def submit_many(total: int, concurrency: int, n_children: int, sleep_duration: timedelta) -> int:
sem = asyncio.Semaphore(concurrency)
start = time.monotonic()
submitted = 0
failed = 0
runs_file_lock = asyncio.Lock()

async def wrapped(i: int):
nonlocal submitted, failed
Expand All @@ -45,7 +134,10 @@ async def wrapped(i: int):
failed += 1
else:
submitted += 1
print(f"[{i}] {name}", flush=True)
if RUNS_FILE:
async with runs_file_lock:
await asyncio.to_thread(_append_run_name, RUNS_FILE, name)
print(f"submitted_run idx={i} url={name}", flush=True)
done = submitted + failed
if done % 100 == 0:
elapsed = time.monotonic() - start
Expand All @@ -57,6 +149,7 @@ async def wrapped(i: int):
elapsed = time.monotonic() - start
rps = submitted / elapsed if elapsed > 0 else 0
print(f"\nDone. submitted={submitted} failed={failed} elapsed={elapsed:.2f}s rps={rps:.2f}")
return 1 if failed else 0


# python stress/sleep_fanout_harness.py --total 25000 --concurrency 500 --n_children 10 --sleep_seconds 10
Expand All @@ -68,15 +161,15 @@ def main() -> None:
parser.add_argument("--sleep_seconds", type=int, default=10)
args = parser.parse_args()

flyte.init_from_config()
asyncio.run(
rc = asyncio.run(
submit_many(
total=args.total,
concurrency=args.concurrency,
n_children=args.n_children,
sleep_duration=timedelta(seconds=args.sleep_seconds),
)
)
raise SystemExit(rc)


if __name__ == "__main__":
Expand Down
Loading
Loading