Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 180 additions & 70 deletions nemo_curator/stages/text/download/common_crawl/extract.py
Original file line number Diff line number Diff line change
@@ -1,104 +1,214 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any

from bs4 import BeautifulSoup
from loguru import logger

from nemo_curator.stages.resources import Resources
from nemo_curator.stages.text.download import DocumentExtractor
from nemo_curator.stages.text.download.html_extractors import HTMLExtractorAlgorithm
from nemo_curator.stages.text.download.html_extractors.justext import JusTextExtractor
from nemo_curator.stages.text.download.html_extractors.model_based import (
CANDIDATE_ATTRIBUTES_FIELD,
CANDIDATE_HTML_FIELD,
CANDIDATE_INDEX_FIELD,
CANDIDATE_TAG_NAME_FIELD,
CANDIDATE_TEXT_FIELD,
HTML_FIELD,
MODEL_INPUT_FIELD,
PLACEHOLDER_CANDIDATE_INDEX,
ModelBasedHTMLExtractionStage,
extract_candidate_elements,
serialize_html_element,
)
from nemo_curator.stages.text.download.html_extractors.resiliparse import ResiliparseExtractor
from nemo_curator.stages.text.download.html_extractors.trafilatura import TrafilaturaExtractor
from nemo_curator.stages.text.download.html_extractors.utils import get_stop_list_dict
from nemo_curator.stages.text.download.utils import decode_html, lang_detect


class CommonCrawlHTMLExtractor(DocumentExtractor):
def __init__(
self,
algorithm: HTMLExtractorAlgorithm | str | None = None,
algorithm_kwargs: dict | None = None,
stop_lists: dict[str, frozenset[str]] | None = None,
):
super().__init__()
algorithm_kwargs = algorithm_kwargs or {}
if algorithm is None:
logger.warning("No algorithm provided, using justext with default parameters")
algorithm = JusTextExtractor()
elif isinstance(algorithm, str):
if algorithm == "justext":
algorithm = JusTextExtractor(**algorithm_kwargs)
elif algorithm == "resiliparse":
algorithm = ResiliparseExtractor(**algorithm_kwargs)
def __init__(
self,
algorithm: HTMLExtractorAlgorithm | str | None = None,
algorithm_kwargs: dict | None = None,
stop_lists: dict[str, frozenset[str]] | None = None,
):
super().__init__()
algorithm_kwargs = algorithm_kwargs or {}
if algorithm is None:
logger.warning("No algorithm provided, using justext with default parameters")
algorithm = JusTextExtractor()
elif isinstance(algorithm, str):
if algorithm == "justext":
algorithm = JusTextExtractor(**algorithm_kwargs)
elif algorithm == "resiliparse":
algorithm = ResiliparseExtractor(**algorithm_kwargs)
elif algorithm == "trafilatura":
algorithm = TrafilaturaExtractor(**algorithm_kwargs)
elif algorithm in {"model", "model_based"}:
msg = (
"Model-based HTML extraction is only supported through "
"CommonCrawlDownloadExtractStage with html_extraction='model' or 'model_based'."
)
raise ValueError(msg)
else:
msg = f"Invalid algorithm: {algorithm}"
raise ValueError(msg)
elif isinstance(algorithm, HTMLExtractorAlgorithm):
if algorithm_kwargs:
logger.warning("Algorithm kwargs provided are ignored when an HTMLExtractorAlgorithm is provided")
else:
msg = f"Invalid algorithm: {algorithm}"
raise ValueError(msg)

if stop_lists is not None:
self._stop_lists = stop_lists
elif isinstance(algorithm, HTMLExtractorAlgorithm):
if algorithm_kwargs:
logger.warning("Algorithm kwargs provided are ignored when an HTMLExtractorAlgorithm is provided")
else:
msg = f"Invalid algorithm: {algorithm}"
raise ValueError(msg)
if stop_lists is not None:
self._stop_lists = stop_lists
else:
self._stop_lists = get_stop_list_dict()

self.algorithm = algorithm
self.resources = getattr(self.algorithm, "resources", Resources(cpus=1.0))

def extract(self, record: dict[str, Any]) -> dict[str, Any] | None:
"""Extract text from HTML content in the record.

Takes a record dict containing "content" field with HTML and returns
a new dict with only the output columns: url, warc_id, source_id, language, text.
"""
# Extract the HTML content from the record
html_content = record.get("content")
if not html_content:
return None

# Content from WARC records is bytes, even though type annotation suggests str
html = decode_html(html_content)

if html is not None:
# Language detection and HTML extraction
lang = lang_detect(html)

text = None
Comment on lines +99 to +104

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 algorithm="model" entry point is unreachable from CommonCrawlDownloadExtractStage

The error message tells users to call CommonCrawlDownloadExtractStage(html_extraction='model'), but stage.py was not updated in this PR — it still passes html_extraction directly to CommonCrawlHTMLExtractor.__init__, where it hits this same ValueError. Any call to CommonCrawlDownloadExtractStage(html_extraction="model") will fail at construction with a self-referential error message. The stage.py file needs to detect html_extraction in {"model", "model_based"} and compose the new CommonCrawlModelBasedCandidateExtractorTokenizerStageModelBasedHTMLInferenceStageAssembleModelBasedHTMLExtractionStage pipeline instead of delegating to CommonCrawlHTMLExtractor.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I think the PR is not usable as is.

