diff --git a/nemo_curator/stages/text/io/reader/__init__.py b/nemo_curator/stages/text/io/reader/__init__.py index 973757b078..4d27af23b1 100644 --- a/nemo_curator/stages/text/io/reader/__init__.py +++ b/nemo_curator/stages/text/io/reader/__init__.py @@ -13,6 +13,7 @@ # limitations under the License. from nemo_curator.stages.text.io.reader.jsonl import JsonlReader +from nemo_curator.stages.text.io.reader.lance import LanceReader from nemo_curator.stages.text.io.reader.parquet import ParquetReader -__all__ = ["JsonlReader", "ParquetReader"] +__all__ = ["JsonlReader", "LanceReader", "ParquetReader"] diff --git a/nemo_curator/stages/text/io/reader/base.py b/nemo_curator/stages/text/io/reader/base.py index 551dc463be..ff0042c796 100644 --- a/nemo_curator/stages/text/io/reader/base.py +++ b/nemo_curator/stages/text/io/reader/base.py @@ -15,10 +15,11 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, TypeAlias, TypeVar import numpy as np import pandas as pd +import pyarrow as pa import ray from loguru import logger @@ -28,13 +29,23 @@ from nemo_curator.backends.utils import RayStageSpecKeys from nemo_curator.stages.base import ProcessingStage from nemo_curator.tasks import DocumentBatch, FileGroupTask +from nemo_curator.tasks.tasks import Task + +ReaderTask = TypeVar("ReaderTask", bound=Task) +ReaderData: TypeAlias = pd.DataFrame | pa.Table + + +@dataclass(frozen=True) +class ReaderOutput: + data: ReaderData + metadata: dict[str, Any] | None = None @dataclass -class BaseReader(ProcessingStage[FileGroupTask, DocumentBatch]): - """Common base for tabular file readers. +class BaseReader(ProcessingStage[ReaderTask, DocumentBatch]): + """Common base for tabular readers. - Subclasses must implement the read_data method. + Subclasses must implement read_task for their input task type. """ fields: list[str] | None = None @@ -42,6 +53,7 @@ class BaseReader(ProcessingStage[FileGroupTask, DocumentBatch]): name: str = "" _generate_ids: bool = False _assign_ids: bool = False + allow_empty: bool = False def __post_init__(self) -> None: if self._generate_ids and self._assign_ids: @@ -52,7 +64,7 @@ def inputs(self) -> tuple[list[str], list[str]]: return [], [] def outputs(self) -> tuple[list[str], list[str]]: - output_fields = self.fields or [] + output_fields = list(self.fields or []) if self._generate_ids or self._assign_ids: from nemo_curator.stages.deduplication.id_generator import CURATOR_DEDUP_ID_STR @@ -72,24 +84,16 @@ def setup(self, _: WorkerMetadata | None = None) -> None: ) raise RuntimeError(msg) from None - def process(self, task: FileGroupTask) -> DocumentBatch: - # Merge read kwargs with storage options precedence: task.storage_options > self.read_kwargs - effective_read_kwargs: dict[str, Any] = {} - if self.read_kwargs: - effective_read_kwargs.update(self.read_kwargs) - - # Read the files - result = self.read_data(task.data, effective_read_kwargs, self.fields) + def process(self, task: ReaderTask) -> DocumentBatch: + output = self.read_task(task, self._effective_read_kwargs(), self.fields) + self._validate_result(task, output.data) + return self._document_batch(task, output) - # Validate the result - if ( - (result is None) - or (hasattr(result, "empty") and result.empty) - or (hasattr(result, "num_rows") and result.num_rows == 0) - ): - msg = f"No data read from files in task {task.task_id}" - raise ValueError(msg) + def _effective_read_kwargs(self) -> dict[str, Any]: + return dict(self.read_kwargs or {}) + def _document_batch(self, task: ReaderTask, output: ReaderOutput) -> DocumentBatch: + result = output.data # Apply IDs only for Pandas DataFrames if isinstance(result, pd.DataFrame): if self._generate_ids: @@ -100,16 +104,29 @@ def process(self, task: FileGroupTask) -> DocumentBatch: return DocumentBatch( dataset_name=task.dataset_name, data=result, - _metadata=task._metadata, + _metadata=self._output_metadata(task, output), + _stage_perf=task._stage_perf, ) + def _output_metadata(self, task: ReaderTask, _output: ReaderOutput) -> dict[str, Any]: + return task._metadata + + def _validate_result(self, task: ReaderTask, result: ReaderData) -> None: + if self.allow_empty: + return + if (result is None) or (isinstance(result, pd.DataFrame) and result.empty) or ( + isinstance(result, pa.Table) and result.num_rows == 0 + ): + msg = f"No data read from files in task {task.task_id}" + raise ValueError(msg) + # Subclass responsibilities ------------------------------------------------- - def read_data( + def read_task( self, - file_paths: list[str], + task: ReaderTask, read_kwargs: dict[str, Any] | None, fields: list[str] | None, - ) -> pd.DataFrame | None: # pragma: no cover - abstract + ) -> ReaderOutput: # pragma: no cover - abstract raise NotImplementedError # ID helpers ---------------------------------------------------------------- @@ -136,3 +153,24 @@ def _generate_ids_func(self, filepath: str | list[str], df: pd.DataFrame) -> pd. def ray_stage_spec(self) -> dict[str, Any]: return {RayStageSpecKeys.IS_ACTOR_STAGE: self._generate_ids or self._assign_ids} + + +@dataclass +class BaseFileReader(BaseReader[FileGroupTask]): + """Base reader for file-group readers that consume lists of paths.""" + + def read_task( + self, + task: FileGroupTask, + read_kwargs: dict[str, Any] | None, + fields: list[str] | None, + ) -> ReaderOutput: + return ReaderOutput(self.read_data(task.data, read_kwargs, fields)) + + def read_data( + self, + file_paths: list[str], + read_kwargs: dict[str, Any] | None, + fields: list[str] | None, + ) -> ReaderData: # pragma: no cover - abstract + raise NotImplementedError diff --git a/nemo_curator/stages/text/io/reader/jsonl.py b/nemo_curator/stages/text/io/reader/jsonl.py index 7ae4c81003..94c2c8ab37 100644 --- a/nemo_curator/stages/text/io/reader/jsonl.py +++ b/nemo_curator/stages/text/io/reader/jsonl.py @@ -23,11 +23,11 @@ from nemo_curator.tasks import DocumentBatch, EmptyTask from nemo_curator.utils.file_utils import FILETYPE_TO_DEFAULT_EXTENSIONS, pandas_select_columns -from .base import BaseReader +from .base import BaseFileReader @dataclass -class JsonlReaderStage(BaseReader): +class JsonlReaderStage(BaseFileReader): """ Stage that processes a group of JSONL files into a DocumentBatch. This stage accepts FileGroupTasks created by FilePartitioningStage @@ -53,7 +53,7 @@ def read_data( paths: list[str], read_kwargs: dict[str, Any] | None = None, fields: list[str] | None = None, - ) -> pd.DataFrame | None: + ) -> pd.DataFrame: """Read JSONL files using Pandas.""" # Normalize read_kwargs to a dict to avoid TypeError when None diff --git a/nemo_curator/stages/text/io/reader/lance.py b/nemo_curator/stages/text/io/reader/lance.py new file mode 100644 index 0000000000..f6bfe62e6f --- /dev/null +++ b/nemo_curator/stages/text/io/reader/lance.py @@ -0,0 +1,311 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, Literal + +if TYPE_CHECKING: + import pyarrow as pa + +from nemo_curator.backends.utils import RayStageSpecKeys +from nemo_curator.stages.base import CompositeStage, ProcessingStage +from nemo_curator.tasks import DocumentBatch, EmptyTask +from nemo_curator.tasks.tasks import Task +from nemo_curator.utils.hash_utils import get_deterministic_hash +from nemo_curator.utils.lance import ( + LANCE_FRAGID_COLUMN, + LANCE_ROWADDR_COLUMN, + add_lance_metadata_columns, +) + +from .base import BaseReader, ReaderOutput + + +@dataclass +class LanceReadTask(Task[list[int]]): + """Task containing Lance fragment ids assigned to one read partition. + + This is created by ``LancePartitioningStage`` and consumed by + ``LanceReaderStage``. + + Args: + data: Lance fragment ids to read. + """ + + data: list[int] = field(default_factory=list) + + @property + def num_items(self) -> int: + return len(self.data) + + def validate(self) -> bool: + return bool(self.data) + + def get_deterministic_id(self) -> str: + lance_metadata = self._metadata.get("lance") or {} + parts = [ + str(lance_metadata.get("path", self.dataset_name)), + str(lance_metadata.get("version", "")), + *(str(fragment_id) for fragment_id in self.data), + ] + return get_deterministic_hash(parts) + + +@dataclass +class LancePartitioningStage(ProcessingStage[EmptyTask, LanceReadTask]): + """Stage that partitions a Lance dataset into fragment-id read tasks. + + The stage opens the dataset once, records the resolved Lance version in + each task, and emits fragment groups for ``LanceReaderStage``. + + Args: + path: Path or URI of the Lance dataset. + fragments_per_partition: Number of Lance fragments assigned to each read task. + fragment_ids: Optional explicit fragment ids to read. Defaults to all fragments. Duplicates are ignored. + read_kwargs: Keyword arguments for opening the Lance dataset. + """ + + path: str + fragments_per_partition: int = 32 + fragment_ids: list[int] | None = None + read_kwargs: dict[str, Any] = field(default_factory=dict) + name: str = "lance_partitioning" + + def __post_init__(self) -> None: + if self.fragments_per_partition <= 0: + msg = "fragments_per_partition must be greater than 0" + raise ValueError(msg) + self.read_kwargs = dict(self.read_kwargs or {}) + + def ray_stage_spec(self) -> dict[str, Any]: + return {RayStageSpecKeys.IS_FANOUT_STAGE: True} + + def _dataset_kwargs(self) -> dict[str, Any]: + read_kwargs = dict(self.read_kwargs) + dataset_kwargs = dict(read_kwargs.pop("dataset_options", {}) or {}) + version = dataset_kwargs.pop("version", None) + version = read_kwargs.pop("version", version) + if version is not None: + dataset_kwargs["version"] = version + storage_options = read_kwargs.pop("storage_options", None) + if storage_options is not None: + dataset_kwargs["storage_options"] = storage_options + return dataset_kwargs + + def process(self, _: EmptyTask) -> list[LanceReadTask]: + import lance + + dataset = lance.dataset(self.path, **self._dataset_kwargs()) + available_fragments = sorted(fragment.fragment_id for fragment in dataset.get_fragments()) + if self.fragment_ids is None: + fragment_ids = available_fragments + else: + fragment_ids = sorted(set(self.fragment_ids)) + missing = sorted(set(fragment_ids) - set(available_fragments)) + if missing: + msg = f"Lance dataset does not contain requested fragment ids: {missing[:10]}" + raise ValueError(msg) + + tasks = [] + for start in range(0, len(fragment_ids), self.fragments_per_partition): + owned_fragments = fragment_ids[start : start + self.fragments_per_partition] + tasks.append( + LanceReadTask( + dataset_name=self.path, + data=owned_fragments, + _metadata={ + "source_files": [self.path], + "lance": { + "path": self.path, + "version": dataset.version, + "fragment_ids": owned_fragments, + }, + }, + ) + ) + return tasks + + +@dataclass +class LanceReaderStage(BaseReader[LanceReadTask]): + """Stage that reads Lance fragment groups into ``DocumentBatch`` objects. + + This stage consumes ``LanceReadTask`` objects from ``LancePartitioningStage`` + and reads the pinned dataset version stored in each task. + + Args: + path: Path or URI of the Lance dataset. + fields: Optional columns to read. Overrides ``columns`` in ``read_kwargs``. + read_kwargs: Keyword arguments for Lance dataset and scanner construction. + include_lance_metadata: Whether to include row-address and fragment-id metadata columns. + allow_empty: Whether filtered reads may return empty tables without raising. + """ + + path: str = "" + fields: list[str] | None = None + read_kwargs: dict[str, Any] = field(default_factory=dict) + include_lance_metadata: bool = True + allow_empty: bool = True + name: str = "lance_reader" + + def __post_init__(self) -> None: + super().__post_init__() + if not self.path: + msg = "path is required" + raise ValueError(msg) + self.read_kwargs = dict(self.read_kwargs or {}) + + def outputs(self) -> tuple[list[str], list[str]]: + scanner_options = self.read_kwargs.get("scanner_options") or {} + columns = self.fields if self.fields is not None else self.read_kwargs.get("columns") + if columns is None: + columns = scanner_options.get("columns") + output_fields = list(columns or []) + if self.include_lance_metadata: + output_fields.extend([LANCE_ROWADDR_COLUMN, LANCE_FRAGID_COLUMN]) + return ["data"], output_fields + + def _output_metadata(self, task: LanceReadTask, output: ReaderOutput) -> dict[str, Any]: + return output.metadata if output.metadata is not None else task._metadata + + def _restore_blob_v2_columns(self, dataset: object, table: pa.Table, blob_columns: list[str]) -> pa.Table: + import lance + + rowaddrs = [int(value) for value in table["_rowaddr"].combine_chunks().to_pylist()] + for column in blob_columns: + payloads = [ + payload + for _, payload in dataset.read_blobs(column, addresses=rowaddrs, preserve_order=True) # type: ignore[attr-defined] + ] + table = table.set_column(table.schema.get_field_index(column), column, lance.blob_array(payloads)) + return table + + def _task_version(self, task: LanceReadTask) -> int: + version = (task._metadata.get("lance") or {}).get("version") + if version is None: + msg = f"Lance read task {task.task_id} is missing a pinned Lance version" + raise ValueError(msg) + return version + + def _dataset_kwargs(self, read_kwargs: dict[str, Any], version: int) -> dict[str, Any]: + dataset_kwargs = dict(read_kwargs.pop("dataset_options", {}) or {}) + requested_version = dataset_kwargs.pop("version", None) + requested_version = read_kwargs.pop("version", requested_version) + if requested_version is not None and requested_version != version: + msg = f"Lance read version mismatch: task version={version}, requested version={requested_version}" + raise ValueError(msg) + dataset_kwargs["version"] = version + storage_options = read_kwargs.pop("storage_options", None) + if storage_options is not None: + dataset_kwargs["storage_options"] = storage_options + return dataset_kwargs + + def _scanner_kwargs(self, read_kwargs: dict[str, Any], fields: list[str] | None) -> dict[str, Any]: + scanner_kwargs = dict(read_kwargs.pop("scanner_options", {}) or {}) + scanner_kwargs.update(read_kwargs) + if fields is not None: + scanner_kwargs["columns"] = fields + return scanner_kwargs + + def read_task( + self, + task: LanceReadTask, + read_kwargs: dict[str, Any] | None, + fields: list[str] | None, + ) -> ReaderOutput: + import lance + from lance.schema import schema_to_json + + read_kwargs = dict(read_kwargs or {}) + dataset_kwargs = self._dataset_kwargs(read_kwargs, self._task_version(task)) + scanner_kwargs = self._scanner_kwargs(read_kwargs, fields) + dataset = lance.dataset(self.path, **dataset_kwargs) + fragments = [dataset.get_fragment(fragment_id) for fragment_id in task.data] + requested_columns = scanner_kwargs.get("columns") + blob_columns = [ + field.name + for field in dataset.schema + if getattr(field.type, "extension_name", None) == "lance.blob.v2" + and (requested_columns is None or field.name in requested_columns) + ] + if self.include_lance_metadata or blob_columns: + scanner_kwargs["with_row_address"] = True + scanner_kwargs["fragments"] = fragments + table = dataset.scanner(**scanner_kwargs).to_table() + if blob_columns: + table = self._restore_blob_v2_columns(dataset, table, blob_columns) + if self.include_lance_metadata: + table = add_lance_metadata_columns(table) + elif blob_columns and "_rowaddr" in table.column_names: + table = table.drop_columns(["_rowaddr"]) + + metadata = dict(task._metadata) + lance_metadata = dict(metadata.get("lance") or {}) + lance_metadata["schema"] = schema_to_json(dataset.schema) + metadata["lance"] = lance_metadata + return ReaderOutput(table, metadata) + + +@dataclass +class LanceReader(CompositeStage[EmptyTask, DocumentBatch]): + """Composite stage for reading Lance datasets. + + This high-level stage decomposes into: + 1. ``LancePartitioningStage`` - partitions Lance fragments into read tasks. + 2. ``LanceReaderStage`` - reads fragment groups into ``DocumentBatch`` objects. + + Args: + path: Path or URI of the Lance dataset. + fragments_per_partition: Number of Lance fragments assigned to each read task. + fields: Optional columns to read. + read_kwargs: Keyword arguments for Lance dataset and scanner construction. + include_lance_metadata: Whether to include row-address and fragment-id metadata columns. + fragment_ids: Optional explicit fragment ids to read. Defaults to all fragments. Duplicates are ignored. + task_type: Output task type. Only ``"document"`` is currently supported. + """ + + path: str + fragments_per_partition: int = 32 + fields: list[str] | None = None + read_kwargs: dict[str, Any] | None = None + include_lance_metadata: bool = True + fragment_ids: list[int] | None = None + task_type: Literal["document"] = "document" + name: str = "lance_reader" + + def __post_init__(self) -> None: + super().__init__() + self.read_kwargs = {} if self.read_kwargs is None else dict(self.read_kwargs) + + def decompose(self) -> list[ProcessingStage]: + if self.task_type != "document": + msg = f"Converting DocumentBatch to {self.task_type} is not supported yet." + raise NotImplementedError(msg) + + return [ + LancePartitioningStage( + path=self.path, + fragments_per_partition=self.fragments_per_partition, + fragment_ids=self.fragment_ids, + read_kwargs=self.read_kwargs, + ), + LanceReaderStage( + path=self.path, + fields=self.fields, + read_kwargs=self.read_kwargs, + include_lance_metadata=self.include_lance_metadata, + ), + ] diff --git a/nemo_curator/stages/text/io/reader/parquet.py b/nemo_curator/stages/text/io/reader/parquet.py index 0654b0f62c..6b6247060a 100644 --- a/nemo_curator/stages/text/io/reader/parquet.py +++ b/nemo_curator/stages/text/io/reader/parquet.py @@ -22,11 +22,11 @@ from nemo_curator.tasks import DocumentBatch, EmptyTask from nemo_curator.utils.file_utils import FILETYPE_TO_DEFAULT_EXTENSIONS -from .base import BaseReader +from .base import BaseFileReader @dataclass -class ParquetReaderStage(BaseReader): +class ParquetReaderStage(BaseFileReader): """ Stage that processes a group of Parquet files into a DocumentBatch. This stage accepts FileGroupTasks created by FilePartitioningStage diff --git a/nemo_curator/stages/text/io/writer/__init__.py b/nemo_curator/stages/text/io/writer/__init__.py index 4b0362b21d..b00e78edef 100644 --- a/nemo_curator/stages/text/io/writer/__init__.py +++ b/nemo_curator/stages/text/io/writer/__init__.py @@ -13,9 +13,15 @@ # limitations under the License. from nemo_curator.stages.text.io.writer.jsonl import JsonlWriter +from nemo_curator.stages.text.io.writer.lance import ( + LanceWriter, + commit_lance_checkpoint, +) from nemo_curator.stages.text.io.writer.parquet import ParquetWriter __all__ = [ "JsonlWriter", + "LanceWriter", "ParquetWriter", + "commit_lance_checkpoint", ] diff --git a/nemo_curator/stages/text/io/writer/lance.py b/nemo_curator/stages/text/io/writer/lance.py new file mode 100644 index 0000000000..15f82dc0fb --- /dev/null +++ b/nemo_curator/stages/text/io/writer/lance.py @@ -0,0 +1,186 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import base64 +import pickle +from dataclasses import dataclass, field +from typing import Any, Literal + +import pyarrow as pa + +from nemo_curator.stages.base import ProcessingStage +from nemo_curator.tasks import DocumentBatch, FileGroupTask +from nemo_curator.utils.lance import ( + lance_checkpoint_record_id, + read_lance_checkpoint, + write_lance_checkpoint_marker, + write_lance_checkpoint_record, +) + +_RESERVED_LANCE_PREFIX = "__lance_" + + +def _drop_reserved_lance_columns(table: pa.Table) -> pa.Table: + columns = [name for name in table.column_names if not name.startswith(_RESERVED_LANCE_PREFIX)] + return table.select(columns) + + +def _metadata_lance_schema(task: DocumentBatch) -> pa.Schema | None: + schema = (task._metadata.get("lance") or {}).get("schema") + if not isinstance(schema, dict): + return None + from lance.schema import json_to_schema + + return json_to_schema(schema) + + +def _schema_for_table(lance_schema: pa.Schema, table: pa.Table) -> pa.Schema: + fields = [] + for table_field in table.schema: + if table_field.name in lance_schema.names: + fields.append(lance_schema.field(table_field.name)) + else: + fields.append(table_field) + return pa.schema(fields) + + +@dataclass +class LanceWriter(ProcessingStage[DocumentBatch, FileGroupTask]): + """Write ``DocumentBatch`` tables to Lance fragments and checkpoint the commit.""" + + path: str + commit_path: str + schema: pa.Schema | None = None + write_kwargs: dict[str, Any] = field(default_factory=dict) + fields: list[str] | None = None + name: str = "lance_writer" + mode: Literal["create", "append", "overwrite"] = "create" + + def __post_init__(self) -> None: + self.write_kwargs = dict(self.write_kwargs or {}) + + def inputs(self) -> tuple[list[str], list[str]]: + return ["data"], [] + + def outputs(self) -> tuple[list[str], list[str]]: + return ["data"], [] + + def _output_table_and_schema(self, task: DocumentBatch) -> tuple[pa.Table, pa.Schema | None]: + table = task.to_pyarrow() + schema = self.schema or _metadata_lance_schema(task) + if self.schema is not None: + table = table.select(self.schema.names) + return table, self.schema + table = table.select(self.fields) if self.fields is not None else _drop_reserved_lance_columns(table) + return table, _schema_for_table(schema, table) if schema is not None else None + + def process(self, task: DocumentBatch) -> FileGroupTask: + from lance.schema import schema_to_json + from lance_ray.fragment import write_fragment + + write_kwargs = dict(self.write_kwargs) + checkpoint_storage_options = write_kwargs.pop("checkpoint_storage_options", None) + write_kwargs.setdefault("max_rows_per_file", 500_000) + table, schema = self._output_table_and_schema(task) + results = write_fragment( + [table], + self.path, + schema=schema, + **write_kwargs, + ) + + record_paths = [] + for index, (fragment, schema) in enumerate(results): + record = { + "kind": "lance_write", + "dataset_path": self.path, + "mode": self.mode, + "task_id": task.task_id, + "fragment_index": index, + "schema": schema_to_json(schema), + "fragment": base64.b64encode(pickle.dumps(fragment)).decode("ascii"), + } + record_paths.append( + write_lance_checkpoint_record( + self.commit_path, + record, + lance_checkpoint_record_id("lance_write", task.task_id, index), + checkpoint_storage_options, + ) + ) + + return FileGroupTask( + dataset_name=task.dataset_name, + data=record_paths, + _metadata=task._metadata, + _stage_perf=task._stage_perf, + ) + + +def _validate_checkpoint_path(records: list[dict[str, Any]], path: str) -> None: + dataset_paths = {record["dataset_path"] for record in records} + if dataset_paths != {path}: + msg = f"Checkpoint records are for {sorted(dataset_paths)}, not {path}" + raise ValueError(msg) + + +def _single_checkpoint_value(records: list[dict[str, Any]], key: str, label: str) -> object: + values = {record[key] for record in records} + if len(values) != 1: + msg = f"Expected one {label}; got {sorted(values)}" + raise ValueError(msg) + return next(iter(values)) + + +def _decode_write_fragments(records: list[dict[str, Any]]) -> list[tuple[object, pa.Schema]]: + from lance.schema import json_to_schema + + return [ + (pickle.loads(base64.b64decode(record["fragment"])), json_to_schema(record["schema"])) # noqa: S301 + for record in sorted( + records, key=lambda record: (str(record.get("task_id", "")), record.get("fragment_index", 0)) + ) + ] + + +def commit_lance_checkpoint( + path: str, + commit_path: str, + *, + storage_options: dict[str, Any] | None = None, + checkpoint_storage_options: dict[str, Any] | None = None, +) -> int: + """Commit records written by ``LanceWriter`` and return the Lance version.""" + import lance + from lance_ray import LanceFragmentCommitter + + records, committed_version = read_lance_checkpoint(commit_path, "lance_write", checkpoint_storage_options) + if committed_version is not None: + return committed_version + + _validate_checkpoint_path(records, path) + mode = str(_single_checkpoint_value(records, "mode", "write mode")) + fragments = _decode_write_fragments(records) + schema = fragments[0][1] + + committer = LanceFragmentCommitter(path, schema=schema, mode=mode, storage_options=storage_options) + if mode == "append": + committer.on_write_start(schema) + fragment_payloads = [(pickle.dumps(fragment), pickle.dumps(schema)) for fragment, schema in fragments] + committer.on_write_complete([fragment_payloads]) + version = lance.dataset(path, storage_options=storage_options).version + write_lance_checkpoint_marker(commit_path, version, checkpoint_storage_options) + return version diff --git a/nemo_curator/utils/lance.py b/nemo_curator/utils/lance.py new file mode 100644 index 0000000000..c955ed82fb --- /dev/null +++ b/nemo_curator/utils/lance.py @@ -0,0 +1,105 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import posixpath +from typing import Any + +import pyarrow as pa + +from nemo_curator.utils.file_utils import get_fs +from nemo_curator.utils.hash_utils import get_deterministic_hash + +LANCE_ROWADDR_COLUMN = "__lance_rowaddr" +LANCE_FRAGID_COLUMN = "__lance_fragid" +_COMMITTED_MARKER = "_COMMITTED" +_RECORDS_DIR = "records" + + +def lance_checkpoint_record_id(kind: str, *parts: object) -> str: + values = [str(part) for part in parts if part not in {None, ""}] + return f"{kind}-{get_deterministic_hash(values or [kind])}" + + +def lance_fragment_ids_from_row_addresses(rowaddr_column: pa.ChunkedArray) -> pa.Array: + rowaddrs = rowaddr_column.combine_chunks().cast(pa.uint64()) + return pa.array([int(value) >> 32 for value in rowaddrs.to_pylist()], type=pa.uint64()) + + +def add_lance_metadata_columns(table: pa.Table) -> pa.Table: + if "_rowaddr" not in table.column_names: + msg = "Lance scanner did not return _rowaddr; include_lance_metadata requires row addresses" + raise ValueError(msg) + + table = table.rename_columns([LANCE_ROWADDR_COLUMN if name == "_rowaddr" else name for name in table.column_names]) + return table.append_column(LANCE_FRAGID_COLUMN, lance_fragment_ids_from_row_addresses(table[LANCE_ROWADDR_COLUMN])) + + +def _checkpoint_fs_path(commit_path: str, storage_options: dict[str, Any] | None = None) -> tuple[Any, str]: + fs = get_fs(commit_path, storage_options) + return fs, fs._strip_protocol(commit_path) + + +def _checkpoint_path(fs_path: str, *parts: str) -> str: + return posixpath.join(fs_path.rstrip("/"), *parts) + + +def write_lance_checkpoint_record( + commit_path: str, + record: dict[str, Any], + record_id: str, + storage_options: dict[str, Any] | None = None, +) -> str: + fs, fs_path = _checkpoint_fs_path(commit_path, storage_options) + records_dir = _checkpoint_path(fs_path, _RECORDS_DIR) + fs.makedirs(records_dir, exist_ok=True) + record_path = _checkpoint_path(records_dir, f"{record_id}.json") + with fs.open(record_path, "w") as stream: + stream.write(json.dumps(record, sort_keys=True) + "\n") + return fs.unstrip_protocol(record_path) + + +def read_lance_checkpoint( + commit_path: str, + kind: str, + storage_options: dict[str, Any] | None = None, +) -> tuple[list[dict[str, Any]], int | None]: + fs, fs_path = _checkpoint_fs_path(commit_path, storage_options) + marker_path = _checkpoint_path(fs_path, _COMMITTED_MARKER) + if fs.exists(marker_path): + with fs.open(marker_path) as stream: + return [], int(json.loads(stream.read())["version"]) + + records = [] + for record_path in sorted(fs.glob(_checkpoint_path(fs_path, _RECORDS_DIR, "*.json"))): + with fs.open(record_path) as stream: + record = json.loads(stream.read()) + if record.get("kind") == kind: + records.append(record) + if not records: + msg = f"No {kind} checkpoint records found under {commit_path}" + raise ValueError(msg) + return records, None + + +def write_lance_checkpoint_marker( + commit_path: str, + version: int, + storage_options: dict[str, Any] | None = None, +) -> None: + fs, fs_path = _checkpoint_fs_path(commit_path, storage_options) + marker_path = _checkpoint_path(fs_path, _COMMITTED_MARKER) + fs.makedirs(posixpath.dirname(marker_path), exist_ok=True) + with fs.open(marker_path, "w") as stream: + stream.write(json.dumps({"version": version}, sort_keys=True, indent=2) + "\n") diff --git a/pyproject.toml b/pyproject.toml index cedc8c48bf..340a3b86e4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -206,6 +206,8 @@ text_cpu = [ "sentence-transformers", ] +lance = ["lance-ray>=0.4"] + text_cuda12 = [ "nemo_curator[cuda12]", "nemo_curator[deduplication_cuda12]", @@ -281,6 +283,7 @@ all = [ "nemo_curator[image_cuda12]", "nemo_curator[inference_server]", "nemo_curator[interleaved_cuda12]", + "nemo_curator[lance]", "nemo_curator[math_cuda12]", "nemo_curator[sdg_cuda12]", "nemo_curator[text_cuda12]", diff --git a/tests/L0_Unit_Test_CPU.sh b/tests/L0_Unit_Test_CPU.sh index 0a498beb78..032838e5b2 100644 --- a/tests/L0_Unit_Test_CPU.sh +++ b/tests/L0_Unit_Test_CPU.sh @@ -23,6 +23,6 @@ export UV_NO_CACHE=1 rm -rf .venv uv venv --seed --python "${PY_VERSION}" -uv sync --no-progress --link-mode copy --locked --extra audio_cpu --extra sdg_cpu --extra text_cpu --extra video_cpu --group test +uv sync --no-progress --link-mode copy --locked --extra audio_cpu --extra sdg_cpu --extra text_cpu --extra video_cpu --extra lance --group test source .venv/bin/activate coverage run -a --branch --source=nemo_curator -m pytest -v "tests/$FOLDER" -m "not gpu" diff --git a/tests/stages/text/io/reader/test_lance.py b/tests/stages/text/io/reader/test_lance.py new file mode 100644 index 0000000000..54407b79b4 --- /dev/null +++ b/tests/stages/text/io/reader/test_lance.py @@ -0,0 +1,154 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path + +import pyarrow as pa +import pytest + +from nemo_curator.stages.text.io.reader.base import BaseReader +from nemo_curator.stages.text.io.reader.lance import ( + LANCE_FRAGID_COLUMN, + LANCE_ROWADDR_COLUMN, + LancePartitioningStage, + LanceReader, + LanceReaderStage, +) +from nemo_curator.tasks import EmptyTask + +pytest.importorskip("lance") + + +def _write_lance_dataset(path: Path) -> None: + import lance + + table = pa.table( + { + "snapshot_id": ["CC-MAIN-2025-26", "CC-MAIN-2025-18", "CC-MAIN-2025-26", "CC-MAIN-2025-26"], + "url": ["https://a.example", "https://b.example", "https://c.example", "https://d.example"], + "text": ["alpha one", "beta two", "gamma three", "delta four"], + "content_zlib": lance.blob_array([b"html-a", b"html-b", b"html-c", b"html-d"]), + }, + schema=pa.schema( + [ + pa.field("snapshot_id", pa.string()), + pa.field("url", pa.string()), + pa.field("text", pa.string()), + lance.blob_field("content_zlib"), + ] + ), + ) + lance.write_dataset( + table, str(path), mode="create", max_rows_per_file=2, max_rows_per_group=2, data_storage_version="2.2" + ) + + +def test_lance_reader_partitions_filters_blobs_and_metadata(tmp_path: Path): + dataset_path = tmp_path / "docs.lance" + _write_lance_dataset(dataset_path) + read_tasks = LancePartitioningStage(path=str(dataset_path), fragments_per_partition=1).process(EmptyTask) + + assert issubclass(LanceReaderStage, BaseReader) + assert len(read_tasks) == 2 + assert read_tasks[0].dataset_name == str(dataset_path) + assert {fragment_id for task in read_tasks for fragment_id in task.data} == {0, 1} + assert read_tasks[0].get_deterministic_id() != read_tasks[1].get_deterministic_id() + + reader = LanceReaderStage( + path=str(dataset_path), + fields=["snapshot_id", "url", "content_zlib"], + read_kwargs={"filter": "snapshot_id = 'CC-MAIN-2025-26'", "scanner_options": {"batch_size": 2}}, + ) + batches = [batch for task in read_tasks if (batch := reader.process(task))] + + seen_fragments: set[int] = set() + for batch in batches: + table = batch.to_pyarrow() + assert "schema" in batch._metadata["lance"] + assert LANCE_ROWADDR_COLUMN in table.column_names + assert LANCE_FRAGID_COLUMN in table.column_names + assert table.schema.field("content_zlib").type.extension_name == "lance.blob.v2" + fragids = {int(value) for value in table[LANCE_FRAGID_COLUMN].combine_chunks().to_pylist()} + assert seen_fragments.isdisjoint(fragids) + seen_fragments.update(fragids) + assert seen_fragments == {0, 1} + + +def test_lance_reader_validates_requested_fragments(tmp_path: Path): + dataset_path = tmp_path / "docs.lance" + _write_lance_dataset(dataset_path) + + tasks = LancePartitioningStage(path=str(dataset_path), fragments_per_partition=1, fragment_ids=[1, 0, 1]).process( + EmptyTask + ) + assert [task.data for task in tasks] == [[0], [1]] + + with pytest.raises(ValueError, match="requested fragment ids"): + LancePartitioningStage(path=str(dataset_path), fragment_ids=[999]).process(EmptyTask) + + +def test_lance_reader_columns_empty_filters_and_fields_override(tmp_path: Path): + dataset_path = tmp_path / "docs.lance" + _write_lance_dataset(dataset_path) + task = LancePartitioningStage(path=str(dataset_path)).process(EmptyTask)[0] + + batch = LanceReaderStage( + path=str(dataset_path), read_kwargs={"columns": ["url"]}, include_lance_metadata=False + ).process(task) + assert batch.to_pyarrow().column_names == ["url"] + + empty_batch = LanceReaderStage(path=str(dataset_path), read_kwargs={"filter": "snapshot_id = 'missing'"}).process( + task + ) + empty_table = empty_batch.to_pyarrow() + assert empty_table.num_rows == 0 + assert LANCE_ROWADDR_COLUMN in empty_table.column_names + assert LANCE_FRAGID_COLUMN in empty_table.column_names + + _, reader_stage = LanceReader( + path="example.lance", fields=["a", "b"], read_kwargs={"columns": ["ignored"]} + ).decompose() + assert reader_stage.fields == ["a", "b"] + assert reader_stage.include_lance_metadata is True + + +def test_lance_reader_uses_partition_version(tmp_path: Path): + import lance + + dataset_path = tmp_path / "docs.lance" + lance.write_dataset(pa.table({"text": ["old"]}), str(dataset_path), mode="create", max_rows_per_file=1) + task = LancePartitioningStage(path=str(dataset_path)).process(EmptyTask)[0] + lance.write_dataset(pa.table({"text": ["new"]}), str(dataset_path), mode="overwrite", max_rows_per_file=1) + + batch = LanceReaderStage(path=str(dataset_path), fields=["text"], include_lance_metadata=False).process(task) + + assert batch.to_pyarrow()["text"].to_pylist() == ["old"] + + +def test_lance_reader_rejects_conflicting_version(tmp_path: Path): + import lance + + dataset_path = tmp_path / "docs.lance" + lance.write_dataset(pa.table({"text": ["old"]}), str(dataset_path), mode="create", max_rows_per_file=1) + task = LancePartitioningStage(path=str(dataset_path)).process(EmptyTask)[0] + lance.write_dataset(pa.table({"text": ["new"]}), str(dataset_path), mode="overwrite", max_rows_per_file=1) + latest_version = lance.dataset(str(dataset_path)).version + + with pytest.raises(ValueError, match="version mismatch"): + LanceReaderStage( + path=str(dataset_path), + fields=["text"], + read_kwargs={"version": latest_version}, + include_lance_metadata=False, + ).process(task) diff --git a/tests/stages/text/io/reader/test_parquet.py b/tests/stages/text/io/reader/test_parquet.py index 4bb93e72bf..6f7cc7be55 100644 --- a/tests/stages/text/io/reader/test_parquet.py +++ b/tests/stages/text/io/reader/test_parquet.py @@ -86,6 +86,7 @@ def test_parquet_reader_stage_pandas_reads_and_concatenates(sample_parquet_files ): out = stage.process(task) assert isinstance(out, DocumentBatch) + assert out._metadata == {"source_files": sample_parquet_files[:2]} df = out.to_pandas() assert isinstance(df, pd.DataFrame) @@ -157,6 +158,23 @@ def test_parquet_reader_stage_pandas_raises_when_all_columns_missing(tmp_path: P _ = stage.process(task) +def test_parquet_reader_stage_empty_file_uses_base_reader_policy(tmp_path: Path): + f = tmp_path / "empty.parquet" + pd.DataFrame({"text": pd.Series(dtype="string"), "score": pd.Series(dtype="float64")}).to_parquet( + f, + index=False, + ) + task = _make_file_group_task([str(f)]) + + with pytest.raises(ValueError, match="No data read from files"): + ParquetReaderStage().process(task) + + out = ParquetReaderStage(allow_empty=True).process(task) + assert isinstance(out, DocumentBatch) + assert out.num_items == 0 + assert out.to_pandas().columns.tolist() == ["text", "score"] + + def test_parquet_reader_stage_pyarrow_reads_and_concatenates(tmp_path: Path): f1 = tmp_path / "a.parquet" f2 = tmp_path / "b.parquet" diff --git a/tests/stages/text/io/writer/test_lance.py b/tests/stages/text/io/writer/test_lance.py new file mode 100644 index 0000000000..45ef6daf2f --- /dev/null +++ b/tests/stages/text/io/writer/test_lance.py @@ -0,0 +1,131 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from pathlib import Path + +import pyarrow as pa +import pytest + +from nemo_curator.stages.text.io.reader.lance import ( + LancePartitioningStage, + LanceReaderStage, +) +from nemo_curator.stages.text.io.writer import ( + LanceWriter, + commit_lance_checkpoint, +) +from nemo_curator.tasks import DocumentBatch, EmptyTask + +pytest.importorskip("lance") +pytest.importorskip("lance_ray") + + +def _blob_schema(extra_fields: list[pa.Field] | None = None) -> pa.Schema: + import lance + + fields = [ + pa.field("id", pa.int64()), + pa.field("url", pa.string()), + pa.field("text", pa.string()), + lance.blob_field("content_zlib"), + ] + fields.extend(extra_fields or []) + return pa.schema(fields) + + +def _blob_table() -> pa.Table: + import lance + + return pa.table( + { + "id": [1, 2, 3, 4], + "url": ["https://a.example", "https://b.example", "https://c.example", "https://d.example"], + "text": ["alpha one", "beta two", "gamma three", "delta four"], + "content_zlib": lance.blob_array([b"html-a", b"html-b", b"html-c", b"html-d"]), + }, + schema=_blob_schema(), + ) + + +def _write_source_dataset(path: Path) -> None: + import lance + + lance.write_dataset( + _blob_table(), str(path), mode="create", max_rows_per_file=2, max_rows_per_group=2, data_storage_version="2.2" + ) + + +def _assert_blob_dataset(path: Path, version: int) -> None: + import lance + + dataset = lance.dataset(str(path), version=version) + assert dataset.count_rows() == 4 + assert dataset.schema.field("content_zlib").type.extension_name == "lance.blob.v2" + blobs = dataset.read_blobs("content_zlib", indices=[0, 1, 2, 3], preserve_order=True) + assert sorted(payload for _, payload in blobs) == [b"html-a", b"html-b", b"html-c", b"html-d"] + + +def test_lance_writer_checkpoint_commit_retry_and_blobs(tmp_path: Path): + output_path = tmp_path / "out.lance" + commit_path = tmp_path / "writer_commit" + batch = DocumentBatch(dataset_name="docs", data=_blob_table()) + batch._set_task_id("0", "task") + writer = LanceWriter( + path=str(output_path), + commit_path=str(commit_path), + schema=_blob_schema(), + mode="overwrite", + write_kwargs={"max_rows_per_file": 2, "max_rows_per_group": 2, "data_storage_version": "2.2"}, + ) + + writer.process(batch) + writer.process(batch) + + records = [json.loads(path.read_text()) for path in (commit_path / "records").glob("*.json")] + assert {(record["task_id"], record["fragment_index"]) for record in records} == {("0_task", 0), ("0_task", 1)} + version = commit_lance_checkpoint(str(output_path), str(commit_path)) + assert commit_lance_checkpoint(str(output_path), str(commit_path)) == version + _assert_blob_dataset(output_path, version) + + +def test_lance_writer_preserves_reader_blob_columns_without_explicit_schema(tmp_path: Path): + source_path = tmp_path / "source.lance" + output_path = tmp_path / "out.lance" + commit_path = tmp_path / "writer_commit" + _write_source_dataset(source_path) + read_task = LancePartitioningStage(path=str(source_path), fragments_per_partition=2).process(EmptyTask)[0] + batch = LanceReaderStage(path=str(source_path), fields=["id", "url", "text", "content_zlib"]).process(read_task) + + LanceWriter( + path=str(output_path), + commit_path=str(commit_path), + mode="overwrite", + write_kwargs={"data_storage_version": "2.2"}, + ).process(batch) + + _assert_blob_dataset(output_path, commit_lance_checkpoint(str(output_path), str(commit_path))) + + +def test_lance_writer_allows_empty_batches(tmp_path: Path): + batch = DocumentBatch(dataset_name="docs", data=pa.table({"text": pa.array([], type=pa.string())})) + batch._set_task_id("0", "empty") + + result = LanceWriter( + path=str(tmp_path / "out.lance"), + commit_path=str(tmp_path / "writer_commit"), + schema=batch.to_pyarrow().schema, + ).process(batch) + + assert result.data == [] diff --git a/uv.lock b/uv.lock index 4c0bfd823f..bd1e2d6497 100644 --- a/uv.lock +++ b/uv.lock @@ -4009,6 +4009,52 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/08/10/9f8af3e6f569685ce3af7faab51c8dd9d93b9c38eba339ca31c746119447/kubernetes-32.0.1-py2.py3-none-any.whl", hash = "sha256:35282ab8493b938b08ab5526c7ce66588232df00ef5e1dbe88a419107dc10998", size = 1988070, upload-time = "2025-02-18T21:06:31.391Z" }, ] +[[package]] +name = "lance-namespace" +version = "0.7.7" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "lance-namespace-urllib3-client" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/06/5c/9822af615fc1bd3ee1073994696c739aecde377be32435ec3303aed1bc5d/lance_namespace-0.7.7.tar.gz", hash = "sha256:d00b525f2e26993a6c61668e798bca6c808605ab8a79f29f86a1a1af92d91ae2", size = 10754, upload-time = "2026-05-20T17:32:59.45Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/11/43/186acc1156da20c351db196e2b6241b2453b16dc1b4cc8e0a626667ca471/lance_namespace-0.7.7-py3-none-any.whl", hash = "sha256:477a7ca6b5e1f673a2c9ba52f42d6e8e3ff7c27a601392a21eb90fba98d0309b", size = 12581, upload-time = "2026-05-20T17:32:57.389Z" }, +] + +[[package]] +name = "lance-namespace-urllib3-client" +version = "0.7.7" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pydantic" }, + { name = "python-dateutil" }, + { name = "typing-extensions" }, + { name = "urllib3" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/07/95/38ab81ccc1e09beeecd8ddfc61b8bc73831dc5053db1e3f9021f64a4896b/lance_namespace_urllib3_client-0.7.7.tar.gz", hash = "sha256:4d8c066628c17c6a10cf643b51a7f7ae1bfb8a614d9cc54a5af38a4ba2b4b102", size = 202930, upload-time = "2026-05-20T17:32:58.308Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/35/96/5483e48e40433b1d078183c15a92c99e59a156041b0260e7f18ee34e7c08/lance_namespace_urllib3_client-0.7.7-py3-none-any.whl", hash = "sha256:9221c3e00fd89f0c811953d94b32d2ea527765280460a174f5872dc8a74c0ed6", size = 334767, upload-time = "2026-05-20T17:32:55.883Z" }, +] + +[[package]] +name = "lance-ray" +version = "0.4.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "lance-namespace" }, + { name = "more-itertools", marker = "python_full_version < '3.12'" }, + { name = "packaging" }, + { name = "pyarrow" }, + { name = "pylance" }, + { name = "pytest" }, + { name = "pytest-cov" }, + { name = "ray", extra = ["data"] }, +] +sdist = { url = "https://files.pythonhosted.org/packages/3f/6d/5d7047d6680bb71e5f1d83342fb58c6b7b376ea8ba30142fe9771d2d657f/lance_ray-0.4.2.tar.gz", hash = "sha256:c1d32ca787b29e1efc41d68d5802ead6229371e680d7135fa52f2f4f2f011d0c", size = 60459, upload-time = "2026-05-12T03:37:32.619Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2c/b8/4b993529f2d5d90a30ab9373254081723b9ad9e2caa9bd359b78a34f7243/lance_ray-0.4.2-py3-none-any.whl", hash = "sha256:aed853718177783a371788fba4a0bfd84621ae577665f1fc9c359fc4a541863e", size = 42869, upload-time = "2026-05-12T03:37:31.502Z" }, +] + [[package]] name = "lark" version = "1.2.2" @@ -5087,6 +5133,7 @@ all = [ { name = "gpustat" }, { name = "iso639-lang" }, { name = "justext" }, + { name = "lance-ray" }, { name = "librosa" }, { name = "lxml" }, { name = "matplotlib" }, @@ -5265,6 +5312,9 @@ interleaved-cuda12 = [ { name = "timm" }, { name = "vllm", marker = "platform_machine == 'x86_64' and sys_platform != 'darwin'" }, ] +lance = [ + { name = "lance-ray" }, +] math-cpu = [ { name = "beautifulsoup4" }, { name = "boto3" }, @@ -5490,6 +5540,7 @@ requires-dist = [ { name = "iso639-lang", marker = "extra == 'translation-common'", specifier = ">=2.6.0" }, { name = "jieba", specifier = "==0.42.1" }, { name = "justext", marker = "extra == 'text-cpu'" }, + { name = "lance-ray", marker = "extra == 'lance'", specifier = ">=0.4" }, { name = "librosa", marker = "extra == 'audio-common'" }, { name = "loguru" }, { name = "lxml", marker = "extra == 'text-cpu'" }, @@ -5516,6 +5567,7 @@ requires-dist = [ { name = "nemo-curator", extras = ["inference-server"], marker = "extra == 'sdg-cuda12'" }, { name = "nemo-curator", extras = ["interleaved-cpu"], marker = "extra == 'interleaved-cuda12'" }, { name = "nemo-curator", extras = ["interleaved-cuda12"], marker = "extra == 'all'" }, + { name = "nemo-curator", extras = ["lance"], marker = "extra == 'all'" }, { name = "nemo-curator", extras = ["math-cpu"], marker = "extra == 'math-cuda12'" }, { name = "nemo-curator", extras = ["math-cuda12"], marker = "extra == 'all'" }, { name = "nemo-curator", extras = ["sdg-cpu"], marker = "extra == 'sdg-cuda12'" }, @@ -5613,7 +5665,7 @@ requires-dist = [ { name = "warcio", marker = "extra == 'text-cpu'" }, { name = "whisperx", marker = "platform_machine == 'x86_64' and sys_platform != 'darwin' and extra == 'audio-common'", specifier = ">=3.8.4" }, ] -provides-extras = ["cuda12", "vllm", "inference-server", "deduplication-cuda12", "audio-common", "audio-cpu", "audio-cuda12", "image-cpu", "image-cuda12", "translation-common", "translation-metrics", "translation-segmentation", "translation-aws", "translation-google", "translation-nmt", "translation-all", "text-cpu", "text-cuda12", "video-cpu", "video-cuda12", "math-cpu", "math-cuda12", "interleaved-cpu", "interleaved-cuda12", "sdg-cpu", "sdg-cuda12", "all"] +provides-extras = ["cuda12", "vllm", "inference-server", "deduplication-cuda12", "audio-common", "audio-cpu", "audio-cuda12", "image-cpu", "image-cuda12", "translation-common", "translation-metrics", "translation-segmentation", "translation-aws", "translation-google", "translation-nmt", "translation-all", "text-cpu", "lance", "text-cuda12", "video-cpu", "video-cuda12", "math-cpu", "math-cuda12", "interleaved-cpu", "interleaved-cuda12", "sdg-cpu", "sdg-cuda12", "all"] [package.metadata.requires-dev] build = [ @@ -8213,6 +8265,24 @@ crypto = [ { name = "cryptography" }, ] +[[package]] +name = "pylance" +version = "7.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "lance-namespace" }, + { name = "numpy" }, + { name = "pyarrow" }, +] +wheels = [ + { url = "https://files.pythonhosted.org/packages/ac/ad/2f64921bf346e7075aef24a72595db44821724a3d89a9a92dd24e79632aa/pylance-7.0.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:98422021975be76e72b1572f41b8c9abb3bee5bdc9bfa5e9ce731110a65ed4d1", size = 62134146, upload-time = "2026-05-27T21:59:37.459Z" }, + { url = "https://files.pythonhosted.org/packages/73/1c/c5a01bee0160b55d9a98895cbd33091d038f0a0995b121ab72e629008d02/pylance-7.0.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4bec86ee5b6fbd8bfc493e653f0a1fba0303cfe5492b9b46fc25ab908edc7183", size = 65373684, upload-time = "2026-05-27T22:04:01.584Z" }, + { url = "https://files.pythonhosted.org/packages/eb/da/1fe8b8f7dbfe734d76af76acc994fc360a0d0c79a4874ef69f5a72a58fe3/pylance-7.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:881491432c53184e52f8d1db8d5f872f39a03f36fb104bec77b33d379519d8b5", size = 69458555, upload-time = "2026-05-27T22:16:50.567Z" }, + { url = "https://files.pythonhosted.org/packages/76/f0/dd505cf3fd0226ab9d94759acd713125af1d3bfacfd80bbd52e3b9f89509/pylance-7.0.0-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:18453999e7fff4f76b16d6b7882c9df0628bd142ff95e2461bd7dd5ee3fe0af3", size = 65394430, upload-time = "2026-05-27T22:05:30.923Z" }, + { url = "https://files.pythonhosted.org/packages/17/ba/2357b81034f28eb00790e258ed140289a6a887a7468ca9df6349fd186b27/pylance-7.0.0-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:04a58051d408c60fe76d41a220dcaf8fea8fb6d1aa0ca78a709b60bc3cc8d19a", size = 69473470, upload-time = "2026-05-27T22:17:18.935Z" }, + { url = "https://files.pythonhosted.org/packages/1f/ec/5c00b6303a67d787f9475141832cbdc513d674ac3dcaeef8a7b169905e65/pylance-7.0.0-cp39-abi3-win_amd64.whl", hash = "sha256:467d4864af047eaab4e1370e2f1e88e2c6f507c079874421116cb41d78bc3629", size = 74792863, upload-time = "2026-05-27T22:19:23.875Z" }, +] + [[package]] name = "pylibcudf-cu12" version = "25.10.0"