Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions packages/data-designer/src/data_designer/cli/commands/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

from __future__ import annotations

import click
import typer

from data_designer.cli.controllers.generation_controller import GenerationController
from data_designer.config.utils.constants import DEFAULT_NUM_RECORDS
from data_designer.interface.results import SUPPORTED_EXPORT_FORMATS


def create_command(
Expand Down Expand Up @@ -35,6 +37,17 @@ def create_command(
"-o",
help="Path where generated artifacts will be stored. Defaults to ./artifacts.",
),
output_format: str | None = typer.Option(
None,
"--output-format",
"-f",
click_type=click.Choice(list(SUPPORTED_EXPORT_FORMATS)),
help=(
"Export the dataset to a single file after generation. "
"Supported formats: jsonl, csv, parquet. "
"The file is written to <artifact-path>/<dataset-name>/dataset.<format>."
),
),
) -> None:
"""Create a full dataset and save results to disk.

Expand All @@ -60,4 +73,5 @@ def create_command(
num_records=num_records,
dataset_name=dataset_name,
artifact_path=artifact_path,
output_format=output_format,
)
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def run_create(
num_records: int,
dataset_name: str,
artifact_path: str | None,
output_format: str | None = None,
) -> None:
"""Load config, create a full dataset, and save results to disk.

