diff --git a/pyproject.toml b/pyproject.toml index f5136f334..970c0666a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,10 @@ dependencies = [ "pandas==2.3.2", "isort==6.0.1", "pre-commit>=4", - "psycopg2-binary==2.9.10" + "psycopg2-binary==2.9.10", + "Scrapy>=2.14.2", + "playwright>=1.49.0,<2", + "markitdown>=0.1.0" ] [project.scripts] diff --git a/scrapy.cfg b/scrapy.cfg new file mode 100644 index 000000000..124bc2c4b --- /dev/null +++ b/scrapy.cfg @@ -0,0 +1,2 @@ +[settings] +default = src.data_manager.collectors.scrapers.settings diff --git a/src/data_manager/collectors/scrapers/adapters.py b/src/data_manager/collectors/scrapers/adapters.py new file mode 100644 index 000000000..bbdc79e9b --- /dev/null +++ b/src/data_manager/collectors/scrapers/adapters.py @@ -0,0 +1,64 @@ +""" +Single-dispatch adapter: converts Scrapy Items into ScrapedResource. + +Design principles: +- Items are dumb data bags. They know nothing about ScrapedResource. +- This is the ONLY place that knows about both schemas. +- New sources: add a @to_scraped_resource.register block here. Touch nothing else. +- Do NOT reconstruct ResourceMetadata — ScrapedResource.get_metadata() already + derives display_name, url, suffix, source_type from raw fields. Pass raw values only. + +Constraint: ~50 LOC of logic. + +Adding a new source (e.g. TwikiPageItem): + @to_scraped_resource.register(TwikiPageItem) + def _twiki(item) -> ScrapedResource: + ... + +If two sources share identical mapping logic, stack decorators: + @to_scraped_resource.register(WebPageItem) + @to_scraped_resource.register(TwikiPageItem) + def _html_page(item) -> ScrapedResource: + ... + Note: do NOT use union type hints (WebPageItem | TwikiPageItem) — + singledispatch ignores annotations, it dispatches on runtime type only. +""" +from __future__ import annotations + +from functools import singledispatch + +from src.data_manager.collectors.scrapers.scraped_resource import ScrapedResource +from src.data_manager.collectors.scrapers.items import WebPageItem + + +@singledispatch +def to_scraped_resource(item) -> ScrapedResource: + """Raises for unregistered types — fail loudly, never silently skip.""" + raise TypeError( + f"No adapter registered for item type {type(item).__name__!r}. " + "Add @to_scraped_resource.register(YourItemClass) in this module." + ) + + +@to_scraped_resource.register(WebPageItem) +def _html_page(item) -> ScrapedResource: + """ + Handles all HTML-family pages regardless of auth method. + + PDFs scraped from the web also route here — the parser sets + suffix="pdf" and content=bytes in the item, so no branch needed. + The adapter passes suffix and source_type through without inspection. + """ + return ScrapedResource( + url=item["url"], + content=item["content"], + suffix=item.get("suffix", "html"), + source_type=item["source_type"], + metadata={ + "content_type": item.get("content_type"), + "encoding": item.get("encoding"), + "title": item.get("title"), + }, + ) + + diff --git a/src/data_manager/collectors/scrapers/integrations/__init__.py b/src/data_manager/collectors/scrapers/auth/__init__.py similarity index 100% rename from src/data_manager/collectors/scrapers/integrations/__init__.py rename to src/data_manager/collectors/scrapers/auth/__init__.py diff --git a/src/data_manager/collectors/scrapers/integrations/git_scraper.py b/src/data_manager/collectors/scrapers/integrations/git_scraper.py deleted file mode 100644 index 7d73fd37a..000000000 --- a/src/data_manager/collectors/scrapers/integrations/git_scraper.py +++ /dev/null @@ -1,353 +0,0 @@ -import os -import re -import shutil -from pathlib import Path -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple - -from git import Repo -from mkdocs.utils.yaml import yaml_load - -from src.utils.config_access import get_global_config -from src.data_manager.collectors.scrapers.scraped_resource import ScrapedResource -from src.utils.env import read_secret -from src.utils.logging import get_logger - -logger = get_logger(__name__) - -if TYPE_CHECKING: - from src.data_manager.collectors.scrapers.scraper_manager import \ - ScraperManager - -global_config = get_global_config() - -class GitScraper: - """Scraper integration that clones Git repositories and indexes MkDocs sites and code files.""" - - def __init__(self, manager: "ScraperManager", git_config: Optional[Dict[str, Any]] = None) -> None: - self.manager = manager - self.config = git_config or {} - - # where we clone our repos to - self.data_path = global_config["DATA_PATH"] - self.git_dir = Path(self.data_path) / "raw_git_repos" - self.git_dir.mkdir(parents=True, exist_ok=True) - - self.code_suffixes = { - suffix.lower() - for suffix in ( - self.config.get( - "code_suffixes", - [ - ".py", - ".js", - ".ts", - ".tsx", - ".jsx", - ".java", - ".go", - ".rs", - ".c", - ".cpp", - ".h", - ".hpp", - ".sh", - ".sql", - ".json", - ".yaml", - ".yml", - ".toml", - ".md", - ".txt", - ], - ) - or [] - ) - } - self.exclude_dirs = { - dir_name - for dir_name in ( - self.config.get( - "exclude_dirs", - [ - ".git", - "node_modules", - ".venv", - "venv", - "__pycache__", - ".idea", - ".vscode", - "dist", - "build", - ], - ) - or [] - ) - } - self.max_file_size_bytes = int(self.config.get("max_file_size_bytes", 1_000_000)) - - self.git_username = read_secret("GIT_USERNAME") - self.git_token = read_secret("GIT_TOKEN") - self._credentials_available = bool(self.git_username and self.git_token) - if not self._credentials_available: - logger.info("No git credentials supplied; will attempt public repo cloning.") - - def collect(self, git_urls: List[str]) -> List[ScrapedResource]: - if not git_urls: - logger.warning("No git URLs provided for scraping; skipping git scraper.") - return [] - - harvested: List[ScrapedResource] = [] - - for url in git_urls: - try: - repo_info = self._prepare_repository(url) - except ValueError as exc: - logger.info(f"{exc}") - continue - except Exception as exc: - logger.error(f"Failed to clone {url}: {exc}") - continue - - try: - harvested.extend(self._harvest_repository(repo_info)) - finally: - shutil.rmtree(repo_info["repo_path"], ignore_errors=True) - - if harvested: - logger.info("Git scraping was completed successfully") - - return harvested - - def _prepare_repository(self, url: str) -> Dict[str, Any]: - url_dict = self._parse_url(url) - repo_path = self._clone_repo(url_dict) - mkdocs_site_url = self._read_mkdocs_site_url(repo_path) - ref = self._determine_ref(repo_path, url_dict["branch"]) - web_base_url = self._compute_web_base_url(url_dict["original_url"]) - - return { - "repo_path": repo_path, - "repo_name": url_dict["repo_name"], - "mkdocs_site_url": mkdocs_site_url, - "ref": ref, - "web_base_url": web_base_url, - } - - def _harvest_repository(self, repo_info: Dict[str, Any]) -> List[ScrapedResource]: - resources: List[ScrapedResource] = [] - resources.extend(self._harvest_mkdocs(repo_info)) - resources.extend(self._harvest_code(repo_info)) - return resources - - def _harvest_mkdocs(self, repo_info: Dict[str, Any]) -> List[ScrapedResource]: - repo_path = repo_info["repo_path"] - mkdocs_site_url = repo_info["mkdocs_site_url"] - base_url = repo_info["web_base_url"] - ref = repo_info["ref"] - docs_dir = repo_path / "docs" - if not docs_dir.exists(): - logger.info(f"Skipping MkDocs harvesting for {repo_path}; missing docs directory") - return [] - - resources: List[ScrapedResource] = [] - parent_repo = repo_info["repo_name"] - used_blob_links = False - for markdown_path in docs_dir.rglob("*.md"): - if mkdocs_site_url: - current_url = mkdocs_site_url + markdown_path.relative_to(docs_dir).with_suffix("").as_posix() - else: - current_url = self._build_blob_url(base_url, ref, markdown_path.relative_to(repo_path)) - used_blob_links = True - logger.info(f"Indexing Git doc: {current_url}") - text_content = markdown_path.read_text(encoding="utf-8") - relative_path = Path(parent_repo) / markdown_path.relative_to(repo_path) - resource = ScrapedResource( - url=current_url, - content=text_content, - suffix=markdown_path.suffix.lstrip(".") or "txt", - source_type="git", - metadata={ - "repo_path": str(markdown_path.relative_to(repo_path)), - "title": markdown_path.stem.replace("_", " ").replace("-", " ").title(), - "parent": parent_repo, - }, - file_name=markdown_path.name, - relative_path=str(relative_path), - ) - if resource.content: - resources.append(resource) - else: - logger.info(f"Resource {current_url} is empty. Skipping...") - - if used_blob_links and not mkdocs_site_url: - logger.info(f"Used repository blob URLs for MkDocs content in {repo_path} (site_url missing)") - - return resources - - def _harvest_code(self, repo_info: Dict[str, Any]) -> List[ScrapedResource]: - repo_path = repo_info["repo_path"] - ref = repo_info["ref"] - base_url = repo_info["web_base_url"] - repo_name = repo_info["repo_name"] - - resources: List[ScrapedResource] = [] - for file_path in self._iter_code_files(repo_path): - logger.debug(file_path) - rel_path = file_path.relative_to(repo_path) - - # avoid overlap wtih _harvest_mkdocs - if rel_path.parts and rel_path.parts[0] == "docs" and file_path.suffix.lower() == ".md": - continue - - try: - if file_path.stat().st_size > self.max_file_size_bytes: - logger.warning(f"Skipping {file_path} due to file size") - continue - except OSError: - continue - - if not self._is_allowed_suffix(file_path): - logger.warning(f"Skipping {file_path} due to disallowed suffix") - continue - - if self._looks_binary(file_path): - logger.warning(f"Skipping {file_path} due to likely binary content") - continue - - try: - text_content = file_path.read_text(encoding="utf-8", errors="ignore") - except Exception: - continue - - if not text_content.strip(): - continue - - resource_url = self._build_blob_url(base_url, ref, rel_path) - relative_path = Path(repo_name) / rel_path - resource = ScrapedResource( - url=resource_url, - content=text_content, - suffix=file_path.suffix.lstrip("."), - source_type="git", - metadata={ - "repo_path": str(rel_path), - "parent": repo_name, - "ref": ref, - }, - file_name=file_path.name, - relative_path=str(relative_path), - ) - resources.append(resource) - - return resources - - def _parse_url(self, url: str) -> dict: - branch_name = None - - regex_repo_name = r"(?:github|gitlab)\.[\w.]+\/[^\/]+\/([\w.-]+)(?:\.git|\/|$)" - match = re.search(regex_repo_name, url, re.IGNORECASE) - if not match: - raise ValueError(f"The git url {url} does not match the expected format.") - - repo_name = match.group(1) - - # Only inject credentials if available (for private repos) - if self._credentials_available: - if "gitlab" in url: - clone_from_url = url.replace("gitlab", f"{self.git_username}:{self.git_token}@gitlab") - elif "github" in url: - clone_from_url = url.replace("github", f"{self.git_username}:{self.git_token}@github") - else: - # For other hosts, try without credentials - clone_from_url = url - else: - # No credentials - use URL as-is (for public repos) - clone_from_url = url - - branch_split = re.split(r"/(?:-/)?tree/", clone_from_url, maxsplit=1) - if len(branch_split) > 1: - branch_name = branch_split[1].strip("/") or None - clone_from_url = branch_split[0].rstrip("/") - - return { - "original_url": url, - "clone_url": clone_from_url, - "repo_name": repo_name, - "branch": branch_name, - } - - def _clone_repo(self, url_dict: dict) -> Path: - clone_url = url_dict["clone_url"] - branch = url_dict["branch"] - repo_name = url_dict["repo_name"] - - logger.info(f"Cloning repository {repo_name}...") - - repo_path = self.git_dir / repo_name - if branch is None: - Repo.clone_from(clone_url, repo_path) - else: - Repo.clone_from(clone_url, repo_path, branch=branch) - - return repo_path - - def _read_mkdocs_site_url(self, repo_path: Path) -> Optional[str]: - mkdocs_file = repo_path / "mkdocs.yml" - if not mkdocs_file.exists(): - return None - try: - with mkdocs_file.open("r") as file: - data = yaml_load(file) - site_url = data.get("site_url") - if not site_url: - return None - return site_url if site_url.endswith("/") else site_url + "/" - except Exception: - logger.info(f"Could not read mkdocs.yml in {repo_path}") - return None - - def _compute_web_base_url(self, original_url: str) -> str: - sanitized = re.sub(r"//[^@/]+@", "//", original_url) - sanitized = re.split(r"/(?:-/)?tree/", sanitized, maxsplit=1)[0] - if sanitized.endswith(".git"): - sanitized = sanitized[:-4] - return sanitized.rstrip("/") - - def _determine_ref(self, repo_path: Path, requested_branch: Optional[str]) -> str: - if requested_branch: - return requested_branch - repo: Optional[Repo] = None - try: - repo = Repo(repo_path) - return repo.active_branch.name - except Exception: - try: - repo = repo or Repo(repo_path) - return repo.head.commit.hexsha[:7] - except Exception: - return "main" - - def _iter_code_files(self, repo_path: Path): - for root, dirs, files in os.walk(repo_path): - dirs[:] = [d for d in dirs if d not in self.exclude_dirs] - for filename in files: - file_path = Path(root) / filename - yield file_path - - def _is_allowed_suffix(self, file_path: Path) -> bool: - return file_path.suffix.lower() in self.code_suffixes - - def _looks_binary(self, file_path: Path) -> bool: - try: - with file_path.open("rb") as file: - sample = file.read(8000) - return b"\0" in sample - except Exception: - return True - - def _build_blob_url(self, base_url: str, ref: str, rel_path: Path) -> str: - base = base_url.rstrip("/") - rel = rel_path.as_posix() - if "gitlab" in base: - return f"{base}/-/blob/{ref}/{rel}" - return f"{base}/blob/{ref}/{rel}" diff --git a/src/data_manager/collectors/scrapers/integrations/sso_scraper.py b/src/data_manager/collectors/scrapers/integrations/sso_scraper.py deleted file mode 100644 index d03877bfb..000000000 --- a/src/data_manager/collectors/scrapers/integrations/sso_scraper.py +++ /dev/null @@ -1,466 +0,0 @@ -import hashlib -import importlib -import json -import os -import re -import time -import urllib.parse -from abc import ABC, abstractmethod -from typing import Dict, List, Tuple - -from selenium import webdriver -from selenium.webdriver.common.by import By -from selenium.webdriver.firefox.options import Options as FirefoxOptions -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.support.ui import WebDriverWait -from selenium.common.exceptions import TimeoutException - -from src.data_manager.collectors.scrapers.scraped_resource import \ - ScrapedResource, BrowserIntermediaryResult -from src.utils.env import read_secret -from src.utils.logging import get_logger - -logger = get_logger(__name__) - -class SSOScraper(ABC): - """Generic base class for SSO-authenticated web scrapers.""" - - def __init__(self, username=None, password=None, headless=True, site_type="generic", max_depth=2, selenium_url=None): - """Initialize the SSO scraper with credentials and browser settings. - - Args: - username (str, optional): SSO username. If None, will try to get from env vars. - password (str, optional): SSO password. If None, will try to get from env vars. - headless (bool): Whether to run the browser in headless mode. - site_type (str): Type of site to scrape ('generic' or 'mkdocs') - max_depth (int): Maximum number of levels to crawl per page. - """ - self.username = username or self.get_username_from_env() - self.password = password or self.get_password_from_env() - self.headless = headless - self.max_depth = max_depth - self.site_type = site_type - self.driver = None - self.visited_urls = set() - self.selenium_url = selenium_url - - if self.username: - logger.info(f"Using username: {self.username}") - - def _is_image_url(self, url: str) -> bool: - """Check if URL points to an image file.""" - image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg', '.ico', '.webp') - parsed_url = urllib.parse.urlparse(url) - path = parsed_url.path.lower() - return any(path.endswith(ext) for ext in image_extensions) - - @abstractmethod - def get_username_from_env(self): - """Get username from environment variables. Override in subclasses.""" - pass - - @abstractmethod - def get_password_from_env(self): - """Get password from environment variables. Override in subclasses.""" - pass - - @abstractmethod - def login(self): - """Login to SSO with the provided credentials. Override in subclasses.""" - pass - - def setup_driver(self): - """Configure and initialize the Firefox WebDriver.""" - firefox_options = FirefoxOptions() - if self.headless: - firefox_options.add_argument("--headless") - - # Additional options for better performance in containers - firefox_options.add_argument("--no-sandbox") - firefox_options.add_argument("--disable-dev-shm-usage") - firefox_options.add_argument("--disable-gpu") - firefox_options.add_argument("--window-size=1920,1080") - - # Create Firefox profile with preferences - firefox_profile = webdriver.FirefoxProfile() - firefox_profile.set_preference("dom.disable_open_during_load", False) - firefox_profile.set_preference("browser.download.folderList", 2) - firefox_profile.set_preference("browser.download.manager.showWhenStarting", False) - firefox_profile.set_preference("browser.helperApps.neverAsk.saveToDisk", "application/pdf") - - # Initialize the driver with options - if self.selenium_url: - self.driver = webdriver.Remote(command_executor=self.selenium_url,options=firefox_options) - else: - self.driver = webdriver.Firefox(options=firefox_options) - self.driver.set_page_load_timeout(30) - logger.info(f"Starting Firefox browser in {'headless' if self.headless else 'visible'} mode...") - return self.driver - - def navigate_to(self, url, wait_time=1): - """Navigate to specified URL and wait for page to load.""" - if not self.driver: - raise RuntimeError("WebDriver not initialized. Call setup_driver() first.") - - self.driver.get(url) - time.sleep(wait_time) # Enable wait time for page loading - logger.info(f"Navigated to {url}") - logger.info(f"Page title: {self.driver.title}") - return self.driver.title - - def get_links_with_same_hostname(self, base_url): - """Extract all links from the current page that have the same hostname as base_url.""" - base_hostname = urllib.parse.urlparse(base_url).netloc - links = [] - - # Find all anchor tags - if self.site_type == "mkdocs": - # For MkDocs, prioritize navigation links - anchors = self.driver.find_elements(By.CSS_SELECTOR, ".md-nav__link, .md-content a") - else: - anchors = self.driver.find_elements(By.TAG_NAME, "a") - - for anchor in anchors: - try: - href = anchor.get_attribute("href") - if href and href.strip(): - parsed_url = urllib.parse.urlparse(href) - # Check if the link has the same hostname and is not a fragment - if parsed_url.netloc == base_hostname and parsed_url.scheme in ('http', 'https'): - # Normalize the URL to prevent duplicates - normalized_url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}" - if parsed_url.query: - normalized_url += f"?{parsed_url.query}" - - # this works for CMS twiki but should be generalized - normalized_url = normalized_url.split("?")[0] - if 'bin/rdiff' in normalized_url or 'bin/edit' in normalized_url or 'bin/oops' in normalized_url or 'bin/attach' in normalized_url or 'bin/genpdf' in normalized_url or '/WebIndex' in normalized_url: - continue - - if not self._clear_url(normalized_url): - continue - - # Skip image files - if self._is_image_url(normalized_url): - logger.debug(f"Skipping image URL: {normalized_url}") - continue - - links.append(normalized_url) - - except Exception as e: - logger.error(f"Error extracting link: {e}") - - return list(set(links)) # Remove duplicates - - def extract_page_data(self, current_url): - """Return the raw HTML payload for the current page.""" - if not self.driver: - raise RuntimeError("WebDriver not initialized. Call setup_driver() first.") - - title = self.driver.title or "" - content = self.driver.page_source or "" - - return { - "url": current_url, - "title": title, - "content": content, - "suffix": "html", - } - - def crawl(self, start_url): - """Crawl pages starting from the given URL, storing title and content of each page. - - Args: - start_url (str): The URL to start crawling from - - Returns: - List[Dict]: A list of dictionaries describing each visited page. - """ - max_depth = self.max_depth - depth = 0 - - if not self.driver: - self.setup_driver() - - # Reset crawling state - self.visited_urls = set() - self.page_data = [] - to_visit = [start_url] - level_links = [] - - # First authenticate through the start URL - self.authenticate_and_navigate(start_url) - - base_hostname = urllib.parse.urlparse(start_url).netloc - logger.info(f"Base hostname for crawling: {base_hostname}") - logger.info(f"Site type: {self.site_type}") - - # History record - pages_visited = 0 - self.visited_urls = set() - - while to_visit and depth < max_depth: - current_url = to_visit.pop(0) - - # Skip if we've already visited this URL - if current_url in self.visited_urls: - continue - - # Skip image files - if self._is_image_url(current_url): - logger.debug(f"Skipping image URL: {current_url}") - self.visited_urls.add(current_url) - continue - - logger.info(f"Crawling page {depth + 1}/{max_depth}: {current_url}") - - try: - # Navigate to the page - self.navigate_to(current_url, wait_time=2) - - # Mark as visited - self.visited_urls.add(current_url) - pages_visited += 1 - - # Extract and store page data - page_data = self.extract_page_data(current_url) - self.page_data.append(page_data) - logger.info(f"Extracted data from {current_url} ({len(page_data['content'])} chars)") - - # Get links to follow - new_links = self.get_links_with_same_hostname(current_url) - logger.info(f"Found {len(new_links)} links on the page (nv: {pages_visited})") - - # Add new links to visit - for link in new_links: - if link not in self.visited_urls and link not in to_visit and link not in level_links: - logger.info(f"Found new link: {link} (nv: {pages_visited})") - level_links.append(link) - - # Scan next level if to_visit is empty - if not to_visit: - to_visit.extend(level_links) - level_links = [] - depth += 1 - - except Exception as e: - logger.info(f"Error crawling {current_url}: {e}", exc_info=True) - self.visited_urls.add(current_url) # Mark as visited to avoid retrying - - logger.info(f"Crawling complete. Visited {pages_visited} pages.") - return list(self.page_data) - - def _clear_url(self, url: str) -> bool: - """Basic filtering for duplicate or fragment-only URLs.""" - if not url: - return False - - # Ignore pure fragments or JavaScript links - if url.startswith("javascript:"): - return False - - return True - - def close(self): - """Close the browser and clean up resources.""" - if self.driver: - logger.info("Closing browser...") - self.driver.quit() - self.driver = None - - def authenticate_and_navigate(self, url): - """Complete authentication flow and navigate to target URL.""" - - if not self.driver: - self.setup_driver() - - try: - # First navigate to trigger SSO - self.driver.get(url) - - # Login - if self.login(): - # Navigate back to target page - title = self.navigate_to(url) - return title - else: - return None - except Exception as e: - logger.warning(f"Error during authentication: {e}", exc_info=True) - return None - - def authenticate(self, url): - """Complete authentication flow and navigate to target URL.""" - try: - if not self.driver: - self.setup_driver() - - # First navigate to trigger SSO - self.driver.get(url) - - # Login - if self.login(): - # Navigate back to target page - return self.driver.get_cookies() - else: - return None - except Exception as e: - logger.warning(f"Error during authentication: {e}", exc_info=True) - return None - - def __enter__(self): - """Context manager entry point.""" - self.setup_driver() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit point.""" - self.close() - - -class CERNSSOScraper(SSOScraper): - """A scraper to handle CERN SSO authentication and page navigation.""" - - def get_username_from_env(self): - """Get CERN SSO username from environment variables.""" - return read_secret("SSO_USERNAME") - - def get_password_from_env(self): - """Get CERN SSO password from environment variables.""" - return read_secret("SSO_PASSWORD") - - def login(self): - """Login to CERN SSO with the provided credentials.""" - if not self.username or not self.password: - raise ValueError("Missing credentials for CERN SSO") - - try: - wait = WebDriverWait(self.driver, 20) - - # Wait for login form to appear - username_input = wait.until( - EC.presence_of_element_located((By.ID, "username")) - ) - username_input.send_keys(self.username) - # time.sleep(1) # Optional sleep to ensure the input is registered - - password_input = wait.until(EC.presence_of_element_located((By.ID, "password"))) - password_input.send_keys(self.password) - # time.sleep(1) # Optional sleep to ensure the input is registered - - sign_in = wait.until(EC.presence_of_element_located((By.ID, "kc-login"))) - sign_in.click() - - logger.info("Login credentials submitted") - return True - except TimeoutException as e: - logger.error(f"Could not find username or password fields in due time: {e}", exc_info=True) - except Exception as e: - logger.error(f"Error during login: {e}",exc_info=True) - return False - - -class SSOCollector: - """Collects resources behind SSO-protected URLs using configured scrapers.""" - - def __init__(self, selenium_config: Dict[str, Dict]) -> None: - self._config = selenium_config or {} - self._enabled = self._config.get("enabled", False) - self._class_name = self._config.get("selenium_class", "") - self._class_map = self._config.get("selenium_class_map", {}) - - def collect(self, url: str) -> List[ScrapedResource]: - if not self._enabled: - logger.error("SSO is disabled or not configured") - return [] - - scraper_class, scraper_kwargs = self._resolve_scraper() - if scraper_class is None: - return [] - - try: - with scraper_class(**scraper_kwargs) as scraper: - payload = scraper.crawl(url) - resources = self._extract_resources(scraper, payload) - if not resources: - logger.warning(f"No content extracted from SSO crawl for {url}") - return resources - except Exception as exc: # pragma: no cover - defensive catch - logger.error(f"SSO scraping failed for {url}: {exc}") - return [] - - def _resolve_scraper(self): - entry = self._class_map.get(self._class_name) - if not entry: - logger.error(f"SSO class {self._class_name} not configured") - return None, {} - - scraper_class = entry.get("class") - if isinstance(scraper_class, str): - module_name = entry.get( - "module", - "src.data_manager.collectors.scrapers.integrations.sso_scraper", - ) - module = importlib.import_module(module_name) - scraper_class = getattr(module, scraper_class) - - scraper_kwargs = entry.get("kwargs", {}) - return scraper_class, scraper_kwargs - - def _extract_resources(self, scraper, payload) -> List[ScrapedResource]: - resources: List[ScrapedResource] = [] - - page_data = getattr(scraper, "page_data", None) - if isinstance(page_data, list): - for page in page_data: - if not isinstance(page, dict): - continue - page_url = page.get("url") - content = page.get("content") - if not page_url or content is None: - continue - - resources.append( - ScrapedResource( - url=page_url, - content=content, - suffix=page.get("suffix", "html"), - source_type="sso", - metadata={ - "title": page.get("title"), - }, - ) - ) - - elif isinstance(payload, list): - for item in payload: - if not isinstance(item, dict): - continue - page_url = item.get("url") - content = item.get("content") - if not page_url or content is None: - continue - resources.append( - ScrapedResource( - url=page_url, - content=content, - suffix=item.get("suffix", "html"), - source_type="sso", - metadata={ - "visible": str(self._visible).lower(), - }, - ) - ) - - elif isinstance(payload, dict): - for page_url in payload.values(): - logger.warning( - f"SSO scraper returned mapping without page content; skipping {page_url}" - ) - - elif payload is not None: - logger.warning( - f"Unsupported SSO payload type {type(payload).__name__}" - ) - - return resources diff --git a/src/data_manager/collectors/scrapers/items.py b/src/data_manager/collectors/scrapers/items.py new file mode 100644 index 000000000..f72d91b5a --- /dev/null +++ b/src/data_manager/collectors/scrapers/items.py @@ -0,0 +1,62 @@ +""" +Scrapy intuition — Items as the data contract (FR-7a): + + Items sit between Parser and Adapter. + Their field schema must be driven by what the Adapter needs + to construct a ScrapedResource — not by what's convenient + to inspect during development. + + Wrong mental model: "what fields help me debug?" + Right mental model: "what fields does ScrapedResource.__init__ need?" + + ScrapedResource fields (from scraped_resource.py): + url — required + content — required (str or bytes) + suffix — required + source_type — required ("web", "sso", "git") + metadata — dict, optional (title, content_type, encoding, etc.) + file_name — optional + relative_path — optional + + So items carry exactly those fields. + Debug fields (body_preview, body_length) belong in logger calls, + not in the item schema — otherwise the adapter becomes a translation + layer for data that should never have been structured in the first place. + +SOLID note — Open/Closed: + Add new Item subclasses for new source types. + Do not add source-specific fields to the base class. + The adapter is the extension point, not the Item. +""" + +import scrapy + + +class BasePageItem(scrapy.Item): + """ + Common fields shared across all scraped source types. + Maps directly to ScrapedResource constructor arguments. + """ + url = scrapy.Field() + content = scrapy.Field() # Full text or bytes — NOT a preview + suffix = scrapy.Field() # "html", "pdf", "md" etc. + source_type = scrapy.Field() # "web" | "twiki" | "indico" | "discourse" + + # Metadata fields — become ScrapedResource.metadata dict + title = scrapy.Field() + content_type = scrapy.Field() # HTTP Content-Type header value + encoding = scrapy.Field() # HTTP response encoding + + # Optional — used by git/SSO scrapers for filesystem layout + file_name = scrapy.Field() + relative_path = scrapy.Field() + + +class WebPageItem(BasePageItem): + """ + Generic page item, works for SSO-*, ordinary web page. + No extra fields needed beyond BasePageItem. + Subclassing is the extension point (OCP) — Twiki quirks + belong in parse_twiki_page(), not in a bloated base class. + """ + pass diff --git a/src/data_manager/collectors/scrapers/middlewares/__init__.py b/src/data_manager/collectors/scrapers/middlewares/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/data_manager/collectors/scrapers/parsers/__init__.py b/src/data_manager/collectors/scrapers/parsers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/data_manager/collectors/scrapers/parsers/link.py b/src/data_manager/collectors/scrapers/parsers/link.py new file mode 100644 index 000000000..c61e46d4e --- /dev/null +++ b/src/data_manager/collectors/scrapers/parsers/link.py @@ -0,0 +1,74 @@ +from typing import Iterator, List +from scrapy.http import Response, TextResponse +from urllib.parse import urlparse +from src.data_manager.collectors.scrapers.items import WebPageItem +from src.data_manager.collectors.scrapers.utils import get_content_type +# Tried in order — first non-empty match wins. +# Covers: HTML5 semantic, ARIA landmark, common CMS patterns, final fallback. +_CONTENT_SELECTORS = [ + "main", + "article", + '[role="main"]', + "#content", + "#main", + "#main-content", + ".main-content", # MIT.edu Drupal wrapper + ".region-content", # Drupal generic region + ".content", + ".post-content", + ".entry-content", + "body", +] + +def _first_outer_html(response: Response, selectors: List[str]) -> str: + for selector in selectors: + nodes = response.css(selector) + if not nodes: + continue + html = nodes[0].get() + if html and html.strip(): + return html.strip() + return "" + +def parse_link_page(response: Response) -> Iterator[WebPageItem]: + """ + Generic page parser — works for any HTML page with no site-specific selectors. + Strategy: + - PDFs: return raw bytes, suffix="pdf". + - HTML: extract visible text from the first matching content container, + falling back through _CONTENT_SELECTORS to
. + Full raw HTML is never stored — only visible text reaches the item. + Suitable as the default parse_item for LinkSpider subclasses that have + no meaningful site-specific structure to exploit. + """ + ct = get_content_type(response) + # ── PDF ────────────────────────────────────────────────────────────────── + if response.url.lower().endswith(".pdf") or "application/pdf" in ct: + yield WebPageItem( + url=response.url, + content=response.body, + suffix="pdf", + source_type="web", + title=urlparse(response.url).path.split("/")[-1].replace(".pdf", "").strip(), + content_type=ct, + ) + return + # ── HTML ───────────────────────────────────────────────────────────────── + title = ( + response.css("h1::text").get() + or response.css("title::text").get() + or "" + ).strip() + body_text = _first_outer_html(response, _CONTENT_SELECTORS) + encoding = response.encoding if isinstance(response, TextResponse) else "utf-8" + if not body_text: + return # empty page — don't yield a blank item + yield WebPageItem( + url=response.url, + content=body_text, + suffix="html", + source_type="web", + title=title, + content_type=ct, + encoding=encoding, + ) diff --git a/src/data_manager/collectors/scrapers/pipelines/__init__.py b/src/data_manager/collectors/scrapers/pipelines/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/data_manager/collectors/scrapers/pipelines/anonymization.py b/src/data_manager/collectors/scrapers/pipelines/anonymization.py new file mode 100644 index 000000000..4f07af398 --- /dev/null +++ b/src/data_manager/collectors/scrapers/pipelines/anonymization.py @@ -0,0 +1,55 @@ +from typing import TYPE_CHECKING + +from src.data_manager.collectors.utils.anonymizer import Anonymizer +from src.data_manager.collectors.scrapers.items import BasePageItem + +from scrapy import Spider +from src.utils.logging import get_logger + +logger = get_logger(__name__) + +class AnonymizationPipeline: + """Runs at priority 250, before PersistencePipeline (300).""" + + _DEFAULT_ANONYMIZER_CONFIG = { + "utils": { + "anonymizer": { + "nlp_model": "en_core_web_sm", + "excluded_words": ["John", "Jane", "Doe"], + "greeting_patterns": [ + r"^(hi|hello|hey|greetings|dear)\b", + r"^\w+,\s*", + ], + "signoff_patterns": [ + r"\b(regards|sincerely|best regards|cheers|thank you)\b", + r"^\s*[-~]+\s*$", + ], + "email_pattern": r"[\w\.-]+@[\w\.-]+\.\w+", + "username_pattern": r"\[~[^\]]+\]", + } + } + } + + def __init__(self, anonymizer: Anonymizer) -> None: + self._anonymizer = anonymizer + + @classmethod + def from_crawler(cls, crawler): + enabled = crawler.settings.getbool("ANONYMIZE_DATA", True) + anonymizer = crawler.settings.get("ANONYMIZER_SERVICE") + if not enabled: + raise NotConfigured("Anonymization is disabled") + if anonymizer is None: + # when we use scrapy cmd, we don't have the anonymizer service provided + dm_config = cls._DEFAULT_ANONYMIZER_CONFIG + return cls(anonymizer=Anonymizer(dm_config)) + return cls(anonymizer=anonymizer) + + def process_item(self, item: BasePageItem, spider: Spider) -> BasePageItem: + if isinstance(item.get("content"), str): + logger.debug(f"Anonymizing content: {item['content']}") + item["content"] = self._anonymizer.anonymize_markup(item["content"]) + logger.debug(f"Anonymized content: {item['content']}") + if isinstance(item.get("title"), str): + item["title"] = self._anonymizer.anonymize(item["title"]) + return item diff --git a/src/data_manager/collectors/scrapers/pipelines/markitdown.py b/src/data_manager/collectors/scrapers/pipelines/markitdown.py new file mode 100644 index 000000000..b8defdfc0 --- /dev/null +++ b/src/data_manager/collectors/scrapers/pipelines/markitdown.py @@ -0,0 +1,41 @@ +from scrapy import Spider +from src.utils.logging import get_logger +from src.data_manager.collectors.utils.markitdown_convertor import MarkitdownConvertor +from src.data_manager.collectors.utils.anonymizer import Anonymizer +from src.data_manager.collectors.scrapers.pipelines.anonymization import AnonymizationPipeline +from src.data_manager.collectors.scrapers.items import BasePageItem +from scrapy.exceptions import NotConfigured + +logger = get_logger(__name__) + +class MarkitdownPipeline: + """Runs at priority 250, before PersistencePipeline (300).""" + + def __init__(self, markitdown: MarkitdownConvertor, anonymizer: Anonymizer, anonymize_data: bool): + self._markitdown = markitdown + self._anonymizer = anonymizer + self._anonymize_data = anonymize_data + + @classmethod + def from_crawler(cls, crawler): + enabled = crawler.settings.getbool("MARKITDOWN_ENABLED", True) + markitdown_convertor = crawler.settings.get("MARKITDOWN_SERVICE") + anonymizer = crawler.settings.get("ANONYMIZER_SERVICE") + anonymize_data = crawler.settings.getbool("ANONYMIZE_DATA", True) + if not enabled: + raise NotConfigured("Markitdown is disabled") + if markitdown_convertor is None: + # when we use scrapy cmd, we don't have the markitdown service provided + markitdown_convertor = MarkitdownConvertor() + if anonymizer is None: + # when we use scrapy cmd, we don't have the anonymizer service provided + anonymizer = AnonymizationPipeline.from_crawler(crawler)._anonymizer + return cls(markitdown=markitdown_convertor, anonymizer=anonymizer, anonymize_data=anonymize_data) + + def process_item(self, item: BasePageItem, spider: Spider) -> BasePageItem: + if isinstance(item.get("content"), str): + item["content"] = self._markitdown.convert(item["content"], file_extension=item["suffix"]) + if self._anonymize_data: + item["content"] = self._anonymizer.anonymize(item["content"]) + logger.debug(f"Markitdown result ({'anonymized' if self._anonymize_data else 'not second pass anonymized'})): {item['content']}") + return item diff --git a/src/data_manager/collectors/scrapers/scraped_resource.py b/src/data_manager/collectors/scrapers/scraped_resource.py index 357eaaf41..080e4cbb7 100644 --- a/src/data_manager/collectors/scrapers/scraped_resource.py +++ b/src/data_manager/collectors/scrapers/scraped_resource.py @@ -74,14 +74,3 @@ def _safe_relative_path(self) -> Optional[Path]: if rel_path.is_absolute() or ".." in rel_path.parts: return None return rel_path - -@dataclass -class BrowserIntermediaryResult: - """ - this class is meant to provide a layer of abstraction for browser based scrapers (i.e selenium) - it will format everything into a single class so that more complicated scraping results which may hit - multiple tabs or pages at once can be handled in a uniform way by the LinkScraper class. - """ - - artifacts: List[Dict] # list of scraper results for each page produced by a seelnium navigation - links: List[str] # links reached diff --git a/src/data_manager/collectors/scrapers/scraper.py b/src/data_manager/collectors/scrapers/scraper.py deleted file mode 100644 index 7fe1ef0e3..000000000 --- a/src/data_manager/collectors/scrapers/scraper.py +++ /dev/null @@ -1,314 +0,0 @@ -import requests -import re - -from typing import Dict, Iterator, List, Optional -from bs4 import BeautifulSoup -from urllib.parse import urlparse, urljoin, urldefrag - -from src.data_manager.collectors.scrapers.scraped_resource import \ - ScrapedResource -from src.utils.logging import get_logger - -logger = get_logger(__name__) - -class LinkScraper: - """ - Single scraper for all our link needs that handles Selenium and requests. - This class explicitly handles requests, but if selenium scraping is enabled for a link - everything is passed through to the driver including how the page data is collected and - how the next level of links are found. This class DOESNT own the selenium driver, that is - owned by the scraper manager class. - """ - - def __init__(self, verify_urls: bool = True, enable_warnings: bool = True) -> None: - self.verify_urls = verify_urls - self.enable_warnings = enable_warnings - # seen_urls tracks anything queued/visited; visited_urls tracks pages actually crawled. - self.visited_urls = set() - self.seen_urls = set() - - def _is_image_url(self, url: str) -> bool: - """Check if URL points to an image file.""" - image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.svg', '.ico', '.webp') - parsed_url = urlparse(url) - path = parsed_url.path.lower() - return any(path.endswith(ext) for ext in image_extensions) - - def reap(self, response, current_url: str, selenium_scrape: bool = False, authenticator = None): - """ - probably the most complicated method here and most volatile in terms of maybe later needing a rewrite - - this method is here to deal with any result that it gets back. for a selenium resource it expects results as a - BrowserIntermediaryResult, otherwhise it will handle it as a normal http response. it handles getting the next set - of links and updating the page data gathered - - Args: - response (BrowserIntermediaryResult | requests.response): whatever has been collected for the current_url by the scraper - selenium_scrape (bool): whether or not selenium was used to scrape this content - authenticator (SSOAuthenticator | None): client being used to crawl websites or just for auth - - Return (tuple[list[str], list[ScrapedResource]]): next links to crawl and resources collected - """ - - # mark as visited - self._mark_visited(current_url) - - source_type = "web" if (authenticator is None) else "sso" - - resources = [] - - if selenium_scrape: # deals with a selenium response (should work for both non authenitcated and authenticated sites in principle) - assert(authenticator is not None) ## this shouldnt be tripped - - # For selenium scraping, we expect a simple dict from extract_page_data - # containing url, title, content, suffix - content = response.get("content", "") - title = response.get("title", "") - suffix = response.get("suffix", "html") - - resource = ScrapedResource( - url=current_url, - content=content, - suffix=suffix, - source_type=source_type, - metadata={ - "title": title, - "content_type": "rendered_html", - "renderer": "selenium", - }, - ) - res = authenticator.get_links_with_same_hostname(current_url) - resources.append(resource) - - else: # deals with http response - content_type = response.headers.get("Content-type") - - if current_url.lower().endswith(".pdf"): - resource = ScrapedResource( - url=current_url, - content=response.content, - suffix="pdf", - source_type=source_type, - metadata={"content_type": content_type}, - ) - else: - resource = ScrapedResource( - url=current_url, - content=response.text, - suffix="html", - source_type=source_type, - metadata={ - "content_type": content_type, - "encoding": response.encoding, - }, - ) - res = self.get_links_with_same_hostname(current_url, resource) - resources.append(resource) - - return res, resources # either collected via http or via authenticators method - - - def crawl( - self, - start_url: str, - browserclient = None, - max_depth: int = 1, - selenium_scrape: bool = False, - max_pages: Optional[int] = None, - ): - """ - crawl pages from a given starting url up to a given depth either using basic http or a provided browser client - - Args : - start_url (str): Url to start crawling from - authenticator (SSOAuthenticator): class used for handling authenticatoin for web resources - max_depth (int): max depth of links to descend from the start url - selenium_scrape (bool): tracks whether or not the page should be scraped through selenium or not - max_pages (int | None): cap on total pages to visit before stopping - - Returns: List[ScrapedResource] - - """ - # Consume the iterator so page_data is populated for callers of crawl(). - for _ in self.crawl_iter( - start_url, - browserclient=browserclient, - max_depth=max_depth, - selenium_scrape=selenium_scrape, - max_pages=max_pages, - collect_page_data=True, - ): - pass - return list(self.page_data) - - def crawl_iter( - self, - start_url: str, - browserclient = None, - max_depth: int = 1, - selenium_scrape: bool = False, - max_pages: Optional[int] = None, - collect_page_data: bool = False, - ) -> Iterator[ScrapedResource]: - """ - crawl pages from a given starting url up to a given depth either using basic http or a provided browser client - - Args : - start_url (str): Url to start crawling from - authenticator (SSOAuthenticator): class used for handling authenticatoin for web resources - max_depth (int): max depth of links to descend from the start url - selenium_scrape (bool): tracks whether or not the page should be scraped through selenium or not - max_pages (int | None): cap on total pages to visit before stopping - collect_page_data (bool): whether to store resources on the scraper instance - - Returns: Iterator[ScrapedResource] - - """ - - if not self.enable_warnings: - import urllib3 - urllib3.disable_warnings() - - depth = 0 - self.visited_urls = set() - self.seen_urls = set() - self.page_data = [] - normalized_start_url = self._normalize_url(start_url) - if not normalized_start_url: - logger.error(f"Failed to crawl: {start_url}, could not normalize URL") - return - to_visit = [normalized_start_url] - self.seen_urls.add(normalized_start_url) - level_links = [] - pages_visited = 0 - - base_hostname = urlparse(normalized_start_url).netloc - logger.info(f"Base hostname for crawling: {base_hostname}") - - # session either stays none or becomes a requests.Session object if not selenium scraping - session = None - - if selenium_scrape: # scrape page with pure selenium - if browserclient is None: - logger.error(f"Failed to crawl: {start_url}, auth is needed but no browser clilent was passed through") - return [] - browserclient.authenticate_and_navigate(normalized_start_url) - - elif not selenium_scrape and browserclient is not None: # use browser client for auth but scrape with http request - session = requests.Session() - cookies = browserclient.authenticate(normalized_start_url) - if cookies is not None: - for cookie_args in cookies: - cookie = requests.cookies.create_cookie(name=cookie_args['name'], - value=cookie_args['value'], - domain=cookie_args.get('domain'), - path=cookie_args.get('path', '/'), - expires=cookie_args.get('expires'), - secure=cookie_args.get('secure', False)) - session.cookies.set_cookie(cookie) - - else: # pure html no browser client needed - session = requests.Session() - - while to_visit and depth < max_depth: - if max_pages is not None and pages_visited >= max_pages: - logger.info(f"Reached max_pages={max_pages}; stopping crawl early.") - break - current_url = to_visit.pop(0) - - # Skip if we've already visited this URL - if current_url in self.visited_urls: - continue - - # Skip image files - if self._is_image_url(current_url): - logger.debug(f"Skipping image URL: {current_url}") - self._mark_visited(current_url) - continue - - logger.info(f"Crawling depth {depth + 1}/{max_depth}: {current_url}") - - try: - - # grab the page content - if not selenium_scrape: - assert (session is not None) # REMOVELATER - response = session.get(current_url, verify = self.verify_urls) - response.raise_for_status() - else: - assert (browserclient is not None) # REMOVELATER - browserclient.navigate_to(current_url, wait_time = 2) - response = browserclient.extract_page_data(current_url) # see the BrowserIntermediaryResult class to see what comes back here - - - # Mark as visited and store content - pages_visited += 1 - new_links, resources = self.reap(response, current_url, selenium_scrape, browserclient) - for resource in resources: - if collect_page_data: - self.page_data.append(resource) - yield resource - - for link in new_links: - normalized_link = self._normalize_url(link) - if not normalized_link: - continue - if normalized_link in self.seen_urls: - continue - logger.info(f"Found new link: {normalized_link} (nv: {pages_visited})") - self.seen_urls.add(normalized_link) - level_links.append(normalized_link) - - except Exception as e: - logger.info(f"Error crawling {current_url}: {e}") - self._mark_visited(current_url) # Mark as visited to avoid retrying - - if not to_visit: - to_visit.extend(level_links) - level_links = [] - depth += 1 - - logger.info(f"Crawling complete. Visited {pages_visited} pages.") - return - - def _normalize_url(self, url: str) -> Optional[str]: - if not url: - return None - - normalized, _ = urldefrag(url) - parsed = urlparse(normalized) - if not parsed.scheme: - return normalized - return parsed._replace( - scheme=parsed.scheme.lower(), - netloc=parsed.netloc.lower(), - ).geturl() - - def _mark_visited(self, url: str) -> None: - normalized = self._normalize_url(url) - if not normalized: - return - self.visited_urls.add(normalized) - self.seen_urls.add(normalized) - - def get_links_with_same_hostname(self, url: str, page_data: ScrapedResource): - """Return all links on the page that share the same hostname as `url`. For now does not support PDFs""" - - base_url = self._normalize_url(url) or url - base_hostname = urlparse(base_url).netloc - links = set() - a_tags = [] - - if (page_data.suffix == "html"): - soup = BeautifulSoup(page_data.content, "html.parser") - a_tags = soup.find_all("a", href=True) - - # how many links found on the first level - for tag in a_tags: - full = urljoin(base_url, tag["href"]) - normalized = self._normalize_url(full) - if not normalized: - continue - if urlparse(normalized).netloc == base_hostname: - links.add(normalized) - return list(links) diff --git a/src/data_manager/collectors/scrapers/scraper_manager.py b/src/data_manager/collectors/scrapers/scraper_manager.py deleted file mode 100644 index 1904f7f11..000000000 --- a/src/data_manager/collectors/scrapers/scraper_manager.py +++ /dev/null @@ -1,366 +0,0 @@ -import os -import importlib -from pathlib import Path -from typing import TYPE_CHECKING, Any, Dict, List, Optional - -from src.data_manager.collectors.persistence import PersistenceService -from src.data_manager.collectors.scrapers.scraped_resource import \ - ScrapedResource -from src.data_manager.collectors.scrapers.scraper import LinkScraper -from src.utils.config_access import get_global_config -from src.utils.env import read_secret -from src.utils.logging import get_logger - -logger = get_logger(__name__) - -if TYPE_CHECKING: - from src.data_manager.collectors.scrapers.integrations.git_scraper import \ - GitScraper - - -class ScraperManager: - """Coordinates scraper integrations and centralises persistence logic.""" - - def __init__(self, dm_config: Optional[Dict[str, Any]] = None) -> None: - global_config = get_global_config() - - sources_config = (dm_config or {}).get("sources", {}) or {} - links_config = sources_config.get("links", {}) if isinstance(sources_config, dict) else {} - selenium_config = links_config.get("selenium_scraper", {}) if isinstance(sources_config, dict) else {} - - git_config = sources_config.get("git", {}) if isinstance(sources_config, dict) else {} - sso_config = sources_config.get("sso", {}) if isinstance(sources_config, dict) else {} - self.base_depth = links_config.get('base_source_depth', 5) - logger.debug(f"Using base depth of {self.base_depth} for weblist URLs") - - scraper_config = {} - if isinstance(links_config, dict): - scraper_config = links_config.get("html_scraper", {}) or {} - self.config = scraper_config - raw_max_pages = links_config.get("max_pages") - self.max_pages = None - if raw_max_pages not in (None, ""): - try: - self.max_pages = int(raw_max_pages) - except (TypeError, ValueError): - logger.warning(f"Invalid max_pages value {raw_max_pages}; ignoring.") - - self.links_enabled = True - self.git_enabled = git_config.get("enabled", False) if isinstance(git_config, dict) else True - self.git_config = git_config if isinstance(git_config, dict) else {} - self.selenium_config = selenium_config or {} - self.selenium_enabled = self.selenium_config.get("enabled", False) - self.scrape_with_selenium = self.selenium_config.get("use_for_scraping", False) - - self.sso_enabled = bool(sso_config.get("enabled", False)) - - self.data_path = Path(global_config["DATA_PATH"]) - self.input_lists = links_config.get("input_lists", []) - self.git_dir = self.data_path / "git" - - self.data_path.mkdir(parents=True, exist_ok=True) - - self.web_scraper = LinkScraper( - verify_urls=self.config.get("verify_urls", False), # Default to False for broader compatibility - enable_warnings=self.config.get("enable_warnings", False), - ) - self._git_scraper: Optional["GitScraper"] = None - - def collect_all_from_config( - self, persistence: PersistenceService - ) -> None: - """Run the configured scrapers and persist their output.""" - link_urls, git_urls, sso_urls = self._collect_urls_from_lists_by_type(self.input_lists) - - if git_urls: - self.git_enabled = True - if sso_urls: - self.sso_enabled = True - self._ensure_sso_defaults() - - self.collect_links(persistence, link_urls=link_urls) - self.collect_sso(persistence, sso_urls=sso_urls) - self.collect_git(persistence, git_urls=git_urls) - - logger.info("Web scraping was completed successfully") - - def collect_links( - self, - persistence: PersistenceService, - link_urls: List[str] = [], - max_depth: Optional[int] = None, - ) -> int: - """Collect only standard link sources. Returns count of resources scraped.""" - if not self.links_enabled: - logger.info("Links disabled, skipping link scraping") - return 0 - if not link_urls: - return 0 - websites_dir = persistence.data_path / "websites" - if not os.path.exists(websites_dir): - os.makedirs(websites_dir, exist_ok=True) - return self._collect_links_from_urls(link_urls, persistence, websites_dir, max_depth=max_depth) - - def collect_git( - self, - persistence: PersistenceService, - git_urls: Optional[List[str]] = None, - ) -> None: - """Collect only git sources.""" - if not self.git_enabled: - logger.info("Git disabled, skipping git scraping") - return - if not git_urls: - return - git_dir = persistence.data_path / "git" - if not os.path.exists(git_dir): - os.makedirs(git_dir, exist_ok=True) - self._collect_git_resources(git_urls, persistence, git_dir) - - def collect_sso( - self, - persistence: PersistenceService, - sso_urls: Optional[List[str]] = None, - ) -> None: - """Collect only SSO sources.""" - if not self.sso_enabled: - logger.info("SSO disabled, skipping SSO scraping") - return - self._ensure_sso_defaults() - if not sso_urls: - return - sso_dir = persistence.data_path / "sso" - if not os.path.exists(sso_dir): - os.makedirs(sso_dir, exist_ok=True) - self._collect_sso_from_urls(sso_urls, persistence, sso_dir) - - def schedule_collect_links(self, persistence: PersistenceService, last_run: Optional[str] = None) -> None: - """ - Scheduled collection of link sources. - For now, this behaves the same as a full collection, overriding last_run depending on the persistence layer. - """ - metadata = persistence.catalog.get_metadata_by_filter("source_type", source_type="web", metadata_keys=["url"]) - catalog_urls = [m[1].get("url", "").strip() for m in metadata] - catalog_urls = [u for u in catalog_urls if u] - logger.info("Scheduled links collection found %d URL(s) in catalog", len(catalog_urls)) - self.collect_links(persistence, link_urls=catalog_urls) - - def schedule_collect_git(self, persistence: PersistenceService, last_run: Optional[str] = None) -> None: - metadata = persistence.catalog.get_metadata_by_filter("source_type", source_type="git", metadata_keys=["url"]) - catalog_urls = [m[1].get("url", "") for m in metadata] - self.collect_git(persistence, git_urls=catalog_urls) - - def schedule_collect_sso(self, persistence: PersistenceService, last_run: Optional[str] = None) -> None: - metadata = persistence.catalog.get_metadata_by_filter("source_type", source_type="sso", metadata_keys=["url"]) - catalog_urls = [m[1].get("url", "") for m in metadata] - self.collect_sso(persistence, sso_urls=catalog_urls) - - def _collect_links_from_urls( - self, - urls: List[str], - persistence: PersistenceService, - output_dir: Path, - max_depth: Optional[int] = None, - ) -> int: - """Collect links from URLs and return total count of resources scraped.""" - # Initialize authenticator if selenium is enabled - authenticator = None - if self.selenium_enabled: - authenticator_class, kwargs = self._resolve_scraper() - if authenticator_class is not None: - authenticator = authenticator_class(**kwargs) - - total_count = 0 - try: - for url in urls: - # For standard link collection, don't use selenium for scraping - # (SSO urls are handled separately via collect_sso) - count = self._handle_standard_url( - url, - persistence, - output_dir, - max_depth=max_depth if max_depth is not None else self.base_depth, - client=None, - use_client_for_scraping=False - ) - total_count += count - finally: - if authenticator is not None: - authenticator.close() # Close the authenticator properly and free the resources - return total_count - - def _collect_sso_from_urls( - self, - urls: List[str], - persistence: PersistenceService, - output_dir: Path, - ) -> None: - """Collect SSO-protected URLs using selenium for authentication.""" - if not self.selenium_enabled: - logger.error("SSO scraping requires data_manager.sources.links.selenium_scraper.enabled") - return - if not read_secret("SSO_USERNAME") or not read_secret("SSO_PASSWORD"): - logger.error("SSO scraping requires SSO_USERNAME and SSO_PASSWORD secrets") - return - authenticator = None - if self.selenium_enabled: - authenticator_class, kwargs = self._resolve_scraper() - if authenticator_class is not None: - authenticator = authenticator_class(**kwargs) - - if authenticator is None: - logger.error("SSO collection requires a valid selenium scraper configuration") - return - - try: - for url in urls: - # For SSO URLs, use selenium client for authentication - # scrape_with_selenium determines if we use selenium for scraping too - self._handle_standard_url( - url, - persistence, - output_dir, - max_depth=self.base_depth, - client=authenticator, - use_client_for_scraping=self.scrape_with_selenium - ) - finally: - if authenticator is not None: - authenticator.close() - - def _ensure_sso_defaults(self) -> None: - if not self.selenium_config: - self.selenium_config = {} - - if not self.selenium_enabled: - self.selenium_config["enabled"] = True - self.selenium_enabled = True - - if not self.selenium_config.get("selenium_class"): - self.selenium_config["selenium_class"] = "CERNSSOScraper" - - class_map = self.selenium_config.setdefault("selenium_class_map", {}) - if "CERNSSOScraper" not in class_map: - class_map["CERNSSOScraper"] = { - "class": "CERNSSOScraper", - "kwargs": { - "headless": True, - "max_depth": 2, - }, - } - - def _collect_urls_from_lists(self, input_lists) -> List[str]: - """Collect URLs from the configured weblists.""" - # Handle case where input_lists might be None - urls: List[str] = [] - if not input_lists: - return urls - for list_name in input_lists: - list_path = Path("weblists") / Path(list_name).name - if not list_path.exists(): - logger.warning(f"Input list {list_path} not found.") - continue - - urls.extend(self._extract_urls_from_file(list_path)) - - return urls - - def _collect_urls_from_lists_by_type(self, input_lists: List[str]) -> tuple[List[str], List[str], List[str]]: - """All types of URLs are in the same input lists, separate them via prefixes""" - link_urls: List[str] = [] - git_urls: List[str] = [] - sso_urls: List[str] = [] - for raw_url in self._collect_urls_from_lists(input_lists): - if raw_url.startswith("git-"): - git_urls.append(raw_url.split("git-", 1)[1]) - continue - if raw_url.startswith("sso-"): - sso_urls.append(raw_url.split("sso-", 1)[1]) - continue - link_urls.append(raw_url) - return link_urls, git_urls, sso_urls - def _resolve_scraper(self): - class_name = self.selenium_config.get("selenium_class") - class_map = self.selenium_config.get("selenium_class_map", {}) - selenium_url = self.selenium_config.get("selenium_url",None) - - entry = class_map.get(class_name) - - if not entry: - logger.error(f"Selenium class {class_name} is not defined in the configuration") - return None, {} - - scraper_class = entry.get("class") - if isinstance(scraper_class, str): - module_name = entry.get( - "module", - "src.data_manager.collectors.scrapers.integrations.sso_scraper", - ) - module = importlib.import_module(module_name) - scraper_class = getattr(module, scraper_class) - scraper_kwargs = entry.get("kwargs", {}) - scraper_kwargs["selenium_url"] = selenium_url - return scraper_class, scraper_kwargs - - - def _handle_standard_url( - self, - url: str, - persistence: PersistenceService, - output_dir: Path, - max_depth: int, - client=None, - use_client_for_scraping: bool = False, - ) -> int: - """Scrape a URL and persist resources. Returns count of resources scraped.""" - count = 0 - try: - for resource in self.web_scraper.crawl_iter( - url, - browserclient=client, - max_depth=max_depth, - selenium_scrape=use_client_for_scraping, - max_pages=self.max_pages, - ): - persistence.persist_resource( - resource, output_dir - ) - count += 1 - logger.info(f"Scraped {count} resources from {url}") - except Exception as exc: - logger.error(f"Failed to scrape {url}: {exc}", exc_info=exc) - return count - - def _extract_urls_from_file(self, path: Path) -> List[str]: - """Extract URLs from file, ignoring depth specifications for now.""" - urls: List[str] = [] - with path.open("r") as file: - for line in file: - stripped = line.strip() - if not stripped or stripped.startswith("#"): - continue - # Extract just the URL part, ignoring depth specification if present - url_depth = stripped.split(",") - url = url_depth[0].strip() - urls.append(url) - return urls - - def _collect_git_resources( - self, - git_urls: List[str], - persistence: PersistenceService, - git_dir: Path, - ) -> List[ScrapedResource]: - git_scraper = self._get_git_scraper() - resources = git_scraper.collect(git_urls) - for resource in resources: - persistence.persist_resource(resource, git_dir) - return resources - - def _get_git_scraper(self) -> "GitScraper": - if self._git_scraper is None: - from src.data_manager.collectors.scrapers.integrations.git_scraper import \ - GitScraper - - self._git_scraper = GitScraper(manager=self, git_config=self.git_config) - return self._git_scraper diff --git a/src/data_manager/collectors/scrapers/settings.py b/src/data_manager/collectors/scrapers/settings.py new file mode 100644 index 000000000..2a447c2e4 --- /dev/null +++ b/src/data_manager/collectors/scrapers/settings.py @@ -0,0 +1,97 @@ +BOT_NAME = "archi_scrapers" + +SPIDER_MODULES = ["src.data_manager.collectors.scrapers.spiders"] + +NEWSPIDER_MODULE = "src.data_manager.collectors.scrapers.spiders" + +# Browser-like UA to avoid bot-blocking (e.g. Twiki ConnectionLost issue) +USER_AGENT = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/120.0.0.0 Safari/537.36" + "archi_scrapers/1.0 (+https://github.com/archi-physics/archi)" +) + +# Default RETRY_TIMES is 2. We bump to 3 for transient failures. +# ConnectionLost is in RETRY_HTTP_CODES by default as a non-HTTP failure; +# Scrapy retries it automatically via RetryMiddleware. +RETRY_ENABLED = True +RETRY_TIMES = 3 # max retries per request (transport + server errors only) +RETRY_HTTP_CODES = [ + 500, # Internal Server Error — transient server fault + 502, # Bad Gateway — upstream not reachable + 503, # Service Unavailable — server overloaded + 504, # Gateway Timeout + 408, # Request Timeout — network-level timeout + # 429 (Too Many Requests) omitted: AutoThrottle should prevent hitting it; +] + +# Conservative floor delay for all sources. +# AutoThrottle will increase this dynamically but never go below it. +# Indico's robots.txt mandates Crawl-delay: 10 — Indico spiders must override +# this to 10 via custom_settings = {"DOWNLOAD_DELAY": 10}. +DOWNLOAD_DELAY = 2 # seconds +# Per-request timeout — prevents indefinite hangs +DOWNLOAD_TIMEOUT = 30 # seconds + +# Keep a single concurrent request per domain. +# AutoThrottle adjusts throughput dynamically; starting at 1 is safe. +CONCURRENT_REQUESTS = 1 +CONCURRENT_REQUESTS_PER_DOMAIN = 1 + +# Robots.txt: obey by default. +# override this per-spider: custom_settings = {"ROBOTSTXT_OBEY": False} +# Never disable globally — it would affect all spiders. +ROBOTSTXT_OBEY = True + +# AutoThrottle +# Enabled as a second politeness layer on top of DOWNLOAD_DELAY. +# AutoThrottle treats DOWNLOAD_DELAY as a minimum — it will never go lower. +# Target concurrency of 1.0 keeps us single-threaded per domain by default. +AUTOTHROTTLE_ENABLED = True +AUTOTHROTTLE_START_DELAY = DOWNLOAD_DELAY # initial delay before AT calibrates +AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0 +AUTOTHROTTLE_MAX_DELAY = 60 # cap: never wait more than 60s +# Log every AutoThrottle adjustment — useful during development, can be +# set False in production if log volume is too high. +AUTOTHROTTLE_DEBUG = False + +# ------------------------------------------------------------------ # +# Depth limiting — safety cap; spiders can narrow via custom_settings. +# ------------------------------------------------------------------ # +DEPTH_LIMIT = 2 # hard cap so a misconfigured crawl can't run forever + +# --------------------------------------------------------------------------- +# Safety: fail loudly on spider import errors +# --------------------------------------------------------------------------- +SPIDER_LOADER_WARN_ONLY = False + +# Maximum error count before the spider is closed automatically. +# 25 gives enough room to diagnose intermittent failures without letting +# a completely broken crawl run for hours. +CLOSESPIDER_ERRORCOUNT = 25 + +LOG_LEVEL = "INFO" + +# The class used to detect and filter duplicate requests +DUPEFILTER_CLASS = "scrapy.dupefilters.RFPDupeFilter" + +# --------------------------------------------------------------------------- +# Middlewares, Pipelines and Extensions Priorities +# --------------------------------------------------------------------------- +DOWNLOADER_MIDDLEWARES = { + "scrapy.downloadermiddlewares.retry.RetryMiddleware": 550, + # RedirectMiddleware stays at its default 600 — no entry needed +} + +SPIDER_AUTH_PROVIDERS = { +} + +ITEM_PIPELINES = { + "src.data_manager.collectors.scrapers.pipelines.anonymization.AnonymizationPipeline": 250, + "src.data_manager.collectors.scrapers.pipelines.markitdown.MarkitdownPipeline": 260, +} + +EXTENSIONS = { + "scrapy.extensions.closespider.CloseSpider": 500, +} diff --git a/src/data_manager/collectors/scrapers/spiders/__init__.py b/src/data_manager/collectors/scrapers/spiders/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/data_manager/collectors/scrapers/spiders/link.py b/src/data_manager/collectors/scrapers/spiders/link.py new file mode 100644 index 000000000..c51c0826d --- /dev/null +++ b/src/data_manager/collectors/scrapers/spiders/link.py @@ -0,0 +1,109 @@ +from typing import Iterator, Callable +from urllib.parse import urlparse +from scrapy import Spider +from scrapy.http import Response, Request +from scrapy.linkextractors import LinkExtractor +from scrapy.link import Link +from src.data_manager.collectors.scrapers.utils import IMAGE_EXTENSIONS, IGNORED_DOCUMENT_EXTENSIONS +from src.data_manager.collectors.scrapers.items import WebPageItem +from src.data_manager.collectors.scrapers.parsers.link import parse_link_page + +class LinkSpider(Spider): + """ + Generic link-following spider for unauthenticated pages. + Stays within the hostnames of all start_urls, up to max_depth. + """ + + name = "link" + + _DEFAULT_START_URLS = ["https://quotes.toscrape.com/"] + + custom_settings = { + "DEPTH_LIMIT": 1, # Default max depth + "DOWNLOAD_DELAY": 2, # Default (download) delay + "CLOSESPIDER_PAGECOUNT": 500 # Default max pages + } + + @classmethod + def from_crawler(cls, crawler, *args, **kwargs): + max_depth = kwargs.get("max_depth") + max_pages = kwargs.get("max_pages") + delay = kwargs.get("delay") + if max_depth: + crawler.settings.set("DEPTH_LIMIT", max_depth, priority="spider") + if max_pages: + crawler.settings.set("CLOSESPIDER_PAGECOUNT", max_pages, priority="spider") + if delay: + crawler.settings.set("DOWNLOAD_DELAY", delay, priority="spider") + return super().from_crawler(crawler, *args, **kwargs) + + def __init__(self, start_urls: list[str] = None, max_depth: int = None, max_pages: int = None, allow: list[str] = None, deny: list[str] = None, delay: int = None, canonicalize: bool = False, process_value: Callable[[str], str] = None, *args, **kwargs): + super().__init__(*args, **kwargs) + self._start_urls = start_urls or getattr(self, "_DEFAULT_START_URLS", []) + self._allowed_domains: set[str] = { + urlparse(u).netloc + for u in self._start_urls + if urlparse(u).netloc + } + default_deny = getattr(self, "_DEFAULT_DENY", []) + default_process_value = getattr(self, "_DEFAULT_PROCESS_VALUE", None) + self._le = LinkExtractor( + allow=allow or [], + deny=(deny or []) + default_deny, + allow_domains=list(self._allowed_domains), + deny_extensions=(IMAGE_EXTENSIONS + IGNORED_DOCUMENT_EXTENSIONS), + canonicalize=canonicalize, + process_value=process_value or default_process_value, + unique=True, + ) + + async def start(self): + """ + Seed requests — validates start_urls at crawl time, not import time. + Building the habit: always attach errback here, never rely on + start_urls shortcut in production spiders. + """ + if not self._start_urls: + raise ValueError("LinkSpider requires start_urls to be set") + for url in self._start_urls: + yield Request(url=url, callback=self.parse, errback=self.errback, meta={"depth": 0}) + + def parse(self, response: Response) -> Iterator[WebPageItem | Request]: + """ + Extract one item per response, then yield follow Requests up to max_depth. + @url https://quotes.toscrape.com/ + @returns items 1 + @returns requests 1 + @scrapes url title + """ + yield from self.parse_item(response) # Yield Item + yield from self.follow_links(response) # Yield Requests + + + def follow_links(self, response: Response) -> Iterator[Request]: + current_depth = response.meta.get("depth", 0) + if current_depth >= self.settings.get("DEPTH_LIMIT"): + self.logger.info("Reached max depth %d", self.settings.get("DEPTH_LIMIT")) + return + for link in self.parse_follow_links(response): + self.logger.info("Following %s at depth %d", link.url, current_depth) + yield Request(link.url, callback=self.parse, errback=self.errback, meta={"depth": current_depth + 1}) + + def errback(self, failure): + self.logger.error( + "Request failed: %s — %s", + failure.request.url, + repr(failure.value), + ) + + # ------------------------------------------------------------------ # + # Extension points — pure, unit-testable/checkable without a reactor + # ------------------------------------------------------------------ # + + def parse_item(self, response: Response) -> Iterator[WebPageItem]: + yield from parse_link_page(response) + + def parse_follow_links(self, response: Response) -> Iterator[Link]: + links = self._le.extract_links(response) + self.logger.info("Extracted %d links from %s", len(links), response.url) + yield from links diff --git a/src/data_manager/collectors/scrapers/utils.py b/src/data_manager/collectors/scrapers/utils.py new file mode 100644 index 000000000..003003d77 --- /dev/null +++ b/src/data_manager/collectors/scrapers/utils.py @@ -0,0 +1,19 @@ +from scrapy.http import Response + +IMAGE_EXTENSIONS = [ + "png", "jpg", "jpeg", "gif", "bmp", "svg", "ico", "webp" +] + +# pdf, docs, xlsx, pptx are first class supported by MarkItDown +IGNORED_DOCUMENT_EXTENSIONS = [ + "doc", + "xls", + "ppt", + "zip", + "rar", +] + +def get_content_type(response: Response) -> str: + """Decode the Content-Type header bytes to str.""" + raw: bytes = response.headers.get("Content-Type", b"") or b"" + return raw.decode("utf-8", errors="replace") diff --git a/src/data_manager/collectors/utils/anonymizer.py b/src/data_manager/collectors/utils/anonymizer.py index 72ac00456..e6ffcc353 100644 --- a/src/data_manager/collectors/utils/anonymizer.py +++ b/src/data_manager/collectors/utils/anonymizer.py @@ -3,20 +3,71 @@ """ import re -from typing import List, Set +from typing import List, Set, Dict, Any import spacy from src.utils.config_access import get_data_manager_config +from html import unescape + +# Generic markup patterns +_TAG_RE = re.compile(r"<[^>]+>") +_CDATA_RE = re.compile(r"") +_DC_CREATOR_RE = re.compile( + r'(John
→ (removed) +#
John Doe
\s*(?:
)?\s*[A-Z][\w.]*(?:\s+[A-Z][\w.]*){0,2}\s*
) + for chunk in self._extract_text_chunks(markup): + names |= self._discover_names(chunk) + return names def anonymize(self, text: str) -> str: """ Anonymize names, emails, usernames, greetings, and sign-offs from the text. """ - doc = self.nlp(text) - names_to_replace = { - ent.text for ent in doc.ents - if ent.label_ == "PERSON" and ent.text not in self.EXCLUDED_WORDS - } + names_to_replace = self._discover_names(text) # Remove email addresses and usernames text = self.EMAIL_PATTERN.sub("", text) text = self.USERNAME_PATTERN.sub("", text) - # Remove greetings and sign-offs + text = self._strip_greetings_signoffs(text) + return self._replace_names(text, names_to_replace) + + def anonymize_markup(self, markup: str) -> str: + """ + Anonymize names, emails, usernames, greetings, and sign-offs from the markup. + including html, rss, and other markup formats. (especially twiki and discourse markup) + """ + names_to_replace = self._discover_names_markup(markup) + # Remove email addresses and usernames + markup = self.EMAIL_PATTERN.sub("", markup) + markup = self.USERNAME_PATTERN.sub("", markup) + markup = _DC_CREATOR_RE.sub(r'\1\2', markup) + markup = _DEFAULT_GENERIC_MARKUP_AUTHOR_ELEMENT_RE.sub("", markup) + markup = _DEFAULT_GENERIC_MARKUP_USER_LINK_RE.sub("", markup) + markup = _DEFAULT_MARKUP_SIGNOFF_TAG_RE.sub("", markup) + markup = _DEFAULT_MARKUP_TRAILING_SIGNOFF_TAG_RE.sub("", markup) + markup = _DEFAULT_MARKUP_TWIKI_USER_LINK_RE.sub("", markup) + markup = self._strip_greetings_signoffs(markup) + return self._replace_names(markup, names_to_replace) + + def _strip_greetings_signoffs(self, text: str) -> str: lines = text.splitlines() - filtered_lines: List[str] = [] + filtered = [] for line in lines: - stripped_line = line.strip() - if any(p.match(stripped_line) for p in self.GREETING_PATTERNS): + stripped = line.strip() + if any(p.match(stripped) for p in self.GREETING_PATTERNS): continue - if any(p.match(stripped_line) for p in self.SIGNOFF_PATTERNS): + if any(p.match(stripped) for p in self.SIGNOFF_PATTERNS): continue - filtered_lines.append(line) - text = "\n".join(filtered_lines) - - # Remove names (case-insensitive) - for name in sorted(names_to_replace, key=len, reverse=True): - pattern = re.compile(r'\b' + re.escape(name) + r'\b', re.IGNORECASE) - text = pattern.sub("", text) - - # Remove extra whitespace - text = "\n".join(line for line in text.splitlines() if line.strip()) - - return text + filtered.append(line) + return "\n".join(filtered) + + def _replace_names(self, text: str, names: set) -> str: + for name in sorted(names, key=len, reverse=True): + text = re.compile(r'\b' + re.escape(name) + r'\b', re.IGNORECASE).sub("", text) + return "\n".join(line for line in text.splitlines() if line.strip()) + + def _extract_text(self, markup: str) -> str: + """Strip markup to plain text for NER. Format-agnostic.""" + attrs = " ".join(_ATTR_TEXT_RE.findall(markup)) + clean = _CDATA_RE.sub(" ", markup) + clean = _TAG_RE.sub(" ", clean) + clean = unescape(clean) + return re.sub(r"\s+", " ", f"{clean} {attrs}").strip() + + def _extract_text_chunks(self, markup: str) -> list: + chunks = [] + # Text content from tags + for match in _CONTENT_TAG_RE.finditer(markup): + inner = _CDATA_RE.sub(" ", match.group(1)) + clean = _TAG_RE.sub(" ", inner) + clean = unescape(clean).strip() + if clean: + chunks.append(clean) + # Text from attributes + attr_text = " ".join(_ATTR_TEXT_RE.findall(markup)) + if attr_text.strip(): + chunks.append(attr_text.strip()) + return chunks diff --git a/src/data_manager/collectors/utils/markitdown_convertor.py b/src/data_manager/collectors/utils/markitdown_convertor.py new file mode 100644 index 000000000..5cbadd603 --- /dev/null +++ b/src/data_manager/collectors/utils/markitdown_convertor.py @@ -0,0 +1,38 @@ +import io +from markitdown import MarkItDown +from src.utils.logging import get_logger +# from src.interfaces.llm.llm_client import LLMClient + +logger = get_logger(__name__) + +def to_valid_file_extension(file_extension: str) -> str: + """ + Convert the file extension to a valid MarkItDown file extension. + """ + return "." + file_extension.lstrip(".") + +class MarkitdownConvertor: + + def __init__(self): + self.markitdown = MarkItDown( + enable_plugins=True, + # llm_client=llm_client, + # llm_model=llm_model, + ) + + def convert(self, content: str, file_extension: str = ".html") -> str: + """ + Convert the content to markdown using MarkItDown. + Args: + content: The content to convert. + file_extension: The file extension of the content. + Returns: + The converted content. + """ + logger.debug(f"Converting content to markdown: {content}") + result = self.markitdown.convert_stream( + io.BytesIO(content.encode("utf-8")), + file_extension=to_valid_file_extension(file_extension), + ) + logger.debug(f"Markitdown result: {result.text_content if hasattr(result, 'text_content') else str(result)}") + return result.text_content if hasattr(result, 'text_content') else str(result)