Skip to content

Commit 4dfc386

Browse files
authored
Merge branch 'main' into adil/user-logs-propagation
2 parents 5657a6b + b3a2732 commit 4dfc386

45 files changed

Lines changed: 23376 additions & 22861 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
"""
2+
Demo: reusable container + concurrent tasks, so log lines from different
3+
actions are interleaved in the same worker process. Each line should be
4+
prefixed with its own [run][action] thanks to the LogRecord factory in
5+
flyte._logging.
6+
7+
Workers exercise three logger configs to confirm context stamping works
8+
regardless of how the user obtains a logger:
9+
10+
1. flyte.logger — the canonical user-facing logger
11+
2. logging.getLogger("flyte.user.myapp") — child of flyte.user, inherits handler
12+
3. logging.getLogger("myapp") — fully independent logger w/ its own handler
13+
"""
14+
15+
import asyncio
16+
import logging
17+
import sys
18+
19+
import flyte
20+
21+
# One worker process, many concurrent invocations on it. With replicas=1 and
22+
# concurrency=8, all worker tasks below land in the same Python process, so
23+
# their log output is genuinely interleaved on a single stderr.
24+
env = flyte.TaskEnvironment(
25+
name="reuse_concurrent_logging",
26+
resources=flyte.Resources(cpu="1", memory="500Mi"),
27+
reusable=flyte.ReusePolicy(
28+
replicas=1,
29+
concurrency=8,
30+
idle_ttl=60,
31+
scaledown_ttl=60,
32+
),
33+
image=flyte.Image.from_debian_base().with_pip_packages("unionai-reuse"),
34+
)
35+
36+
# Variant 2: child of flyte.user. No handler/level setup needed — propagation
37+
# carries records up to flyte.user's handler, so [run][action] prefixing and
38+
# the user log level both apply automatically.
39+
inherited_logger = logging.getLogger("flyte.user.myapp")
40+
41+
# Variant 3: independent logger with its own StreamHandler. The flyte
42+
# LogRecordFactory still stamps run_name/action_name on every record, so the
43+
# formatter below can reference them via %(run_name)s / %(action_name)s even
44+
# though this logger lives outside the flyte.* namespace.
45+
independent_logger = logging.getLogger("myapp")
46+
if not independent_logger.handlers:
47+
_h = logging.StreamHandler(sys.stderr)
48+
_h.setFormatter(logging.Formatter("[%(run_name)s][%(action_name)s] myapp: %(message)s"))
49+
independent_logger.addHandler(_h)
50+
independent_logger.setLevel(logging.INFO)
51+
independent_logger.propagate = False
52+
53+
54+
@env.task
55+
async def worker_flyte_logger(label: str, ticks: int = 3) -> str:
56+
"""Variant 1: the canonical flyte.logger."""
57+
flyte.logger.info("starting label=%s", label)
58+
for i in range(ticks):
59+
await asyncio.sleep(1)
60+
flyte.logger.info("label=%s tick=%d/%d", label, i + 1, ticks)
61+
flyte.logger.info("done label=%s", label)
62+
return label
63+
64+
65+
@env.task
66+
async def worker_inherited_logger(label: str, ticks: int = 3) -> str:
67+
"""Variant 2: child of flyte.user — formatting is inherited."""
68+
inherited_logger.info("starting label=%s", label)
69+
for i in range(ticks):
70+
await asyncio.sleep(1)
71+
inherited_logger.info("label=%s tick=%d/%d", label, i + 1, ticks)
72+
inherited_logger.info("done label=%s", label)
73+
return label
74+
75+
76+
@env.task
77+
async def worker_independent_logger(label: str, ticks: int = 3) -> str:
78+
"""Variant 3: a fully independent stdlib logger — record factory still stamps context."""
79+
independent_logger.info("starting label=%s", label)
80+
for i in range(ticks):
81+
await asyncio.sleep(1)
82+
independent_logger.info("label=%s tick=%d/%d", label, i + 1, ticks)
83+
independent_logger.info("done label=%s", label)
84+
return label
85+
86+
87+
# The fan-out parent itself doesn't need to share the reuse pool — clone_with
88+
# turns reuse off for it and depends on the worker env for image + registration.
89+
@env.clone_with(name="reuse_concurrent_main", reusable=None, depends_on=[env]).task
90+
async def main(n: int = 2) -> list[str]:
91+
"""
92+
Dispatches a mix of all three worker variants so logs from each logger
93+
config interleave on the reused container's stderr.
94+
"""
95+
flyte.logger.info("dispatching %d of each worker variant", n)
96+
coros: list = []
97+
for i in range(n):
98+
coros.append(worker_flyte_logger(label=f"flyte-{i}"))
99+
coros.append(worker_inherited_logger(label=f"inherited-{i}"))
100+
coros.append(worker_independent_logger(label=f"independent-{i}"))
101+
results = await asyncio.gather(*coros)
102+
flyte.logger.info("all workers finished: %s", results)
103+
return results
104+
105+
106+
if __name__ == "__main__":
107+
flyte.init_from_config()
108+
run = flyte.run(main, n=2)
109+
print(run.url)
110+
run.wait()

examples/sandbox/raise_error.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
"""
2+
Sandbox — Code That Raises an Error
3+
===================================
4+
5+
Minimal example: a sandbox whose code always raises. Run remotely to see
6+
the failure surface as a task error.
7+
"""
8+
9+
import flyte
10+
import flyte.sandbox
11+
from flyte.sandbox import sandbox_environment
12+
13+
env = flyte.TaskEnvironment(
14+
name="raise-error-demo",
15+
depends_on=[sandbox_environment],
16+
)
17+
18+
19+
boom_sandbox = flyte.sandbox.create(
20+
name="boom",
21+
code='raise ValueError(f"boom: x={x}")',
22+
inputs={"x": int},
23+
outputs={"result": int},
24+
cache="disable",
25+
)
26+
27+
28+
@env.task
29+
async def trigger_failure(x: int = 1) -> int:
30+
return await boom_sandbox.run.aio(x=x)
31+
32+
33+
if __name__ == "__main__":
34+
flyte.init_from_config()
35+
run = flyte.with_runcontext(mode="remote").run(trigger_failure, x=42)
36+
print("Run URL:", run.url)

0 commit comments

Comments
 (0)