Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
a14a99d
docs: add implementation plan for resume mechanism
przemekboruta Apr 13, 2026
d595ed3
feat(storage): add resume flag and clear_partial_results()
przemekboruta Apr 13, 2026
05e7354
feat(batch-manager): add start_batch param to start()
przemekboruta Apr 13, 2026
7024b10
feat(builder): implement resume logic in DatasetBuilder
przemekboruta Apr 13, 2026
9a3a6d9
feat(interface): expose resume on DataDesigner.create()
przemekboruta Apr 13, 2026
ad7b949
test: add tests for resume mechanism
przemekboruta Apr 13, 2026
6e5f73c
feat(builder): extend resume to async engine (DATA_DESIGNER_ASYNC_ENG…
przemekboruta Apr 13, 2026
48548f3
fix(builder): skip after-generation processors when resume finds data…
przemekboruta Apr 13, 2026
5532bd6
fix(builder): use filesystem count for initial_total_num_batches on a…
przemekboruta Apr 13, 2026
e86dce5
feat(results): add export() method and --output-format CLI flag
przemekboruta Apr 13, 2026
4c6c96c
fix(builder): handle resume when metadata.json missing (interrupted b…
przemekboruta Apr 14, 2026
9d5b8cc
docs(interface): fix resume docstring — async engine is supported
przemekboruta Apr 14, 2026
59f4e3b
fix(builder): derive initial_actual_num_records from filesystem in as…
przemekboruta Apr 14, 2026
fbf6bdb
feat(resume): replace resume: bool with ResumeMode enum (NEVER/ALWAYS…
przemekboruta Apr 30, 2026
b6e4c5c
fix(resume): invalidate resolved_dataset_name cache when IF_POSSIBLE …
przemekboruta May 1, 2026
1073275
fix(builder): move partial-completion warning before return in _build…
przemekboruta May 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,14 @@ def reset(self, delete_files: bool = False) -> None:
except OSError as e:
raise DatasetBatchManagementError(f"🛑 Failed to delete directory {dir_path}: {e}")

def start(self, *, num_records: int, buffer_size: int) -> None:
def start(
self,
*,
num_records: int,
buffer_size: int,
start_batch: int = 0,
initial_actual_num_records: int = 0,
) -> None:
if num_records <= 0:
raise DatasetBatchManagementError("🛑 num_records must be positive.")
if buffer_size <= 0:
Expand All @@ -169,6 +176,8 @@ def start(self, *, num_records: int, buffer_size: int) -> None:
if remaining_records := num_records % buffer_size:
self._num_records_list.append(remaining_records)
self.reset()
self._current_batch_number = start_batch
self._actual_num_records = initial_actual_num_records

def write(self) -> Path | None:
"""Write the current batch to a parquet file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@ class RowGroupBufferManager:
exclusively by the async scheduler.
"""

def __init__(self, artifact_storage: ArtifactStorage) -> None:
def __init__(
self,
artifact_storage: ArtifactStorage,
initial_actual_num_records: int = 0,
initial_total_num_batches: int = 0,
) -> None:
self._buffers: dict[int, list[dict]] = {}
self._row_group_sizes: dict[int, int] = {}
self._dropped: dict[int, set[int]] = {}
self._artifact_storage = artifact_storage
self._actual_num_records: int = 0
self._total_num_batches: int = 0
self._actual_num_records: int = initial_actual_num_records
self._total_num_batches: int = initial_total_num_batches

def init_row_group(self, row_group: int, size: int) -> None:
"""Allocate a buffer for *row_group* with *size* empty rows."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ class BatchStage(StrEnum):
PROCESSORS_OUTPUTS = "processors_outputs_path"


class ResumeMode(StrEnum):
NEVER = "never"
ALWAYS = "always"
IF_POSSIBLE = "if_possible"


class ArtifactStorage(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)

Expand All @@ -47,6 +53,7 @@ class ArtifactStorage(BaseModel):
partial_results_folder_name: str = "tmp-partial-parquet-files"
dropped_columns_folder_name: str = "dropped-columns-parquet-files"
processors_outputs_folder_name: str = PROCESSORS_OUTPUTS_FOLDER_NAME
resume: ResumeMode = ResumeMode.NEVER
_media_storage: MediaStorage = PrivateAttr(default=None)

@property
Expand All @@ -67,12 +74,19 @@ def artifact_path_exists(self) -> bool:
def resolved_dataset_name(self) -> str:
dataset_path = self.artifact_path / self.dataset_name
if dataset_path.exists() and len(list(dataset_path.iterdir())) > 0:
if self.resume in (ResumeMode.ALWAYS, ResumeMode.IF_POSSIBLE):
return self.dataset_name
new_dataset_name = f"{self.dataset_name}_{datetime.now().strftime('%m-%d-%Y_%H%M%S')}"
logger.info(
f"📂 Dataset path {str(dataset_path)!r} already exists. Dataset from this session"
f"\n\t\t will be saved to {str(self.artifact_path / new_dataset_name)!r} instead."
)
return new_dataset_name
if self.resume == ResumeMode.ALWAYS:
raise ArtifactStorageError(
f"🛑 Cannot resume: no existing dataset found at {str(dataset_path)!r}. "
"Run without resume=ResumeMode.ALWAYS to start a new generation."
)
return self.dataset_name

@property
Expand Down Expand Up @@ -204,6 +218,11 @@ def load_dataset_with_dropped_columns(self) -> pd.DataFrame:
df = lazy.pd.concat([df, df_dropped], axis=1)
return df

def clear_partial_results(self) -> None:
"""Remove any in-flight partial results left over from an interrupted run."""
if self.partial_results_path.exists():
shutil.rmtree(self.partial_results_path)

def move_partial_result_to_final_file_path(self, batch_number: int) -> Path:
partial_result_path = self.create_batch_file_path(batch_number, batch_stage=BatchStage.PARTIAL_RESULT)
if not partial_result_path.exists():
Expand Down
Loading
Loading