Expand All @@ -124,6 +125,8 @@ def run_create(
num_records: Number of records to generate.
dataset_name: Name for the generated dataset folder.
artifact_path: Path where generated artifacts will be stored, or None for default.
output_format: If set, export the dataset to a single file in this format after
generation. One of 'jsonl', 'csv', 'parquet'.
"""
config_builder = self._load_config(config_source)

Expand All @@ -147,16 +150,27 @@ def run_create(
print_error(f"Dataset creation failed: {e}")
raise typer.Exit(code=1)

dataset = results.load_dataset()
num_records = results.count_records()

analysis = results.load_analysis()
if analysis is not None:
console.print()
analysis.to_report()

console.print()
print_success(f"Dataset created — {len(dataset)} record(s) generated")
console.print(f" Artifacts saved to: [bold]{results.artifact_storage.base_dataset_path}[/bold]")

if output_format is not None:
Comment thread
przemekboruta marked this conversation as resolved.
export_path = Path(results.artifact_storage.base_dataset_path) / f"dataset.{output_format}"
try:
results.export(export_path)
except Exception as e:
print_error(f"Export failed: {e}")
raise typer.Exit(code=1)
console.print(f" Exported to: [bold]{export_path}[/bold]")

console.print()
print_success(f"Dataset created — {num_records} record(s) generated")
console.print()

def _load_config(self, config_source: str) -> DataDesignerConfigBuilder:
Expand Down
118 changes: 117 additions & 1 deletion packages/data-designer/src/data_designer/interface/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal, get_args

import data_designer.lazy_heavy_imports as lazy
from data_designer.config.analysis.dataset_profiler import DatasetProfilerResults
from data_designer.config.config_builder import DataDesignerConfigBuilder
from data_designer.config.dataset_metadata import DatasetMetadata
from data_designer.config.errors import InvalidFileFormatError
from data_designer.config.utils.visualization import WithRecordSamplerMixin
from data_designer.engine.dataset_builders.errors import ArtifactStorageError
from data_designer.engine.storage.artifact_storage import ArtifactStorage
Expand All @@ -19,6 +21,9 @@

from data_designer.engine.dataset_builders.utils.task_model import TaskTrace

ExportFormat = Literal["jsonl", "csv", "parquet"]
SUPPORTED_EXPORT_FORMATS: tuple[str, ...] = get_args(ExportFormat)


class DatasetCreationResults(WithRecordSamplerMixin):
"""Results container for a Data Designer dataset creation run.
Expand Down Expand Up @@ -69,6 +74,18 @@ def load_dataset(self) -> pd.DataFrame:
"""
return self.artifact_storage.load_dataset()

def count_records(self) -> int:
"""Return the total number of records in the generated dataset.

Counts rows by reading Parquet file metadata only — no data pages are
loaded, so memory usage is constant regardless of dataset size.

Returns:
Total row count across all batch parquet files.
"""
batch_files = sorted(self.artifact_storage.final_dataset_path.glob("batch_*.parquet"))
return sum(lazy.pq.read_metadata(f).num_rows for f in batch_files)

def load_processor_dataset(self, processor_name: str) -> pd.DataFrame:
"""Load the dataset generated by a processor.

Expand All @@ -95,6 +112,57 @@ def get_path_to_processor_artifacts(self, processor_name: str) -> Path:
raise ArtifactStorageError(f"Processor {processor_name} has no artifacts.")
return self.artifact_storage.processors_outputs_path / processor_name

def export(self, path: Path | str, *, format: ExportFormat | None = None) -> Path:
"""Export the generated dataset to a single file by streaming batch files.

The output format is inferred from the file extension when *format* is
omitted. Pass *format* explicitly to override the extension (e.g. write a
``.txt`` file as JSONL).

Unlike :meth:`load_dataset`, this method never materialises the full dataset
in memory — it reads batch parquet files one at a time and appends each to
the output file, keeping peak memory proportional to a single batch.

Args:
path: Output file path. The exact path is used as-is; the extension is
not rewritten.
format: Output format. One of ``'jsonl'``, ``'csv'``, or ``'parquet'``.
When omitted, the format is inferred from the file extension.

Returns:
Path to the written file.

Raises:
InvalidFileFormatError: If the format cannot be determined or is not
one of the supported values.
ArtifactStorageError: If no batch parquet files are found.

Example:
>>> results = data_designer.create(config, num_records=1000)
>>> results.export("output.jsonl")
PosixPath('output.jsonl')
>>> results.export("output.csv")
PosixPath('output.csv')
Comment thread
przemekboruta marked this conversation as resolved.
>>> results.export("output.txt", format="jsonl")
PosixPath('output.txt')
"""
path = Path(path)
resolved_format: str = format if format is not None else path.suffix.lstrip(".").lower()
if resolved_format not in SUPPORTED_EXPORT_FORMATS:
raise InvalidFileFormatError(
f"Unsupported export format: {resolved_format!r}. Choose one of: {', '.join(SUPPORTED_EXPORT_FORMATS)}."
)
batch_files = sorted(self.artifact_storage.final_dataset_path.glob("batch_*.parquet"))
if not batch_files:
raise ArtifactStorageError("No batch parquet files found to export.")
if resolved_format == "jsonl":
_export_jsonl(batch_files, path)
elif resolved_format == "csv":
_export_csv(batch_files, path)
elif resolved_format == "parquet":
_export_parquet(batch_files, path)
return path

def push_to_hub(
self,
repo_id: str,
Expand Down Expand Up @@ -140,3 +208,51 @@ def push_to_hub(
description=description,
tags=tags,
)


def _export_jsonl(batch_files: list[Path], output: Path) -> None:
"""Write *batch_files* to *output* as JSONL, one record per line.

Each batch is appended in turn so peak memory stays proportional to one batch.
"""
with output.open("w", encoding="utf-8") as f:
for batch_file in batch_files:
chunk = lazy.pd.read_parquet(batch_file)
content = chunk.to_json(orient="records", lines=True, force_ascii=False, date_format="iso")
f.write(content)
if not content.endswith("\n"):
# Guard against empty batches where to_json() omits the trailing newline.
f.write("\n")


def _export_csv(batch_files: list[Path], output: Path) -> None:
"""Write *batch_files* to *output* as CSV with a single header row."""
for i, batch_file in enumerate(batch_files):
chunk = lazy.pd.read_parquet(batch_file)
chunk.to_csv(output, mode="a" if i > 0 else "w", header=(i == 0), index=False)


def _export_parquet(batch_files: list[Path], output: Path) -> None:
"""Write *batch_files* to *output* as a single Parquet file.

Schemas are unified across batches before writing so that columns with minor
type drift (e.g. ``int64`` vs ``float64`` across batches) are cast to a
consistent schema rather than causing a write error.

Raises:
InvalidFileFormatError: If batch schemas have incompatible column names or
types that cannot be unified or cast.
"""
schemas = [lazy.pq.read_schema(f) for f in batch_files]
try:
# promote_options="permissive" allows minor numeric type drift (e.g. int64 → double)
unified_schema = lazy.pa.unify_schemas(schemas, promote_options="permissive")
except (lazy.pa.lib.ArrowInvalid, lazy.pa.lib.ArrowTypeError) as e:
raise InvalidFileFormatError(f"Cannot unify batch schemas for parquet export: {e}") from e
with lazy.pq.ParquetWriter(output, unified_schema) as writer:
for batch_file in batch_files:
table = lazy.pq.read_table(batch_file)
try:
writer.write_table(table.cast(unified_schema))
except (lazy.pa.lib.ArrowInvalid, ValueError) as e:
raise InvalidFileFormatError(f"Cannot cast batch {batch_file.name} to unified schema: {e}") from e
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ def test_create_command_delegates_to_controller(mock_ctrl_cls: MagicMock) -> Non
mock_ctrl = MagicMock()
mock_ctrl_cls.return_value = mock_ctrl

create_command(config_source="config.yaml", num_records=10, dataset_name="dataset", artifact_path=None)
create_command(
config_source="config.yaml", num_records=10, dataset_name="dataset", artifact_path=None, output_format=None
)

mock_ctrl_cls.assert_called_once()
mock_ctrl.run_create.assert_called_once_with(
config_source="config.yaml",
num_records=10,
dataset_name="dataset",
artifact_path=None,
output_format=None,
)


Expand All @@ -40,13 +43,15 @@ def test_create_command_passes_custom_options(mock_ctrl_cls: MagicMock) -> None:
num_records=100,
dataset_name="my_data",
artifact_path="/custom/output",
output_format=None,
)

mock_ctrl.run_create.assert_called_once_with(
config_source="config.py",
num_records=100,
dataset_name="my_data",
artifact_path="/custom/output",
output_format=None,
)


Expand All @@ -56,11 +61,37 @@ def test_create_command_default_artifact_path_is_none(mock_ctrl_cls: MagicMock)
mock_ctrl = MagicMock()
mock_ctrl_cls.return_value = mock_ctrl

create_command(config_source="config.yaml", num_records=5, dataset_name="ds", artifact_path=None)
create_command(
config_source="config.yaml", num_records=5, dataset_name="ds", artifact_path=None, output_format=None
)

mock_ctrl.run_create.assert_called_once_with(
config_source="config.yaml",
num_records=5,
dataset_name="ds",
artifact_path=None,
output_format=None,
)


@patch("data_designer.cli.commands.create.GenerationController")
def test_create_command_passes_output_format(mock_ctrl_cls: MagicMock) -> None:
"""Test create_command forwards --output-format to the controller."""
mock_ctrl = MagicMock()
mock_ctrl_cls.return_value = mock_ctrl

create_command(
config_source="config.yaml",
num_records=10,
dataset_name="dataset",
artifact_path=None,
output_format="jsonl",
)

mock_ctrl.run_create.assert_called_once_with(
config_source="config.yaml",
num_records=10,
dataset_name="dataset",
artifact_path=None,
output_format="jsonl",
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ def _make_mock_preview_results(num_records: int) -> MagicMock:
def _make_mock_create_results(num_records: int, base_path: str = "/output/artifacts/dataset") -> MagicMock:
"""Create a mock CreateResults with the given number of records."""
mock_results = MagicMock()
mock_dataset = MagicMock()
mock_dataset.__len__ = MagicMock(return_value=num_records)
mock_results.load_dataset.return_value = mock_dataset
mock_results.count_records.return_value = num_records
mock_results.artifact_storage.base_dataset_path = base_path
return mock_results

Expand Down Expand Up @@ -772,3 +770,50 @@ def test_run_create_skips_report_when_analysis_is_none(mock_load_config: MagicMo
# load_analysis() returns None, so to_report() must not be called.
# If the code ignores the None check, an AttributeError propagates and the test fails.
mock_results.load_analysis.assert_called_once()


@patch(f"{_CTRL}.DataDesigner")
@patch(f"{_CTRL}.load_config_builder")
def test_run_create_with_output_format_happy_path(mock_load_config: MagicMock, mock_dd_cls: MagicMock) -> None:
"""export() is called with the correct path and format when --output-format is given."""
mock_load_config.return_value = MagicMock(spec=DataDesignerConfigBuilder)
mock_dd = MagicMock()
mock_dd_cls.return_value = mock_dd
mock_results = _make_mock_create_results(5)
mock_dd.create.return_value = mock_results

controller = GenerationController()
controller.run_create(
config_source="config.yaml",
num_records=5,
dataset_name="dataset",
artifact_path=None,
output_format="jsonl",
)

mock_results.export.assert_called_once_with(
Path("/output/artifacts/dataset") / "dataset.jsonl",
)


@patch(f"{_CTRL}.DataDesigner")
@patch(f"{_CTRL}.load_config_builder")
def test_run_create_export_failure_exits(mock_load_config: MagicMock, mock_dd_cls: MagicMock) -> None:
"""If export() raises, run_create exits with code 1."""
mock_load_config.return_value = MagicMock(spec=DataDesignerConfigBuilder)
mock_dd = MagicMock()
mock_dd_cls.return_value = mock_dd
mock_results = _make_mock_create_results(5)
mock_results.export.side_effect = RuntimeError("disk full")
mock_dd.create.return_value = mock_results

controller = GenerationController()
with pytest.raises(typer.Exit) as exc_info:
controller.run_create(
config_source="config.yaml",
num_records=5,
dataset_name="dataset",
artifact_path=None,
output_format="csv",
)
assert exc_info.value.exit_code == 1
1 change: 1 addition & 0 deletions packages/data-designer/tests/cli/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ def test_app_dispatches_lazy_create_command(mock_controller_cls: Mock) -> None:
num_records=DEFAULT_NUM_RECORDS,
dataset_name="dataset",
artifact_path=None,
output_format=None,
)
Loading
Loading