Add Lance reader stage#2111
Conversation
VibhuJawa
left a comment
There was a problem hiding this comment.
Please fix these things
b5f9ac2 to
0c5f4df
Compare
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
daf8b22 to
9a39e0d
Compare
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
9a39e0d to
bc20753
Compare
| scanner_kwargs["columns"] = fields | ||
| return scanner_kwargs | ||
|
|
||
| def read_task( |
There was a problem hiding this comment.
Main class to review
Greptile SummaryThis PR introduces a
Confidence Score: 4/5Safe to merge; all identified concerns are non-blocking and do not affect correctness for the documented use cases. The version-pinning design is solid and well-tested. The key-parsing pattern in _dataset_kwargs / _scanner_kwargs (pop-then-catch-all) is functional but fragile to future extension. The Python-level row-address loop in utils/lance.py will be noticeably slow on large partitions. None of these affect correctness for the current feature scope. nemo_curator/stages/text/io/reader/lance.py (read_kwargs key-parsing contract and silent empty-dataset path) and nemo_curator/utils/lance.py (vectorisation of fragment-ID extraction). Important Files Changed
Sequence Diagram%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant U as User
participant LR as LanceReader (CompositeStage)
participant LP as LancePartitioningStage
participant Lance as lance.dataset
participant LRS as LanceReaderStage
participant Utils as nemo_curator/utils/lance
U->>LR: LanceReader(path, read_kwargs, ...)
LR->>LR: decompose()
LR-->>LP: LancePartitioningStage(path, read_kwargs)
LR-->>LRS: LanceReaderStage(path, fields, read_kwargs)
note over LP: Fan-out stage
LP->>Lance: "lance.dataset(path, **dataset_kwargs)"
Lance-->>LP: dataset (version N)
LP->>LP: enumerate fragments, chunk by fragments_per_partition
LP-->>LRS: "LanceReadTask(frag_ids, version=N) x M"
note over LRS: Per-task execution
LRS->>Lance: "lance.dataset(path, version=N)"
Lance-->>LRS: versioned dataset
LRS->>LRS: detect blob-v2 columns
LRS->>Lance: dataset.scanner(fragments, with_row_address).to_table()
Lance-->>LRS: pa.Table with _rowaddr
opt blob-v2 columns present
LRS->>Lance: "dataset.read_blobs(column, addresses, preserve_order=True)"
Lance-->>LRS: blob payloads
LRS->>LRS: table.set_column(blob_array)
end
opt include_lance_metadata
LRS->>Utils: add_lance_metadata_columns(table)
Utils->>Utils: rename _rowaddr to __lance_rowaddr
Utils->>Utils: shift_right rowaddrs to get __lance_fragid
Utils-->>LRS: enriched pa.Table
end
LRS->>LRS: attach schema JSON to metadata
LRS-->>U: "DocumentBatch(data=pa.Table, _metadata={lance:{schema,version,...}})"
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant U as User
participant LR as LanceReader (CompositeStage)
participant LP as LancePartitioningStage
participant Lance as lance.dataset
participant LRS as LanceReaderStage
participant Utils as nemo_curator/utils/lance
U->>LR: LanceReader(path, read_kwargs, ...)
LR->>LR: decompose()
LR-->>LP: LancePartitioningStage(path, read_kwargs)
LR-->>LRS: LanceReaderStage(path, fields, read_kwargs)
note over LP: Fan-out stage
LP->>Lance: "lance.dataset(path, **dataset_kwargs)"
Lance-->>LP: dataset (version N)
LP->>LP: enumerate fragments, chunk by fragments_per_partition
LP-->>LRS: "LanceReadTask(frag_ids, version=N) x M"
note over LRS: Per-task execution
LRS->>Lance: "lance.dataset(path, version=N)"
Lance-->>LRS: versioned dataset
LRS->>LRS: detect blob-v2 columns
LRS->>Lance: dataset.scanner(fragments, with_row_address).to_table()
Lance-->>LRS: pa.Table with _rowaddr
opt blob-v2 columns present
LRS->>Lance: "dataset.read_blobs(column, addresses, preserve_order=True)"
Lance-->>LRS: blob payloads
LRS->>LRS: table.set_column(blob_array)
end
opt include_lance_metadata
LRS->>Utils: add_lance_metadata_columns(table)
Utils->>Utils: rename _rowaddr to __lance_rowaddr
Utils->>Utils: shift_right rowaddrs to get __lance_fragid
Utils-->>LRS: enriched pa.Table
end
LRS->>LRS: attach schema JSON to metadata
LRS-->>U: "DocumentBatch(data=pa.Table, _metadata={lance:{schema,version,...}})"
|
| def lance_fragment_ids_from_row_addresses(rowaddr_column: pa.ChunkedArray) -> pa.Array: | ||
| rowaddrs = rowaddr_column.combine_chunks().cast(pa.uint64()) | ||
| return pa.array([int(value) >> 32 for value in rowaddrs.to_pylist()], type=pa.uint64()) |
There was a problem hiding this comment.
The fragment-ID extraction converts the entire column to a Python list and performs the bit-shift in a CPython loop. For a table with millions of rows this can add measurable latency. PyArrow's compute layer performs the same shift vectorised in C++ without materialising a Python list.
| def lance_fragment_ids_from_row_addresses(rowaddr_column: pa.ChunkedArray) -> pa.Array: | |
| rowaddrs = rowaddr_column.combine_chunks().cast(pa.uint64()) | |
| return pa.array([int(value) >> 32 for value in rowaddrs.to_pylist()], type=pa.uint64()) | |
| def lance_fragment_ids_from_row_addresses(rowaddr_column: pa.ChunkedArray) -> pa.Array: | |
| import pyarrow.compute as pc | |
| rowaddrs = rowaddr_column.combine_chunks().cast(pa.uint64()) | |
| return pc.shift_right(rowaddrs, 32).cast(pa.uint64()) |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| def decompose(self) -> list[ProcessingStage]: | ||
| if self.task_type != "document": | ||
| msg = f"Converting DocumentBatch to {self.task_type} is not supported yet." | ||
| raise NotImplementedError(msg) | ||
|
|
||
| return [ | ||
| LancePartitioningStage( | ||
| path=self.path, | ||
| fragments_per_partition=self.fragments_per_partition, | ||
| fragment_ids=self.fragment_ids, | ||
| read_kwargs=self.read_kwargs, | ||
| ), | ||
| LanceReaderStage( | ||
| path=self.path, | ||
| fields=self.fields, | ||
| read_kwargs=self.read_kwargs, | ||
| include_lance_metadata=self.include_lance_metadata, | ||
| ), |
There was a problem hiding this comment.
Shared
read_kwargs dict passed to both sub-stages
decompose() passes self.read_kwargs (a single dict object) to both LancePartitioningStage and LanceReaderStage. Each stage copies the dict in its own __post_init__, so the current code is safe. The risk is in LanceReaderStage._dataset_kwargs / _scanner_kwargs, which pop() from their local copy of read_kwargs in a specific order: dataset keys are consumed first, then scanner keys, then all remaining keys are forwarded to the scanner via scanner_kwargs.update(read_kwargs). If a future caller or subclass omits that local dict(read_kwargs or {}) copy in read_task, or if new dataset-level keys are added without a corresponding pop() in _dataset_kwargs, those keys will silently leak into the scanner and produce confusing Lance errors. Documenting the key-parsing contract (which keys go where) or asserting on unrecognised keys after both _dataset_kwargs and _scanner_kwargs have run would make this boundary explicit.
sarahyurick
left a comment
There was a problem hiding this comment.
Hi, just walking through the PR and leaving some minor comments for now. Will try to do more of a deep dive soon.
| msg = f"No data read from files in task {task.task_id}" | ||
| raise ValueError(msg) | ||
| def _effective_read_kwargs(self) -> dict[str, Any]: | ||
| return dict(self.read_kwargs or {}) |
There was a problem hiding this comment.
Nit but I don't see a reason for having a 1 line helper function.
| ) | ||
|
|
||
| def _output_metadata(self, task: ReaderTask, _output: ReaderOutput) -> dict[str, Any]: | ||
| return task._metadata |
There was a problem hiding this comment.
Same comment as above.
| return dataset_kwargs | ||
|
|
||
| def process(self, _: EmptyTask) -> list[LanceReadTask]: | ||
| import lance |
| allow_empty: Whether filtered reads may return empty tables without raising. | ||
| """ | ||
|
|
||
| path: str = "" |
There was a problem hiding this comment.
Make it a hard requirement instead of having a check in the post init:
| path: str = "" | |
| path: str |
| return output.metadata if output.metadata is not None else task._metadata | ||
|
|
||
| def _restore_blob_v2_columns(self, dataset: object, table: pa.Table, blob_columns: list[str]) -> pa.Table: | ||
| import lance |
There was a problem hiding this comment.
Same here, should it be a top-level import? We can lazy-load the LanceReader so that lance is not a hard dependency.
| for column in blob_columns: | ||
| payloads = [ | ||
| payload | ||
| for _, payload in dataset.read_blobs(column, addresses=rowaddrs, preserve_order=True) # type: ignore[attr-defined] |
There was a problem hiding this comment.
General question does # type: ignore[attr-defined] matter for the codebase? Like will it break without it?
| import lance | ||
| from lance.schema import schema_to_json |
There was a problem hiding this comment.
Similar comment as above.
| ) | ||
| from nemo_curator.tasks import EmptyTask | ||
|
|
||
| pytest.importorskip("lance") |
There was a problem hiding this comment.
Should we create a @pytest.mark.lance or something instead? I kinda worry about using importorskip because unless someone is explicitly checking the relevant CI job then it could just silently skip or something.
Split from #2106. This is PR 1 of 3 in the Lance IO stack.
What changed:
Stack:
Validation:
Note: the full JSONL ID-generator reader test path was not used for this focused validation because the local untracked outputs/ benchmark artifacts make Ray runtime-env packaging exceed 512 MiB.