fyi the PR #2075 is going to take over this work I think, thank you!

if lang in self._stop_lists:
text = self.algorithm.extract_text(html, self._stop_lists[lang], lang)

if text is not None:
if len(text) > 0:
text = "\n\n".join(text)
return {
"url": record["url"],
"warc_id": record["warc_id"],
"source_id": record["source_id"],
"language": lang,
"text": text,
}
else:
return None
return None

def input_columns(self) -> list[str]:
return ["url", "warc_id", "source_id", "content"]

def output_columns(self) -> list[str]:
return ["url", "warc_id", "source_id", "language", "text"]

def setup_on_node(self, *args, **kwargs) -> None:
setup_on_node = getattr(self.algorithm, "setup_on_node", None)
if callable(setup_on_node):
setup_on_node(*args, **kwargs)

def setup(self, *args, **kwargs) -> None:
setup = getattr(self.algorithm, "setup", None)
if callable(setup):
setup(*args, **kwargs)

def extract(self, record: dict[str, Any]) -> dict[str, Any] | None:
"""Extract text from HTML content in the record.
def teardown(self) -> None:
teardown = getattr(self.algorithm, "teardown", None)
if callable(teardown):
teardown()

Takes a record dict containing "content" field with HTML and returns
a new dict with only the output columns: url, warc_id, source_id, language, text.
"""
# Extract the HTML content from the record
def ray_stage_spec(self) -> dict[str, Any]:
ray_stage_spec = getattr(self.algorithm, "ray_stage_spec", None)
if callable(ray_stage_spec):
return ray_stage_spec()
return {}


class CommonCrawlModelBasedCandidateExtractor(DocumentExtractor):
def __init__(
self,
algorithm: ModelBasedHTMLExtractionStage,
stop_lists: dict[str, frozenset[str]] | None = None,
):
super().__init__()
self.algorithm = algorithm
self._stop_lists = stop_lists or get_stop_list_dict()
self.resources = Resources(cpus=1.0)

def extract(self, record: dict[str, Any]) -> list[dict[str, Any]] | None:
html_content = record.get("content")
if not html_content:
return None

# Content from WARC records is bytes, even though type annotation suggests str
html = decode_html(html_content)
if html is None:
return None

language = lang_detect(html)
if language not in self._stop_lists:
return None

if html is not None:
# Language detection and HTML extraction
lang = lang_detect(html)

text = None
if lang in self._stop_lists:
text = self.algorithm.extract_text(html, self._stop_lists[lang], lang)

if text is not None:
if len(text) > 0:
text = "\n\n".join(text)
return {
"url": record["url"],
"warc_id": record["warc_id"],
"source_id": record["source_id"],
"language": lang,
"text": text,
}
else:
return None
return None
elements = extract_candidate_elements(BeautifulSoup(html, "lxml"))
base_record = {
"url": record["url"],
"warc_id": record["warc_id"],
"source_id": record["source_id"],
"language": language,
HTML_FIELD: html,
}

if not elements:
return [
{
**base_record,
CANDIDATE_INDEX_FIELD: PLACEHOLDER_CANDIDATE_INDEX,
CANDIDATE_TAG_NAME_FIELD: None,
CANDIDATE_TEXT_FIELD: None,
CANDIDATE_HTML_FIELD: None,
CANDIDATE_ATTRIBUTES_FIELD: {},
MODEL_INPUT_FIELD: "",
}
]

return [{**base_record, **serialize_html_element(element)} for element in elements]

def input_columns(self) -> list[str]:
return ["url", "warc_id", "source_id", "content"]

def output_columns(self) -> list[str]:
return ["url", "warc_id", "source_id", "language", "text"]
return [
"url",
"warc_id",
"source_id",
"language",
HTML_FIELD,
CANDIDATE_INDEX_FIELD,
CANDIDATE_TAG_NAME_FIELD,
CANDIDATE_TEXT_FIELD,
CANDIDATE_HTML_FIELD,
CANDIDATE_ATTRIBUTES_FIELD,
MODEL_INPUT_FIELD,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Return type incompatible with DocumentExtractor base class

CommonCrawlModelBasedCandidateExtractor.extract returns list[dict[str, Any]] | None, but DocumentExtractor.extract is declared dict[str, Any] | None. When this extractor is eventually plugged into DocumentIterateExtractStage, the stage calls extracted[self.filename_col] = record_dict[self.filename_col] — a string-keyed assignment on what is a list, raising a TypeError. The interface contract needs to be resolved: either DocumentIterateExtractStage must be taught to flatten list returns, or CommonCrawlModelBasedCandidateExtractor should not inherit DocumentExtractor and instead be used only in the custom multi-stage pipeline path.

]
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@

from .base import HTMLExtractorAlgorithm
from .justext import JusTextExtractor
from .model_based import HTMLElement, HTMLElementClassifier, HTMLElementPrediction, ModelBasedHTMLExtractionStage
from .resiliparse import ResiliparseExtractor
from .trafilatura import TrafilaturaExtractor

__all__ = [
"HTMLElement",
"HTMLElementClassifier",
"HTMLElementPrediction",
"HTMLExtractorAlgorithm",
"JusTextExtractor",
"ModelBasedHTMLExtractionStage",
"ResiliparseExtractor",
"TrafilaturaExtractor",
]
Loading