Skip to content

Add Lance reader and writer stages#2106

Closed
VibhuJawa wants to merge 16 commits into
NVIDIA-NeMo:mainfrom
VibhuJawa:feat/lance-integration
Closed

Add Lance reader and writer stages#2106
VibhuJawa wants to merge 16 commits into
NVIDIA-NeMo:mainfrom
VibhuJawa:feat/lance-integration

Conversation

@VibhuJawa

@VibhuJawa VibhuJawa commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Description

Adds a first-pass Lance integration for Curator text IO:

  • LanceReader partitions Lance datasets by fragment and returns Arrow-backed DocumentBatches.
  • LanceWriter writes Curator batches to Lance and records checkpointed commit payloads.
  • LanceAnnotationWriter appends/validates annotation columns with Lance add_columns(...), writes fragment-local row updates, and records checkpointed commit payloads.
  • Writer-side driver commit helpers live in writer/lance.py and publish checkpointed writes with commit_lance_checkpoint(...) and annotation-column updates with commit_lance_annotation_checkpoint(...).
  • Shared Lance metadata/checkpoint utilities live under nemo_curator/utils/lance.py and reuse Curator filesystem/hash helpers.
  • Includes focused Lance functionality coverage for read filters/projection, blob v2 round trips, write checkpoint commits, annotation prepare/update, duplicate row-address rejection, and split-fragment update rejection.

Usage

from nemo_curator.stages.text.io.reader import LanceReader
from nemo_curator.stages.text.io.writer import LanceWriter, commit_lance_checkpoint

reader = LanceReader(
    "s3://bucket/dataset.lance",
    fields=["url", "text", "content_zlib"],
    read_kwargs={"filter": "snapshot_id = 'CC-MAIN-2025-26'"},
)

writer = LanceWriter(
    path="s3://bucket/output.lance",
    commit_path="s3://bucket/checkpoints/output",
    mode="overwrite",
)

# Run the Curator pipeline with the writer, then commit on the driver.
version = commit_lance_checkpoint(
    "s3://bucket/output.lance",
    "s3://bucket/checkpoints/output",
)

Append annotation columns back to an existing Lance dataset:

import pyarrow as pa

from nemo_curator.stages.text.io.writer import LanceAnnotationWriter, commit_lance_annotation_checkpoint

annotation_writer = LanceAnnotationWriter(
    path="s3://bucket/dataset.lance",
    commit_path="s3://bucket/checkpoints/annotations",
    schema=pa.schema([pa.field("word_count", pa.int64())]),
    create_columns=True,
)
annotation_writer.prepare()

# Run the Curator pipeline with `annotation_writer`, then commit on the driver.
version = commit_lance_annotation_checkpoint(
    "s3://bucket/dataset.lance",
    "s3://bucket/checkpoints/annotations",
)

Testing

  • uv run ruff check nemo_curator/utils/lance.py nemo_curator/stages/text/io/reader/lance.py nemo_curator/stages/text/io/writer/lance.py nemo_curator/stages/text/io/writer/__init__.py tests/stages/text/io/reader/test_lance.py tests/stages/text/io/writer/test_lance.py pyproject.toml
  • bash -n tests/L0_Unit_Test_CPU.sh
  • uv run pytest tests/stages/text/io/reader/test_lance.py tests/stages/text/io/writer/test_lance.py -q

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

@copy-pr-bot

copy-pr-bot Bot commented Jun 24, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@VibhuJawa

Copy link
Copy Markdown
Contributor Author

@claude review

@VibhuJawa VibhuJawa changed the title Feat/lance integration Add Lance reader and writer stages Jun 24, 2026
VibhuJawa added 12 commits June 24, 2026 04:01
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds first-pass Lance integration for Curator text IO, enabling fragment-partitioned reading into Arrow-backed DocumentBatch tasks, checkpointed writes/annotation updates, and driver-side commit helpers to publish staged writes.

Changes:

  • Introduces LanceReader (partition + read stages) with support for filter/projection, blob v2 restoration, and optional Lance metadata columns.
  • Adds LanceWriter and LanceAnnotationWriter plus checkpoint/commit helpers (commit_lance_checkpoint, commit_lance_annotation_checkpoint) to publish staged writes.
  • Wires in a new optional lance extra (lance-ray>=0.4), updates CI/unit test env sync, and adds targeted pytest coverage.

Reviewed changes

Copilot reviewed 10 out of 11 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
uv.lock Adds locked dependencies for lance-ray and its transitive requirements; registers lance extra and includes it in all.
pyproject.toml Defines the new optional dependency extra lance and includes it under all.
tests/L0_Unit_Test_CPU.sh Ensures CPU unit test env installs the lance extra.
nemo_curator/stages/text/io/reader/__init__.py Exposes LanceReader from the text IO reader package.
nemo_curator/stages/text/io/writer/__init__.py Exposes LanceWriter / LanceAnnotationWriter from the text IO writer package.
nemo_curator/stages/text/io/reader/lance.py Implements Lance dataset partitioning by fragment and fragment-scoped scanning into DocumentBatch (with optional metadata + blob v2 restore).
nemo_curator/stages/text/io/writer/lance.py Implements staged fragment writes and fragment-local annotation updates with checkpoint record emission.
nemo_curator/stages/text/io/lance_utils.py Adds checkpoint record/marker read-write helpers and reserved Lance metadata column constants.
nemo_curator/stages/text/io/lance_commit.py Adds driver-side commit helpers to publish checkpointed writes/updates to a Lance dataset.
tests/stages/text/io/reader/test_lance.py Adds coverage for partitioning/version pinning, filter/projection behavior, blob v2, and metadata columns.
tests/stages/text/io/writer/test_lance.py Adds coverage for checkpoint commits, blob v2 round trips, annotation prepare/update flows, and rejection of invalid update sets.

Comment thread nemo_curator/stages/text/io/reader/lance.py
Comment on lines +254 to +256
)

def _validate_unique_rowaddrs(table: pa.Table, fragment_id: int) -> None:
Comment thread nemo_curator/stages/text/io/writer/lance_commit.py Outdated
Comment thread nemo_curator/stages/text/io/writer/lance_commit.py Outdated
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>

@VibhuJawa VibhuJawa left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial review

Comment thread nemo_curator/stages/text/io/reader/lance.py
Comment thread nemo_curator/stages/text/io/writer/lance.py Outdated
Comment thread nemo_curator/stages/text/io/writer/lance.py Outdated
Comment thread nemo_curator/stages/text/io/writer/lance.py
Comment thread nemo_curator/stages/text/io/writer/lance.py
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
@VibhuJawa VibhuJawa force-pushed the feat/lance-integration branch from 90c9c47 to 10c0de8 Compare June 24, 2026 04:35

@VibhuJawa VibhuJawa left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more cleanup

import lance
from lance.schema import schema_to_json

version = (task._metadata.get("lance") or {}).get("version")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its fine, lets just assume we will get version from task._metadata.get("lance") !

table = dataset.scanner(**scanner_kwargs).to_table()
if table.num_rows == 0:
return None
lance_schema = pa.schema([dataset.schema.field(name) for name in table.column_names if name in dataset.schema.names])

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why dont we just get schema from dataset.schema ?

Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
@VibhuJawa

Copy link
Copy Markdown
Contributor Author

Closing in favor of 3 PRs , i have started #2111 , #2112, #2113

@VibhuJawa VibhuJawa closed this Jun 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants