Skip to content

[WIP] Feat/cc lancedb pipeline#2101

Draft
VibhuJawa wants to merge 37 commits into
NVIDIA-NeMo:mainfrom
VibhuJawa:feat/cc-lancedb-pipeline
Draft

[WIP] Feat/cc lancedb pipeline#2101
VibhuJawa wants to merge 37 commits into
NVIDIA-NeMo:mainfrom
VibhuJawa:feat/cc-lancedb-pipeline

Conversation

@VibhuJawa

Copy link
Copy Markdown
Contributor

Description

Usage

# Add snippet demonstrating usage

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

VibhuJawa and others added 30 commits June 17, 2026 11:30
… and LanceDB on PBSS

Adds a 7-stage NeMo Curator pipeline that reads Common Crawl index shards
from PBSS S3, byte-range fetches WARC records, runs three independent HTML
extractor stages pipelined for 3x extraction throughput, and writes to a
LanceDB table on PBSS S3.

New stages (nemo_curator/stages/):
- CCIndexShardListStage     -- fan-out: list CC index Parquet shards per snapshot
- CCIndexParquetReaderStage -- fan-out: stream Parquet via pafs.S3FileSystem (no full download)
- CCWarcByteRangeFetchStage -- actor: S3/HTTP byte-range WARC fetch (16 threads)
- HtmlExtractStage          -- parameterized actor: one extractor per instance
- LanceDBWriter             -- actor: append to LanceDB table, batch_size=1 for streaming

Pipeline entry point: tutorials/text/cc-lancedb/build_url_index.py
LanceDB schema:       tutorials/text/cc-lancedb/lancedb_writer.py

Key design decisions:
- 3 sequential HtmlExtractStage actors pipeline different blocks simultaneously
  (Ray streaming), giving ~3x CPU utilization vs a single combined stage
- CCIndexParquetReaderStage uses pyarrow.fs.S3FileSystem for row-group streaming
  (no full 898MB shard download before first batch; S3FileSystem cached in setup())
- warc_filename bucket prefix stripped at read time in CCIndexParquetReaderStage
- batch_size=1 on LanceDBWriter keeps it active throughout; no compaction needed
- LanceDBWriter.resources=Resources(cpus=2.0) for PyArrow + LanceDB io_threads
- fetch_stage.max_workers drives the Ray CPU cap to stay under PBSS ~400
  concurrent connection throttle (384 / max_workers + reserved_cpus)
- CC PBSS credentials passed explicitly via s3_key_id/s3_secret to avoid
  env-var pickling issues across Ray worker processes

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
batch_size=1 in Ray Data means 1 ROW per process() call, causing
90K separate tbl.add() calls which never complete. batch_size=None
passes one full block (chunk_size rows) per call.

cpus=4.0 on LanceDBWriter gives PyArrow blob serialisation more
CPU headroom; reserved_cpus updated to 7 (3 extract + 4 writer).

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…ipeline

CCWarcByteRangeFetcher was a DocumentExtractor subclass with extract(),
_run_extractor(), and three embedded extractor instances. The 3-stage
HtmlExtractStage pipeline made all extraction code in this class dead.
Stripped to a plain fetch-only helper with no inheritance.

Removes: LANCEDB_CC_SCHEMA, pipeline.py, cc_html_bytes_extractor.py,
compact_and_index.py, slurm/submit.sh, slurm/compact_and_index.sh.
LanceDBWriter.batch_size=5_000 fixes batch_size=None (falsy) bug where
Ray Data buffered all blocks before calling process().

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…DownloadExtractStage

Removes custom cc_index.py (CCIndexShardListStage, CCIndexParquetReaderStage)
and warc_byte_range.py (CCWarcByteRangeFetcher, CCWarcByteRangeFetchStage).
These duplicated the upstream DocumentDownloadExtractStage which already
handles URL generation -> WARC download -> record iteration correctly.

Changes:
- CommonCrawlWARCDownloader gains s3_bucket + s3_endpoint_url params so it
  can point to PBSS mirror without subclassing. Existing callers unaffected.
- HtmlExtractStage gains input_column param (default "content", the upstream
  CommonCrawlWarcIterator field name).
- build_url_index.py -> build_cc_lancedb.py: rewritten to use
  DocumentDownloadExtractStage(extractor=None) + 3x HtmlExtractStage.
- __init__.py: exports only HtmlExtractStage + CommonCrawlDownloadExtractStage.

Pipeline: DocumentDownloadExtractStage -> Trafilatura -> JusText ->
          Resiliparse -> LanceDBWriter

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…it rm in prev commit)

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
- Remove `is_s3_compatible` flag in LanceDBWriter; `if endpoint:` is equivalent
- Add `aws_region` to `_build_lancedb_storage_options` in build_cc_lancedb.py
- Extract `_FETCH_CONCURRENCY = 24` named constant; replace magic number in ray.init
- Remove `_BLOB_META` / `metadata=_BLOB_META` from tutorial schema; LanceDBWriter.__post_init__ auto-injects blob encoding metadata
- Wire HtmlExtractStage.inputs() and process() to use self.input_column (was hardcoded "cc_html_bytes")
- Remove all Slurm shell scripts and benchmark_pbss.py from tutorials/text/cc-lancedb

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Add s3_key_id/s3_secret params to CommonCrawlWARCDownloader.__init__.
When set, they are injected only into the s5cmd subprocess environment,
leaving the ambient AWS_ACCESS_KEY_ID/SECRET free for LanceDB writes.

build_cc_lancedb.py reads CC_PBSS_ACCESS_KEY_ID/CC_PBSS_SECRET_ACCESS_KEY
for WARC download and AWS_ACCESS_KEY_ID/SECRET for LanceDB write,
falling back to the write creds if the read-specific vars are absent.

Also fix misleading ray.init comment re: PBSS connection throttle.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Compaction is a separate operational concern; keep the writer focused
on streaming appends only. Users can run lancedb tbl.compact_files()
independently after a data load if needed.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…le in setup

- ray.init uses RAY_TMPDIR env var (set to /raid/$USER in Slurm script for spill-to-disk)
- Strip CC-MAIN- / CC-NEWS- prefix from --snapshot so both 'CC-MAIN-2025-26' and '2025-26' work
- LanceDBWriter.setup() tries open_table() first (cheaper for already-existing tables),
  falls back to create_table(exist_ok=True) — avoids redundant create overhead in distributed runs

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…tasink

