diff --git a/README.md b/README.md index bf38f79f6..872e4f4c4 100644 --- a/README.md +++ b/README.md @@ -22,13 +22,11 @@ Data Designer helps you create synthetic datasets that go beyond simple LLM prom --- -### ⚠️ Security Notice: LiteLLM Supply-Chain Incident (2026-03-24) +### 📣 Heads-up: async engine is now the default -On March 24, 2026, malicious versions of `litellm` ([1.82.7 and 1.82.8](https://github.com/BerriAI/litellm/issues/24518)) were published to PyPI containing a credential stealer. The compromised packages were available for [approximately five hours](https://www.okta.com/blog/threat-intelligence/litellm-supply-chain-attack--an-explainer-for-identity-pros/) (10:39 – 16:00 UTC) before being removed. +Data Designer now runs pipelines on a cell-level async engine that overlaps independent columns and adapts concurrency per (provider, model). On most pipelines this is faster with no config changes; on slow self-hosted endpoints, set `inference_parameters.timeout` to your real per-request latency. See [Architecture & Performance → Async Engine](https://nvidia-nemo.github.io/DataDesigner/latest/concepts/architecture-and-performance/#async-engine) for the behaviors worth knowing about. -The only Data Designer releases that could resolve to these versions are **v0.2.2** (Dec 2025) and **v0.2.3** (Jan 2026), which carried a looser `litellm<2` upper bound. These are nearly three months old and have been superseded by eight subsequent releases — both have been yanked from PyPI as a precaution. All other releases (v0.3.0 – v0.5.3) pinned `litellm` to `>=1.73.6,<1.80.12` and were never compatible with 1.82.x. Starting with v0.5.4, `litellm` is no longer a dependency. - -To have been impacted through Data Designer, you would need to have had one of these two old versions explicitly pinned *and* run a fresh `pip install` or dependency-cache update that resolved `litellm` during the five-hour window on March 24. If you believe you may be affected, see [BerriAI's incident report](https://github.com/BerriAI/litellm/issues/24518) for remediation steps. +If you hit anything unexpected, fall back to the legacy sync engine for one transitional release with `DATA_DESIGNER_ASYNC_ENGINE=0`, and please [open an issue](https://github.com/NVIDIA-NeMo/DataDesigner/issues/new) so we can fix the async path. --- diff --git a/docs/concepts/architecture-and-performance.md b/docs/concepts/architecture-and-performance.md index 6590ac1f1..69f500c22 100644 --- a/docs/concepts/architecture-and-performance.md +++ b/docs/concepts/architecture-and-performance.md @@ -47,12 +47,12 @@ This guide explains the architecture, execution model, and how to tune performan ## Execution Model -!!! note "Dataset Builder" - This describes Data Designer's current **`DatasetBuilder`**, which generates columns sequentially within batches. Other dataset generation strategies are in development. +!!! note "Two execution engines" + The default execution path is the **async engine**, which dispatches work at the cell level and overlaps independent columns — see [Async Engine](#async-engine) below for its semantics. The legacy **sync engine** is still available for one transitional release via `DATA_DESIGNER_ASYNC_ENGINE=0` and is what this section describes. The configuration knobs documented below (`buffer_size`, `max_parallel_requests`, AIMD throttle config, error handling) apply to both engines; the differences are flagged inline. -Data Designer processes datasets in **batches**, with **parallel** operations within each batch. +The sync engine processes datasets in **batches**, with **parallel** operations within each batch. -### How It Works +### How It Works (sync engine) **Step 1: Split into batches** @@ -60,7 +60,7 @@ Your dataset is divided into batches of `buffer_size` records. Each batch is pro **Step 2: Process columns sequentially** -Within a batch, columns are generated one at a time following the dependency graph. The order depends on column dependencies—expression columns may come before LLM columns if the LLM columns depend on them. +Within a batch, columns are generated one at a time following the dependency graph. The order depends on column dependencies—expression columns may come before LLM columns if the LLM columns depend on them. (The async engine relaxes this: columns whose per-cell dependencies are satisfied can run concurrently with columns earlier in the order.) Example workflow: @@ -93,9 +93,9 @@ Within each column, cells are processed **in parallel** up to the configured lim | Concept | Description | |---------|-------------| -| **Batching** | Records are split into batches of `buffer_size`. Each batch completes entirely before the next begins. | -| **Sequential columns** | Within a batch, columns are generated one at a time, respecting the dependency graph. | -| **Parallel cells** | Within a column, individual cells (records) are generated in parallel up to the configured limit. | +| **Batching** | Records are split into batches of `buffer_size`. In the sync engine, each batch completes entirely before the next begins; in the async engine, multiple row groups (the async equivalent) can be in flight concurrently. | +| **Sequential columns** | Sync-engine only: columns within a batch are generated one at a time, respecting the dependency graph. The async engine schedules at the cell level instead. | +| **Parallel cells** | Within a column, individual cells (records) are generated in parallel up to the configured limit. Same on both engines. | ### Concurrency Formula @@ -116,8 +116,8 @@ concurrent_requests = min( This means Data Designer automatically finds the right concurrency level for your server without manual tuning. -!!! note "Sync engine caveat" - AIMD adaptive concurrency is fully active on the **async engine** path. On the current **sync engine** path, 429 responses are first retried at the HTTP transport layer; AIMD only engages as a fallback if transport retries are exhausted. In practice the concurrency limit stays near `max_parallel_requests` for most sync workloads. The async engine is landing soon and will be the recommended path for production workloads. +!!! note "Engine paths" + AIMD adaptive concurrency is fully active on the default **async engine**. The legacy **sync engine** is available for one transitional release via `DATA_DESIGNER_ASYNC_ENGINE=0`; on that path 429s are first retried at the HTTP transport layer and AIMD only engages as a fallback. See [Async engine](#async-engine) below. **Example**: With `buffer_size=100` and `max_parallel_requests=32`, Data Designer starts sending up to 32 requests in parallel. If the server returns 429s, concurrency drops automatically (e.g., to 24, then 18) and recovers once the server catches up. @@ -169,7 +169,7 @@ model = dd.ModelConfig( **Default**: 4 -**When to increase**: Your inference backend has high throughput capacity, you're using a cloud API with generous rate limits, or you're running vLLM/TensorRT-LLM with multiple GPUs. With AIMD, setting an aggressively high value is safer than before — the system will self-correct downward if the server can't keep up. (On the async engine the salvage queue reclaims failed rows; on the sync engine the initial burst of 429s before AIMD stabilizes can drop rows, so start with a more conservative ceiling if you're using the sync path.) +**When to increase**: Your inference backend has high throughput capacity, you're using a cloud API with generous rate limits, or you're running vLLM/TensorRT-LLM with multiple GPUs. With AIMD, setting an aggressively high value is safer than before — the system will self-correct downward if the server can't keep up. The salvage queue on the async engine (default) reclaims failed rows; on the sync engine the initial burst of 429s before AIMD stabilizes can drop rows, so start with a more conservative ceiling if you've opted into sync. **When to decrease**: You want to cap resource usage to a known safe level, or you want more predictable/debuggable execution. @@ -201,8 +201,8 @@ designer.set_run_config(run_config) Data Designer uses an AIMD (Additive Increase / Multiplicative Decrease) controller to automatically adjust concurrency per model based on rate-limit feedback from the inference server. The defaults work well for most workloads. Override them via `ThrottleConfig` only when you understand the trade-offs. -!!! note "Sync engine caveat" - Adaptive throttling is fully active on the **async engine** path, where 429 responses propagate directly to the AIMD controller. On the **sync engine** path, 429s are first retried at the HTTP transport layer; `ThrottleConfig` settings only take effect as a fallback if transport retries are exhausted. The async engine is landing soon and will be the recommended path for production workloads. +!!! note "Engine paths" + Adaptive throttling is fully active on the default **async engine**, where 429 responses propagate directly to the AIMD controller. On the legacy **sync engine** (`DATA_DESIGNER_ASYNC_ENGINE=0`), 429s are first retried at the HTTP transport layer; `ThrottleConfig` settings only take effect as a fallback if transport retries are exhausted. ```python import data_designer.config as dd @@ -260,6 +260,56 @@ designer.set_run_config(run_config) --- +## Async Engine + +The async engine is the default execution path. It dispatches work at the cell level rather than the column level, so independent columns overlap in time and per-(provider, model) AIMD pools tune themselves independently. See the [Async All the Way Down](../devnotes/posts/async-all-the-way-down.md) dev note for the full architecture. + +### Per-model timeouts drive every deadline + +The `inference_parameters.timeout` field on a `ModelConfig` sets the per-request HTTP timeout. The same value also drives the sync→async bridge that custom columns use when they call `model.generate()`. There is no separate queue-wait deadline — waits scale with provider speed and AIMD's adaptive concurrency. Slow self-hosted endpoints (e.g. large models on a single GPU) only need this one knob raised: + +```python +import data_designer.config as dd + +config_builder.add_model_config( + dd.ModelConfig( + alias="slow-model", + model="my/slow-model", + provider="my-provider", + inference_parameters=dd.ChatCompletionInferenceParams( + timeout=600, + ), + ) +) +``` + +### Run outcomes + +A run can finish with fewer records than requested when non-retryable errors drop rows. Inspect `len(result.load_dataset())` to detect. + +If the rate of non-retryable errors crosses `RunConfig.shutdown_error_rate`, generation stops early and raises `DataDesignerEarlyShutdownError` (a subclass of `DataDesignerGenerationError`). Catch it separately when a typed retry path is appropriate: + +```python +from data_designer.interface.errors import DataDesignerEarlyShutdownError + +try: + result = dd_instance.create(config_builder, num_records=1000) +except DataDesignerEarlyShutdownError: + # e.g. retry against a different model alias + ... +``` + +### Opting out + +!!! warning "Deprecated" + `DATA_DESIGNER_ASYNC_ENGINE=0` selects the legacy sync engine. This is a deprecated escape hatch for the transitional release and will be removed in a future version. The opt-out also emits a `DeprecationWarning` at run time so it shows up in your logs. + +```bash +DATA_DESIGNER_ASYNC_ENGINE=0 python my_pipeline.py +``` + +--- + ## Common Problems | Problem | Symptom | Solution | diff --git a/docs/concepts/custom_columns.md b/docs/concepts/custom_columns.md index 983444855..447c6f96d 100644 --- a/docs/concepts/custom_columns.md +++ b/docs/concepts/custom_columns.md @@ -55,6 +55,9 @@ Model aliases are validated before generation starts. If an alias doesn't exist For `full_column`, set `generation_strategy=dd.GenerationStrategy.FULL_COLUMN`. +!!! note "Concurrent dispatch" + Sync `cell_by_cell` generators are dispatched concurrently across rows under the async engine. Module-level mutable state (counters, caches, non-thread-safe HTTP clients) needs synchronization or per-row instantiation. For network-bound work, prefer `async def fn(row)` — the engine runs it directly on its event loop and skips the thread bridge. + ## The Decorator ```python @@ -175,6 +178,17 @@ models = data_designer.get_models(["my-model"]) result = my_generator({"name": "Alice"}, None, models) ``` +In unit tests that mock model clients, use `MagicMock(spec=ModelFacade)` so async methods are auto-detected: + +```python +from unittest.mock import MagicMock +from data_designer.engine.models.facade import ModelFacade + +mock_model = MagicMock(spec=ModelFacade) +``` + +Mocking only `generate()` will silently no-op under the async engine because the bridge routes through `agenerate()`. + ## See Also - [Column Configs Reference](../code_reference/column_configs.md) diff --git a/docs/concepts/processors.md b/docs/concepts/processors.md index 2b7d17fb1..1b0f75943 100644 --- a/docs/concepts/processors.md +++ b/docs/concepts/processors.md @@ -24,6 +24,9 @@ Processors can run at three stages, determined by which callback methods they im !!! info "Full Schema Available During Generation" Each batch carries the full dataset schema during generation. Post-batch schema changes such as column dropping only alter past batches, so all columns remain accessible to generators while building follow-up batches. +!!! warning "Row-count changes under the async engine" + The async engine (default) enforces row-count invariance in `process_before_batch()` and `process_after_batch()` — a processor returning a different row count raises `DatasetGenerationError`. Run row-filtering or expansion logic in `process_after_generation()`, which operates on the final dataset and supports row-count changes. The legacy sync engine (opt-out via `DATA_DESIGNER_ASYNC_ENGINE=0`) is permissive about row-count changes at all stages. + A processor can implement any combination of these callbacks. The built-in processors use `process_after_batch()` by default. ## Processor Types diff --git a/docs/devnotes/posts/async-all-the-way-down.md b/docs/devnotes/posts/async-all-the-way-down.md index e9bd1d72c..14338337b 100644 --- a/docs/devnotes/posts/async-all-the-way-down.md +++ b/docs/devnotes/posts/async-all-the-way-down.md @@ -266,6 +266,14 @@ Pipelines with independent columns or multi-model setups will see the largest ga The dependencies were always per-cell. Now the engine schedules them that way. +--- + +## **Update** + +The async engine is now the default execution path. Set `DATA_DESIGNER_ASYNC_ENGINE=0` to opt back into the legacy sync engine for one transitional release. The [Architecture & Performance](../../concepts/architecture-and-performance.md#async-engine) page covers the configuration knobs and behaviors worth knowing about. + +--- + Key Resources: 1. [NeMo Data Designer on GitHub](https://github.com/NVIDIA-NeMo/DataDesigner) diff --git a/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py b/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py index 500db3a3a..ff4c8de5f 100644 --- a/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py +++ b/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py @@ -16,6 +16,14 @@ _T = TypeVar("_T") +# Preserved deliberately. Two other 300s deadlines were retired in the +# async-default flip (PR #592): the throttle queue-wait and the +# ``_AsyncBridgedModelFacade`` bridge in ``custom.py`` — both have +# ``ModelFacade`` context and could derive a per-call deadline from +# ``inference_parameters.timeout``. This generic ``ColumnGenerator.generate()`` +# fallback has no facade reachable, so a defensive backstop stays here for +# now. Wiring a per-call timeout through to ``_run_coroutine_sync`` is +# tracked as a structural follow-up. SYNC_BRIDGE_TIMEOUT = 300 if TYPE_CHECKING: diff --git a/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py b/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py index 87be65548..6206b3674 100644 --- a/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py +++ b/packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py @@ -13,7 +13,7 @@ import data_designer.lazy_heavy_imports as lazy from data_designer.config.column_configs import CustomColumnConfig, GenerationStrategy -from data_designer.engine.column_generators.generators.base import SYNC_BRIDGE_TIMEOUT, ColumnGenerator +from data_designer.engine.column_generators.generators.base import ColumnGenerator from data_designer.engine.column_generators.utils.errors import CustomColumnGenerationError from data_designer.engine.models.errors import RETRYABLE_MODEL_ERRORS, ModelTimeoutError from data_designer.logging import LOG_INDENT @@ -23,6 +23,30 @@ logger = logging.getLogger(__name__) +# Floor for the derived sync→async bridge timeout. Defends against pathologically +# small `inference_parameters.timeout` values; under normal operation the per-call +# derivation in ``_compute_bridge_timeout`` exceeds this floor. +# Module-level so tests can patch it for fast feedback. +_BRIDGE_TIMEOUT_FLOOR_S: float = 60.0 + + +def _compute_bridge_timeout( + request_timeout: float, + max_correction_steps: int, + max_conversation_restarts: int = 0, +) -> float: + """Derive the sync→async bridge deadline for one logical generation. + + One bridged call can fire up to ``(1 + max_conversation_restarts)`` full + attempts, and each attempt can fire up to ``(1 + max_correction_steps)`` + requests against the model. The 1.5x buffer absorbs HTTP connect/teardown + and serialization overhead. The floor (``_BRIDGE_TIMEOUT_FLOOR_S``) keeps + the deadline sane when ``inference_parameters.timeout`` is small or the + factory default. + """ + attempts = (1 + max_conversation_restarts) * (1 + max_correction_steps) + return max(_BRIDGE_TIMEOUT_FLOOR_S, attempts * request_timeout * 1.5) + class _AsyncBridgedModelFacade: """Proxy that bridges ``model.generate()`` to ``model.agenerate()`` in async engine mode. @@ -63,19 +87,30 @@ def generate(self, *args: Any, **kwargs: Any) -> tuple[Any, list]: from data_designer.engine.dataset_builders.utils.async_concurrency import ensure_async_engine_loop + # Honor a per-call ``timeout=`` override (passed straight through to the + # HTTP layer); fall back to the model's configured ``inference_parameters.timeout`` + # via ``facade.request_timeout`` when no override is set. + per_request_timeout_override = kwargs.get("timeout") + per_request_timeout = ( + float(per_request_timeout_override) if per_request_timeout_override is not None else facade.request_timeout + ) + correction_steps = int(kwargs.get("max_correction_steps", 0) or 0) + conversation_restarts = int(kwargs.get("max_conversation_restarts", 0) or 0) + bridge_timeout = _compute_bridge_timeout(per_request_timeout, correction_steps, conversation_restarts) + loop = ensure_async_engine_loop() future = asyncio.run_coroutine_threadsafe(facade.agenerate(*args, **kwargs), loop) try: - return future.result(timeout=SYNC_BRIDGE_TIMEOUT) + return future.result(timeout=bridge_timeout) except concurrent.futures.TimeoutError as exc: future.cancel() # Demoted to debug: the raised ModelTimeoutError already surfaces # the timeout at the scheduler with full context, and the throttled # degraded-provider WARN is the user-facing signal under sustained # bridge timeouts. Per-event WARN was noise on top of those. - logger.debug("Async model bridge timed out after %ss; coroutine cancelled", SYNC_BRIDGE_TIMEOUT) + logger.debug("Async model bridge timed out after %.0fs; coroutine cancelled", bridge_timeout) # Raise as ModelTimeoutError so the scheduler classifies it retryable. - raise ModelTimeoutError(f"model.generate() bridge timed out after {SYNC_BRIDGE_TIMEOUT}s") from exc + raise ModelTimeoutError(f"model.generate() bridge timed out after {bridge_timeout:.0f}s") from exc def __getattr__(self, name: str) -> Any: return getattr(object.__getattribute__(self, "_facade"), name) diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py index 2120589b8..688ec529b 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py @@ -192,6 +192,14 @@ def __init__( # via the partial_row_groups property as a structured signal. self._partial_row_groups: list[int] = [] + # First non-retryable error encountered, if any. Surfaced via the + # ``first_non_retryable_error`` property so the interface can include + # the original cause in user-facing errors when a run produces 0 records + # (e.g. a deterministic seed-source failure). Sync engine preserved this + # context naturally because the from_scratch task raised; the async + # engine drops rows and continues, losing the cause unless we capture it. + self._first_non_retryable_error: Exception | None = None + # Pre-compute row-group sizes for O(1) lookup self._rg_size_map: dict[int, int] = dict(row_groups) @@ -250,6 +258,16 @@ def partial_row_groups(self) -> tuple[int, ...]: """ return tuple(self._partial_row_groups) + @property + def first_non_retryable_error(self) -> Exception | None: + """The first non-retryable error captured by the scheduler, if any. + + Surfaced so callers can preserve the original cause when a run produces + 0 records due to deterministic failures (e.g. invalid seed sources). + Returns ``None`` for runs that completed without non-retryable errors. + """ + return self._first_non_retryable_error + def _spawn_worker(self, coro: Coroutine[Any, Any, None]) -> asyncio.Task: """Create a tracked worker task that auto-removes itself on completion.""" task = asyncio.create_task(coro) @@ -862,6 +880,11 @@ async def _execute_task_inner_impl(self, task: Task) -> None: if retryable: self._deferred.append(task) else: + # Capture the first non-retryable error for the interface to surface + # as the root cause when the run produces 0 records (e.g. deterministic + # seed failures). Subsequent failures are still logged below. + if self._first_non_retryable_error is None: + self._first_non_retryable_error = exc # Non-retryable: drop the affected row(s) if task.row_index is not None: self._drop_row(task.row_group, task.row_index, exclude_columns={task.column}) @@ -898,7 +921,17 @@ async def _run_from_scratch(self, task: Task, generator: ColumnGenerator) -> Any if isinstance(generator, FromScratchColumnGenerator): result_df = await generator.agenerate_from_scratch(rg_size) else: - result_df = await generator.agenerate(lazy.pd.DataFrame()) + # Non-FromScratch generators dispatched as seeds (no upstream columns) + # operate on existing buffer rows — same contract as the sync engine's + # FULL_COLUMN path. Pass an ``rg_size``-row snapshot so the generator + # produces ``rg_size`` rows back, instead of an empty DataFrame which + # would yield zero values and fail ``update_batch``. + if self._buffer_manager is not None: + records = [self._buffer_manager.get_row(task.row_group, ri) for ri in range(rg_size)] + input_df = lazy.pd.DataFrame(records) + else: + input_df = lazy.pd.DataFrame(index=range(rg_size)) + result_df = await generator.agenerate(input_df) # Write results to buffer (include side-effect columns) if self._buffer_manager is not None: diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py index 96470977c..985ca2c09 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py @@ -66,17 +66,14 @@ logger = logging.getLogger(__name__) -DATA_DESIGNER_ASYNC_ENGINE = os.environ.get("DATA_DESIGNER_ASYNC_ENGINE", "0") == "1" +# Async engine is the default execution path. Set ``DATA_DESIGNER_ASYNC_ENGINE=0`` +# to opt back into the legacy sync engine for one transitional release; the sync +# path is scheduled for removal afterwards. +DATA_DESIGNER_ASYNC_ENGINE = os.environ.get("DATA_DESIGNER_ASYNC_ENGINE", "1") == "1" if DATA_DESIGNER_ASYNC_ENGINE: import asyncio - import sys - if sys.version_info < (3, 11): - raise RuntimeError( - "DATA_DESIGNER_ASYNC_ENGINE requires Python 3.11+ (asyncio.TaskGroup). " - f"Current version: {sys.version_info.major}.{sys.version_info.minor}" - ) from data_designer.engine.dataset_builders.async_scheduler import ( DEFAULT_TASK_POOL_SIZE, LLM_WAIT_POOL_MULTIPLIER, @@ -123,6 +120,10 @@ def __init__( # ``-1`` means "no async run has executed yet" so callers can # distinguish "0 records produced" from "never ran". self._actual_num_records: int = -1 + # First non-retryable error captured by the scheduler in the most recent + # async run, if any. Used by the interface to surface the original cause + # when a run produces 0 records due to deterministic failures. + self._first_non_retryable_error: Exception | None = None self._data_designer_config = compile_data_designer_config(data_designer_config, resource_provider) self._column_configs = compile_dataset_builder_column_configs(self._data_designer_config) @@ -160,6 +161,11 @@ def actual_num_records(self) -> int: """Records actually written by the most recent async run (-1 if no run yet).""" return self._actual_num_records + @property + def first_non_retryable_error(self) -> Exception | None: + """First non-retryable error captured by the scheduler in the most recent run.""" + return self._first_non_retryable_error + def set_processor_runner(self, processors: list[Processor]) -> None: """Replace the processor runner with a new one using the given processors.""" self._processor_runner = ProcessorRunner( @@ -271,6 +277,7 @@ def _reset_run_state(self) -> None: self._early_shutdown = False self._partial_row_groups = () self._actual_num_records = -1 + self._first_non_retryable_error = None self._task_traces = [] def _build_async_preview(self, generators: list[ColumnGenerator], num_records: int) -> pd.DataFrame: @@ -297,6 +304,7 @@ def _build_async_preview(self, generators: list[ColumnGenerator], num_records: i self._early_shutdown = scheduler.early_shutdown self._partial_row_groups = scheduler.partial_row_groups self._actual_num_records = buffer_manager.actual_num_records + self._first_non_retryable_error = scheduler.first_non_retryable_error if not buffer_manager.has_row_group(0): return lazy.pd.DataFrame() @@ -371,6 +379,7 @@ def on_complete(final_path: Path | str | None) -> None: self._early_shutdown = scheduler.early_shutdown self._partial_row_groups = scheduler.partial_row_groups self._actual_num_records = buffer_manager.actual_num_records + self._first_non_retryable_error = scheduler.first_non_retryable_error # Emit telemetry try: diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_concurrency.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_concurrency.py index 9546814fc..a7d1883c6 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_concurrency.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/async_concurrency.py @@ -6,7 +6,7 @@ Async counterpart to ``concurrency.py``. Same operational contract (callbacks with optional context, error aggregation, shutdown thresholds), different runtime model. The sync module runs callables in a ``ThreadPoolExecutor``; -this module runs coroutines in ``asyncio.TaskGroup`` on a dedicated loop +this module runs coroutines via ``asyncio.gather`` on a dedicated loop thread. Callers stay synchronous. Architecture: @@ -17,7 +17,7 @@ ``ExecutorResults`` model as the sync executor. Caller Thread ──► run() ──► run_coroutine_threadsafe ──► Background Loop - (TaskGroup) + (gather) Singleton Event Loop: The background loop is a process-wide singleton. Async-stateful @@ -181,9 +181,19 @@ async def _run_all(self, work_items: list[tuple[Coroutine[Any, Any, Any], dict | self._semaphore = asyncio.Semaphore(self._max_workers) self._shutdown_event = asyncio.Event() - async with asyncio.TaskGroup() as tg: - for i, (coro, context) in enumerate(work_items): - tg.create_task(self._run_task(i, coro, context)) + # gather-with-explicit-cancel: equivalent to asyncio.TaskGroup but available on 3.10. + # _run_task swallows its own exceptions into error_trap, so children don't raise into + # gather under normal operation. The except-block preserves TaskGroup's "cancel siblings + # on parent cancellation or unexpected child raise" semantics for safety. + tasks = [asyncio.create_task(self._run_task(i, coro, ctx)) for i, (coro, ctx) in enumerate(work_items)] + try: + await asyncio.gather(*tasks) + except BaseException: + for t in tasks: + if not t.done(): + t.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + raise if not self._disable_early_shutdown and self._results.early_shutdown: self._raise_task_error() diff --git a/packages/data-designer-engine/src/data_designer/engine/models/clients/throttle_manager.py b/packages/data-designer-engine/src/data_designer/engine/models/clients/throttle_manager.py index e8f720c0c..fb57c32ab 100644 --- a/packages/data-designer-engine/src/data_designer/engine/models/clients/throttle_manager.py +++ b/packages/data-designer-engine/src/data_designer/engine/models/clients/throttle_manager.py @@ -28,7 +28,6 @@ class ThrottleDomain(str, Enum): # --------------------------------------------------------------------------- DEFAULT_MIN_LIMIT: int = 1 -DEFAULT_ACQUIRE_TIMEOUT: float = 300.0 CAPACITY_POLL_INTERVAL: float = 0.05 @@ -330,9 +329,17 @@ def acquire_sync( provider_name: str, model_id: str, domain: ThrottleDomain, - timeout: float = DEFAULT_ACQUIRE_TIMEOUT, + timeout: float | None = None, ) -> None: - deadline = time.monotonic() + timeout + """Block until a permit is available. + + ``timeout=None`` (the default) waits indefinitely; the per-request HTTP + timeout (``inference_parameters.timeout``) is the only deadline that bounds + actual work, and queue waits scale naturally with provider speed and + AIMD's adaptive concurrency. Pass an explicit float for tests or for + support cases where a queue-wait deadline is genuinely desired. + """ + deadline = (time.monotonic() + timeout) if timeout is not None else None wait = self.try_acquire(provider_name=provider_name, model_id=model_id, domain=domain) if wait == 0.0: return @@ -352,13 +359,17 @@ def acquire_sync( ) try: while True: - remaining = deadline - time.monotonic() - if remaining <= 0 or wait > remaining: - raise TimeoutError( - f"Throttle acquire timed out after {timeout:.0f}s " - f"for {provider_name}/{model_id} [{domain.value}]" - ) - time.sleep(min(wait, remaining)) + if deadline is not None: + remaining = deadline - time.monotonic() + if remaining <= 0 or wait > remaining: + raise TimeoutError( + f"Throttle acquire timed out after {timeout:.0f}s " + f"for {provider_name}/{model_id} [{domain.value}]" + ) + sleep_for = min(wait, remaining) + else: + sleep_for = wait + time.sleep(sleep_for) wait = self.try_acquire(provider_name=provider_name, model_id=model_id, domain=domain) if wait == 0.0: return @@ -379,9 +390,17 @@ async def acquire_async( provider_name: str, model_id: str, domain: ThrottleDomain, - timeout: float = DEFAULT_ACQUIRE_TIMEOUT, + timeout: float | None = None, ) -> None: - deadline = time.monotonic() + timeout + """Block until a permit is available. + + ``timeout=None`` (the default) waits indefinitely; the per-request HTTP + timeout (``inference_parameters.timeout``) is the only deadline that bounds + actual work, and queue waits scale naturally with provider speed and + AIMD's adaptive concurrency. Pass an explicit float for tests or for + support cases where a queue-wait deadline is genuinely desired. + """ + deadline = (time.monotonic() + timeout) if timeout is not None else None wait = self.try_acquire(provider_name=provider_name, model_id=model_id, domain=domain) if wait == 0.0: return @@ -401,13 +420,17 @@ async def acquire_async( ) try: while True: - remaining = deadline - time.monotonic() - if remaining <= 0 or wait > remaining: - raise TimeoutError( - f"Throttle acquire timed out after {timeout:.0f}s " - f"for {provider_name}/{model_id} [{domain.value}]" - ) - await asyncio.sleep(min(wait, remaining)) + if deadline is not None: + remaining = deadline - time.monotonic() + if remaining <= 0 or wait > remaining: + raise TimeoutError( + f"Throttle acquire timed out after {timeout:.0f}s " + f"for {provider_name}/{model_id} [{domain.value}]" + ) + sleep_for = min(wait, remaining) + else: + sleep_for = wait + await asyncio.sleep(sleep_for) wait = self.try_acquire(provider_name=provider_name, model_id=model_id, domain=domain) if wait == 0.0: return diff --git a/packages/data-designer-engine/src/data_designer/engine/models/facade.py b/packages/data-designer-engine/src/data_designer/engine/models/facade.py index 3083133a7..885430300 100644 --- a/packages/data-designer-engine/src/data_designer/engine/models/facade.py +++ b/packages/data-designer-engine/src/data_designer/engine/models/facade.py @@ -151,6 +151,17 @@ def model_alias(self) -> str: def max_parallel_requests(self) -> int: return self._model_config.inference_parameters.max_parallel_requests + @property + def request_timeout(self) -> float: + """Effective per-request HTTP timeout in seconds. + + Mirrors the fallback in ``clients/factory.py`` so callers that want to + derive bounded waits (e.g. the sync→async bridge) get a value that + matches what the client is actually using. + """ + raw = self._model_config.inference_parameters.timeout + return float(raw) if raw is not None else 60.0 + @property def usage_stats(self) -> ModelUsageStats: return self._usage_stats diff --git a/packages/data-designer-engine/src/data_designer/engine/resources/resource_provider.py b/packages/data-designer-engine/src/data_designer/engine/resources/resource_provider.py index 28199bc71..a3642fd9a 100644 --- a/packages/data-designer-engine/src/data_designer/engine/resources/resource_provider.py +++ b/packages/data-designer-engine/src/data_designer/engine/resources/resource_provider.py @@ -90,6 +90,7 @@ def create_resource_provider( run_config: RunConfig | None = None, mcp_providers: list[MCPProviderT] | None = None, tool_configs: list[ToolConfig] | None = None, + client_concurrency_mode: ClientConcurrencyMode | None = None, ) -> ResourceProvider: """Factory function for creating a ResourceProvider instance. @@ -134,11 +135,17 @@ def create_resource_provider( mcp_provider_registry=mcp_provider_registry, ) - client_concurrency_mode = ( - ClientConcurrencyMode.ASYNC - if os.environ.get("DATA_DESIGNER_ASYNC_ENGINE", "0") == "1" - else ClientConcurrencyMode.SYNC - ) + # Default the client mode from the env var when the caller hasn't decided. + # The interface (DataDesigner) computes the mode based on env var AND the + # config (e.g. allow_resize columns force a sync fallback) and passes the + # result explicitly. Direct callers of this factory still get the env-var + # default for backward compatibility. + if client_concurrency_mode is None: + client_concurrency_mode = ( + ClientConcurrencyMode.ASYNC + if os.environ.get("DATA_DESIGNER_ASYNC_ENGINE", "1") == "1" + else ClientConcurrencyMode.SYNC + ) effective_run_config = run_config or RunConfig() diff --git a/packages/data-designer-engine/tests/engine/column_generators/generators/test_custom.py b/packages/data-designer-engine/tests/engine/column_generators/generators/test_custom.py index 9aa0afd5e..ec7f78509 100644 --- a/packages/data-designer-engine/tests/engine/column_generators/generators/test_custom.py +++ b/packages/data-designer-engine/tests/engine/column_generators/generators/test_custom.py @@ -20,7 +20,11 @@ from data_designer.config.column_configs import CustomColumnConfig, GenerationStrategy from data_designer.config.custom_column import custom_column_generator -from data_designer.engine.column_generators.generators.custom import CustomColumnGenerator, _AsyncBridgedModelFacade +from data_designer.engine.column_generators.generators.custom import ( + CustomColumnGenerator, + _AsyncBridgedModelFacade, + _compute_bridge_timeout, +) from data_designer.engine.column_generators.utils.errors import CustomColumnGenerationError from data_designer.engine.models.clients.errors import SyncClientUnavailableError from data_designer.engine.models.errors import RETRYABLE_MODEL_ERRORS, ModelTimeoutError @@ -540,6 +544,7 @@ def test_async_bridge_falls_back_to_agenerate_on_sync_client_error() -> None: facade.generate.side_effect = SyncClientUnavailableError( "Sync methods are not available on an async-mode HttpModelClient." ) + facade.request_timeout = 60.0 async def fake_agenerate(*args: Any, **kwargs: Any) -> tuple: return ("async_result", list(args), kwargs) @@ -586,6 +591,10 @@ def test_async_bridge_timeout_raises_model_timeout_error() -> None: facade.generate.side_effect = SyncClientUnavailableError( "Sync methods are not available on an async-mode HttpModelClient." ) + # Bridge derives timeout from facade.request_timeout × max_correction_steps + # (clamped to _BRIDGE_TIMEOUT_FLOOR_S). Patch the floor down so this test + # finishes in milliseconds rather than the production default of 60s. + facade.request_timeout = 0.01 async def hangs_forever(*args: Any, **kwargs: Any) -> tuple: await asyncio.sleep(60) @@ -604,7 +613,7 @@ async def hangs_forever(*args: Any, **kwargs: Any) -> tuple: "data_designer.engine.dataset_builders.utils.async_concurrency.ensure_async_engine_loop", return_value=engine_loop, ), - patch("data_designer.engine.column_generators.generators.custom.SYNC_BRIDGE_TIMEOUT", 0.05), + patch("data_designer.engine.column_generators.generators.custom._BRIDGE_TIMEOUT_FLOOR_S", 0.05), pytest.raises(ModelTimeoutError, match="bridge timed out"), ): proxy.generate("hello") @@ -626,3 +635,78 @@ async def call_from_loop() -> None: with pytest.raises(RuntimeError, match="Use 'await model.agenerate\\(\\)'"): asyncio.run(call_from_loop()) + + +@pytest.mark.parametrize( + "request_timeout,correction_steps,conversation_restarts,expected", + [ + (60.0, 0, 0, 90.0), # 1 * 1 * 60 * 1.5 = 90, above floor + (60.0, 2, 0, 270.0), # 3 * 1 * 60 * 1.5 = 270 + (60.0, 0, 2, 270.0), # 1 * 3 * 60 * 1.5 = 270 — restarts contribute too + (60.0, 1, 1, 360.0), # 2 * 2 * 60 * 1.5 = 360 — corrections × restarts compound + (10.0, 0, 0, 60.0), # 1 * 1 * 10 * 1.5 = 15, clamped to 60s floor + ], + ids=[ + "no-corrections-no-restarts", + "corrections-only", + "restarts-only", + "corrections-and-restarts-compound", + "small-clamped-to-floor", + ], +) +def test_compute_bridge_timeout( + request_timeout: float, correction_steps: int, conversation_restarts: int, expected: float +) -> None: + """Bridge deadline = max(floor, (1+restarts) * (1+corrections) * request_timeout * 1.5).""" + assert _compute_bridge_timeout(request_timeout, correction_steps, conversation_restarts) == expected + + +@pytest.mark.parametrize( + "kwargs,expected_per_request", + [ + ({}, 60.0), # No override; bridge uses facade.request_timeout + ({"timeout": 600.0}, 600.0), # Per-call timeout overrides the model default + ], + ids=["no-override-uses-facade-default", "override-uses-per-call-value"], +) +def test_async_bridge_honors_per_call_timeout(kwargs: dict[str, object], expected_per_request: float) -> None: + """``model.generate(timeout=...)`` must drive the bridge deadline, not just the facade default.""" + facade = Mock() + facade.generate.side_effect = SyncClientUnavailableError("sync unavailable") + facade.request_timeout = 60.0 # would be used if no override + + captured: dict[str, float] = {} + + async def fake_agenerate(*_args: object, **_kwargs: object) -> tuple: + return ("ok", [], {}) + + facade.agenerate = fake_agenerate + proxy = _AsyncBridgedModelFacade(facade) + + real_compute = _compute_bridge_timeout + + def capture_compute(per_request: float, *args: object, **inner: object) -> float: + captured["per_request"] = per_request + return real_compute(per_request, *args, **inner) + + engine_loop = asyncio.new_event_loop() + engine_thread = threading.Thread(target=engine_loop.run_forever, daemon=True) + engine_thread.start() + + try: + with ( + patch( + "data_designer.engine.dataset_builders.utils.async_concurrency.ensure_async_engine_loop", + return_value=engine_loop, + ), + patch( + "data_designer.engine.column_generators.generators.custom._compute_bridge_timeout", + capture_compute, + ), + ): + proxy.generate("hello", **kwargs) + finally: + engine_loop.call_soon_threadsafe(engine_loop.stop) + engine_thread.join(timeout=5) + + assert captured["per_request"] == expected_per_request diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py index 1e241a454..8c796374a 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py @@ -38,6 +38,19 @@ import pandas as pd +@pytest.fixture(autouse=True) +def _force_sync_engine(monkeypatch: pytest.MonkeyPatch) -> None: + """Pin tests in this file to the legacy sync engine. + + These tests use Mock-based stub resource providers that don't satisfy the + contracts expected by the async task-queue scheduler (e.g. the registry's + ``get_aggregate_max_parallel_requests()`` returns a Mock instead of an int). + They cover sync-engine behavior; the async path has dedicated coverage in + ``test_async_builder_integration.py`` and ``test_async_scheduler.py``. + """ + monkeypatch.setattr(builder_mod, "DATA_DESIGNER_ASYNC_ENGINE", False) + + @pytest.fixture def stub_test_column_configs(): return [ @@ -617,6 +630,7 @@ class StubScheduler: traces: list[object] = [] early_shutdown: bool = False partial_row_groups: tuple[int, ...] = () + first_non_retryable_error: Exception | None = None async def run(self) -> None: return None diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_async_concurrency.py b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_async_concurrency.py index ed7e9acfa..4baa829af 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_async_concurrency.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/utils/test_async_concurrency.py @@ -2,13 +2,9 @@ # SPDX-License-Identifier: Apache-2.0 import asyncio -import sys import pytest -# asyncio.TaskGroup requires Python 3.11+ -pytestmark = pytest.mark.skipif(sys.version_info < (3, 11), reason="asyncio.TaskGroup requires Python 3.11+") - from data_designer.engine.dataset_builders.utils.async_concurrency import ( AsyncConcurrentExecutor, ) diff --git a/packages/data-designer-engine/tests/engine/models/clients/test_throttle_manager.py b/packages/data-designer-engine/tests/engine/models/clients/test_throttle_manager.py index 1a619731e..11a24edb7 100644 --- a/packages/data-designer-engine/tests/engine/models/clients/test_throttle_manager.py +++ b/packages/data-designer-engine/tests/engine/models/clients/test_throttle_manager.py @@ -3,6 +3,7 @@ from __future__ import annotations +import asyncio import threading import time @@ -503,6 +504,53 @@ async def test_acquire_async_raises_timeout_when_at_capacity() -> None: await tm.acquire_async(provider_name=PROVIDER, model_id=MODEL, domain=DOMAIN, timeout=0.0) +@pytest.mark.asyncio +async def test_acquire_async_default_no_deadline_waits_for_release() -> None: + """``timeout=None`` (the default) waits for the permit instead of raising. + + Issue #551: the previous 300s default produced spurious ``ModelTimeoutError`` + cascades on slow endpoints with deep queues; now queue waits scale with + provider speed and only the HTTP timeout deadlines actual work. The + ``timeout=0.0`` case is covered by ``test_acquire_async_raises_timeout_when_at_capacity``. + """ + tm = ThrottleManager() + tm.register(provider_name=PROVIDER, model_id=MODEL, alias="a1", max_parallel_requests=1) + tm.try_acquire(provider_name=PROVIDER, model_id=MODEL, domain=DOMAIN) + + async def release_after(delay: float) -> None: + await asyncio.sleep(delay) + tm.release_success(provider_name=PROVIDER, model_id=MODEL, domain=DOMAIN) + + # Hold a strong reference to the task so the loop's weak-ref bookkeeping + # can't GC it before the inner await observes the release. + release_task = asyncio.create_task(release_after(0.05)) + try: + # asyncio.wait_for caps the test runtime; the inner acquire_async passes None. + await asyncio.wait_for( + tm.acquire_async(provider_name=PROVIDER, model_id=MODEL, domain=DOMAIN), + timeout=2.0, + ) + finally: + await release_task + + +def test_acquire_sync_default_no_deadline_waits_for_release() -> None: + """Sync counterpart: ``timeout=None`` default blocks until release.""" + tm = ThrottleManager() + tm.register(provider_name=PROVIDER, model_id=MODEL, alias="a1", max_parallel_requests=1) + tm.try_acquire(provider_name=PROVIDER, model_id=MODEL, domain=DOMAIN) + + def release_after(delay: float) -> None: + time.sleep(delay) + tm.release_success(provider_name=PROVIDER, model_id=MODEL, domain=DOMAIN) + + threading.Thread(target=release_after, args=(0.05,), daemon=True).start() + start = time.monotonic() + tm.acquire_sync(provider_name=PROVIDER, model_id=MODEL, domain=DOMAIN) + elapsed = time.monotonic() - start + assert 0.04 < elapsed < 2.0, f"expected ~0.05s wait, got {elapsed:.3f}s" + + # --- Thread safety --- diff --git a/packages/data-designer/src/data_designer/interface/data_designer.py b/packages/data-designer/src/data_designer/interface/data_designer.py index 09ab43b0a..cd928f611 100644 --- a/packages/data-designer/src/data_designer/interface/data_designer.py +++ b/packages/data-designer/src/data_designer/interface/data_designer.py @@ -4,6 +4,7 @@ from __future__ import annotations import logging +import warnings from pathlib import Path from typing import TYPE_CHECKING @@ -35,9 +36,10 @@ from data_designer.config.utils.info import InfoType, InterfaceInfo from data_designer.engine.analysis.dataset_profiler import DataDesignerDatasetProfiler, DatasetProfilerConfig from data_designer.engine.compiler import compile_data_designer_config -from data_designer.engine.dataset_builders.dataset_builder import DatasetBuilder +from data_designer.engine.dataset_builders.dataset_builder import DATA_DESIGNER_ASYNC_ENGINE, DatasetBuilder from data_designer.engine.mcp.io import list_tool_names from data_designer.engine.model_provider import ModelProviderRegistry, resolve_model_provider_registry +from data_designer.engine.models.clients.adapters.http_model_client import ClientConcurrencyMode from data_designer.engine.resources.person_reader import ( PersonReader, create_person_reader, @@ -258,6 +260,15 @@ def create( "warnings above (and any 'Provider showing degraded performance' logs) for " "the contributing failures." ) from e + # Surface the original task error when the run produced 0 records due to a + # deterministic non-retryable failure (e.g. bad seed source). Without this, + # the user sees a generic FileNotFoundError-on-parquet that obscures the cause. + # ``actual_num_records`` is set only on the async path; sync runs leave it at + # ``-1`` and ``first_non_retryable_error`` at ``None``, so this branch is + # async-only by construction. + root_cause = builder.first_non_retryable_error + if root_cause is not None and builder.actual_num_records == 0: + raise DataDesignerGenerationError(f"🛑 {type(root_cause).__name__}: {root_cause}") from root_cause raise DataDesignerGenerationError( f"🛑 Failed to load generated dataset — all records may have been dropped " f"due to generation failures. Check the warnings above for details. Original error: {e}" @@ -276,6 +287,9 @@ def create( "🛑 Dataset is empty — early shutdown was triggered before any records " "could complete. Check the warnings above for the contributing failures." ) + root_cause = builder.first_non_retryable_error + if root_cause is not None and builder.actual_num_records == 0: + raise DataDesignerGenerationError(f"🛑 {type(root_cause).__name__}: {root_cause}") from root_cause raise DataDesignerGenerationError( "🛑 Dataset is empty — all records were dropped due to generation failures. " "Check the warnings above for details on which columns failed." @@ -347,6 +361,9 @@ def preview( "🛑 Preview is empty — early shutdown was triggered before any records " "could complete. Check the warnings above for the contributing failures." ) + root_cause = builder.first_non_retryable_error + if root_cause is not None and builder.actual_num_records == 0: + raise DataDesignerGenerationError(f"🛑 {type(root_cause).__name__}: {root_cause}") from root_cause raise DataDesignerGenerationError( "🛑 Dataset is empty — all records were dropped due to generation or processing failures. " "Check the warnings above for details on which columns failed." @@ -548,7 +565,39 @@ def _create_resource_provider( run_config=self._run_config, mcp_providers=self._mcp_providers, tool_configs=config_builder.tool_configs, + client_concurrency_mode=self._resolve_client_concurrency_mode(config_builder), ) + @staticmethod + def _resolve_client_concurrency_mode(config_builder: DataDesignerConfigBuilder) -> ClientConcurrencyMode: + """Pick the model-client mode that matches the engine the run will use. + + The async engine is the default, but ``allow_resize=True`` columns force + a sync-engine fallback (see ``DatasetBuilder._resolve_async_compatibility``). + Without aligning the client mode here, those runs would create async-only + clients and then call sync methods on them — raising ``SyncClientUnavailableError`` + from inside the sync engine. Match the client mode to the actual engine + choice so the fallback path is functional. + """ + if not DATA_DESIGNER_ASYNC_ENGINE: + # Deliberate opt-out via env var. Surface the deprecation so users + # know the sync path is going away. Mirror the ``allow_resize`` shape + # in ``_resolve_async_compatibility``: emit both a ``logger.warning`` + # (visible in the project's logging output) and a ``DeprecationWarning`` + # (programmatic signal callers can filter on). The ``allow_resize`` + # auto-fallback has its own warning from the builder layer; we don't + # double-warn here. + msg = ( + "DATA_DESIGNER_ASYNC_ENGINE=0 selects the legacy sync engine, which is " + "deprecated and will be removed in a future release. Unset the variable " + "(or set it to 1) to use the async engine." + ) + logger.warning(f"⚠️ {msg}") + warnings.warn(msg, DeprecationWarning, stacklevel=3) + return ClientConcurrencyMode.SYNC + if any(c.allow_resize for c in config_builder.get_column_configs()): + return ClientConcurrencyMode.SYNC + return ClientConcurrencyMode.ASYNC + def _get_interface_info(self, model_providers: list[ModelProvider]) -> InterfaceInfo: return InterfaceInfo(model_providers=model_providers) diff --git a/packages/data-designer/tests/interface/test_data_designer.py b/packages/data-designer/tests/interface/test_data_designer.py index f3bc44044..d382f0fae 100644 --- a/packages/data-designer/tests/interface/test_data_designer.py +++ b/packages/data-designer/tests/interface/test_data_designer.py @@ -6,6 +6,7 @@ import contextlib import json import logging +import warnings from datetime import datetime from pathlib import Path from typing import Any @@ -16,8 +17,9 @@ import data_designer.interface.data_designer as dd_mod import data_designer.lazy_heavy_imports as lazy -from data_designer.config.column_configs import ExpressionColumnConfig, SamplerColumnConfig +from data_designer.config.column_configs import CustomColumnConfig, ExpressionColumnConfig, SamplerColumnConfig from data_designer.config.config_builder import DataDesignerConfigBuilder +from data_designer.config.custom_column import custom_column_generator from data_designer.config.errors import InvalidConfigError from data_designer.config.models import ModelProvider from data_designer.config.processors import DropColumnsProcessorConfig @@ -385,6 +387,84 @@ def stub_seed_reader(): return StubHuggingFaceSeedReader() +def _builder_with_allow_resize() -> DataDesignerConfigBuilder: + """Config with one allow_resize=True column — forces sync-engine fallback.""" + + @custom_column_generator() + def _expander(row: dict) -> list[dict]: + return [{**row, "item": i} for i in range(2)] + + builder = DataDesignerConfigBuilder() + builder.add_column( + SamplerColumnConfig( + name="seed", + sampler_type=SamplerType.CATEGORY, + params=CategorySamplerParams(values=["a"]), + ) + ) + builder.add_column( + CustomColumnConfig( + name="item", + generator_function=_expander, + allow_resize=True, + ) + ) + return builder + + +@pytest.mark.parametrize( + "env_value,with_allow_resize,expected,expect_deprecation", + [ + ("1", False, "async", False), + ("1", True, "sync", False), + ("0", False, "sync", True), + ], + ids=[ + "async-on-no-fallback-uses-async-clients", + "async-on-allow-resize-falls-back-to-sync-clients", + "async-off-uses-sync-clients-and-warns", + ], +) +def test_resolve_client_concurrency_mode_matches_engine_choice( + env_value: str, + with_allow_resize: bool, + expected: str, + expect_deprecation: bool, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Client mode must match the engine the run will actually use. + + Without this alignment, a sync-fallback run (e.g. ``allow_resize=True``) + would be left with async-only clients and call sync methods on them, + raising ``SyncClientUnavailableError`` from inside the sync engine. + + The ``DATA_DESIGNER_ASYNC_ENGINE=0`` opt-out path also emits a + ``DeprecationWarning`` so users on the legacy sync engine see a + pre-removal signal in their logs. The auto-fallback path + (``allow_resize=True``) does not double-warn here; the builder layer + emits its own warning when the run actually executes. + """ + monkeypatch.setattr(dd_mod, "DATA_DESIGNER_ASYNC_ENGINE", env_value == "1") + builder = _builder_with_allow_resize() if with_allow_resize else DataDesignerConfigBuilder() + if not with_allow_resize: + builder.add_column( + SamplerColumnConfig( + name="seed", + sampler_type=SamplerType.CATEGORY, + params=CategorySamplerParams(values=["a"]), + ) + ) + + if expect_deprecation: + with pytest.warns(DeprecationWarning, match="legacy sync engine"): + mode = DataDesigner._resolve_client_concurrency_mode(builder) + else: + with warnings.catch_warnings(): + warnings.simplefilter("error", DeprecationWarning) + mode = DataDesigner._resolve_client_concurrency_mode(builder) + assert mode.value == expected + + def test_init_with_custom_secret_resolver(stub_artifact_path, stub_model_providers): """Test DataDesigner initialization with custom secret resolver.""" designer = DataDesigner( @@ -733,8 +813,13 @@ def test_preview_raises_error_when_profiler_fails( assert isinstance(exc_info.value.__cause__, ValueError) -def _patch_builder_state(*, early_shutdown: bool, actual_num_records: int = 0) -> contextlib.ExitStack: - """Patch DatasetBuilder.early_shutdown / actual_num_records as PropertyMocks.""" +def _patch_builder_state( + *, + early_shutdown: bool, + actual_num_records: int = 0, + first_non_retryable_error: Exception | None = None, +) -> contextlib.ExitStack: + """Patch DatasetBuilder.early_shutdown / actual_num_records / first_non_retryable_error.""" stack = contextlib.ExitStack() stack.enter_context( patch( @@ -750,6 +835,13 @@ def _patch_builder_state(*, early_shutdown: bool, actual_num_records: int = 0) - return_value=actual_num_records, ) ) + stack.enter_context( + patch( + "data_designer.engine.dataset_builders.dataset_builder.DatasetBuilder.first_non_retryable_error", + new_callable=PropertyMock, + return_value=first_non_retryable_error, + ) + ) return stack @@ -883,6 +975,54 @@ def test_create_error_dispatch_on_load_outcome( assert isinstance(exc_info.value.__cause__, FileNotFoundError) +@pytest.mark.parametrize( + "load_side_effect", + ["raises", "empty_df"], + ids=["load-raises-filenotfound", "load-returns-empty-df"], +) +def test_create_surfaces_first_non_retryable_error_when_zero_records( + stub_artifact_path: Path, + stub_model_providers: list[ModelProvider], + stub_sampler_only_config_builder: DataDesignerConfigBuilder, + stub_managed_assets_path: Path, + load_side_effect: str, +) -> None: + """When 0 records were produced due to a deterministic non-retryable error + (no early-shutdown), surface that error's message instead of a wrapped + FileNotFoundError on the parquet path. The interface chains the original + exception via ``__cause__`` so callers still have full context. + """ + data_designer = _make_data_designer(stub_artifact_path, stub_model_providers, stub_managed_assets_path) + root_cause = ValueError("invalid seed source: no rows after hydration") + + if load_side_effect == "raises": + load_patch = patch( + "data_designer.engine.storage.artifact_storage.ArtifactStorage.load_dataset_with_dropped_columns", + side_effect=FileNotFoundError("No parquet files found"), + ) + else: + load_patch = patch( + "data_designer.engine.storage.artifact_storage.ArtifactStorage.load_dataset_with_dropped_columns", + return_value=lazy.pd.DataFrame(), + ) + + with ( + load_patch, + _patch_builder_state( + early_shutdown=False, + actual_num_records=0, + first_non_retryable_error=root_cause, + ), + ): + with pytest.raises(DataDesignerGenerationError, match="invalid seed source") as exc_info: + data_designer.create(stub_sampler_only_config_builder, num_records=10) + + # Original cause is preserved via __cause__, not lost behind the parquet error. + assert exc_info.value.__cause__ is root_cause + # The typed DataDesignerEarlyShutdownError must NOT fire here — the gate didn't trip. + assert not isinstance(exc_info.value, DataDesignerEarlyShutdownError) + + def test_preview_raises_generation_error_when_dataset_is_empty( stub_artifact_path, stub_model_providers, stub_sampler_only_config_builder, stub_managed_assets_path ): diff --git a/tests_e2e/src/data_designer_e2e_tests/plugins/regex_filter/impl.py b/tests_e2e/src/data_designer_e2e_tests/plugins/regex_filter/impl.py index 4b7a82ccc..79d8528c9 100644 --- a/tests_e2e/src/data_designer_e2e_tests/plugins/regex_filter/impl.py +++ b/tests_e2e/src/data_designer_e2e_tests/plugins/regex_filter/impl.py @@ -14,9 +14,14 @@ class RegexFilterProcessor(Processor[RegexFilterProcessorConfig]): - """Filters batch rows based on a regex pattern.""" + """Filters rows based on a regex pattern. - def process_before_batch(self, data: pd.DataFrame) -> pd.DataFrame: + Runs at the ``process_after_generation`` stage so row-count changes are + applied to the final dataset. The pre-/post-batch stages enforce row-count + invariance under the async engine. + """ + + def process_after_generation(self, data: pd.DataFrame) -> pd.DataFrame: compiled = re.compile(self.config.pattern) mask = data[self.config.column].astype(str).apply(lambda v: bool(compiled.search(v))) if self.config.invert: