Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ Data Designer helps you create synthetic datasets that go beyond simple LLM prom

---

### ⚠️ Security Notice: LiteLLM Supply-Chain Incident (2026-03-24)
### 📣 Heads-up: async engine is now the default

On March 24, 2026, malicious versions of `litellm` ([1.82.7 and 1.82.8](https://github.com/BerriAI/litellm/issues/24518)) were published to PyPI containing a credential stealer. The compromised packages were available for [approximately five hours](https://www.okta.com/blog/threat-intelligence/litellm-supply-chain-attack--an-explainer-for-identity-pros/) (10:39 – 16:00 UTC) before being removed.
Data Designer now runs pipelines on a cell-level async engine that overlaps independent columns and adapts concurrency per (provider, model). On most pipelines this is faster with no config changes; on slow self-hosted endpoints, set `inference_parameters.timeout` to your real per-request latency. See [Architecture & Performance → Async Engine](https://nvidia-nemo.github.io/DataDesigner/latest/concepts/architecture-and-performance/#async-engine) for the behaviors worth knowing about.

The only Data Designer releases that could resolve to these versions are **v0.2.2** (Dec 2025) and **v0.2.3** (Jan 2026), which carried a looser `litellm<2` upper bound. These are nearly three months old and have been superseded by eight subsequent releases — both have been yanked from PyPI as a precaution. All other releases (v0.3.0 – v0.5.3) pinned `litellm` to `>=1.73.6,<1.80.12` and were never compatible with 1.82.x. Starting with v0.5.4, `litellm` is no longer a dependency.

To have been impacted through Data Designer, you would need to have had one of these two old versions explicitly pinned *and* run a fresh `pip install` or dependency-cache update that resolved `litellm` during the five-hour window on March 24. If you believe you may be affected, see [BerriAI's incident report](https://github.com/BerriAI/litellm/issues/24518) for remediation steps.
If you hit anything unexpected, fall back to the legacy sync engine for one transitional release with `DATA_DESIGNER_ASYNC_ENGINE=0`, and please [open an issue](https://github.com/NVIDIA-NeMo/DataDesigner/issues/new) so we can fix the async path.

---

Expand Down
76 changes: 63 additions & 13 deletions docs/concepts/architecture-and-performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@ This guide explains the architecture, execution model, and how to tune performan

## Execution Model

!!! note "Dataset Builder"
This describes Data Designer's current **`DatasetBuilder`**, which generates columns sequentially within batches. Other dataset generation strategies are in development.
!!! note "Two execution engines"
The default execution path is the **async engine**, which dispatches work at the cell level and overlaps independent columns — see [Async Engine](#async-engine) below for its semantics. The legacy **sync engine** is still available for one transitional release via `DATA_DESIGNER_ASYNC_ENGINE=0` and is what this section describes. The configuration knobs documented below (`buffer_size`, `max_parallel_requests`, AIMD throttle config, error handling) apply to both engines; the differences are flagged inline.

Data Designer processes datasets in **batches**, with **parallel** operations within each batch.
The sync engine processes datasets in **batches**, with **parallel** operations within each batch.

### How It Works
### How It Works (sync engine)

**Step 1: Split into batches**

Your dataset is divided into batches of `buffer_size` records. Each batch is processed completely before moving to the next.

**Step 2: Process columns sequentially**

Within a batch, columns are generated one at a time following the dependency graph. The order depends on column dependencies—expression columns may come before LLM columns if the LLM columns depend on them.
Within a batch, columns are generated one at a time following the dependency graph. The order depends on column dependencies—expression columns may come before LLM columns if the LLM columns depend on them. (The async engine relaxes this: columns whose per-cell dependencies are satisfied can run concurrently with columns earlier in the order.)

Example workflow:

Expand Down Expand Up @@ -93,9 +93,9 @@ Within each column, cells are processed **in parallel** up to the configured lim

| Concept | Description |
|---------|-------------|
| **Batching** | Records are split into batches of `buffer_size`. Each batch completes entirely before the next begins. |
| **Sequential columns** | Within a batch, columns are generated one at a time, respecting the dependency graph. |
| **Parallel cells** | Within a column, individual cells (records) are generated in parallel up to the configured limit. |
| **Batching** | Records are split into batches of `buffer_size`. In the sync engine, each batch completes entirely before the next begins; in the async engine, multiple row groups (the async equivalent) can be in flight concurrently. |
| **Sequential columns** | Sync-engine only: columns within a batch are generated one at a time, respecting the dependency graph. The async engine schedules at the cell level instead. |
| **Parallel cells** | Within a column, individual cells (records) are generated in parallel up to the configured limit. Same on both engines. |

### Concurrency Formula

Expand All @@ -116,8 +116,8 @@ concurrent_requests = min(

This means Data Designer automatically finds the right concurrency level for your server without manual tuning.

!!! note "Sync engine caveat"
AIMD adaptive concurrency is fully active on the **async engine** path. On the current **sync engine** path, 429 responses are first retried at the HTTP transport layer; AIMD only engages as a fallback if transport retries are exhausted. In practice the concurrency limit stays near `max_parallel_requests` for most sync workloads. The async engine is landing soon and will be the recommended path for production workloads.
!!! note "Engine paths"
AIMD adaptive concurrency is fully active on the default **async engine**. The legacy **sync engine** is available for one transitional release via `DATA_DESIGNER_ASYNC_ENGINE=0`; on that path 429s are first retried at the HTTP transport layer and AIMD only engages as a fallback. See [Async engine](#async-engine) below.

**Example**: With `buffer_size=100` and `max_parallel_requests=32`, Data Designer starts sending up to 32 requests in parallel. If the server returns 429s, concurrency drops automatically (e.g., to 24, then 18) and recovers once the server catches up.

Expand Down Expand Up @@ -169,7 +169,7 @@ model = dd.ModelConfig(

**Default**: 4

**When to increase**: Your inference backend has high throughput capacity, you're using a cloud API with generous rate limits, or you're running vLLM/TensorRT-LLM with multiple GPUs. With AIMD, setting an aggressively high value is safer than before — the system will self-correct downward if the server can't keep up. (On the async engine the salvage queue reclaims failed rows; on the sync engine the initial burst of 429s before AIMD stabilizes can drop rows, so start with a more conservative ceiling if you're using the sync path.)
**When to increase**: Your inference backend has high throughput capacity, you're using a cloud API with generous rate limits, or you're running vLLM/TensorRT-LLM with multiple GPUs. With AIMD, setting an aggressively high value is safer than before — the system will self-correct downward if the server can't keep up. The salvage queue on the async engine (default) reclaims failed rows; on the sync engine the initial burst of 429s before AIMD stabilizes can drop rows, so start with a more conservative ceiling if you've opted into sync.

**When to decrease**: You want to cap resource usage to a known safe level, or you want more predictable/debuggable execution.

Expand Down Expand Up @@ -201,8 +201,8 @@ designer.set_run_config(run_config)

Data Designer uses an AIMD (Additive Increase / Multiplicative Decrease) controller to automatically adjust concurrency per model based on rate-limit feedback from the inference server. The defaults work well for most workloads. Override them via `ThrottleConfig` only when you understand the trade-offs.

!!! note "Sync engine caveat"
Adaptive throttling is fully active on the **async engine** path, where 429 responses propagate directly to the AIMD controller. On the **sync engine** path, 429s are first retried at the HTTP transport layer; `ThrottleConfig` settings only take effect as a fallback if transport retries are exhausted. The async engine is landing soon and will be the recommended path for production workloads.
!!! note "Engine paths"
Adaptive throttling is fully active on the default **async engine**, where 429 responses propagate directly to the AIMD controller. On the legacy **sync engine** (`DATA_DESIGNER_ASYNC_ENGINE=0`), 429s are first retried at the HTTP transport layer; `ThrottleConfig` settings only take effect as a fallback if transport retries are exhausted.

```python
import data_designer.config as dd
Expand Down Expand Up @@ -260,6 +260,56 @@ designer.set_run_config(run_config)

---

## Async Engine

The async engine is the default execution path. It dispatches work at the cell level rather than the column level, so independent columns overlap in time and per-(provider, model) AIMD pools tune themselves independently. See the [Async All the Way Down](../devnotes/posts/async-all-the-way-down.md) dev note for the full architecture.

### Per-model timeouts drive every deadline

The `inference_parameters.timeout` field on a `ModelConfig` sets the per-request HTTP timeout. The same value also drives the sync→async bridge that custom columns use when they call `model.generate()`. There is no separate queue-wait deadline — waits scale with provider speed and AIMD's adaptive concurrency. Slow self-hosted endpoints (e.g. large models on a single GPU) only need this one knob raised:

```python
import data_designer.config as dd

config_builder.add_model_config(
dd.ModelConfig(
alias="slow-model",
model="my/slow-model",
provider="my-provider",
inference_parameters=dd.ChatCompletionInferenceParams(
timeout=600,
),
)
)
```

### Run outcomes

A run can finish with fewer records than requested when non-retryable errors drop rows. Inspect `len(result.load_dataset())` to detect.

If the rate of non-retryable errors crosses `RunConfig.shutdown_error_rate`, generation stops early and raises `DataDesignerEarlyShutdownError` (a subclass of `DataDesignerGenerationError`). Catch it separately when a typed retry path is appropriate:

```python
from data_designer.interface.errors import DataDesignerEarlyShutdownError

try:
result = dd_instance.create(config_builder, num_records=1000)
except DataDesignerEarlyShutdownError:
# e.g. retry against a different model alias
...
```

### Opting out

!!! warning "Deprecated"
`DATA_DESIGNER_ASYNC_ENGINE=0` selects the legacy sync engine. This is a deprecated escape hatch for the transitional release and will be removed in a future version. The opt-out also emits a `DeprecationWarning` at run time so it shows up in your logs.

```bash
DATA_DESIGNER_ASYNC_ENGINE=0 python my_pipeline.py
```

---

## Common Problems

| Problem | Symptom | Solution |
Expand Down
14 changes: 14 additions & 0 deletions docs/concepts/custom_columns.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ Model aliases are validated before generation starts. If an alias doesn't exist

For `full_column`, set `generation_strategy=dd.GenerationStrategy.FULL_COLUMN`.

!!! note "Concurrent dispatch"
Sync `cell_by_cell` generators are dispatched concurrently across rows under the async engine. Module-level mutable state (counters, caches, non-thread-safe HTTP clients) needs synchronization or per-row instantiation. For network-bound work, prefer `async def fn(row)` — the engine runs it directly on its event loop and skips the thread bridge.

## The Decorator

```python
Expand Down Expand Up @@ -175,6 +178,17 @@ models = data_designer.get_models(["my-model"])
result = my_generator({"name": "Alice"}, None, models)
```

In unit tests that mock model clients, use `MagicMock(spec=ModelFacade)` so async methods are auto-detected:

```python
from unittest.mock import MagicMock
from data_designer.engine.models.facade import ModelFacade

mock_model = MagicMock(spec=ModelFacade)
```

Mocking only `generate()` will silently no-op under the async engine because the bridge routes through `agenerate()`.

## See Also

- [Column Configs Reference](../code_reference/column_configs.md)
Expand Down
3 changes: 3 additions & 0 deletions docs/concepts/processors.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ Processors can run at three stages, determined by which callback methods they im
!!! info "Full Schema Available During Generation"
Each batch carries the full dataset schema during generation. Post-batch schema changes such as column dropping only alter past batches, so all columns remain accessible to generators while building follow-up batches.

!!! warning "Row-count changes under the async engine"
The async engine (default) enforces row-count invariance in `process_before_batch()` and `process_after_batch()` — a processor returning a different row count raises `DatasetGenerationError`. Run row-filtering or expansion logic in `process_after_generation()`, which operates on the final dataset and supports row-count changes. The legacy sync engine (opt-out via `DATA_DESIGNER_ASYNC_ENGINE=0`) is permissive about row-count changes at all stages.

A processor can implement any combination of these callbacks. The built-in processors use `process_after_batch()` by default.

## Processor Types
Expand Down
8 changes: 8 additions & 0 deletions docs/devnotes/posts/async-all-the-way-down.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,14 @@ Pipelines with independent columns or multi-model setups will see the largest ga

The dependencies were always per-cell. Now the engine schedules them that way.

---

## **Update**

The async engine is now the default execution path. Set `DATA_DESIGNER_ASYNC_ENGINE=0` to opt back into the legacy sync engine for one transitional release. The [Architecture & Performance](../../concepts/architecture-and-performance.md#async-engine) page covers the configuration knobs and behaviors worth knowing about.

---

Key Resources:

1. [NeMo Data Designer on GitHub](https://github.com/NVIDIA-NeMo/DataDesigner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@

_T = TypeVar("_T")

# Preserved deliberately. Two other 300s deadlines were retired in the
# async-default flip (PR #592): the throttle queue-wait and the
# ``_AsyncBridgedModelFacade`` bridge in ``custom.py`` — both have
# ``ModelFacade`` context and could derive a per-call deadline from
# ``inference_parameters.timeout``. This generic ``ColumnGenerator.generate()``
# fallback has no facade reachable, so a defensive backstop stays here for
# now. Wiring a per-call timeout through to ``_run_coroutine_sync`` is
# tracked as a structural follow-up.
SYNC_BRIDGE_TIMEOUT = 300

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import data_designer.lazy_heavy_imports as lazy
from data_designer.config.column_configs import CustomColumnConfig, GenerationStrategy
from data_designer.engine.column_generators.generators.base import SYNC_BRIDGE_TIMEOUT, ColumnGenerator
from data_designer.engine.column_generators.generators.base import ColumnGenerator
from data_designer.engine.column_generators.utils.errors import CustomColumnGenerationError
from data_designer.engine.models.errors import RETRYABLE_MODEL_ERRORS, ModelTimeoutError
from data_designer.logging import LOG_INDENT
Expand All @@ -23,6 +23,30 @@

logger = logging.getLogger(__name__)

# Floor for the derived sync→async bridge timeout. Defends against pathologically
# small `inference_parameters.timeout` values; under normal operation the per-call
# derivation in ``_compute_bridge_timeout`` exceeds this floor.
# Module-level so tests can patch it for fast feedback.
_BRIDGE_TIMEOUT_FLOOR_S: float = 60.0


def _compute_bridge_timeout(
request_timeout: float,
max_correction_steps: int,
max_conversation_restarts: int = 0,
) -> float:
"""Derive the sync→async bridge deadline for one logical generation.

One bridged call can fire up to ``(1 + max_conversation_restarts)`` full
attempts, and each attempt can fire up to ``(1 + max_correction_steps)``
requests against the model. The 1.5x buffer absorbs HTTP connect/teardown
and serialization overhead. The floor (``_BRIDGE_TIMEOUT_FLOOR_S``) keeps
the deadline sane when ``inference_parameters.timeout`` is small or the
factory default.
"""
attempts = (1 + max_conversation_restarts) * (1 + max_correction_steps)
return max(_BRIDGE_TIMEOUT_FLOOR_S, attempts * request_timeout * 1.5)


class _AsyncBridgedModelFacade:
"""Proxy that bridges ``model.generate()`` to ``model.agenerate()`` in async engine mode.
Expand Down Expand Up @@ -63,19 +87,30 @@ def generate(self, *args: Any, **kwargs: Any) -> tuple[Any, list]:

from data_designer.engine.dataset_builders.utils.async_concurrency import ensure_async_engine_loop

# Honor a per-call ``timeout=`` override (passed straight through to the
# HTTP layer); fall back to the model's configured ``inference_parameters.timeout``
# via ``facade.request_timeout`` when no override is set.
per_request_timeout_override = kwargs.get("timeout")
per_request_timeout = (
float(per_request_timeout_override) if per_request_timeout_override is not None else facade.request_timeout
)
correction_steps = int(kwargs.get("max_correction_steps", 0) or 0)
conversation_restarts = int(kwargs.get("max_conversation_restarts", 0) or 0)
bridge_timeout = _compute_bridge_timeout(per_request_timeout, correction_steps, conversation_restarts)

loop = ensure_async_engine_loop()
future = asyncio.run_coroutine_threadsafe(facade.agenerate(*args, **kwargs), loop)
try:
return future.result(timeout=SYNC_BRIDGE_TIMEOUT)
return future.result(timeout=bridge_timeout)
except concurrent.futures.TimeoutError as exc:
future.cancel()
# Demoted to debug: the raised ModelTimeoutError already surfaces
# the timeout at the scheduler with full context, and the throttled
# degraded-provider WARN is the user-facing signal under sustained
# bridge timeouts. Per-event WARN was noise on top of those.
logger.debug("Async model bridge timed out after %ss; coroutine cancelled", SYNC_BRIDGE_TIMEOUT)
logger.debug("Async model bridge timed out after %.0fs; coroutine cancelled", bridge_timeout)
# Raise as ModelTimeoutError so the scheduler classifies it retryable.
raise ModelTimeoutError(f"model.generate() bridge timed out after {SYNC_BRIDGE_TIMEOUT}s") from exc
raise ModelTimeoutError(f"model.generate() bridge timed out after {bridge_timeout:.0f}s") from exc

def __getattr__(self, name: str) -> Any:
return getattr(object.__getattribute__(self, "_facade"), name)
Expand Down
Loading
Loading