Workers write fragments in parallel with no manifest contention; a single
LanceDataset.commit() closes the dataset atomically — the correct two-phase
distributed write pattern from lance-ray (https://github.com/lance-format/lance-ray).

Changes:
- Add run_stages_to_lance() to nemo_curator/stages/text/io/writer/lancedb.py:
  thin glue that chains Curator ProcessingStages with RayDataStageAdapter,
  converts the DocumentBatch stream to pandas via _batches_to_pandas(), and
  calls ray_dataset.write_datasink(datasink) — no changes to RayDataExecutor
- Export run_stages_to_lance from the writer __init__
- build_cc_lancedb.py: use LanceDatasink + run_stages_to_lance; remove
  LanceDBWriter stage, Pipeline, RayDataExecutor, and separate ray.init()
- reserved_cpus drops from 7 to 3 (writer actor removed; datasink handles writes)

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…o executor changes

Uses lance_ray.LanceFragmentWriter as a proper ProcessingStage:
- Workers write lance fragment files in parallel (no manifest contention)
- Returns fragment metadata rows (small pickled bytes, not actual data)
- pipeline.run() collects fragment metadata via _dataset_to_tasks() as normal
- Driver calls lance_commit_fragments() for a single atomic LanceDataset.commit()

No changes to RayDataExecutor or Pipeline — both used unchanged.

Files changed:
- writer/utils.py: add s3_storage_options_from_env() + df_to_typed_arrow()
- writer/lancedb.py: LanceFragmentWriterStage + lance_commit_fragments() + LanceDBWriter
- writer/__init__.py: export LanceFragmentWriterStage + lance_commit_fragments
- build_cc_lancedb.py: uses Pipeline + RayDataExecutor cleanly with new stage
- pyproject.toml: lancedb extra now pulls lance-ray>=0.4 instead of lancedb

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
LanceDBWriter was unused — LanceFragmentWriterStage + lance_commit_fragments
is the correct distributed write path.  Also remove build_lance_index.py and
lancedb_writer.py which were orphaned after the lance-ray refactor.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…_lance_index.py

- LanceFragmentWriterStage.process(): use task.to_pyarrow() instead of
  to_pandas() — avoids an Arrow→pandas→Arrow round-trip since
  LanceFragmentWriter accepts pa.Table directly
- build_lance_index.py: replace 150 lines of custom Ray remote code with a
  direct call to lance_ray.index.create_scalar_index() which handles fragment
  distribution, parallel segment building, merge, and atomic commit

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…ent per WARC

DocumentIterateExtractStage produces one DocumentBatch per WARC file (~50-200K
records). Setting max_rows_per_file=500_000 ensures LanceFragmentWriter never
splits a single batch across multiple fragment files — one WARC = one fragment.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…sts commit

lance_commit_fragments:
- Create-if-not-exists + append: opens existing dataset and appends, creates on
  FileNotFoundError — safe for concurrent Slurm array jobs writing to same URI
- Uses retry_with_backoff() from writer/utils.py on manifest version conflict
- Extract schema once (iloc[0]) instead of unpickling on every row
- Use task.get_columns() before to_pandas() to skip non-fragment tasks cheaply

writer/utils.py:
- Add sync retry_with_backoff() — mirrors async version in translation backends
- Delete dead df_to_typed_arrow (no callers after LanceDBWriter removal)

lancedb.py:
- _add_blob_encoding_metadata: early-return when schema has no large_binary fields
- Remove WARC-domain comment from max_rows_per_file
- Remove inline retry / time import (now via retry_with_backoff)

build_cc_lancedb.py:
- Replace manual lance_storage_options dict with s3_storage_options_from_env()
  (fixes silent "aws_endpoint" key bug; keys now match object_store expectations)
- Strip infra arithmetic from _FETCH_CONCURRENCY comment

build_lance_index.py:
- Move lance_ray import to top level
- Remove point-in-time cardinality annotation from BITMAP comment

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…iterStage

LanceFragmentTask(Task[list[FragmentMetadata]]) lives in lancedb.py alongside
the stage that produces it. process() unpickles fragment metadata once and
returns a typed task instead of a DocumentBatch with opaque pickled bytes.

lance_commit_fragments() now:
- Filters tasks with isinstance(t, LanceFragmentTask) — no column-name magic
- Reads fragments/schema from typed fields — no pickle in the commit path
- Exported from nemo_curator.stages.text.io.writer for isinstance checks

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
LanceFragmentWriter.__call__() pickling FragmentMetadata into pa.Table bytes
was the only reason pickle appeared in process(). By calling write_fragment()
directly it returns list[tuple[FragmentMetadata, pa.Schema]] — no pickle,
no pandas conversion, no _writer state in the actor.

setup() still imports write_fragment as an early availability check.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…commit

Fixes:
- download.py: strip bucket name prefix from URL path when building S3 key —
  prevents s3://crawl-data/crawl-data/CC-MAIN-... double-prefix bug on PBSS
- download.py: build credential-override env dict once in __init__ (_run_env)
  instead of os.environ.copy() on every _download_to_path() call

Simplify:
- lancedb.py: single-pass fragment/schema collection replaces 3 list iterations
- lancedb.py: setup() uses plain module import instead of discard-alias
- lancedb.py: add workaround note on _add_blob_encoding_metadata
- lancedb.py: remove misleading Any import comment
- build_cc_lancedb.py: delete dead _FETCH_CONCURRENCY constant and stale comment

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
lance 7.0.0 raises ValueError (not FileNotFoundError) when the dataset
_versions/ directory is absent on a first-ever commit. The except clause
now catches both so Overwrite is used correctly for new datasets.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
- build_cc_lancedb.py: --split/--total-splits/--stage-only/--snapshot-id,
  AddSnapshotIdStage, fix schema import from local schema.py
- schema.py: CC_LANCE_SCHEMA with snapshot_id blob annotation
- commit_snapshot.py: single LanceDataset.commit() per snapshot
- check_progress.py: manifest JSONL status + resubmit commands
- slurm/: process_snapshot_array, commit_snapshot, submit_snapshot, run_all_cc

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…HEMA

- add_filename_column=False: source_id already carries the WARC filename;
  file_name was an extra duplicate column that doesn't belong in the schema
- CC_LANCE_SCHEMA is already wired to LanceFragmentWriterStage so columns
  are cast to large_binary/large_string with blob encoding on content

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Slurm scripts are infrastructure-specific and kept locally/on Nebius.
Only Python files and README belong in the PR.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Moving snapshot_id injection into the iterator (yielded per record)
preserves Ray Data's operator fusion of DocumentIterateExtractStage →
HtmlExtractActor. A separate AddSnapshotIdStage between them broke
that fusion, causing ~3.7 GB per WARC to materialise in the Ray object
store before Trafilatura could start.

- CommonCrawlWarcIterator: optional snapshot_id param; included in
  every yielded record and in output_columns() when set
- build_cc_lancedb.py: pass snapshot_id to iterator, remove
  AddSnapshotIdStage and its unused imports
- LanceFragmentWriterStage: reverted — stays generic, no CC concerns

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…column order

Lance schema validation is strict about column order. CC_LANCE_SCHEMA has
snapshot_id as the first column but the iterator was appending it last.
Now snapshot_id is the first key in the yielded dict when set.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
lance validates exact column order. Rather than requiring every upstream
stage to yield columns in the right order, the writer reorders the Arrow
table to match the schema using tbl.select(schema.names).

This makes LanceFragmentWriterStage resilient to any upstream column
ordering — the schema is the single source of truth for order.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…eues

HtmlExtractStage now accepts a list of (factory, output_column) pairs.
When multiple extractors are given, all run in one actor pool's process()
call — no intermediate object-store materialisation between extractors.

Ray Data cannot fuse adjacent actor-pool operators automatically; running
all extractors in one actor is the only way to avoid the backpressure
cascade that stalls JusText/Resiliparse waiting for space in the store.

Pipeline goes from:
  [fused: Iterate+Trafilatura] → [8GB queue] → JusText → [8GB queue] → Resiliparse
to:
  [fused: Iterate + all 3 extractors] → [8GB queue once] → LanceWriter

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Extractors run sequentially per row — 1 CPU handles all 3.
Setting cpus=3.0 would reduce actor concurrency with no throughput gain.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Matches HtmlExtractStage default so all actor stages have the same
resource footprint — enables Ray to potentially fuse them and maximises
actor pool concurrency (32 actors on a 32-CPU node vs 16 with cpus=2.0).

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…urvive Ray pickling

Non-field attributes set in __post_init__ are lost when Ray pickles the
stage for actor initialization. Use @Property to derive _factories and
_output_columns from the actual pickled fields (extractor_factory,
output_column) — zero extra state, always consistent.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
VibhuJawa and others added 6 commits June 17, 2026 18:26
… compute nodes

Nebius compute nodes cannot reach index.commoncrawl.org. Pre-generate
the URL list on a node with internet access (or Mac) and pass it via
--url-file. Falls back to the live HTTP fetch if not provided.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…t call

MainCommonCrawlUrlGenerator.__post_init__ fetches collinfo.json from
index.commoncrawl.org even at construction time. When --url-file is
provided, use a minimal URLGenerator subclass that returns the
pre-loaded list directly, bypassing all network calls.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
cc_html_extract.py: single row pass (decode+lang_detect 1x not 3x per row)
warc_iterator.py: stable output_columns, always emit snapshot_id
utils.py: split s3_storage_options_from_env into generic+lance layers
build_cc_lancedb.py: lazy CC URL imports, single _StaticURLGenerator, json.load
schema.py: delete dead CC_LANCE_INDEXES
commit_snapshot.py: use lance_commit_fragments instead of reimplementing
build_lance_index.py: use s3_credentials_from_env (no lance-specific keys)

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
cc_html_extract.py:
  - Add __post_init__ guard: fail at construction if extractor_factory
    and output_column list lengths are mismatched (not at first batch)

lancedb.py:
  - Expand _add_blob_encoding_metadata docstring with lance-ray version
    context and removal criterion
  - Document batch_size=1 constraint in LanceFragmentWriterStage docstring
  - Log caught (FileNotFoundError, ValueError) in lance_commit_fragments
    so non-absence errors are visible rather than silently triggering Overwrite

writer/__init__.py:
  - Export s3_credentials_from_env and s3_storage_options_from_env so
    callers can discover both helpers from the package public surface

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
lancedb.py:
  - storage_options field: repr=False to prevent credential leak in logs
  - write_fragment: raise RuntimeError on empty result (data loss prevention)
  - lance_commit_fragments: log caught ValueError/FileNotFoundError for visibility

schema.py:
  - snapshot_id: nullable=True — iterators without snapshot_id emit None,
    lance needs the field marked nullable to accept it

build_cc_lancedb.py:
  - _StaticURLGenerator.generate_urls: return list(split_urls) copy so
    callers cannot mutate the shared slice

tests/test_warc_iterator.py:
  - Update test_output_columns assertion to include snapshot_id (always emitted)

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
…rce failure

Without --exclusive, multiple jobs share a node. Without capping num_cpus,
Ray auto-detects 64 CPUs but the job only owns 32, causing
'Failed to get allocated resources after 11 consecutive failures'.

ray.init(num_cpus=SLURM_CPUS_PER_TASK) is a belt-and-suspenders fix
alongside --exclusive in the Slurm script.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
@copy-pr-bot

copy-pr-bot Bot commented Jun 22, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant