Add Lance writer stage#2112
Conversation
6e9d584 to
0b418d3
Compare
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
2a529a0 to
72e5ede
Compare
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
72e5ede to
d197e67
Compare
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
d197e67 to
b07fd9c
Compare
Greptile SummaryThis PR adds
Confidence Score: 3/5The writer's two-phase commit is not fully crash-safe for append mode, and the checkpoint format relies on pickle-deserializing data from remote storage. The append-mode double-commit window (crash after nemo_curator/stages/text/io/writer/lance.py and nemo_curator/utils/lance.py deserve the most attention — the commit_lance_checkpoint crash-recovery logic and checkpoint marker validation. Important Files Changed
Sequence Diagram%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant Pipeline
participant LanceWriter
participant lance_ray as lance_ray.write_fragment
participant CheckpointFS as Checkpoint Storage
participant Commit as commit_lance_checkpoint
participant LanceDB as Lance Dataset
Pipeline->>LanceWriter: process(DocumentBatch)
LanceWriter->>lance_ray: write_fragment([table], path, schema)
lance_ray-->>LanceWriter: [(LanceFragment, schema), ...]
LanceWriter->>LanceWriter: pickle+base64 encode each fragment
LanceWriter->>CheckpointFS: write_lance_checkpoint_record(record_id, JSON)
LanceWriter-->>Pipeline: FileGroupTask(record_paths)
Note over Pipeline,Commit: After all batches processed
Pipeline->>Commit: commit_lance_checkpoint(path, commit_path)
Commit->>CheckpointFS: read_lance_checkpoint (check _COMMITTED marker)
CheckpointFS-->>Commit: records (or committed_version if marker exists)
Commit->>Commit: decode fragments via pickle.loads
Commit->>LanceDB: LanceFragmentCommitter.on_write_complete([payloads])
LanceDB-->>Commit: version N
Commit->>CheckpointFS: "write_lance_checkpoint_marker(version=N)"
Commit-->>Pipeline: version N
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant Pipeline
participant LanceWriter
participant lance_ray as lance_ray.write_fragment
participant CheckpointFS as Checkpoint Storage
participant Commit as commit_lance_checkpoint
participant LanceDB as Lance Dataset
Pipeline->>LanceWriter: process(DocumentBatch)
LanceWriter->>lance_ray: write_fragment([table], path, schema)
lance_ray-->>LanceWriter: [(LanceFragment, schema), ...]
LanceWriter->>LanceWriter: pickle+base64 encode each fragment
LanceWriter->>CheckpointFS: write_lance_checkpoint_record(record_id, JSON)
LanceWriter-->>Pipeline: FileGroupTask(record_paths)
Note over Pipeline,Commit: After all batches processed
Pipeline->>Commit: commit_lance_checkpoint(path, commit_path)
Commit->>CheckpointFS: read_lance_checkpoint (check _COMMITTED marker)
CheckpointFS-->>Commit: records (or committed_version if marker exists)
Commit->>Commit: decode fragments via pickle.loads
Commit->>LanceDB: LanceFragmentCommitter.on_write_complete([payloads])
LanceDB-->>Commit: version N
Commit->>CheckpointFS: "write_lance_checkpoint_marker(version=N)"
Commit-->>Pipeline: version N
Reviews (1): Last reviewed commit: "Add Lance writer stage" | Re-trigger Greptile |
| def commit_lance_checkpoint( | ||
| path: str, | ||
| commit_path: str, | ||
| *, | ||
| storage_options: dict[str, Any] | None = None, | ||
| checkpoint_storage_options: dict[str, Any] | None = None, | ||
| ) -> int: | ||
| """Commit records written by ``LanceWriter`` and return the Lance version.""" | ||
| import lance | ||
| from lance_ray import LanceFragmentCommitter | ||
|
|
||
| records, committed_version = read_lance_checkpoint(commit_path, "lance_write", checkpoint_storage_options) | ||
| if committed_version is not None: | ||
| return committed_version | ||
|
|
||
| _validate_checkpoint_path(records, path) | ||
| mode = str(_single_checkpoint_value(records, "mode", "write mode")) | ||
| fragments = _decode_write_fragments(records) | ||
| schema = fragments[0][1] | ||
|
|
||
| committer = LanceFragmentCommitter(path, schema=schema, mode=mode, storage_options=storage_options) | ||
| if mode == "append": | ||
| committer.on_write_start(schema) | ||
| fragment_payloads = [(pickle.dumps(fragment), pickle.dumps(schema)) for fragment, schema in fragments] | ||
| committer.on_write_complete([fragment_payloads]) | ||
| version = lance.dataset(path, storage_options=storage_options).version | ||
| write_lance_checkpoint_marker(commit_path, version, checkpoint_storage_options) | ||
| return version |
There was a problem hiding this comment.
Potential double-commit for "append" mode on retry
If the process crashes after committer.on_write_complete succeeds but before write_lance_checkpoint_marker writes the _COMMITTED marker, a subsequent retry will call commit_lance_checkpoint again. Because the marker is absent, read_lance_checkpoint returns the original records. For "append" mode, on_write_start(schema) is called again — this time it reads the already-incremented dataset version as the new read_version — and on_write_complete commits the same fragments a second time, appending duplicate rows. "create" and "overwrite" modes are likely idempotent or will error; append is not.
A safer pattern is to read and store the post-commit version atomically with the commit (e.g. pass it through LanceFragmentCommitter), or to check whether the committed version's history already contains the expected fragments before issuing a second on_write_complete.
| def _decode_write_fragments(records: list[dict[str, Any]]) -> list[tuple[object, pa.Schema]]: | ||
| from lance.schema import json_to_schema | ||
|
|
||
| return [ | ||
| (pickle.loads(base64.b64decode(record["fragment"])), json_to_schema(record["schema"])) # noqa: S301 | ||
| for record in sorted( | ||
| records, key=lambda record: (str(record.get("task_id", "")), record.get("fragment_index", 0)) | ||
| ) | ||
| ] |
There was a problem hiding this comment.
Deserialized pickle data originates from persistent storage
_decode_write_fragments calls pickle.loads on base64-decoded bytes read from JSON files under commit_path. The # noqa: S301 suppresses the linter warning, but the underlying concern is real: unlike lance-ray's use of pickle for in-memory Ray object passing, these payloads are persisted to disk or remote object storage between process invocations. If an attacker gains write access to commit_path (a realistic threat when commit_path is an S3/GCS prefix shared across a job), they can replace any *.json checkpoint file with a maliciously crafted pickle payload, and commit_lance_checkpoint will execute arbitrary code.
Consider storing the fragment metadata using Lance's own JSON serialization (lance.LanceFragment.metadata() / json_to_schema) rather than pickle, or adding an HMAC over the file contents to detect tampering.
| def read_lance_checkpoint( | ||
| commit_path: str, | ||
| kind: str, | ||
| storage_options: dict[str, Any] | None = None, | ||
| ) -> tuple[list[dict[str, Any]], int | None]: | ||
| fs, fs_path = _checkpoint_fs_path(commit_path, storage_options) | ||
| marker_path = _checkpoint_path(fs_path, _COMMITTED_MARKER) | ||
| if fs.exists(marker_path): | ||
| with fs.open(marker_path) as stream: | ||
| return [], int(json.loads(stream.read())["version"]) | ||
|
|
||
| records = [] | ||
| for record_path in sorted(fs.glob(_checkpoint_path(fs_path, _RECORDS_DIR, "*.json"))): | ||
| with fs.open(record_path) as stream: | ||
| record = json.loads(stream.read()) | ||
| if record.get("kind") == kind: | ||
| records.append(record) | ||
| if not records: | ||
| msg = f"No {kind} checkpoint records found under {commit_path}" | ||
| raise ValueError(msg) | ||
| return records, None |
There was a problem hiding this comment.
_COMMITTED marker check skips dataset_path validation
When the _COMMITTED marker exists, read_lance_checkpoint returns early with the stored version number and an empty records list. The caller (commit_lance_checkpoint) never reaches _validate_checkpoint_path, so there is no verification that the marker corresponds to the same path argument. If commit_path is accidentally reused for a different dataset (e.g. due to a misconfigured pipeline), commit_lance_checkpoint will silently return the wrong version for that dataset. Storing the dataset_path in the marker JSON and validating it on read would catch this class of misconfiguration.
| "sentence-transformers", | ||
| ] | ||
|
|
||
| lance = ["lance-ray>=0.4"] |
There was a problem hiding this comment.
lance (distributed as pylance) is imported directly in nemo_curator/stages/text/io/reader/lance.py and nemo_curator/stages/text/io/writer/lance.py, but only lance-ray is listed as a direct dependency. The pylance package is currently a transitive dep of lance-ray, but this will silently break if lance-ray ever changes its dependency graph. Declaring pylance explicitly pins the contract.
| lance = ["lance-ray>=0.4"] | |
| lance = ["lance-ray>=0.4", "pylance"] |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Split from #2106. This is PR 2 of 3 in the Lance IO stack.
Stacked on #2111. The branch is based on feat/lance-reader; until #2111 merges, GitHub may show reader changes in this PR when viewed against main. For the exact PR2 delta, compare:
VibhuJawa/NeMo-Curator@feat/lance-reader...feat/lance-writer
What changed:
Validation: