diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 4636bc5c..319cd7f0 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -64,7 +64,7 @@ jobs: - name: Install dependencies run: | - uv sync --frozen --python python --no-dev --no-managed-python --group tests + uv sync --frozen --python python --no-dev --no-python-downloads --group tests - name: Run api tests env: diff --git a/aymurai/api/endpoints/routers/anonymizer/anonymizer.py b/aymurai/api/endpoints/routers/anonymizer/anonymizer.py index 65a36131..6a48c33b 100644 --- a/aymurai/api/endpoints/routers/anonymizer/anonymizer.py +++ b/aymurai/api/endpoints/routers/anonymizer/anonymizer.py @@ -5,7 +5,7 @@ from threading import Lock import torch -from fastapi import Body, Depends, Form, Query, UploadFile +from fastapi import Body, Depends, Form, HTTPException, Query, UploadFile from fastapi.responses import FileResponse from fastapi.routing import APIRouter from sqlmodel import Session @@ -31,7 +31,10 @@ TextRequest, ) from aymurai.settings import settings -from aymurai.text.anonymization import DocAnonymizer, replace_labels_in_text +from aymurai.text.anonymization import ( + InvalidDocumentAnonymizer, + get_anonymizer, +) from aymurai.text.extraction import MIMETYPE_EXTENSION_MAPPER from aymurai.utils.entity_disambiguation import ( build_canonical_entities, @@ -514,11 +517,21 @@ async def anonymizer_compile_document( """ logger.info(f"receiving => {file.filename}") extension = MIMETYPE_EXTENSION_MAPPER.get(file.content_type) - logger.info(f"detection extension: {extension} ({file.content_type})") + file_suffix = os.path.splitext(file.filename or "")[1].lower() + + if extension is None and file_suffix: + extension = file_suffix.lstrip(".") + + if extension not in {"docx", "pdf"}: + raise HTTPException( + status_code=400, + detail=f"Unsupported format for anonymization: {extension or 'unknown'}", + ) + + logger.info(f"detected extension: {extension} ({file.content_type})") # Create a temporary file - _, suffix = os.path.splitext(file.filename) - suffix = suffix if suffix == ".docx" else ".txt" + suffix = f".{extension}" tmp_dir = tempfile.gettempdir() # Use delete=False to avoid the file being deleted when the NamedTemporaryFile object is closed @@ -537,7 +550,7 @@ async def anonymizer_compile_document( annots_json = json.loads(annotations) annots = DocumentAnnotations.model_validate(annots_json) - logger.info(f"processing annotations => {annots}") + effective_label_policies = _merge_label_policies(annots.label_policies) effective_render_policy = _merge_render_policy(annots.render_policy) @@ -562,9 +575,6 @@ async def anonymizer_compile_document( override=False, ) - # Anonymize the document - doc_anonymizer = DocAnonymizer() - filtered_annotations = [] for paragraph in annots.data: filtered_labels = [ @@ -583,39 +593,36 @@ async def anonymizer_compile_document( filtered_annotations, effective_render_policy, effective_label_policies ) - if suffix == ".docx": - item = {"path": tmp_filename} - doc_anonymizer.render_context = render_context - doc_anonymizer( - item, - [ - document_information.model_dump() - for document_information in filtered_annotations - ], + preds = [ + document_information.model_dump(mode="json", exclude_none=True) + for document_information in filtered_annotations + ] + + try: + anonymizer = get_anonymizer(extension) + anonymized_path = anonymizer( + {"path": tmp_filename}, + preds, tmp_dir, + render_context=render_context, + ) + except (ValueError, InvalidDocumentAnonymizer) as exc: + if os.path.exists(tmp_filename): + os.remove(tmp_filename) + raise HTTPException(status_code=400, detail=str(exc)) from exc + + if extension == "pdf": + if os.path.exists(tmp_filename): + os.remove(tmp_filename) + + return FileResponse( + anonymized_path, + background=BackgroundTask(os.remove, anonymized_path), + media_type="application/pdf", + filename=f"{os.path.splitext(file.filename)[0]}.pdf", ) - logger.info(f"saved temp file on local storage => {tmp_filename}") - - else: - # Export as raw document - anonymized_doc = [ - replace_labels_in_text( - document_information.model_dump(), - render_context=render_context, - ) - .replace("<", "<") - .replace(">", ">") - for document_information in filtered_annotations - ] - with open(tmp_filename, "w") as f: - f.write("\n".join(anonymized_doc)) - - # Add watermark to the end of the document - f.write( - "\n\nDocumento anonimizado por AymurAI\n\nhttps://www.aymurai.info/" - ) - # Convert to ODT + # DOCX flow keeps ODT output cmd = [ settings.LIBREOFFICE_BIN, "--headless", @@ -623,9 +630,8 @@ async def anonymizer_compile_document( "odt", "--outdir", tmp_dir, - tmp_filename, + anonymized_path, ] - logger.info(f"Executing: {' '.join(cmd)}") try: @@ -633,20 +639,20 @@ async def anonymizer_compile_document( cmd, shell=False, encoding="utf-8", errors="ignore" ) logger.info(f"LibreOffice output: {output}") - except subprocess.CalledProcessError as e: + except subprocess.CalledProcessError as exc: raise RuntimeError( - f"LibreOffice conversion failed: {e.output.decode('utf-8', errors='ignore')}" - ) + f"LibreOffice conversion failed: {exc.output.decode('utf-8', errors='ignore')}" + ) from exc + finally: + if os.path.exists(tmp_filename): + os.remove(tmp_filename) - odt = tmp_filename.replace(suffix, ".odt") + odt = f"{os.path.splitext(anonymized_path)[0]}.odt" logger.info(f"Expected output file path: {odt}") if not os.path.exists(odt): raise RuntimeError(f"File at path {odt} does not exist.") - # Ensure the temporary file is deleted - os.remove(tmp_filename) - return FileResponse( odt, background=BackgroundTask(os.remove, odt), diff --git a/aymurai/api/endpoints/routers/misc/document_extract.py b/aymurai/api/endpoints/routers/misc/document_extract.py index 37b7d0a4..ba315b74 100644 --- a/aymurai/api/endpoints/routers/misc/document_extract.py +++ b/aymurai/api/endpoints/routers/misc/document_extract.py @@ -31,7 +31,7 @@ def extraction(path: str) -> str: str: Extracted text from the document. """ text = extract_document(path) - return document_normalize(text) if text else "" + return document_normalize(text, preserve_paragraphs=True) if text else "" def run_safe_text_extraction( @@ -63,6 +63,20 @@ def run_safe_text_extraction( raise +def _split_document_paragraphs(document: str) -> list[str]: + if re.search(r"\n\s*\n+", document): + raw_paragraphs = re.split(r"\n\s*\n+", document) + else: + raw_paragraphs = document.splitlines() + + paragraphs = [ + re.sub(r"[ \t]{2,}", " ", paragraph.strip()) + for paragraph in raw_paragraphs + if paragraph.strip() + ] + return list(unique_justseen(paragraphs)) + + @router.post("/document-extract", response_model=Document) def plain_text_extractor(file: UploadFile) -> Document: """ @@ -111,9 +125,6 @@ def plain_text_extractor(file: UploadFile) -> Document: logger.info(f"removed temp file from local storage => {tmp_filename}") document_id = data_to_uuid(data) - - paragraphs = [line.strip() for line in document.split("\n") if line.strip()] - paragraphs = [re.sub(r"\s{2,}", " ", line) for line in paragraphs] - paragraphs = list(unique_justseen(paragraphs)) + paragraphs = _split_document_paragraphs(document) return Document(document=paragraphs, document_id=document_id) diff --git a/aymurai/database/crud/anonymization/paragraph.py b/aymurai/database/crud/anonymization/paragraph.py index 1d169036..17f826b8 100644 --- a/aymurai/database/crud/anonymization/paragraph.py +++ b/aymurai/database/crud/anonymization/paragraph.py @@ -27,7 +27,7 @@ def _serialize_doclabels(value: list[DocLabel] | None): """ if value is None: return None - return _DOC_LABELS_ADAPTER.dump_python(value, mode="json") + return _DOC_LABELS_ADAPTER.dump_python(value, mode="json", exclude_none=True) def _normalize_paragraph_payload(payload: dict) -> dict: @@ -63,7 +63,7 @@ def anonymization_paragraph_create( Returns: AnonymizationParagraph: The persisted paragraph record. """ - payload = _normalize_paragraph_payload(paragraph_in.model_dump()) + payload = _normalize_paragraph_payload(paragraph_in.model_dump(exclude_none=True)) new_paragraph = AnonymizationParagraph(**payload) if override: @@ -171,14 +171,14 @@ def anonymization_paragraph_batch_create_update( paragraph = session.get(AnonymizationParagraph, paragraph_id) if paragraph: - payload = _normalize_paragraph_payload(p_in.model_dump()) + payload = _normalize_paragraph_payload(p_in.model_dump(exclude_none=True)) payload.pop("id", None) for field, value in payload.items(): if value is not None: setattr(paragraph, field, value) else: - payload = _normalize_paragraph_payload(p_in.model_dump()) + payload = _normalize_paragraph_payload(p_in.model_dump(exclude_none=True)) paragraph = AnonymizationParagraph(**payload) session.add(paragraph) diff --git a/aymurai/settings.py b/aymurai/settings.py index 3844d767..2079f561 100644 --- a/aymurai/settings.py +++ b/aymurai/settings.py @@ -65,6 +65,10 @@ def assemble_cors_origins(cls, v) -> list[str]: MEMORY_CACHE_TTL: int = 60 LIBREOFFICE_BIN: str = "libreoffice" + PDF_WATERMARK_FONT_REGULAR: str | None = None + PDF_WATERMARK_FONT_BOLD: str | None = None + ANONYMIZATION_METADATA_CREATOR: str = "AymurAI" + ANONYMIZATION_METADATA_PRODUCER: str = "AymurAI" # Disambiguation Config diff --git a/aymurai/text/anonymization/__init__.py b/aymurai/text/anonymization/__init__.py index 7f839a95..51f3a65b 100644 --- a/aymurai/text/anonymization/__init__.py +++ b/aymurai/text/anonymization/__init__.py @@ -1,7 +1,21 @@ from aymurai.text.anonymization.alignment import replace_labels_in_text -from aymurai.text.anonymization.doc_anonymizer import DocAnonymizer +from aymurai.text.anonymization.base import ( + BaseAnonymizer, + InvalidDocumentAnonymizer, + get_anonymizer, + register_anonymizer, + supported_extensions, +) +from aymurai.text.anonymization.docx import DocxAnonymizer +from aymurai.text.anonymization.pdf import PdfAnonymizer __all__ = [ - "DocAnonymizer", + "BaseAnonymizer", + "DocxAnonymizer", + "PdfAnonymizer", + "InvalidDocumentAnonymizer", + "get_anonymizer", + "register_anonymizer", + "supported_extensions", "replace_labels_in_text", ] diff --git a/aymurai/text/anonymization/alignment.py b/aymurai/text/anonymization/alignment.py index 3a6386b3..e4f2547e 100644 --- a/aymurai/text/anonymization/alignment.py +++ b/aymurai/text/anonymization/alignment.py @@ -9,9 +9,9 @@ from joblib import hash from more_itertools import flatten +from aymurai.meta.api_interfaces import LabelPolicy from aymurai.models.flair.utils import FlairTextNormalize from aymurai.utils.alignment.core import align_text, tokenize -from aymurai.meta.api_interfaces import LabelPolicy REGEX_PARAGRAPH = r"((?.*?)(\/w:p\b)" REGEX_FRAGMENT = r"(?(?P.*?)(<.*?\/w:t)" @@ -61,6 +61,71 @@ def resolve_render_token(label: dict, render_context: dict | None = None) -> str return f"{base}_{index}" +def _label_replacement_start(label: dict) -> int: + """ + Determines the start character index for a label, considering possible alternative attributes. + + Args: + label (dict): Label dictionary which may contain alternative start character attributes. + + Returns: + int: The start character index for the label. + """ + attrs = label.get("attrs") or {} + alt_start = attrs.get("aymurai_alt_start_char") + start_char = label.get("start_char") + return int(alt_start if alt_start is not None else (start_char or 0)) + + +def _label_replacement_end(label: dict) -> int: + """ + Determines the end character index for a label, considering possible alternative attributes. + + Args: + label (dict): Label dictionary which may contain alternative end character attributes. + + Returns: + int: The end character index for the label. + """ + attrs = label.get("attrs") or {} + alt_end = attrs.get("aymurai_alt_end_char") + end_char = label.get("end_char") + return int(alt_end if alt_end is not None else (end_char or 0)) + + +def _label_replacement_text(label: dict, document: str) -> str: + """ + Determines the replacement text for a label, considering possible alternative attributes. + + Args: + label (dict): Label dictionary which may contain alternative text attributes. + document (str): The document text from which to extract the label text. + + Returns: + str: The text for the label, considering possible alternative attributes. + """ + attrs = label.get("attrs") or {} + + alt_text = attrs.get("aymurai_alt_text") + if alt_text is not None: + return str(alt_text) if alt_text else "" + + alt_start = attrs.get("aymurai_alt_start_char") + alt_end = attrs.get("aymurai_alt_end_char") + if alt_start is not None and alt_end is not None: + start_char, end_char = int(alt_start), int(alt_end) + if 0 <= start_char < end_char <= len(document): + return document[start_char:end_char] + + start_char = int(label.get("start_char") or 0) + end_char = int(label.get("end_char") or 0) + if 0 <= start_char < end_char <= len(document): + return document[start_char:end_char] + + text = label.get("text") + return str(text) if text else "" + + def unify_consecutive_labels( sample: dict, text_key: str = "document", @@ -93,9 +158,11 @@ def unify_consecutive_labels( # Iterate over labels for label in labels: # Get attributes - text = label["attrs"]["aymurai_alt_text"] or label["text"] - start_char = label["attrs"]["aymurai_alt_start_char"] or label["start_char"] - end_char = label["attrs"]["aymurai_alt_end_char"] or label["end_char"] + text = _label_replacement_text(label, document) + start_char = _label_replacement_start(label) + end_char = _label_replacement_end(label) + if not text or end_char <= start_char: + continue aymurai_label = resolve_render_token(label, render_context) if current_group is None: @@ -115,7 +182,7 @@ def unify_consecutive_labels( else: # Finish the current group and start a new one current_group["text"] = document[ - current_group["start_char"] : current_group["end_char"] + 1 + current_group["start_char"] : current_group["end_char"] ] unified_labels.append(current_group) current_group = { @@ -128,7 +195,7 @@ def unify_consecutive_labels( # Finish the last group if current_group is not None: current_group["text"] = document[ - current_group["start_char"] : current_group["end_char"] + 1 + current_group["start_char"] : current_group["end_char"] ] unified_labels.append(current_group) @@ -271,7 +338,7 @@ def index_paragraphs(file: str) -> list[dict]: list[dict]: A list of dictionaries representing the indexed paragraphs. """ # Read the XML file - with open(file) as f: + with open(file, encoding="utf-8-sig") as f: xml = f.read() paragraphs = [] diff --git a/aymurai/text/anonymization/base.py b/aymurai/text/anonymization/base.py new file mode 100644 index 00000000..a1631159 --- /dev/null +++ b/aymurai/text/anonymization/base.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Any + + +class InvalidDocumentAnonymizer(Exception): + """Raised when an anonymizer receives an invalid or unsupported document.""" + + +class BaseAnonymizer(ABC): + """Common interface shared by all document anonymizers.""" + + extension: str + + @property + def __name__(self) -> str: + return self.__class__.__name__ + + def ensure_file(self, path: Path) -> Path: + if not path.exists(): + raise InvalidDocumentAnonymizer(f"Invalid path: {path}") + return path + + def __call__( + self, + item: dict, + preds: list[dict], + output_dir: str = ".", + render_context: dict[str, Any] | None = None, + ) -> str: + return self.anonymize(item, preds, output_dir, render_context=render_context) + + @abstractmethod + def anonymize( + self, + item: dict, + preds: list[dict], + output_dir: str = ".", + render_context: dict[str, Any] | None = None, + ) -> str: + """Anonymize a document and return the output path.""" + + +_REGISTRY: dict[str, type[BaseAnonymizer]] = {} + + +def register_anonymizer(cls: type[BaseAnonymizer]) -> type[BaseAnonymizer]: + extension = getattr(cls, "extension", None) + if not extension: + raise ValueError( + f"Anonymizer {cls.__name__} must define an 'extension' attribute" + ) + + _REGISTRY[extension.lower()] = cls + return cls + + +def get_anonymizer(extension: str) -> BaseAnonymizer: + normalized = extension.lower() + try: + anonymizer_cls = _REGISTRY[normalized] + except KeyError as exc: + raise ValueError(f"Unsupported extension: {extension}") from exc + return anonymizer_cls() + + +def supported_extensions() -> set[str]: + return set(_REGISTRY.keys()) + + +__all__ = [ + "BaseAnonymizer", + "InvalidDocumentAnonymizer", + "get_anonymizer", + "register_anonymizer", + "supported_extensions", +] diff --git a/aymurai/text/anonymization/doc_anonymizer.py b/aymurai/text/anonymization/doc_anonymizer.py deleted file mode 100644 index 7feb6f3a..00000000 --- a/aymurai/text/anonymization/doc_anonymizer.py +++ /dev/null @@ -1,88 +0,0 @@ -import os -import tempfile -from glob import glob - -from more_itertools import flatten - -from aymurai.meta.pipeline_interfaces import Transform -from aymurai.text.anonymization.alignment import ( - index_paragraphs, - match_paragraphs_with_predictions, -) -from aymurai.text.anonymization.watermarks import add_footer_watermark -from aymurai.text.anonymization.xml_docx import ( - create_docx, - replace_text_in_xml, - unzip_document, -) -from aymurai.utils.cache import cache_load, cache_save, get_cache_key - - -class DocAnonymizer(Transform): - """ - Anonymize document by replacing sensitive data with label tokens - """ - - def __init__(self, use_cache: bool = False): - self.use_cache = use_cache - self.render_context = None - - def __call__(self, item: dict, preds: list[dict], output_dir: str = ".") -> None: - """ - Performs the anonymization process on a document. - - Args: - item (dict): The document item to be anonymized. - preds (list[dict]): The list of predictions for the document. - output_dir (str, optional): The directory to save the anonymized document. - Defaults to ".". - - Raises: - ValueError: If the document has an extension other than `.docx`. - """ - item_path = item["path"] - - if not os.path.splitext(item_path)[-1] == ".docx": - raise ValueError("Only `.docx` extension is allowed.") - - if not item.get("data"): - item["data"] = {} - - cache_key = get_cache_key(item_path, self.__name__) - if self.use_cache and (cache_data := cache_load(key=cache_key)): - paragraphs = cache_data - else: - # Unzip document into a temporary directory - with tempfile.TemporaryDirectory() as tempdir: - unzip_document(item_path, tempdir) - - # Parse XML files - xml_files = glob(f"{tempdir}/**/*.xml", recursive=True) - paragraphs = (index_paragraphs(file) for file in xml_files) - paragraphs = list(flatten(paragraphs)) - - # Filter out empty paragraphs - paragraphs = [ - paragraph - for paragraph in paragraphs - if paragraph["plain_text"].strip() - ] - - # Matching - paragraphs = match_paragraphs_with_predictions(paragraphs, preds) - - # Edit XML filess - replace_text_in_xml(paragraphs, tempdir, self.render_context) - - # Recreate anonymized document - os.makedirs(output_dir, exist_ok=True) - create_docx( - tempdir, - f"{output_dir}/{os.path.basename(item_path)}", - ) - - # Add watermark to the footer - add_footer_watermark(f"{output_dir}/{os.path.basename(item_path)}") - - if self.use_cache: - cache_save(paragraphs, key=cache_key) diff --git a/aymurai/text/anonymization/docx/__init__.py b/aymurai/text/anonymization/docx/__init__.py new file mode 100644 index 00000000..5d5d0aca --- /dev/null +++ b/aymurai/text/anonymization/docx/__init__.py @@ -0,0 +1,3 @@ +from aymurai.text.anonymization.docx.anonymizer import DocxAnonymizer + +__all__ = ["DocxAnonymizer"] diff --git a/aymurai/text/anonymization/docx/anonymizer.py b/aymurai/text/anonymization/docx/anonymizer.py new file mode 100644 index 00000000..73c43487 --- /dev/null +++ b/aymurai/text/anonymization/docx/anonymizer.py @@ -0,0 +1,122 @@ +import os +import tempfile +from datetime import datetime, timezone +from glob import glob +from pathlib import Path +from typing import Any + +from docx import Document +from more_itertools import flatten + +from aymurai.text.anonymization.alignment import ( + index_paragraphs, + match_paragraphs_with_predictions, +) +from aymurai.text.anonymization.base import ( + BaseAnonymizer, + InvalidDocumentAnonymizer, + register_anonymizer, +) +from aymurai.text.anonymization.docx.watermark import add_footer_watermark +from aymurai.settings import settings +from aymurai.text.anonymization.docx.xml import ( + create_docx, + replace_text_in_xml, + unzip_document, +) +from aymurai.utils.cache import cache_load, cache_save, get_cache_key + + +def _set_aymurai_core_properties(doc_path: str) -> None: + """ + Applies the configured AymurAI tooling metadata fields to the DOCX core properties. + + Args: + doc_path (str): The path to the DOCX document to update. + """ + document = Document(doc_path) + core_properties = document.core_properties + core_properties.author = "" + core_properties.last_modified_by = settings.ANONYMIZATION_METADATA_CREATOR + core_properties.modified = datetime.now(timezone.utc) + document.save(doc_path) + + +@register_anonymizer +class DocxAnonymizer(BaseAnonymizer): + """ + Anonymize DOCX documents by replacing sensitive data with label tokens. + """ + + extension = "docx" + + def __init__(self, use_cache: bool = False): + self.use_cache = use_cache + + def anonymize( + self, + item: dict, + preds: list[dict], + output_dir: str = ".", + render_context: dict[str, Any] | None = None, + ) -> str: + """ + Anonymizes a DOCX document using the matched paragraph predictions. + + Args: + item (dict): The item dictionary containing the input DOCX path. + preds (list[dict]): The predictions to apply to the document. + output_dir (str, optional): The directory where the anonymized document should be written. Defaults to '.'. + render_context (dict[str, Any] | None, optional): The rendering context used to resolve replacement tokens. + Defaults to None. + + Returns: + str: The path to the anonymized DOCX output file. + """ + item_path = Path(item["path"]) + file_path = self.ensure_file(item_path) + + if file_path.suffix.lower() != ".docx": + raise InvalidDocumentAnonymizer("Only `.docx` extension is allowed.") + + if not item.get("data"): + item["data"] = {} + + cache_key = get_cache_key(str(file_path), self.__name__) + if self.use_cache and (cache_data := cache_load(key=cache_key)): + paragraphs = cache_data + else: + # Unzip document into a temporary directory + with tempfile.TemporaryDirectory() as tempdir: + unzip_document(str(file_path), tempdir) + + # Parse XML files + xml_files = glob(f"{tempdir}/**/*.xml", recursive=True) + paragraphs = (index_paragraphs(file) for file in xml_files) + paragraphs = list(flatten(paragraphs)) + + # Filter out empty paragraphs + paragraphs = [ + paragraph + for paragraph in paragraphs + if paragraph["plain_text"].strip() + ] + # Matching + paragraphs = match_paragraphs_with_predictions(paragraphs, preds) + + # Edit XML files + replace_text_in_xml(paragraphs, tempdir, render_context) + + # Recreate anonymized document + os.makedirs(output_dir, exist_ok=True) + output_path = f"{output_dir}/{os.path.basename(str(file_path))}" + create_docx(tempdir, output_path) + + # Add metadata branding and the footer watermark + _set_aymurai_core_properties(output_path) + add_footer_watermark(output_path) + + if self.use_cache: + cache_save(paragraphs, key=cache_key) + + return f"{output_dir}/{os.path.basename(str(file_path))}" diff --git a/aymurai/text/anonymization/watermarks.py b/aymurai/text/anonymization/docx/watermark.py similarity index 100% rename from aymurai/text/anonymization/watermarks.py rename to aymurai/text/anonymization/docx/watermark.py diff --git a/aymurai/text/anonymization/xml_docx.py b/aymurai/text/anonymization/docx/xml.py similarity index 100% rename from aymurai/text/anonymization/xml_docx.py rename to aymurai/text/anonymization/docx/xml.py diff --git a/aymurai/text/anonymization/pdf/__init__.py b/aymurai/text/anonymization/pdf/__init__.py new file mode 100644 index 00000000..21271aae --- /dev/null +++ b/aymurai/text/anonymization/pdf/__init__.py @@ -0,0 +1,3 @@ +from aymurai.text.anonymization.pdf.anonymizer import PdfAnonymizer + +__all__ = ["PdfAnonymizer"] diff --git a/aymurai/text/anonymization/pdf/anonymizer.py b/aymurai/text/anonymization/pdf/anonymizer.py new file mode 100644 index 00000000..0030c24b --- /dev/null +++ b/aymurai/text/anonymization/pdf/anonymizer.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +import os +from pathlib import Path +from typing import Any + +import pymupdf +import pymupdf.layout # noqa: F401 # activates layout support +from pymupdf4llm.helpers import document_layout as pymupdf4llm_document_layout + +from aymurai.text.anonymization.base import ( + BaseAnonymizer, + InvalidDocumentAnonymizer, + register_anonymizer, +) +from aymurai.text.anonymization.pdf.layout import ( + _apply_minimal_boundary_merge, + _build_layout_paragraphs, + _match_predictions_to_layout, +) +from aymurai.text.anonymization.pdf.ops import ( + _apply_redactions, + _collect_page_redactions, +) +from aymurai.text.anonymization.pdf.sanitize import ( + _collect_link_cleanup_rects, + _sanitize_document, +) +from aymurai.text.anonymization.pdf.watermark import add_pdf_footer_watermark + + +@register_anonymizer +class PdfAnonymizer(BaseAnonymizer): + """ + Anonymize PDF documents by replacing sensitive data with label tokens. + """ + + extension = "pdf" + + def anonymize( + self, + item: dict, + preds: list[dict], + output_dir: str = ".", + render_context: dict[str, Any] | None = None, + ) -> str: + """ + Anonymizes a PDF document using the matched paragraph predictions. + + Args: + item (dict): The item dictionary containing the input PDF path. + preds (list[dict]): The predictions to apply to the document. + output_dir (str, optional): The directory where the anonymized document should be written. Defaults to '.'. + render_context (dict[str, Any] | None, optional): The rendering context used to resolve replacement tokens. Defaults to None. + + Returns: + str: The path to the anonymized PDF output file. + """ + item_path = Path(item["path"]) + file_path = self.ensure_file(item_path) + + if file_path.suffix.lower() != ".pdf": + raise InvalidDocumentAnonymizer("Only `.pdf` extension is allowed.") + + with pymupdf.open(str(file_path)) as doc: + parsed_doc = pymupdf4llm_document_layout.parse_document( + doc, + filename=str(file_path), + show_progress=False, + force_text=True, + use_ocr=False, + force_ocr=False, + ) + + layout_paragraphs = _build_layout_paragraphs(parsed_doc) + matched_paragraphs = _match_predictions_to_layout( + layout_paragraphs, + preds, + ) + + _apply_minimal_boundary_merge(matched_paragraphs, render_context) + page_ops, widget_ops, signature_widget_ops = _collect_page_redactions( + doc, + matched_paragraphs, + render_context, + ) + _apply_redactions(doc, page_ops, widget_ops, signature_widget_ops) + cleanup_rects = _collect_link_cleanup_rects( + page_ops, + widget_ops, + signature_widget_ops, + ) + _sanitize_document(doc, cleanup_rects) + add_pdf_footer_watermark(doc) + + os.makedirs(output_dir, exist_ok=True) + output_path = Path(output_dir) / f"{file_path.stem}.anonymized.pdf" + doc.save(str(output_path), garbage=4, clean=1, deflate=1) + + return str(output_path) diff --git a/aymurai/text/anonymization/pdf/common.py b/aymurai/text/anonymization/pdf/common.py new file mode 100644 index 00000000..91f42927 --- /dev/null +++ b/aymurai/text/anonymization/pdf/common.py @@ -0,0 +1,620 @@ +from __future__ import annotations + +import re +from functools import lru_cache +from typing import Any +from unicodedata import normalize + +import pymupdf + +TEXT_FLAG_ITALIC = 2 +TEXT_FLAG_SERIF = 4 +TEXT_FLAG_MONOSPACED = 8 +TEXT_FLAG_BOLD = 16 +PDF_TAG_MIN_FONT_SIZE = 7.0 +PDF_TAG_FONT_STEP = 0.5 +PDF_TAG_MAX_ABBREVIATION = 3 +PDF_TOKEN_ALIAS_MAP: dict[str, tuple[str, str]] = { + "CORREO_ELECTRONICO": ("CORREO", "MAIL"), + "CUIT_CUIL": ("CUIT", "CUIL"), + "DIRECCION": ("DIREC", "DIR"), + "ESTUDIOS": ("ESTUD", "EDU"), + "MARCA_AUTOMOVIL": ("MARCA_AUTO", "AUTO"), + "NACIONALIDAD": ("NACIONAL", "NAC"), + "NOMBRE_ARCHIVO": ("NOM_ARCH", "ARCH"), + "NUM_ACTUACION": ("NUM_ACT", "ACT"), + "NUM_CAJA_AHORRO": ("NUM_CAJA", "CAJA"), + "NUM_EXPEDIENTE": ("NUM_EXP", "EXPTE"), + "NUM_MATRICULA": ("NUM_MAT", "MAT"), + "PATENTE_DOMINIO": ("PAT_DOM", "PAT"), + "TELEFONO": ("TELEF", "TEL"), + "TEXTO_ANONIMIZAR": ("TEXTO_ANON", "ANON"), + "USUARIX": ("USUAR", "USR"), +} +PDF_TAG_RECT_X_PADDING = 0.5 +PDF_TAG_RECT_Y_PADDING = 0.0 +PDF_TAG_RECT_INSET = 0.5 +PDF_TAG_RECT_GAP_FACTOR = 0.5 +PDF_TAG_RECT_GAP_MIN = 3.0 +PDF_TAG_RECT_GAP_MAX = 8.0 + + +def _line_text(line: dict) -> str: + """ + Builds the plain text content for a parsed PDF line. + + Args: + line (dict): The parsed line metadata being processed. + + Returns: + str: The concatenated text content for the line. + """ + return "".join(span.get("text", "") for span in line.get("spans", [])) + + +def _rect_tuple(value: Any) -> tuple[float, float, float, float]: + """ + Normalizes a rectangle-like value into a coordinate tuple. + + Args: + value (Any): The rectangle-like value to normalize. + + Returns: + tuple[float, float, float, float]: The normalized rectangle coordinates. + """ + if isinstance(value, pymupdf.Rect): + return (float(value.x0), float(value.y0), float(value.x1), float(value.y1)) + if isinstance(value, (list, tuple)) and len(value) == 4: + return (float(value[0]), float(value[1]), float(value[2]), float(value[3])) + raise ValueError(f"Invalid rectangle value: {value}") + + +def _default_style(fallback_size: float = 10.0) -> dict[str, Any]: + """ + Builds a default text style dictionary for PDF rendering helpers. + + Args: + fallback_size (float, optional): The fallback font size used when no style data is available. Defaults to 10.0. + + Returns: + dict[str, Any]: The default style dictionary. + """ + return { + "font": "", + "flags": 0, + "color": (0.0, 0.0, 0.0), + "size": fallback_size, + "ascender": 0.8, + "descender": -0.2, + } + + +def _span_text_weight(span: dict) -> tuple[int, float]: + """ + Computes a sorting weight for a span based on text length and size. + + Args: + span (dict): The span metadata being evaluated. + + Returns: + tuple[int, float]: The text-length and size weight for the span. + """ + text = str(span.get("text") or "").strip() + return (len(text), float(span.get("size") or 0.0)) + + +def _pdf_color_from_span(span: dict) -> tuple[float, float, float]: + """ + Converts a span color value into PDF RGB components. + + Args: + span (dict): The span metadata being evaluated. + + Returns: + tuple[float, float, float]: The PDF RGB color components for the span. + """ + try: + return tuple( + float(value) for value in pymupdf.sRGB_to_pdf(int(span.get("color") or 0)) + ) + except Exception: + return (0.0, 0.0, 0.0) + + +def _line_style(line: dict, fallback_size: float = 10.0) -> dict[str, Any]: + """ + Determines the dominant text style for a parsed PDF line. + + Args: + line (dict): The parsed line metadata being processed. + fallback_size (float, optional): The fallback font size used when no style data is available. Defaults to 10.0. + + Returns: + dict[str, Any]: The dominant style dictionary for the line. + """ + spans = [ + span for span in line.get("spans") or [] if str(span.get("text") or "").strip() + ] + if not spans: + return _default_style(fallback_size) + + dominant = max(spans, key=_span_text_weight) + return { + "font": str(dominant.get("font") or ""), + "flags": int(dominant.get("flags") or 0), + "color": _pdf_color_from_span(dominant), + "size": float(dominant.get("size") or fallback_size), + "ascender": float(dominant.get("ascender") or 0.8), + "descender": float(dominant.get("descender") or -0.2), + } + + +def _build_spans_detail(line: dict) -> tuple[list[dict], int]: + """ + Builds per-span style metadata and character offsets for a line. + + Args: + line (dict): The parsed line metadata being processed. + + Returns: + tuple[list[dict], int]: The span detail list and left-strip offset. + """ + raw_text = normalize("NFKC", _line_text(line)) + strip_offset = len(raw_text) - len(raw_text.lstrip()) + + spans_detail: list[dict] = [] + cursor = 0 + for span in line.get("spans", []): + span_text = normalize("NFKC", span.get("text", "")) + span_start = cursor + cursor += len(span_text) + spans_detail.append( + { + "start": span_start, + "end": cursor, + "style": { + "font": str(span.get("font") or ""), + "flags": int(span.get("flags") or 0), + "color": _pdf_color_from_span(span), + "size": float(span.get("size") or 10.0), + "ascender": float(span.get("ascender") or 0.8), + "descender": float(span.get("descender") or -0.2), + }, + } + ) + return spans_detail, strip_offset + + +def _entity_style_from_spans( + line_entry: dict, + offset_in_stripped_text: int, +) -> dict[str, Any]: + """ + Resolves the style for the entity offset inside a line entry. + + Args: + line_entry (dict): The `line_entry` value used by this helper. + offset_in_stripped_text (int): The entity offset inside the stripped line text. + + Returns: + dict[str, Any]: The resolved style dictionary for the entity offset. + """ + spans_detail = line_entry.get("spans_detail") + if not spans_detail: + return line_entry.get("style") or _default_style() + + strip_offset = line_entry.get("strip_offset", 0) + raw_offset = offset_in_stripped_text + strip_offset + + for span_info in spans_detail: + if span_info["start"] <= raw_offset < span_info["end"]: + return span_info["style"] + + return line_entry.get("style") or _default_style() + + +def _font_size(line: dict, fallback: float = 10.0) -> float: + """ + Calculates a representative font size for a parsed line. + + Args: + line (dict): The parsed line metadata being processed. + fallback (float, optional): The fallback font size to use when the line has no span sizes. Defaults to 10.0. + + Returns: + float: The representative font size for the line. + """ + spans = line.get("spans") or [] + sizes = [float(span.get("size")) for span in spans if span.get("size")] + if not sizes: + return fallback + size = sum(sizes) / len(sizes) + return max(size * 0.9, PDF_TAG_MIN_FONT_SIZE) + + +def _style_flags(style: dict[str, Any]) -> tuple[bool, bool, bool, bool]: + """ + Extracts boolean style flags from a style dictionary. + + Args: + style (dict[str, Any]): The style dictionary being analyzed. + + Returns: + tuple[bool, bool, bool, bool]: The bold, italic, monospace, and serif flags. + """ + flags = int(style.get("flags") or 0) + font_label = str(style.get("font") or "").lower() + + is_bold = bool(flags & TEXT_FLAG_BOLD) or "bold" in font_label + is_italic = bool(flags & TEXT_FLAG_ITALIC) or any( + token in font_label for token in ("italic", "oblique") + ) + is_mono = bool(flags & TEXT_FLAG_MONOSPACED) or any( + token in font_label for token in ("courier", "mono", "console") + ) + is_serif = bool(flags & TEXT_FLAG_SERIF) or any( + token in font_label + for token in ("times", "serif", "georgia", "garamond", "mistral") + ) + return is_bold, is_italic, is_mono, is_serif + + +def _base14_fontname_for_style(style: dict[str, Any]) -> str: + """ + Maps a style dictionary to the closest Base-14 font name. + + Args: + style (dict[str, Any]): The style dictionary being analyzed. + + Returns: + str: The Base-14 font name that best matches the style. + """ + is_bold, is_italic, is_mono, is_serif = _style_flags(style) + + if is_mono: + family = "Courier" + elif is_serif: + family = "Times" + else: + family = "Helvetica" + + variants = { + ("Helvetica", False, False): "Helvetica", + ("Helvetica", True, False): "Helvetica-Bold", + ("Helvetica", False, True): "Helvetica-Oblique", + ("Helvetica", True, True): "Helvetica-BoldOblique", + ("Times", False, False): "Times-Roman", + ("Times", True, False): "Times-Bold", + ("Times", False, True): "Times-Italic", + ("Times", True, True): "Times-BoldItalic", + ("Courier", False, False): "Courier", + ("Courier", True, False): "Courier-Bold", + ("Courier", False, True): "Courier-Oblique", + ("Courier", True, True): "Courier-BoldOblique", + } + return variants[(family, is_bold, is_italic)] + + +def _build_flexible_pattern(text: str) -> str: + """ + Builds a whitespace-tolerant regex pattern for the given text. + + Args: + text (str): The text value being normalized or searched. + + Returns: + str: The whitespace-tolerant regex pattern. + """ + tokens = [re.escape(tok) for tok in re.split(r"\s+", text.strip()) if tok] + return r"\s+".join(tokens) + + +def _find_flexible( + haystack: str, + needle: str, + start: int = 0, +) -> tuple[int, int] | None: + """ + Finds a text span using exact and whitespace-tolerant matching. + + Args: + haystack (str): The source text to search within. + needle (str): The target text to search for. + start (int, optional): The preferred start offset for the search. Defaults to 0. + + Returns: + tuple[int, int] | None: The start and end offsets of the match, if found. + """ + if not needle: + return None + + idx = haystack.find(needle, start) + if idx >= 0: + return idx, idx + len(needle) + + pattern = _build_flexible_pattern(needle) + if not pattern: + return None + + match = re.search(pattern, haystack[start:]) + if match: + return start + match.start(), start + match.end() + + if start > 0: + match = re.search(pattern, haystack) + if match: + return match.start(), match.end() + + return None + + +def _token_parts(token: str) -> tuple[str, str | None]: + """ + Splits a logical token into its base label and numeric suffix. + + Args: + token (str): The logical replacement token being processed. + + Returns: + tuple[str, str | None]: The token base and optional numeric suffix. + """ + match = re.match(r"^(.*?)(?:_(\d+))?$", token) + if not match: + normalized = token.strip() or "ENT" + return normalized, None + + base = match.group(1).strip() or "ENT" + suffix = match.group(2) + return base, suffix + + +def _abbreviate_token(base: str, length: int) -> str: + """ + Builds an abbreviated token label with the requested length. + + Args: + base (str): The token base label to abbreviate or alias. + length (int): The target abbreviation length. + + Returns: + str: The abbreviated token label. + """ + normalized = "".join(char for char in base.upper() if char.isalnum()) + if not normalized: + normalized = "ENT" + return normalized[:length] or normalized[:1] or "E" + + +def _token_aliases(base: str) -> tuple[str, ...]: + """ + Returns configured alias labels for a token base. + + Args: + base (str): The token base label to abbreviate or alias. + + Returns: + tuple[str, ...]: The configured aliases for the token base. + """ + aliases = PDF_TOKEN_ALIAS_MAP.get(base.upper(), ()) + normalized_aliases: list[str] = [] + + for alias in aliases: + normalized = re.sub(r"[^A-Z0-9_]", "", str(alias).upper()) + if ( + normalized + and normalized != base.upper() + and normalized not in normalized_aliases + ): + normalized_aliases.append(normalized) + + return tuple(normalized_aliases) + + +def _build_display_token_candidates(token: str) -> list[str]: + """ + Builds the list of token display candidates to try when rendering. + + Args: + token (str): The logical replacement token being processed. + + Returns: + list[str]: The candidate display tokens to try when rendering. + """ + base, suffix = _token_parts(token.upper()) + candidates: list[str] = [] + + def add(value: str) -> None: + """ + Appends a token display candidate when it has not been added yet. + + Args: + value (str): The rectangle-like value to normalize. + """ + if value and value not in candidates: + candidates.append(value) + + def add_base_variants(label: str) -> None: + """ + Appends the base token variants for the current label candidate. + + Args: + label (str): The label metadata being processed. + """ + if suffix: + add(f"<{label}_{suffix}>") + add(f"<{label}>") + + add_base_variants(base) + + for alias in _token_aliases(base): + add_base_variants(alias) + + abbreviated = _abbreviate_token(base, PDF_TAG_MAX_ABBREVIATION) + add_base_variants(abbreviated) + + return candidates + + +def _iter_font_sizes(start_size: float) -> list[float]: + """ + Builds the descending font sizes to try when fitting a token. + + Args: + start_size (float): The `start_size` value used by this helper. + + Returns: + list[float]: The font sizes to try in descending order. + """ + if start_size <= 0: + return [] + + sizes: list[float] = [start_size] + current = start_size + while current - PDF_TAG_FONT_STEP >= PDF_TAG_MIN_FONT_SIZE - 1e-6: + current = round(current - PDF_TAG_FONT_STEP, 2) + if current not in sizes: + sizes.append(current) + + return sizes + + +def _fit_display_token( + token: str, + rect: pymupdf.Rect, + fontname: str, + base_font_size: float, + font_obj: pymupdf.Font | None = None, +) -> tuple[str | None, float | None]: + """ + Finds a token rendering variant and font size that fit inside a rectangle. + + Args: + token (str): The logical replacement token being processed. + rect (pymupdf.Rect): The rectangle used by the helper. + fontname (str): The font name to use for measurement or rendering. + base_font_size (float): The initial font size to try when fitting text. + font_obj (pymupdf.Font | None, optional): The font object used for measurement. Defaults to None. + + Returns: + tuple[str | None, float | None]: The fitted token text and font size. + """ + if rect.width <= 0 or rect.height <= 0: + return None, None + + available_width = max(rect.width - (2 * PDF_TAG_RECT_INSET), 1.0) + start_size = min(base_font_size, max(rect.height - 1.0, 1.0)) + if start_size < 1.0: + return None, None + + def _measure(text: str, size: float) -> float: + """ + Measures the width of a candidate token at the given font size. + + Args: + text (str): The text value being normalized or searched. + size (float): The font size used for the current measurement. + + Returns: + float: The measured width of the candidate text. + """ + if font_obj is not None: + try: + return font_obj.text_length(text, fontsize=size) + except Exception: + pass + return pymupdf.get_text_length(text, fontname=fontname, fontsize=size) + + for size in _iter_font_sizes(start_size): + for candidate in _build_display_token_candidates(token): + if _measure(candidate, size) <= available_width + 0.1: + return candidate, size + + return None, None + + +_BASE14_FONT_CACHE: dict[str, pymupdf.Font] = {} + + +@lru_cache(maxsize=None) +def _cached_base14_font(name: str) -> pymupdf.Font: + """ + Loads and caches a Base-14 font by name. + + Args: + name (str): The Base-14 font name to load. + + Returns: + pymupdf.Font: The cached Base-14 font object. + """ + return pymupdf.Font(name) + + +def _get_base14_font(style: dict[str, Any]) -> pymupdf.Font: + """ + Returns the cached Base-14 font object for a style dictionary. + + Args: + style (dict[str, Any]): The style dictionary being analyzed. + + Returns: + pymupdf.Font: The cached Base-14 font for the style. + """ + name = _base14_fontname_for_style(style) + font = _BASE14_FONT_CACHE.get(name) + if font is None: + font = _cached_base14_font(name) + _BASE14_FONT_CACHE[name] = font + return font + + +def _rect_vertical_overlap(left: pymupdf.Rect, right: pymupdf.Rect) -> float: + """ + Calculates the vertical overlap ratio between two rectangles. + + Args: + left (pymupdf.Rect): The left rectangle or label to compare. + right (pymupdf.Rect): The right rectangle or label to compare. + + Returns: + float: The vertical overlap ratio between the rectangles. + """ + overlap = max(0.0, min(left.y1, right.y1) - max(left.y0, right.y0)) + min_height = max(min(left.height, right.height), 1e-6) + return overlap / min_height + + +def _group_adjacent_rects( + rects: list[pymupdf.Rect], max_gap: float +) -> list[pymupdf.Rect]: + """ + Merges horizontally adjacent rectangles that belong to the same segment. + + Args: + rects (list[pymupdf.Rect]): The `rects` value used by this helper. + max_gap (float): The `max_gap` value used by this helper. + + Returns: + list[pymupdf.Rect]: The merged rectangle groups. + """ + if not rects: + return [] + + ordered = sorted(rects, key=lambda rect: (rect.y0, rect.x0, rect.x1)) + groups: list[list[pymupdf.Rect]] = [[ordered[0]]] + + for rect in ordered[1:]: + previous = groups[-1][-1] + gap = rect.x0 - previous.x1 + if _rect_vertical_overlap(previous, rect) >= 0.5 and gap <= max_gap: + groups[-1].append(rect) + else: + groups.append([rect]) + + merged_rects: list[pymupdf.Rect] = [] + for group in groups: + merged = pymupdf.Rect(group[0]) + for rect in group[1:]: + merged.include_rect(rect) + merged_rects.append(merged) + + return merged_rects diff --git a/aymurai/text/anonymization/pdf/layout.py b/aymurai/text/anonymization/pdf/layout.py new file mode 100644 index 00000000..50ce529a --- /dev/null +++ b/aymurai/text/anonymization/pdf/layout.py @@ -0,0 +1,510 @@ +from __future__ import annotations + +import re +from copy import deepcopy +from typing import Any +from unicodedata import normalize + +import pymupdf +from jiwer import cer + +from aymurai.logger import get_logger +from aymurai.text.anonymization.alignment import ( + _label_replacement_end as _label_end, +) +from aymurai.text.anonymization.alignment import ( + _label_replacement_start as _label_start, +) +from aymurai.text.anonymization.alignment import ( + resolve_render_token, +) +from aymurai.text.anonymization.pdf.common import ( + PDF_TAG_RECT_GAP_FACTOR, + PDF_TAG_RECT_GAP_MAX, + PDF_TAG_RECT_GAP_MIN, + _build_flexible_pattern, + _build_spans_detail, + _font_size, + _group_adjacent_rects, + _line_style, + _line_text, + _rect_tuple, + _rect_vertical_overlap, +) + +logger = get_logger(__name__) + + +def _same_boundary_candidate(left: dict, right: dict) -> bool: + """ + Checks whether two labels can share a merged boundary token. + + Args: + left (dict): The left rectangle or label to compare. + right (dict): The right rectangle or label to compare. + + Returns: + bool: Whether the labels can share a boundary token. + """ + left_attrs = left.get("attrs") or {} + right_attrs = right.get("attrs") or {} + + if left_attrs.get("aymurai_label") != right_attrs.get("aymurai_label"): + return False + + left_cid = left_attrs.get("canonical_entity_id") + right_cid = right_attrs.get("canonical_entity_id") + if left_cid and right_cid and str(left_cid) != str(right_cid): + return False + + left_text = str(left.get("text") or "").strip() + right_text = str(right.get("text") or "").strip() + return bool(left_text and right_text) + + +def _resolve_token(label: dict, render_context: dict[str, Any] | None) -> str: + """ + Resolves the logical replacement token for a label. + + Args: + label (dict): The label metadata being processed. + render_context (dict[str, Any] | None): The rendering context used to resolve replacement tokens. + + Returns: + str: The logical token that should replace the label. + """ + boundary_token = label.get("_boundary_token") + if boundary_token: + return boundary_token + + token = resolve_render_token(label, render_context) + return token or "ENT" + + +def _apply_minimal_boundary_merge( + paragraphs: list[dict], + render_context: dict[str, Any] | None, +) -> None: + """ + Propagates a shared token across paragraph-boundary label pairs. + + Args: + paragraphs (list[dict]): The paragraph collection being processed. + render_context (dict[str, Any] | None): The rendering context used to resolve replacement tokens. + """ + for left_par, right_par in zip(paragraphs, paragraphs[1:]): + left_doc = left_par.get("document") or "" + right_doc = right_par.get("document") or "" + left_labels = left_par.get("labels") or [] + right_labels = right_par.get("labels") or [] + + if not left_doc or not right_doc or not left_labels or not right_labels: + continue + + left_candidates = [ + label + for label in left_labels + if _label_end(label) >= max(0, len(left_doc) - 2) + ] + right_candidates = [label for label in right_labels if _label_start(label) <= 2] + + if not left_candidates or not right_candidates: + continue + + for left_label in left_candidates: + for right_label in right_candidates: + if not _same_boundary_candidate(left_label, right_label): + continue + + shared_token = _resolve_token(left_label, render_context) + if not shared_token: + shared_token = _resolve_token(right_label, render_context) + if shared_token: + left_label["_boundary_token"] = shared_token + right_label["_boundary_token"] = shared_token + break + + +def _build_layout_paragraphs(parsed_doc: Any) -> list[dict]: + """ + Builds normalized paragraph metadata from the parsed PDF layout. + + Args: + parsed_doc (Any): The parsed PDF layout document. + + Returns: + list[dict]: The normalized layout paragraphs extracted from the parsed document. + """ + chunks = parsed_doc.to_text( + page_chunks=True, + header=True, + footer=True, + show_progress=False, + ) + + paragraphs: list[dict] = [] + layout_index = 0 + for page_idx, (page, chunk) in enumerate(zip(parsed_doc.pages, chunks)): + page_text = chunk.get("text") or "" + page_boxes = chunk.get("page_boxes") or [] + + for box_meta in page_boxes: + box_idx = int(box_meta["index"]) + if box_idx >= len(page.boxes): + continue + + start, stop = box_meta.get("pos", (0, 0)) + box_text = normalize("NFKC", page_text[start:stop]).strip() + if not box_text: + continue + + box = page.boxes[box_idx] + line_entries: list[dict] = [] + line_text_chunks: list[str] = [] + line_cursor = 0 + + for line_idx, line in enumerate(box.textlines or []): + text = normalize("NFKC", _line_text(line)).strip() + if not text: + continue + + if line_text_chunks: + line_text_chunks.append("\n") + line_cursor += 1 + + line_start = line_cursor + line_text_chunks.append(text) + line_cursor += len(text) + line_end = line_cursor + style = _line_style(line) + spans_detail, strip_offset = _build_spans_detail(line) + + line_entries.append( + { + "page_index": page_idx, + "box_index": box_idx, + "line_index": line_idx, + "bbox": _rect_tuple(line["bbox"]), + "font_size": _font_size(line, float(style.get("size") or 10.0)), + "start": line_start, + "end": line_end, + "text": text, + "style": style, + "spans_detail": spans_detail, + "strip_offset": strip_offset, + } + ) + + line_text = "".join(line_text_chunks) + if not line_text: + continue + + paragraphs.append( + { + "plain_text": box_text, + "metadata": { + "layout_index": layout_index, + "page_index": page_idx, + "page_number": page.page_number, + "box_index": box_idx, + "boxclass": box.boxclass, + "box_bbox": ( + float(box.x0), + float(box.y0), + float(box.x1), + float(box.y1), + ), + "line_text": line_text, + "lines": line_entries, + }, + } + ) + layout_index += 1 + + return paragraphs + + +def _match_predictions_to_layout( + layout_paragraphs: list[dict], + preds: list[dict], +) -> list[dict]: + """ + Matches model predictions to the closest layout paragraphs. + + Args: + layout_paragraphs (list[dict]): The `layout_paragraphs` value used by this helper. + preds (list[dict]): The predictions to apply to the document. + + Returns: + list[dict]: The predictions annotated with their matched layout metadata. + """ + if not layout_paragraphs or not preds: + return [] + + available_indices = list(range(len(layout_paragraphs))) + all_indices = list(range(len(layout_paragraphs))) + matched: list[dict] = [] + + normalized_layout_texts = [ + normalize("NFKC", paragraph["plain_text"]).strip() + for paragraph in layout_paragraphs + ] + + for pred_idx, pred in enumerate(preds): + pred_text = normalize("NFKC", str(pred.get("document") or "")).strip() + if not pred_text: + continue + + candidate_pool = available_indices if available_indices else all_indices + exact_idx = next( + ( + idx + for idx in candidate_pool + if normalized_layout_texts[idx] == pred_text + ), + None, + ) + + if exact_idx is None: + exact_idx = min( + candidate_pool, + key=lambda idx: cer(pred_text, normalized_layout_texts[idx]), + ) + + paragraph = deepcopy(layout_paragraphs[exact_idx]) + paragraph["document"] = pred.get("document") or "" + paragraph["labels"] = pred.get("labels") or [] + paragraph["pred_index"] = pred_idx + matched.append(paragraph) + + if exact_idx in available_indices: + available_indices.remove(exact_idx) + + matched.sort(key=lambda paragraph: paragraph["metadata"]["layout_index"]) + return matched + + +def _pick_rect_group_for_segment( + page: pymupdf.Page, + line: dict, + text: str, + line_x_cursor: dict[tuple[int, int, int], float], +) -> pymupdf.Rect: + """ + Chooses the best rectangle group for a text segment on the page. + + Args: + page (pymupdf.Page): The PDF page being processed. + line (dict): The parsed line metadata being processed. + text (str): The text value being normalized or searched. + line_x_cursor (dict[tuple[int, int, int], float]): The per-line cursor used to keep page searches stable. + + Returns: + pymupdf.Rect | None: The chosen rectangle group for the segment, if found. + """ + clip = pymupdf.Rect(line["bbox"]) + rects = [rect for rect in page.search_for(text, clip=clip) if rect.intersects(clip)] + if not rects: + return clip + + max_gap = min( + max(clip.height * PDF_TAG_RECT_GAP_FACTOR, PDF_TAG_RECT_GAP_MIN), + PDF_TAG_RECT_GAP_MAX, + ) + grouped_rects = _group_adjacent_rects(rects, max_gap=max_gap) + + line_key = (line["page_index"], line["box_index"], line["line_index"]) + min_x = line_x_cursor.get(line_key, clip.x0 - 1) + + for rect in grouped_rects: + if rect.x0 >= min_x - 0.5: + line_x_cursor[line_key] = rect.x1 + return rect + + chosen = grouped_rects[0] + line_x_cursor[line_key] = chosen.x1 + return chosen + + +def _normalize_line_chars(spans: list[dict]) -> list[dict[str, Any]]: + """ + Normalizes per-character span data into searchable character entries. + + Args: + spans (list[dict]): The span collection to normalize into character entries. + + Returns: + list[dict[str, Any]]: The normalized character entries for the line. + """ + chars: list[dict[str, Any]] = [] + for span in spans: + for char in span.get("chars") or []: + norm_text = normalize("NFKC", str(char.get("c") or "")) + if not norm_text: + continue + bbox = pymupdf.Rect(char["bbox"]) + for norm_char in norm_text: + chars.append({"char": norm_char, "bbox": bbox}) + return chars + + +def _line_chars_from_page(page: pymupdf.Page, line: dict) -> list[dict[str, Any]]: + """ + Extracts character-level geometry for a parsed line from the page text. + + Args: + page (pymupdf.Page): The PDF page being processed. + line (dict): The parsed line metadata being processed. + + Returns: + list[dict[str, Any]]: The character entries extracted from the page. + """ + clip = pymupdf.Rect(line["bbox"]) + raw = page.get_text("rawdict", clip=clip) + target_text = normalize("NFKC", str(line.get("text") or "")).strip() + + best_chars: list[dict[str, Any]] = [] + best_score: tuple[float, float, float] | None = None + + for block in raw.get("blocks") or []: + if block.get("type", 0) != 0: + continue + for raw_line in block.get("lines") or []: + chars = _normalize_line_chars(raw_line.get("spans") or []) + if not chars: + continue + + candidate_rect = pymupdf.Rect(raw_line["bbox"]) + candidate_text = "".join(entry["char"] for entry in chars).strip() + overlap = ( + _rect_vertical_overlap(candidate_rect, clip) + if candidate_rect.intersects(clip) + else 0.0 + ) + text_score = 0.0 + if target_text or candidate_text: + text_score = ( + 0.0 + if target_text == candidate_text + else cer(target_text, candidate_text) + ) + bbox_score = ( + abs(candidate_rect.x0 - clip.x0) + + abs(candidate_rect.y0 - clip.y0) + + abs(candidate_rect.x1 - clip.x1) + + abs(candidate_rect.y1 - clip.y1) + ) / 100.0 + score = (1.0 - overlap, text_score, bbox_score) + if best_score is None or score < best_score: + best_score = score + best_chars = chars + + return best_chars + + +def _line_chars_text(chars: list[dict[str, Any]]) -> str: + """ + Builds the searchable text for a character entry list. + + Args: + chars (list[dict[str, Any]]): The character entry list being processed. + + Returns: + str: The concatenated character text. + """ + return "".join(str(entry.get("char") or "") for entry in chars) + + +def _find_line_char_span( + chars: list[dict[str, Any]], + text: str, + *, + start: int = 0, + raw_text: str | None = None, +) -> tuple[int, int] | None: + """ + Finds the character span for a text fragment inside a line. + + Args: + chars (list[dict[str, Any]]): The character entry list being processed. + text (str): The text value being normalized or searched. + start (int, optional): The preferred start offset for the search. Defaults to 0. + raw_text (str | None, optional): The raw line text used as a fallback search surface. Defaults to None. + + Returns: + tuple[int, int] | None: The start and end character offsets, if found. + """ + if not chars or not text: + return None + + haystack = raw_text if raw_text is not None else _line_chars_text(chars) + pattern = _build_flexible_pattern(text) + + def _search(offset: int) -> tuple[int, int] | None: + """ + Searches for the candidate span from the provided offset. + + Args: + offset (int): The search offset used by the nested helper. + + Returns: + tuple[int, int] | None: The matching span for the current offset, if found. + """ + exact_idx = haystack.find(text, offset) + flexible_span = None + if pattern: + match = re.search(pattern, haystack[offset:]) + if match is not None: + flexible_span = (offset + match.start(), offset + match.end()) + + if exact_idx < 0: + return flexible_span + exact_span = (exact_idx, exact_idx + len(text)) + if flexible_span is None: + return exact_span + return min(exact_span, flexible_span, key=lambda span: span[0]) + + span = _search(start) + if span is None and start > 0: + span = _search(0) + return span + + +def _rect_from_char_slice( + chars: list[dict[str, Any]], + start: int, + end: int, +) -> pymupdf.Rect | None: + """ + Builds a rectangle covering the requested character slice. + + Args: + chars (list[dict[str, Any]]): The character entry list being processed. + start (int): The preferred start offset for the search. + end (int): The `end` value used by this helper. + + Returns: + pymupdf.Rect | None: The rectangle covering the requested character slice. + """ + if not chars: + return None + + slice_start = max(int(start), 0) + slice_end = min(int(end), len(chars)) + if slice_end <= slice_start: + return None + + segment = chars[slice_start:slice_end] + if not segment: + return None + + boxes = [entry["bbox"] for entry in segment if str(entry["char"]).strip()] + if not boxes: + boxes = [entry["bbox"] for entry in segment] + if not boxes: + return None + + rect = pymupdf.Rect(boxes[0]) + for bbox in boxes[1:]: + rect.include_rect(bbox) + return rect diff --git a/aymurai/text/anonymization/pdf/ops.py b/aymurai/text/anonymization/pdf/ops.py new file mode 100644 index 00000000..bdad1d0a --- /dev/null +++ b/aymurai/text/anonymization/pdf/ops.py @@ -0,0 +1,828 @@ +from __future__ import annotations + +from typing import Any + +import pymupdf + +from aymurai.logger import get_logger +from aymurai.text.anonymization.alignment import ( + _label_replacement_start as _label_start, +) +from aymurai.text.anonymization.alignment import ( + _label_replacement_text as _label_surface_text, +) +from aymurai.text.anonymization.pdf.common import ( + PDF_TAG_RECT_GAP_MAX, + PDF_TAG_RECT_INSET, + PDF_TAG_RECT_X_PADDING, + PDF_TAG_RECT_Y_PADDING, + _base14_fontname_for_style, + _default_style, + _entity_style_from_spans, + _find_flexible, + _fit_display_token, + _get_base14_font, + _group_adjacent_rects, + _rect_vertical_overlap, +) +from aymurai.text.anonymization.pdf.layout import ( + _find_line_char_span, + _line_chars_from_page, + _line_chars_text, + _pick_rect_group_for_segment, + _rect_from_char_slice, + _resolve_token, +) +from aymurai.text.anonymization.pdf.widgets import ( + _apply_widget_ops, + _entity_overlaps_widget, + _page_widget_infos, + _prepare_signature_widget_ops, +) + +logger = get_logger(__name__) + +_IMAGE_OVERLAP_THRESHOLD = 0.3 + + +def _padded_rect(rect: pymupdf.Rect, clip: pymupdf.Rect) -> pymupdf.Rect: + """ + Pads a rectangle within the provided clipping bounds. + + Args: + rect (pymupdf.Rect): The rectangle used by the helper. + clip (pymupdf.Rect): The clipping rectangle to constrain the operation. + + Returns: + pymupdf.Rect: The padded rectangle clipped to the provided bounds. + """ + padded = pymupdf.Rect(rect) + padded.x0 = max(clip.x0, padded.x0 - PDF_TAG_RECT_X_PADDING) + padded.y0 = max(clip.y0, padded.y0 - PDF_TAG_RECT_Y_PADDING) + padded.x1 = min(clip.x1, padded.x1 + PDF_TAG_RECT_X_PADDING) + padded.y1 = min(clip.y1, padded.y1 + PDF_TAG_RECT_Y_PADDING) + return padded + + +def _render_rect(rect: pymupdf.Rect) -> pymupdf.Rect: + """ + Builds the token rendering rectangle from the padded canvas rectangle. + + Args: + rect (pymupdf.Rect): The rectangle used by the helper. + + Returns: + pymupdf.Rect: The rectangle used to render the replacement token. + """ + render_rect = pymupdf.Rect(rect) + inset = min(PDF_TAG_RECT_INSET, max(render_rect.height * 0.1, 0.0)) + render_rect.x0 += inset + render_rect.x1 -= inset + if render_rect.x1 <= render_rect.x0: + render_rect = pymupdf.Rect(rect) + return render_rect + + +def _text_redact_rect(rect: pymupdf.Rect) -> pymupdf.Rect: + """ + Builds the redaction rectangle used to remove original text. + + Args: + rect (pymupdf.Rect): The rectangle used by the helper. + + Returns: + pymupdf.Rect: The rectangle used for text redaction. + """ + redact_rect = pymupdf.Rect(rect) + edge_inset = min(0.25, max(redact_rect.width * 0.01, 0.05)) + if redact_rect.width > (2 * edge_inset): + redact_rect.x0 += edge_inset + redact_rect.x1 -= edge_inset + return redact_rect + + +def _build_page_op( + rect: pymupdf.Rect, + line: dict | None, + token: str, + is_image: bool = False, + entity_style: dict[str, Any] | None = None, +) -> dict[str, Any]: + """ + Builds the rendering operation metadata for a matched page segment. + + Args: + rect (pymupdf.Rect): The rectangle used by the helper. + line (dict | None): The parsed line metadata being processed. + token (str): The logical replacement token being processed. + is_image (bool, optional): Whether the operation is intended for image-backed content. Defaults to False. + entity_style (dict[str, Any] | None, optional): The resolved style dictionary for the entity text. Defaults to None. + + Returns: + dict[str, Any]: The rendering operation metadata for the segment. + """ + line_clip = pymupdf.Rect(line["bbox"]) if line else pymupdf.Rect(rect) + canvas_rect = _padded_rect(rect, line_clip) + render_rect = _render_rect(canvas_rect) + style = entity_style or (line or {}).get("style") or _default_style() + base_font_size = float((line or {}).get("font_size") or style.get("size") or 10.0) + + # Always use Base-14 fonts: they carry correct bold/italic weight and + # contain all glyphs needed for tags (<, >, _, digits, letters). + # Subset font buffers extracted from the PDF lack many of these glyphs. + fontname = _base14_fontname_for_style(style) + font_obj = _get_base14_font(style) + + display_token, fitted_size = _fit_display_token( + token, + render_rect, + fontname, + base_font_size, + font_obj=font_obj, + ) + + if not display_token or fitted_size is None: + logger.warning( + "Could not fit PDF token '%s' inside rect=%s", + token, + tuple(round(value, 2) for value in canvas_rect), + ) + + return { + "redact_rect": _text_redact_rect(rect), + "background_rect": canvas_rect, + "canvas_rect": canvas_rect, + "render_rect": render_rect, + "line_rect": line_clip, + "text": display_token, + "logical_token": token, + "fontname": fontname, + "fontsize": fitted_size, + "text_align": pymupdf.TEXT_ALIGN_LEFT, + "text_color": style.get("color") or (0.0, 0.0, 0.0), + "style": style, + } + + +def _image_rects_for_clip( + page: pymupdf.Page, + clip: pymupdf.Rect, +) -> list[pymupdf.Rect]: + """ + Collects image rectangles that overlap the given page region. + + Args: + page (pymupdf.Page): The PDF page being processed. + clip (pymupdf.Rect): The clipping rectangle to constrain the operation. + + Returns: + list[pymupdf.Rect]: The image rectangles that overlap the clip region. + """ + rects: list[pymupdf.Rect] = [] + for img_info in page.get_image_info(): + bbox = img_info.get("bbox") + if bbox is None: + continue + img_rect = pymupdf.Rect(bbox) + if img_rect.intersects(clip) and img_rect.get_area() > 0: + rects.append(img_rect) + return rects + + +def _entity_overlaps_image( + page: pymupdf.Page, + entity_rect: pymupdf.Rect, + image_rects: list[pymupdf.Rect], +) -> pymupdf.Rect | None: + """ + Checks whether an entity rectangle overlaps a detected image. + + Args: + page (pymupdf.Page): The PDF page being processed. + entity_rect (pymupdf.Rect): The rectangle representing the entity on the page. + image_rects (list[pymupdf.Rect]): The image rectangles available for overlap checks. + + Returns: + pymupdf.Rect | None: The overlapping image rectangle, if one exists. + """ + for img_rect in image_rects: + overlap = _rect_vertical_overlap(entity_rect, img_rect) + if overlap >= _IMAGE_OVERLAP_THRESHOLD and entity_rect.intersects(img_rect): + return img_rect + return None + + +def _collect_page_redactions( + doc: pymupdf.Document, + paragraphs: list[dict], + render_context: dict[str, Any] | None, +) -> dict[int, list[dict]]: + """ + Collects text, widget, and signature redaction operations for a document. + + Args: + doc (pymupdf.Document): The PDF document being processed. + paragraphs (list[dict]): The paragraph collection being processed. + render_context (dict[str, Any] | None): The rendering context used to resolve replacement tokens. + + Returns: + tuple[dict[int, list[dict]], dict[int, list[dict]], dict[int, list[dict]]]: The page, text-widget, and signature-widget operations. + """ + page_ops: dict[int, list[dict]] = {} + widget_ops: dict[int, list[dict]] = {} + signature_widget_ops: dict[int, list[dict]] = {} + line_x_cursor: dict[tuple[int, int, int], float] = {} + line_char_cache: dict[tuple[int, int, int], list[dict[str, Any]]] = {} + line_char_text_cache: dict[tuple[int, int, int], str] = {} + line_char_cursor: dict[tuple[int, int, int], int] = {} + + # Pre-compute image rects and widgets per page + page_image_rects: dict[int, list[pymupdf.Rect]] = {} + page_widgets: dict[int, list[dict[str, Any]]] = {} + + for paragraph in paragraphs: + metadata = paragraph.get("metadata") or {} + lines = metadata.get("lines") or [] + if not lines: + continue + + page_index = int(metadata["page_index"]) + page = doc[page_index] + line_text = metadata.get("line_text") or "" + box_clip = pymupdf.Rect(metadata.get("box_bbox") or page.rect) + document = paragraph.get("document") or "" + labels = sorted(paragraph.get("labels") or [], key=_label_start) + search_cursor = 0 + + # Lazy-load image rects and widget infos for this page + if page_index not in page_image_rects: + page_image_rects[page_index] = _image_rects_for_clip(page, page.rect) + if page_index not in page_widgets: + page_widgets[page_index] = _page_widget_infos(page) + + for label in labels: + entity_text = _label_surface_text(label, document).strip() + if not entity_text: + continue + + token = _resolve_token(label, render_context) + + span = _find_flexible(line_text, entity_text, start=search_cursor) + if span is None: + span = _find_flexible(line_text, entity_text, start=0) + if span is None: + # -- Fallback: direct page search -- + fallback_rects = [ + rect + for rect in page.search_for(entity_text, clip=box_clip) + if rect.intersects(box_clip) + ] + + # Check if this is a widget-backed entity before falling back to images + if fallback_rects: + fallback_widget = _entity_overlaps_widget( + fallback_rects[0], + page_widgets[page_index], + ) + if fallback_widget is not None: + if ( + fallback_widget["field_type"] + == pymupdf.PDF_WIDGET_TYPE_TEXT + ): + widget_ops.setdefault(page_index, []).append( + { + "widget_xref": fallback_widget["xref"], + "field_name": fallback_widget["field_name"], + "widget_info": fallback_widget, + "entity_text": entity_text, + "logical_token": token, + } + ) + continue + if ( + fallback_widget["field_type"] + == pymupdf.PDF_WIDGET_TYPE_SIGNATURE + ): + op = _build_page_op( + fallback_rects[0], + lines[0] if lines else None, + token, + entity_style=fallback_widget.get("style") or None, + ) + op["widget_xref"] = fallback_widget["xref"] + op["widget_rect"] = fallback_widget["rect"] + signature_widget_ops.setdefault(page_index, []).append(op) + continue + + # Check if this is an image-based entity + if not fallback_rects: + img_match = _try_image_entity( + page, + entity_text, + box_clip, + page_image_rects[page_index], + ) + if img_match is not None: + op = _build_page_op( + img_match, + lines[0] if lines else None, + token, + is_image=True, + ) + op["image_rect"] = img_match + page_ops.setdefault(page_index, []).append(op) + continue + + if fallback_rects: + grouped_rects = _group_adjacent_rects( + fallback_rects, max_gap=PDF_TAG_RECT_GAP_MAX + ) + fallback_line = lines[0] if lines else None + + # Check if any of these rects overlap an image + for rect in grouped_rects: + img_rect = _entity_overlaps_image( + page, + rect, + page_image_rects[page_index], + ) + op = _build_page_op( + rect, + fallback_line, + token, + is_image=(img_rect is not None), + ) + if img_rect is not None: + op["image_rect"] = img_rect + page_ops.setdefault(page_index, []).append(op) + continue + + logger.warning( + "Could not map label '%s' on page=%s box=%s", + entity_text, + metadata.get("page_number"), + metadata.get("box_index"), + ) + continue + + search_cursor = span[1] + + # Collect line segments this entity spans + segments: list[ + tuple[ + dict, + str, + pymupdf.Rect, + pymupdf.Rect | None, + dict, + dict[str, Any] | None, + ] + ] = [] + for line in lines: + overlap_start = max(span[0], line["start"]) + overlap_end = min(span[1], line["end"]) + if overlap_end <= overlap_start: + continue + + segment_text = line_text[overlap_start:overlap_end].strip() + if not segment_text: + continue + + line_key = ( + line["page_index"], + line["box_index"], + line["line_index"], + ) + line_chars = line_char_cache.get(line_key) + if line_chars is None: + line_chars = _line_chars_from_page(page, line) + line_char_cache[line_key] = line_chars + + line_char_text = line_char_text_cache.get(line_key) + if line_char_text is None: + line_char_text = _line_chars_text(line_chars) + line_char_text_cache[line_key] = line_char_text + + raw_span = _find_line_char_span( + line_chars, + segment_text, + start=line_char_cursor.get(line_key, 0), + raw_text=line_char_text, + ) + rect = None + if raw_span is not None: + line_char_cursor[line_key] = raw_span[1] + rect = _rect_from_char_slice(line_chars, raw_span[0], raw_span[1]) + + if rect is None: + raw_start = ( + overlap_start - line["start"] + int(line.get("strip_offset", 0)) + ) + raw_end = ( + overlap_end - line["start"] + int(line.get("strip_offset", 0)) + ) + rect = _rect_from_char_slice(line_chars, raw_start, raw_end) + if rect is None: + rect = _pick_rect_group_for_segment( + page, + line, + segment_text, + line_x_cursor, + ) + + widget_info = _entity_overlaps_widget( + rect, + page_widgets[page_index], + ) + + # Check for image overlap + img_rect = _entity_overlaps_image( + page, + rect, + page_image_rects[page_index], + ) + + # Determine entity-specific style from the span that + # actually contains this text (not the line's dominant style) + offset_in_line = overlap_start - line["start"] + ent_style = _entity_style_from_spans(line, offset_in_line) + + segments.append( + (line, segment_text, rect, img_rect, ent_style, widget_info) + ) + + if not segments: + continue + + if len(segments) == 1: + # Single-line entity: route widget-backed content through the widget path. + line, _seg_text, rect, img_rect, ent_style, widget_info = segments[0] + if widget_info is not None: + if widget_info["field_type"] == pymupdf.PDF_WIDGET_TYPE_TEXT: + widget_ops.setdefault(page_index, []).append( + { + "widget_xref": widget_info["xref"], + "field_name": widget_info["field_name"], + "widget_info": widget_info, + "entity_text": entity_text, + "logical_token": token, + } + ) + continue + if widget_info["field_type"] == pymupdf.PDF_WIDGET_TYPE_SIGNATURE: + op = _build_page_op( + rect, + line, + token, + entity_style=ent_style, + ) + op["widget_xref"] = widget_info["xref"] + op["widget_rect"] = widget_info["rect"] + signature_widget_ops.setdefault(page_index, []).append(op) + continue + + op = _build_page_op( + rect, + line, + token, + is_image=(img_rect is not None), + entity_style=ent_style, + ) + if img_rect is not None: + op["image_rect"] = img_rect + page_ops.setdefault(page_index, []).append(op) + else: + # Multi-line entity: write the token on the widest segment only; blank the others. + widest_idx = max( + range(len(segments)), + key=lambda i: segments[i][2].width, + ) + any_image = any(seg[3] is not None for seg in segments) + shared_image_rect = next( + (seg[3] for seg in segments if seg[3] is not None), + None, + ) + + signature_widget = None + if all(seg[5] is not None for seg in segments): + widget_xrefs = {int(seg[5]["xref"]) for seg in segments} + widget_types = {int(seg[5]["field_type"]) for seg in segments} + if len(widget_xrefs) == 1 and widget_types == { + pymupdf.PDF_WIDGET_TYPE_SIGNATURE + }: + signature_widget = segments[0][5] + + for seg_idx, ( + seg_line, + _seg_text, + seg_rect, + seg_img, + seg_style, + seg_widget, + ) in enumerate(segments): + if seg_idx == widest_idx: + op = _build_page_op( + seg_rect, + seg_line, + token, + is_image=(any_image and signature_widget is None), + entity_style=seg_style, + ) + if signature_widget is None and shared_image_rect is not None: + op["image_rect"] = shared_image_rect + else: + op = _build_page_op( + seg_rect, + seg_line, + token, + is_image=( + (seg_img is not None) and signature_widget is None + ), + entity_style=seg_style, + ) + op["text"] = None + op["fontsize"] = None + if seg_img is not None and signature_widget is None: + op["image_rect"] = seg_img + + if signature_widget is not None: + op["widget_xref"] = signature_widget["xref"] + op["widget_rect"] = signature_widget["rect"] + signature_widget_ops.setdefault(page_index, []).append(op) + else: + page_ops.setdefault(page_index, []).append(op) + + return page_ops, widget_ops, signature_widget_ops + + +def _try_image_entity( + page: pymupdf.Page, + entity_text: str, + clip: pymupdf.Rect, + image_rects: list[pymupdf.Rect], +) -> pymupdf.Rect | None: + """ + Finds the best image rectangle for an entity when text search fails. + + Args: + page (pymupdf.Page): The PDF page being processed. + entity_text (str): The entity text being mapped. + clip (pymupdf.Rect): The clipping rectangle to constrain the operation. + image_rects (list[pymupdf.Rect]): The image rectangles available for overlap checks. + + Returns: + pymupdf.Rect | None: The best image rectangle for the entity, if found. + """ + if not image_rects: + return None + + # Try unclipped text search — the entity might be rendered as real text + # on top of (or near) an image. + text_hits = page.search_for(entity_text) + if text_hits: + for hit_rect in text_hits: + for img_rect in image_rects: + if hit_rect.intersects(img_rect): + return img_rect + + # Fallback: pick the image whose intersection with *clip* is largest + best: pymupdf.Rect | None = None + best_area = 0.0 + for img_rect in image_rects: + if not img_rect.intersects(clip) or img_rect.get_area() <= 0: + continue + intersection = img_rect & clip + area = intersection.get_area() + if area > best_area: + best_area = area + best = img_rect + + return best + + +def _render_text_op(page: pymupdf.Page, op: dict) -> None: + """ + Renders a single anonymization token back onto a page. + + Args: + page (pymupdf.Page): The PDF page being processed. + op (dict): The operation dictionary being processed. + """ + canvas = pymupdf.Rect(op.get("background_rect") or op["canvas_rect"]) + if not op.get("skip_background_fill"): + page.draw_rect( + canvas, + color=(1, 1, 1), + fill=(1, 1, 1), + width=0, + overlay=True, + ) + + if not op.get("text") or not op.get("fontsize"): + return + + render = op["render_rect"] + line_rect = pymupdf.Rect(op.get("line_rect") or render) + style = op.get("style") or {} + base14_name = _base14_fontname_for_style(style) + font_obj = _get_base14_font(style) + + fontsize = float(op["fontsize"]) + descender = float(style.get("descender") or -0.2) + baseline_y = line_rect.y1 + (descender * fontsize) + baseline_y = min( + max(baseline_y, line_rect.y0 + (fontsize * 0.65)), + line_rect.y1 - 0.1, + ) + + text_width = font_obj.text_length(op["text"], fontsize=fontsize) + x_start = render.x0 + max((render.width - text_width) / 2.0, 0.0) + + try: + page.insert_text( + (x_start, baseline_y), + op["text"], + fontname=base14_name, + fontsize=fontsize, + color=op["text_color"], + overlay=True, + ) + return + except Exception as exc: + logger.debug("insert_text failed for '%s': %s", op["text"], exc) + + try: + tw = pymupdf.TextWriter(page.rect, color=op["text_color"]) + tw.fill_textbox( + render, + op["text"], + font=font_obj, + fontsize=fontsize, + align=op.get("text_align", pymupdf.TEXT_ALIGN_CENTER), + ) + tw.write_text(page, overlay=True) + return + except Exception as exc: + logger.debug("TextWriter failed for '%s': %s", op["text"], exc) + + try: + page.insert_textbox( + render, + op["text"], + fontname=base14_name, + fontsize=fontsize, + color=op["text_color"], + align=op.get("text_align", pymupdf.TEXT_ALIGN_CENTER), + overlay=True, + ) + except Exception as exc: + logger.warning( + "All text insertion methods failed for '%s': %s", + op["text"], + exc, + ) + + +def _page_asset_rect(op: dict[str, Any]) -> pymupdf.Rect | None: + """ + Resolves the asset rectangle associated with a page operation. + + Args: + op (dict[str, Any]): The operation dictionary being processed. + + Returns: + pymupdf.Rect | None: The asset rectangle associated with the operation, if any. + """ + asset_rect = op.get("asset_rect") or op.get("image_rect") + if asset_rect is None: + return None + return pymupdf.Rect(asset_rect) + + +def _partition_page_ops( + page_ops: dict[int, list[dict]], +) -> tuple[dict[int, list[dict]], dict[int, list[dict]]]: + """ + Splits page operations into text-only and asset-backed groups. + + Args: + page_ops (dict[int, list[dict]]): The collected page operations grouped by page index. + + Returns: + tuple[dict[int, list[dict]], dict[int, list[dict]]]: The text-only and asset-backed operations. + """ + text_ops: dict[int, list[dict]] = {} + asset_ops: dict[int, list[dict]] = {} + + for page_idx, ops in page_ops.items(): + for op in ops: + if _page_asset_rect(op) is None: + text_ops.setdefault(page_idx, []).append(op) + else: + asset_ops.setdefault(page_idx, []).append(op) + + return text_ops, asset_ops + + +def _apply_text_redactions( + doc: pymupdf.Document, + text_page_ops: dict[int, list[dict]], +) -> None: + """ + Applies text-only redactions and re-renders replacement tokens. + + Args: + doc (pymupdf.Document): The PDF document being processed. + text_page_ops (dict[int, list[dict]]): The text-only page operations grouped by page index. + """ + for page_idx, ops in text_page_ops.items(): + if not ops: + continue + + page = doc[page_idx] + for op in ops: + page.add_redact_annot( + op["redact_rect"], + text=None, + fill=(1, 1, 1), + cross_out=False, + ) + + page.apply_redactions( + images=pymupdf.PDF_REDACT_IMAGE_NONE, + graphics=pymupdf.PDF_REDACT_LINE_ART_NONE, + text=pymupdf.PDF_REDACT_TEXT_REMOVE, + ) + + for op in ops: + _render_text_op(page, op) + + +def _apply_asset_redactions( + doc: pymupdf.Document, + asset_page_ops: dict[int, list[dict]], +) -> None: + """ + Applies asset-backed redactions and re-renders replacement tokens. + + Args: + doc (pymupdf.Document): The PDF document being processed. + asset_page_ops (dict[int, list[dict]]): The asset-backed page operations grouped by page index. + """ + for page_idx, ops in asset_page_ops.items(): + if not ops: + continue + + page = doc[page_idx] + graphics_mode = pymupdf.PDF_REDACT_LINE_ART_NONE + + for op in ops: + asset_rect = _page_asset_rect(op) + if asset_rect is None: + continue + + page.add_redact_annot( + asset_rect, + text=None, + fill=(1, 1, 1), + cross_out=False, + ) + graphics_mode = max( + graphics_mode, + int(op.get("graphics_mode") or pymupdf.PDF_REDACT_LINE_ART_NONE), + ) + + page.apply_redactions( + images=pymupdf.PDF_REDACT_IMAGE_REMOVE, + graphics=graphics_mode, + text=pymupdf.PDF_REDACT_TEXT_REMOVE, + ) + + for op in ops: + _render_text_op(page, op) + + +def _apply_redactions( + doc: pymupdf.Document, + page_ops: dict[int, list[dict]], + widget_ops: dict[int, list[dict]], + signature_widget_ops: dict[int, list[dict]], +) -> None: + """ + Applies all collected PDF redactions in the correct order. + + Args: + doc (pymupdf.Document): The PDF document being processed. + page_ops (dict[int, list[dict]]): The collected page operations grouped by page index. + widget_ops (dict[int, list[dict]]): The collected text widget operations grouped by page index. + signature_widget_ops (dict[int, list[dict]]): The collected signature widget operations grouped by page index. + """ + _apply_widget_ops(doc, widget_ops) + _prepare_signature_widget_ops(doc, signature_widget_ops) + + text_page_ops, asset_page_ops = _partition_page_ops(page_ops) + for page_idx, ops in signature_widget_ops.items(): + asset_page_ops.setdefault(page_idx, []).extend(ops) + + _apply_text_redactions(doc, text_page_ops) + _apply_asset_redactions(doc, asset_page_ops) diff --git a/aymurai/text/anonymization/pdf/sanitize.py b/aymurai/text/anonymization/pdf/sanitize.py new file mode 100644 index 00000000..408f32bf --- /dev/null +++ b/aymurai/text/anonymization/pdf/sanitize.py @@ -0,0 +1,294 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any + +import pymupdf + +from aymurai.logger import get_logger +from aymurai.settings import settings + +logger = get_logger(__name__) + + +def _pdf_metadata_mod_date() -> str: + """ + Builds the PDF metadata modification timestamp in UTC. + + Returns: + str: The PDF-formatted UTC modification timestamp. + """ + timestamp = datetime.now(timezone.utc) + return timestamp.strftime("D:%Y%m%d%H%M%S+00'00'") + + +def _append_cleanup_rect( + cleanup_rects: dict[int, list[pymupdf.Rect]], + page_idx: int, + rect: pymupdf.Rect | tuple[float, float, float, float] | None, +) -> None: + """ + Appends a cleanup rectangle for later document sanitization. + + Args: + cleanup_rects (dict[int, list[pymupdf.Rect]]): The cleanup rectangles grouped by page index. + page_idx (int): The page index associated with the operation. + rect (pymupdf.Rect | tuple[float, float, float, float] | None): The rectangle used by the helper. + """ + if rect is None: + return + + cleanup_rect = pymupdf.Rect(rect) + if cleanup_rect.get_area() <= 0: + return + cleanup_rects.setdefault(page_idx, []).append(cleanup_rect) + + +def _cleanup_rect_for_page_op(op: dict[str, Any]) -> pymupdf.Rect | None: + """ + Builds the cleanup rectangle for a standard page operation. + + Args: + op (dict[str, Any]): The operation dictionary being processed. + + Returns: + pymupdf.Rect | None: The cleanup rectangle for the page operation, if available. + """ + if op.get("image_rect") is not None: + cleanup_rect = pymupdf.Rect(op["image_rect"]) + redact_rect = op.get("redact_rect") + if redact_rect is not None: + cleanup_rect.include_rect(pymupdf.Rect(redact_rect)) + return cleanup_rect + + cleanup_source = ( + op.get("redact_rect") or op.get("background_rect") or op.get("canvas_rect") + ) + if cleanup_source is None: + return None + return pymupdf.Rect(cleanup_source) + + +def _cleanup_rect_for_widget_op(op: dict[str, Any]) -> pymupdf.Rect | None: + """ + Builds the cleanup rectangle for a text widget operation. + + Args: + op (dict[str, Any]): The operation dictionary being processed. + + Returns: + pymupdf.Rect | None: The cleanup rectangle for the widget operation, if available. + """ + widget_info = op.get("widget_info") or {} + widget_rect = widget_info.get("rect") + if widget_rect is None: + return None + return pymupdf.Rect(widget_rect) + + +def _cleanup_rect_for_signature_widget_op(op: dict[str, Any]) -> pymupdf.Rect | None: + """ + Builds the cleanup rectangle for a signature widget operation. + + Args: + op (dict[str, Any]): The operation dictionary being processed. + + Returns: + pymupdf.Rect | None: The cleanup rectangle for the signature widget operation, if available. + """ + widget_rect = op.get("widget_rect") + if widget_rect is not None: + return pymupdf.Rect(widget_rect) + + background_rect = op.get("background_rect") or op.get("canvas_rect") + if background_rect is None: + return None + return pymupdf.Rect(background_rect) + + +def _collect_link_cleanup_rects( + page_ops: dict[int, list[dict]], + widget_ops: dict[int, list[dict]], + signature_widget_ops: dict[int, list[dict]], +) -> dict[int, list[pymupdf.Rect]]: + """ + Collects cleanup rectangles used to prune overlapping links. + + Args: + page_ops (dict[int, list[dict]]): The collected page operations grouped by page index. + widget_ops (dict[int, list[dict]]): The collected text widget operations grouped by page index. + signature_widget_ops (dict[int, list[dict]]): The collected signature widget operations grouped by page index. + + Returns: + dict[int, list[pymupdf.Rect]]: The cleanup rectangles grouped by page index. + """ + cleanup_rects: dict[int, list[pymupdf.Rect]] = {} + + for page_idx, ops in page_ops.items(): + for op in ops: + _append_cleanup_rect(cleanup_rects, page_idx, _cleanup_rect_for_page_op(op)) + + for page_idx, ops in widget_ops.items(): + for op in ops: + _append_cleanup_rect( + cleanup_rects, + page_idx, + _cleanup_rect_for_widget_op(op), + ) + + for page_idx, ops in signature_widget_ops.items(): + for op in ops: + _append_cleanup_rect( + cleanup_rects, + page_idx, + _cleanup_rect_for_signature_widget_op(op), + ) + + return cleanup_rects + + +def _remove_overlapping_page_links( + doc: pymupdf.Document, + cleanup_rects: dict[int, list[pymupdf.Rect]], +) -> None: + """ + Deletes page links that overlap anonymized regions. + + Args: + doc (pymupdf.Document): The PDF document being processed. + cleanup_rects (dict[int, list[pymupdf.Rect]]): The cleanup rectangles grouped by page index. + """ + for page_idx, page_rects in cleanup_rects.items(): + if not page_rects: + continue + + page = doc[page_idx] + for link in list(page.get_links()): + link_rect = link.get("from") + if link_rect is None: + continue + link_rect = pymupdf.Rect(link_rect) + if not any(link_rect.intersects(rect) for rect in page_rects): + continue + try: + page.delete_link(link) + except Exception as exc: + logger.warning( + "Failed to delete PDF link on page=%s rect=%s: %s", + page_idx, + tuple(round(value, 2) for value in link_rect), + exc, + ) + + +def _remove_remaining_annotations(doc: pymupdf.Document) -> None: + """ + Deletes residual page annotations after sanitization. + + Args: + doc (pymupdf.Document): The PDF document being processed. + """ + for page_idx, page in enumerate(doc): + for annot in list(page.annots() or []): + try: + page.delete_annot(annot) + except Exception as exc: + logger.warning( + "Failed to delete residual PDF annotation on page=%s: %s", + page_idx, + exc, + ) + + +def _clear_standard_metadata(doc: pymupdf.Document) -> None: + """ + Clears the standard PDF metadata fields on a document. + + Args: + doc (pymupdf.Document): The PDF document being processed. + """ + doc.set_metadata( + { + "title": "", + "author": "", + "subject": "", + "keywords": "", + "creator": "", + "producer": "", + "creationDate": "", + "modDate": "", + "trapped": "", + } + ) + + +def _apply_aymurai_metadata(doc: pymupdf.Document) -> None: + """ + Applies the configured AymurAI tooling metadata fields to the PDF document. + + Args: + doc (pymupdf.Document): The PDF document being processed. + """ + metadata = dict(doc.metadata or {}) + metadata.update( + { + "title": metadata.get("title") or "", + "author": "", + "subject": metadata.get("subject") or "", + "keywords": metadata.get("keywords") or "", + "creator": settings.ANONYMIZATION_METADATA_CREATOR, + "producer": settings.ANONYMIZATION_METADATA_PRODUCER, + "creationDate": metadata.get("creationDate") or "", + "modDate": _pdf_metadata_mod_date(), + "trapped": metadata.get("trapped") or "", + } + ) + doc.set_metadata(metadata) + + +def _sanitize_document( + doc: pymupdf.Document, + cleanup_rects: dict[int, list[pymupdf.Rect]], +) -> None: + """ + Sanitizes document-level PDF metadata, attachments, and annotations. + + Args: + doc (pymupdf.Document): The PDF document being processed. + cleanup_rects (dict[int, list[pymupdf.Rect]]): The cleanup rectangles grouped by page index. + """ + _remove_overlapping_page_links(doc, cleanup_rects) + doc.scrub( + metadata=True, + xml_metadata=True, + javascript=True, + attached_files=True, + embedded_files=True, + thumbnails=True, + reset_responses=True, + hidden_text=True, + clean_pages=True, + remove_links=False, + reset_fields=False, + redactions=False, + ) + _remove_remaining_annotations(doc) + _clear_standard_metadata(doc) + _apply_aymurai_metadata(doc) + + get_xml_metadata = getattr(doc, "get_xml_metadata", None) + del_xml_metadata = getattr(doc, "del_xml_metadata", None) + if callable(get_xml_metadata) and callable(del_xml_metadata): + try: + xml_metadata = get_xml_metadata() + except Exception as exc: + logger.warning("Failed to read PDF XML metadata after scrub: %s", exc) + else: + if xml_metadata: + try: + del_xml_metadata() + except Exception as exc: + logger.warning( + "Failed to delete residual PDF XML metadata: %s", + exc, + ) diff --git a/aymurai/text/anonymization/pdf/watermark.py b/aymurai/text/anonymization/pdf/watermark.py new file mode 100644 index 00000000..c15d9aef --- /dev/null +++ b/aymurai/text/anonymization/pdf/watermark.py @@ -0,0 +1,522 @@ +from __future__ import annotations + +import os +from functools import lru_cache +from pathlib import Path +from typing import Any + +import pymupdf + +from aymurai.logger import get_logger +from aymurai.settings import settings + +logger = get_logger(__name__) + +WATERMARK_PREFIX_TEXT = "Documento anonimizado por " +WATERMARK_LINK_TEXT = "AymurAI" +WATERMARK_TEXT = f"{WATERMARK_PREFIX_TEXT}{WATERMARK_LINK_TEXT}" +WATERMARK_URL = "https://www.aymurai.info/" +WATERMARK_FONT_SIZE = 10.0 +WATERMARK_MARGIN_X = 24.0 +WATERMARK_BASELINE_MARGIN = 12.0 +WATERMARK_TOP_BASELINE = 22.0 +WATERMARK_RECT_PADDING_X = 4.0 +WATERMARK_RECT_PADDING_Y = 4.0 +WATERMARK_COLLISION_PADDING = 12.0 +WATERMARK_TEXT_COLOR = tuple(channel / 255 for channel in (192, 192, 192)) +WATERMARK_LINK_COLOR = tuple(channel / 255 for channel in (115, 190, 250)) + + +def _candidate_font_paths() -> tuple[list[Path], list[Path]]: + """ + Builds the ordered list of candidate font paths for the PDF watermark. + + Returns: + tuple[list[Path], list[Path]]: The regular and bold watermark font candidates. + """ + override_regular = ( + os.getenv("PDF_WATERMARK_FONT_REGULAR") or settings.PDF_WATERMARK_FONT_REGULAR + ) + override_bold = ( + os.getenv("PDF_WATERMARK_FONT_BOLD") or settings.PDF_WATERMARK_FONT_BOLD + ) + + regular_candidates: list[Path] = [] + bold_candidates: list[Path] = [] + + if override_regular: + regular_candidates.append(Path(override_regular).expanduser()) + if override_bold: + bold_candidates.append(Path(override_bold).expanduser()) + + resource_roots: list[Path] = [] + resources_base = Path(settings.RESOURCES_BASEPATH) + if resources_base.is_absolute(): + resource_roots.append(resources_base) + else: + resource_roots.append((Path("/workspace") / resources_base).resolve()) + resource_roots.append(resources_base) + + font_roots: list[Path] = [] + for root in resource_roots: + font_roots.extend([root / "fonts", root / "fonts" / "archivo"]) + + for root in font_roots: + regular_candidates.extend( + [ + root / "Archivo-Regular.ttf", + root / "Archivo-Regular.otf", + root / "Archivo[wdth,wght].ttf", + root / "Archivo-VariableFont_wdth,wght.ttf", + ] + ) + bold_candidates.extend( + [ + root / "Archivo-Bold.ttf", + root / "Archivo-Bold.otf", + root / "Archivo-BoldItalic.ttf", + root / "Archivo-VariableFont_wdth,wght.ttf", + root / "Archivo[wdth,wght].ttf", + ] + ) + + system_roots = [ + Path("/usr/share/fonts/truetype/archivo"), + Path("/usr/share/fonts/opentype/archivo"), + Path("/usr/local/share/fonts/archivo"), + Path.home() / ".local/share/fonts", + Path.home() / ".local/share/fonts/archivo", + ] + for root in system_roots: + regular_candidates.extend( + [ + root / "Archivo-Regular.ttf", + root / "Archivo-Regular.otf", + root / "Archivo[wdth,wght].ttf", + root / "Archivo-VariableFont_wdth,wght.ttf", + ] + ) + bold_candidates.extend( + [ + root / "Archivo-Bold.ttf", + root / "Archivo-Bold.otf", + root / "Archivo-BoldItalic.ttf", + root / "Archivo-VariableFont_wdth,wght.ttf", + root / "Archivo[wdth,wght].ttf", + ] + ) + + return regular_candidates, bold_candidates + + +def _first_existing_path(paths: list[Path]) -> str | None: + """ + Returns the first existing file path from the provided candidates. + + Args: + paths (list[Path]): The candidate paths to inspect. + + Returns: + str | None: The first existing file path, if one is found. + """ + seen: set[str] = set() + for path in paths: + expanded = path.expanduser() + resolved = str(expanded) + if resolved in seen: + continue + seen.add(resolved) + if expanded.exists() and expanded.is_file(): + return str(expanded) + return None + + +@lru_cache(maxsize=1) +def _watermark_font_paths() -> tuple[str | None, str | None]: + """ + Resolves the font paths used by the PDF watermark. + + Returns: + tuple[str | None, str | None]: The resolved regular and bold watermark font paths. + """ + regular_candidates, bold_candidates = _candidate_font_paths() + regular_path = _first_existing_path(regular_candidates) + bold_path = _first_existing_path(bold_candidates) + if regular_path is None and bold_path is not None: + regular_path = bold_path + if bold_path is None: + bold_path = regular_path + return regular_path, bold_path + + +@lru_cache(maxsize=1) +def _watermark_font_config() -> dict[str, Any]: + """ + Builds the font configuration used to render the PDF watermark. + + Returns: + dict[str, Any]: The watermark font configuration dictionary. + """ + regular_path, bold_path = _watermark_font_paths() + if regular_path: + try: + return { + "text_fontname": "archivo-watermark", + "text_fontfile": regular_path, + "text_font": pymupdf.Font(fontfile=regular_path), + "link_fontname": "archivo-watermark-bold", + "link_fontfile": bold_path or regular_path, + "link_font": pymupdf.Font(fontfile=bold_path or regular_path), + } + except Exception as exc: + logger.warning( + "Could not load Archivo font for PDF watermark, falling back to Base-14 fonts: %s", + exc, + ) + + return { + "text_fontname": "Helvetica", + "text_fontfile": None, + "text_font": pymupdf.Font("Helvetica"), + "link_fontname": "Helvetica-Bold", + "link_fontfile": None, + "link_font": pymupdf.Font("Helvetica-Bold"), + } + + +def _watermark_text_length( + text: str, + *, + font_obj: pymupdf.Font, + fontname: str, + fontsize: float, +) -> float: + """ + Measures the rendered width of watermark text. + + Args: + text (str): The text value being normalized or searched. + font_obj (pymupdf.Font): The font object used for measurement. + fontname (str): The font name to use for measurement or rendering. + fontsize (float): The font size used for measurement or rendering. + + Returns: + float: The rendered width of the watermark text. + """ + try: + return float(font_obj.text_length(text, fontsize=fontsize)) + except Exception: + return float( + pymupdf.get_text_length(text, fontname=fontname, fontsize=fontsize) + ) + + +def _insert_watermark_text( + page: pymupdf.Page, + point: tuple[float, float], + text: str, + *, + fontname: str, + fontsize: float, + color: tuple[float, float, float], + fontfile: str | None = None, +) -> None: + """ + Inserts watermark text onto a page using the resolved font settings. + + Args: + page (pymupdf.Page): The PDF page being processed. + point (tuple[float, float]): The insertion point on the page. + text (str): The text value being normalized or searched. + fontname (str): The font name to use for measurement or rendering. + fontsize (float): The font size used for measurement or rendering. + color (tuple[float, float, float]): The PDF RGB color used to render the text. + fontfile (str | None, optional): The optional font file path to embed for rendering. Defaults to None. + """ + kwargs: dict[str, Any] = { + "fontsize": fontsize, + "fontname": fontname, + "color": color, + "overlay": True, + } + if fontfile: + kwargs["fontfile"] = fontfile + page.insert_text(point, text, **kwargs) + + +def _expanded_rect(rect: pymupdf.Rect, padding: float) -> pymupdf.Rect: + """ + Expands a rectangle by a uniform padding in every direction. + + Args: + rect (pymupdf.Rect): The rectangle to expand. + padding (float): The amount of padding to apply on every side. + + Returns: + pymupdf.Rect: The expanded rectangle. + """ + return pymupdf.Rect( + rect.x0 - padding, + rect.y0 - padding, + rect.x1 + padding, + rect.y1 + padding, + ) + + +def _watermark_corner_order(page_index: int) -> list[str]: + """ + Builds the preferred watermark corner order for a page. + + Args: + page_index (int): The page index being processed. + + Returns: + list[str]: The ordered watermark corner candidates for the page. + """ + if page_index % 2 == 0: + return ["bottom-right", "bottom-left", "top-left", "top-right"] + return ["bottom-left", "top-left", "top-right", "bottom-right"] + + +def _watermark_layout_for_corner( + page: pymupdf.Page, + corner: str, + *, + prefix_width: float, + link_width: float, + total_width: float, +) -> dict[str, Any]: + """ + Builds the watermark geometry for a specific page corner. + + Args: + page (pymupdf.Page): The PDF page being processed. + corner (str): The corner identifier used to position the watermark. + prefix_width (float): The rendered width of the watermark prefix text. + link_width (float): The rendered width of the watermark link text. + total_width (float): The total rendered width of the watermark text. + + Returns: + dict[str, Any]: The watermark layout data for the corner. + """ + if corner.endswith("right"): + x_start = max( + WATERMARK_MARGIN_X, + page.rect.width - total_width - WATERMARK_MARGIN_X, + ) + else: + x_start = WATERMARK_MARGIN_X + + if corner.startswith("bottom"): + baseline_y = page.rect.height - WATERMARK_BASELINE_MARGIN + else: + baseline_y = WATERMARK_TOP_BASELINE + + link_x = x_start + prefix_width + text_top = baseline_y - WATERMARK_FONT_SIZE + banner_rect = pymupdf.Rect( + x_start - WATERMARK_RECT_PADDING_X, + text_top - WATERMARK_RECT_PADDING_Y, + x_start + total_width + WATERMARK_RECT_PADDING_X, + baseline_y + WATERMARK_RECT_PADDING_Y, + ) + link_rect = pymupdf.Rect( + link_x, + text_top, + link_x + link_width, + baseline_y + 2.0, + ) + + return { + "corner": corner, + "x_start": x_start, + "baseline_y": baseline_y, + "link_x": link_x, + "banner_rect": banner_rect, + "link_rect": link_rect, + } + + +def _occupied_page_rects(page: pymupdf.Page) -> list[pymupdf.Rect]: + """ + Collects page rectangles already occupied by visible content. + + Args: + page (pymupdf.Page): The PDF page being processed. + + Returns: + list[pymupdf.Rect]: The occupied rectangles found on the page. + """ + occupied: list[pymupdf.Rect] = [] + + text_data = page.get_text("dict") + for block in text_data.get("blocks", []): + bbox = block.get("bbox") + if bbox is None: + continue + rect = pymupdf.Rect(bbox) + if rect.get_area() <= 0: + continue + occupied.append(_expanded_rect(rect, WATERMARK_COLLISION_PADDING)) + + for drawing in page.get_drawings(): + rect = drawing.get("rect") + if rect is None: + continue + rect = pymupdf.Rect(rect) + if rect.get_area() <= 0: + continue + occupied.append(_expanded_rect(rect, WATERMARK_COLLISION_PADDING)) + + return occupied + + +def _watermark_overlap_score( + banner_rect: pymupdf.Rect, + occupied_rects: list[pymupdf.Rect], +) -> tuple[float, float, int]: + """ + Scores a watermark placement by the amount of page content it overlaps. + + Args: + banner_rect (pymupdf.Rect): The watermark banner rectangle being scored. + occupied_rects (list[pymupdf.Rect]): The occupied page rectangles used for overlap checks. + + Returns: + tuple[float, float, int]: The overlap ratio, overlap area, and overlap count for the placement. + """ + overlap_area = 0.0 + overlap_count = 0 + banner_area = max(banner_rect.get_area(), 1.0) + + for rect in occupied_rects: + if not banner_rect.intersects(rect): + continue + intersection = banner_rect & rect + area = intersection.get_area() + if area <= 0: + continue + overlap_area += area + overlap_count += 1 + + return overlap_area / banner_area, overlap_area, overlap_count + + +def _choose_watermark_layout( + page: pymupdf.Page, + page_index: int, + *, + prefix_width: float, + link_width: float, + total_width: float, +) -> dict[str, Any]: + """ + Selects the watermark placement with the least overlap on a page. + + Args: + page (pymupdf.Page): The PDF page being processed. + page_index (int): The page index being processed. + prefix_width (float): The rendered width of the watermark prefix text. + link_width (float): The rendered width of the watermark link text. + total_width (float): The total rendered width of the watermark text. + + Returns: + dict[str, Any]: The chosen watermark layout data. + """ + occupied_rects = _occupied_page_rects(page) + candidate_layouts = [ + _watermark_layout_for_corner( + page, + corner, + prefix_width=prefix_width, + link_width=link_width, + total_width=total_width, + ) + for corner in _watermark_corner_order(page_index) + ] + + best_layout = candidate_layouts[0] + best_score: tuple[float, float, int] | None = None + + for layout in candidate_layouts: + score = _watermark_overlap_score(layout["banner_rect"], occupied_rects) + if score[0] == 0.0 and score[1] == 0.0: + return layout + if best_score is None or score < best_score: + best_layout = layout + best_score = score + + return best_layout + + +def add_pdf_footer_watermark(doc: pymupdf.Document) -> None: + """ + Adds the anonymization watermark to the least crowded corner of each PDF page. + + Args: + doc (pymupdf.Document): The PDF document being processed. + """ + font_config = _watermark_font_config() + prefix_width = _watermark_text_length( + WATERMARK_PREFIX_TEXT, + font_obj=font_config["text_font"], + fontname=font_config["text_fontname"], + fontsize=WATERMARK_FONT_SIZE, + ) + link_width = _watermark_text_length( + WATERMARK_LINK_TEXT, + font_obj=font_config["link_font"], + fontname=font_config["link_fontname"], + fontsize=WATERMARK_FONT_SIZE, + ) + total_width = prefix_width + link_width + + for page_index, page in enumerate(doc): + layout = _choose_watermark_layout( + page, + page_index, + prefix_width=prefix_width, + link_width=link_width, + total_width=total_width, + ) + baseline_y = layout["baseline_y"] + x_start = layout["x_start"] + link_x = layout["link_x"] + + _insert_watermark_text( + page, + (x_start, baseline_y), + WATERMARK_PREFIX_TEXT, + fontname=font_config["text_fontname"], + fontsize=WATERMARK_FONT_SIZE, + color=WATERMARK_TEXT_COLOR, + fontfile=font_config["text_fontfile"], + ) + _insert_watermark_text( + page, + (link_x, baseline_y), + WATERMARK_LINK_TEXT, + fontname=font_config["link_fontname"], + fontsize=WATERMARK_FONT_SIZE, + color=WATERMARK_LINK_COLOR, + fontfile=font_config["link_fontfile"], + ) + + if layout["corner"].startswith("bottom"): + underline_y = min(page.rect.height - 1.0, baseline_y + 1.0) + else: + underline_y = baseline_y + 1.0 + page.draw_line( + (link_x, underline_y), + (link_x + link_width, underline_y), + color=WATERMARK_LINK_COLOR, + width=0.8, + overlay=True, + ) + page.insert_link( + { + "kind": pymupdf.LINK_URI, + "from": layout["link_rect"], + "uri": WATERMARK_URL, + } + ) diff --git a/aymurai/text/anonymization/pdf/widgets.py b/aymurai/text/anonymization/pdf/widgets.py new file mode 100644 index 00000000..3ea97d7e --- /dev/null +++ b/aymurai/text/anonymization/pdf/widgets.py @@ -0,0 +1,323 @@ +from __future__ import annotations + +from typing import Any + +import pymupdf + +from aymurai.logger import get_logger +from aymurai.text.anonymization.pdf.common import ( + _build_display_token_candidates, + _default_style, + _find_flexible, + _get_base14_font, +) + +logger = get_logger(__name__) + + +def _signature_background_rect( + op: dict[str, Any], + widget_rect: pymupdf.Rect, +) -> pymupdf.Rect: + """ + Builds the background rectangle used for a signature replacement. + + Args: + op (dict[str, Any]): The operation dictionary being processed. + widget_rect (pymupdf.Rect): The rectangle occupied by the widget. + + Returns: + pymupdf.Rect: The background rectangle for the signature replacement. + """ + background = pymupdf.Rect( + op.get("line_rect") or op.get("canvas_rect") or widget_rect + ) + canvas_rect = op.get("canvas_rect") + if canvas_rect is not None: + background.include_rect(pymupdf.Rect(canvas_rect)) + + pad_x = max(background.height * 0.75, 2.0) + pad_y = max(background.height * 0.25, 0.75) + widget_clip = pymupdf.Rect(widget_rect) + + background.x0 = max(widget_clip.x0, background.x0 - pad_x) + background.y0 = max(widget_clip.y0, background.y0 - pad_y) + background.x1 = min(widget_clip.x1, background.x1 + pad_x) + background.y1 = min(widget_clip.y1, background.y1 + pad_y) + return background + + +def _widget_text_color(widget: pymupdf.Widget) -> tuple[float, float, float]: + """ + Extracts the text color configured on a PDF widget. + + Args: + widget (pymupdf.Widget): The widget being processed. + + Returns: + tuple[float, float, float]: The widget text color in PDF RGB components. + """ + values = list(widget.text_color or []) + if not values: + return (0.0, 0.0, 0.0) + if len(values) == 1: + shade = float(values[0]) + return (shade, shade, shade) + if len(values) >= 3: + return tuple(float(value) for value in values[:3]) + return (0.0, 0.0, 0.0) + + +def _style_from_widget(widget: pymupdf.Widget) -> dict[str, Any]: + """ + Builds a text style dictionary from a widget definition. + + Args: + widget (pymupdf.Widget): The widget being processed. + + Returns: + dict[str, Any]: The style dictionary derived from the widget. + """ + return { + "font": str(widget.text_font or ""), + "flags": 0, + "color": _widget_text_color(widget), + "size": float(widget.text_fontsize or 10.0), + "ascender": 0.8, + "descender": -0.2, + } + + +def _page_widget_infos(page: pymupdf.Page) -> list[dict[str, Any]]: + """ + Collects text and signature widget metadata for a page. + + Args: + page (pymupdf.Page): The PDF page being processed. + + Returns: + list[dict[str, Any]]: The widget metadata collected for the page. + """ + infos: list[dict[str, Any]] = [] + for widget in page.widgets() or []: + if widget.field_type not in ( + pymupdf.PDF_WIDGET_TYPE_TEXT, + pymupdf.PDF_WIDGET_TYPE_SIGNATURE, + ): + continue + infos.append( + { + "xref": int(widget.xref), + "field_type": int(widget.field_type), + "field_name": str(widget.field_name or ""), + "field_value": str(widget.field_value or ""), + "rect": pymupdf.Rect(widget.rect), + "style": _style_from_widget(widget), + } + ) + return infos + + +def _entity_overlaps_widget( + entity_rect: pymupdf.Rect, + widget_infos: list[dict[str, Any]], +) -> dict[str, Any] | None: + """ + Finds the widget that most overlaps the given entity rectangle. + + Args: + entity_rect (pymupdf.Rect): The rectangle representing the entity on the page. + widget_infos (list[dict[str, Any]]): The widget metadata available for overlap checks. + + Returns: + dict[str, Any] | None: The best overlapping widget info, if one exists. + """ + best_widget: dict[str, Any] | None = None + best_area = 0.0 + for widget_info in widget_infos: + widget_rect = widget_info["rect"] + if not entity_rect.intersects(widget_rect): + continue + area = (entity_rect & widget_rect).get_area() + if area > best_area: + best_area = area + best_widget = widget_info + return best_widget + + +def _fit_widget_token( + widget_info: dict[str, Any], + current_text: str, + entity_span: tuple[int, int], + token: str, +) -> str: + """ + Finds a token variant that fits inside a widget value. + + Args: + widget_info (dict[str, Any]): The widget metadata being processed. + current_text (str): The current widget text value. + entity_span (tuple[int, int]): The span of the entity inside the widget text. + token (str): The logical replacement token being processed. + + Returns: + str: The token variant that fits in the widget value. + """ + style = widget_info.get("style") or _default_style() + rect = pymupdf.Rect(widget_info["rect"]) + font_obj = _get_base14_font(style) + max_width = max(rect.width - 1.0, 1.0) + + prefix = current_text[: entity_span[0]] + suffix = current_text[entity_span[1] :] + + for candidate in _build_display_token_candidates(token): + candidate_text = f"{prefix}{candidate}{suffix}" + if ( + font_obj.text_length( + candidate_text, fontsize=float(style.get("size") or 10.0) + ) + <= max_width + 0.1 + ): + return candidate + + candidates = _build_display_token_candidates(token) + return candidates[0] if candidates else f"<{token}>" + + +def _apply_widget_ops( + doc: pymupdf.Document, + widget_ops: dict[int, list[dict]], +) -> None: + """ + Applies collected replacements to editable text widgets. + + Args: + doc (pymupdf.Document): The PDF document being processed. + widget_ops (dict[int, list[dict]]): The collected text widget operations grouped by page index. + """ + for page_idx, ops in widget_ops.items(): + if not ops: + continue + + page = doc[page_idx] + widgets = { + int(widget.xref): widget + for widget in (page.widgets() or []) + if widget.field_type == pymupdf.PDF_WIDGET_TYPE_TEXT + } + grouped: dict[int, list[dict]] = {} + for op in ops: + grouped.setdefault(int(op["widget_xref"]), []).append(op) + + for widget_xref, replacements in grouped.items(): + widget = widgets.get(widget_xref) + if widget is None: + logger.warning( + "Could not resolve PDF widget xref=%s on page=%s", + widget_xref, + page_idx, + ) + continue + + current_text = str(widget.field_value or "") + if not current_text: + continue + + search_cursor = 0 + changed = False + for replacement in replacements: + entity_text = replacement["entity_text"] + span = _find_flexible(current_text, entity_text, start=search_cursor) + if span is None: + span = _find_flexible(current_text, entity_text, start=0) + if span is None: + logger.warning( + "Could not map widget label '%s' in widget '%s' on page=%s", + entity_text, + replacement.get("field_name") or widget.field_name, + page_idx, + ) + continue + + token_text = _fit_widget_token( + replacement["widget_info"], + current_text, + span, + replacement["logical_token"], + ) + current_text = ( + f"{current_text[: span[0]]}{token_text}{current_text[span[1] :]}" + ) + search_cursor = span[0] + len(token_text) + changed = True + + if not changed: + continue + + try: + widget.field_value = current_text + widget.update() + except Exception as exc: + logger.warning( + "Failed to update PDF widget '%s' on page=%s: %s", + widget.field_name, + page_idx, + exc, + ) + + +def _prepare_signature_widget_ops( + doc: pymupdf.Document, + signature_widget_ops: dict[int, list[dict]], +) -> None: + """ + Deletes signature widgets and prepares their replacement operations. + + Args: + doc (pymupdf.Document): The PDF document being processed. + signature_widget_ops (dict[int, list[dict]]): The collected signature widget operations grouped by page index. + """ + for page_idx, ops in signature_widget_ops.items(): + if not ops: + continue + + page = doc[page_idx] + widgets = { + int(widget.xref): widget + for widget in (page.widgets() or []) + if widget.field_type == pymupdf.PDF_WIDGET_TYPE_SIGNATURE + } + grouped: dict[int, list[dict]] = {} + for op in ops: + grouped.setdefault(int(op["widget_xref"]), []).append(op) + + for widget_xref, widget_group_ops in grouped.items(): + widget = widgets.get(widget_xref) + widget_rect = pymupdf.Rect( + widget_group_ops[0].get("widget_rect") or (0, 0, 0, 0) + ) + + if widget is not None: + widget_rect = pymupdf.Rect(widget.rect) + try: + page.delete_widget(widget) + except Exception as exc: + logger.warning( + "Failed to delete signature widget xref=%s on page=%s: %s", + widget_xref, + page_idx, + exc, + ) + else: + logger.warning( + "Could not resolve PDF signature widget xref=%s on page=%s", + widget_xref, + page_idx, + ) + + for op in widget_group_ops: + op["widget_rect"] = pymupdf.Rect(widget_rect) + op["asset_rect"] = pymupdf.Rect(widget_rect) + op["graphics_mode"] = pymupdf.PDF_REDACT_LINE_ART_REMOVE_IF_COVERED + op["background_rect"] = _signature_background_rect(op, widget_rect) diff --git a/aymurai/text/extractors/pdf.py b/aymurai/text/extractors/pdf.py index 0e83c30d..c672dfe7 100644 --- a/aymurai/text/extractors/pdf.py +++ b/aymurai/text/extractors/pdf.py @@ -9,27 +9,11 @@ class PdfExtractor(BaseExtractor): extension = "pdf" - def extract(self, path: Path, y_tolerance: float | None = None, **_: Any) -> str: - """ - Extract normalized text from a PDF document. - - Args: - path (Path): Input document path. - y_tolerance (float | None, optional): Maximum vertical gap used to - merge nearby text blocks. If None, it is estimated from the - document. Defaults to None. - **_ (Any): Ignored extra keyword arguments for backward compatibility. - - Returns: - str: Cleaned textual content. - - Raises: - InvalidFile: If the file is unreadable or extraction fails. - """ + def extract(self, path: Path, **_: Any) -> str: file_path = self.ensure_file(path) try: - return pdf_to_text(file_path, y_tolerance=y_tolerance) + return pdf_to_text(file_path) except (OSError, ValueError) as exc: raise InvalidFile(str(exc)) from exc except Exception as exc: diff --git a/aymurai/text/extractors/utils.py b/aymurai/text/extractors/utils.py index 009b562d..8db4c661 100644 --- a/aymurai/text/extractors/utils.py +++ b/aymurai/text/extractors/utils.py @@ -1,12 +1,13 @@ -import statistics +import re import unicodedata import xml.etree.ElementTree as ET import zipfile from pathlib import Path -from typing import Any +from typing import AbstractSet, Any -import numpy as np import pymupdf +import pymupdf.layout # noqa: F401 # activates layout support +import pymupdf4llm import xmltodict from lxml import etree from more_itertools import flatten @@ -18,6 +19,7 @@ ODT_NS = {"text": "urn:oasis:names:tc:opendocument:xmlns:text:1.0"} +PDF_SKIP_BOX_CLASSES = frozenset({"picture", "formula", "table"}) def normalize_text(text: str) -> str: @@ -33,109 +35,83 @@ def normalize_text(text: str) -> str: return unicodedata.normalize("NFKC", text) -def _compute_median_margin_between_blocks(pdf_path: str) -> float: +def _clean_pdf_box_text(text: str, box_class: str) -> str: """ - Computes the median vertical margin between text blocks in a PDF. + Clean box-level PDF text while preserving the original layout content. + Args: - pdf_path (str): Path to the PDF file. + text (str): Raw text sliced from a page box. + box_class (str): Box class emitted by ``pymupdf4llm``. + Returns: - float: Median margin between text blocks (in points). + str: Cleaned, normalized box text. """ - margins = [] - - with pymupdf.open(pdf_path) as doc: - for page in doc: - # Extract all text blocks from the page - blocks = page.get_text("blocks") - - # Sort blocks by their top y-coordinate (y0) - blocks_sorted = sorted(blocks, key=lambda b: b[1]) - - # Compute vertical margins between consecutive blocks - for i in range(1, len(blocks_sorted)): - previous_block = blocks_sorted[i - 1] - current_block = blocks_sorted[i] - - # Calculate the vertical margin - previous_y1 = previous_block[3] # Bottom of the previous block - current_y0 = current_block[1] # Top of the current block - margin = current_y0 - previous_y1 + text = normalize_text(text).strip() + if box_class == "footnote": + text = re.sub(r"(?m)^>\s?", "", text) + return text - if margin > 0: # Ignore overlapping blocks - margins.append(margin) - # Compute and return the median margin - if margins: - return statistics.median(margins) - else: - return 0.0 # Return 0 if no margins were found - - -def _extract_and_merge_paragraphs(pdf_path: str, y_tolerance: float = 5) -> list[str]: +def pdf_to_paragraphs( + file_path: Path | str, + *, + include_headers: bool = True, + include_footers: bool = True, + skip_box_classes: AbstractSet[str] = PDF_SKIP_BOX_CLASSES, +) -> list[str]: """ - Extracts and merges paragraphs from a PDF by grouping close text blocks. + Extract paragraph-like layout units from a PDF using PyMuPDF layout parsing. + Args: - pdf_path (str): Path to the PDF file. - y_tolerance (float, optional): Maximum vertical gap (in points) to consider blocks part of the same paragraph. - Defaults to 5. + file_path (Path | str): Path to the PDF document. + include_headers (bool): Whether to keep header boxes. Defaults to True. + include_footers (bool): Whether to keep footer boxes. Defaults to True. + skip_box_classes (AbstractSet[str]): Layout box classes to ignore. Defaults to PDF_SKIP_BOX_CLASSES. + Returns: - list[str]: A list of merged paragraphs as strings. + list[str]: Normalized paragraph strings extracted from the PDF. """ - paragraphs = [] - current_paragraph = [] - last_y1 = None - - with pymupdf.open(pdf_path) as doc: - for page in doc: - # Extract all text blocks from the page - blocks = page.get_text("blocks") - - # Sort blocks by their top y-coordinate (y0) - blocks_sorted = sorted(blocks, key=lambda b: b[1]) - - for block in blocks_sorted: - x0, y0, x1, y1, text, *_ = block - - if last_y1 is not None and (y0 - last_y1) > y_tolerance: - # If the gap between blocks is too large, start a new paragraph - if current_paragraph: - paragraphs.append(" ".join(current_paragraph)) - current_paragraph = [] - - current_paragraph.append(text) - last_y1 = y1 - - if current_paragraph: - paragraphs.append(" ".join(current_paragraph)) - current_paragraph = [] + logger.debug("Extracting layout paragraphs from PDF: %s", file_path) + + with pymupdf.open(str(file_path)) as doc: + chunks = pymupdf4llm.to_text( + doc, + filename=str(file_path), + page_chunks=True, + header=include_headers, + footer=include_footers, + show_progress=False, + force_text=True, + use_ocr=False, + force_ocr=False, + ) + + paragraphs: list[str] = [] + for chunk in chunks: + page_text = chunk.get("text") or "" + for box in chunk.get("page_boxes") or []: + if box.get("class") in skip_box_classes: + continue + + start, stop = box.get("pos", (0, 0)) + text = _clean_pdf_box_text(page_text[start:stop], box.get("class") or "") + if text: + paragraphs.append(text) return paragraphs -def pdf_to_text( - file_path: Path | str, - y_tolerance: float | None = None, -) -> str: +def pdf_to_text(file_path: Path | str) -> str: """ - Extract text from a PDF file and return normalized plain text. + Extract normalized plain text from a PDF using filtered layout boxes. Args: - file_path (Path): Path to the PDF document. - y_tolerance (float, optional): Maximum vertical gap (in points) to consider blocks part of the same paragraph. - If None, it will be computed as the median margin between blocks. Defaults to None. + file_path (Path | str): Path to the PDF document. Returns: str: Cleaned textual content extracted from the PDF. """ - logger.info("Extracting text from PDF: %s", file_path) - - if y_tolerance is None: - y_tolerance = _compute_median_margin_between_blocks(file_path) - - paragraphs = _extract_and_merge_paragraphs(file_path, np.ceil(y_tolerance)) - docu = "\n\n".join(paragraphs) - - return normalize_text(docu) + return "\n\n".join(pdf_to_paragraphs(file_path)) def load_xml_from_docx(path: Path, xmlfile: str = "word/footnotes.xml") -> Any | None: diff --git a/aymurai/text/normalize.py b/aymurai/text/normalize.py index 9027a0d8..4154533b 100644 --- a/aymurai/text/normalize.py +++ b/aymurai/text/normalize.py @@ -2,45 +2,72 @@ import unicodedata -def document_normalize(text: str) -> str: - """Normalize extracted text from documents - * join invalid newlines - * remove continous whitespaces +def _normalize_document_characters(text: str) -> str: + """ + Apply character-level normalization without changing document structure. Args: - text (str): document + text (str): Raw extracted document text. Returns: - str: normalized + str: Character-normalized text. """ - - # normalize character encodings - # text = unicodedata.normalize("NFKD", text) + text = text.replace("\r\n", "\n").replace("\r", "\n") text = unicodedata.normalize("NFKC", text) + text = re.sub(r"(“|”)", '"', text) + text = text.replace("\\/", "/") + text = re.sub(r"[ \t]{2,}", " ", text) + return text + - # remove continous whitespace - text = re.sub(r" {2,}", r" ", text) +def _normalize_paragraph_text(text: str) -> str: + """ + Normalize text inside a single paragraph while preserving paragraph borders. + + Args: + text (str): Paragraph text. + + Returns: + str: Normalized paragraph content. + """ + text = re.sub(r"[ \t]*\n[ \t]*", "\n", text.strip()) # delete newline if NEXT char is: # - lower character or a number - # - punctuanion + # - punctuation text = re.sub(r"\n([a-z0-9;:,\.])", r" \g<1>", text) # delete newline if PREVIOUS char is: # - quote mark - # - punctuanions (except '.' because possible ambiguity) + # - punctuations (except '.' because possible ambiguity) text = re.sub(r"([\w,\"-])\n", r"\g<1> ", text) # cleanup some junk - # - multiple newlines, hyphens - text = re.sub(r"\n{2,}", "\n", text) text = re.sub(r"[-]{2,}", "-", text) text = re.sub(r"\.-", ".", text) + text = re.sub(r" {2,}", " ", text) + return text.strip() - # quotation marks - text = re.sub(r"(“|”)", '"', text) - # scaped slashes - text = text.replace("\/", "/") +def document_normalize(text: str, *, preserve_paragraphs: bool = False) -> str: + """Normalize extracted text from documents. - return text + Args: + text (str): Document text. + preserve_paragraphs (bool): Preserve blank-line paragraph boundaries. Defaults to False. + + Returns: + str: Normalized document text. + """ + text = _normalize_document_characters(text) + + if preserve_paragraphs: + paragraphs = [ + _normalize_paragraph_text(paragraph) + for paragraph in re.split(r"\n\s*\n+", text) + if paragraph.strip() + ] + return "\n\n".join(paragraphs) + + text = _normalize_paragraph_text(text) + return re.sub(r"\n{2,}", "\n", text) diff --git a/docs/es/pipelines/anonymizer/README.md b/docs/es/pipelines/anonymizer/README.md index 2d241616..f7f71857 100644 --- a/docs/es/pipelines/anonymizer/README.md +++ b/docs/es/pipelines/anonymizer/README.md @@ -47,7 +47,7 @@ Fuente editable: [../../../pipelines/anonymizer/pipeline.excalidraw](../../../pi ### Módulos backend relevantes - Router: `aymurai/api/endpoints/routers/anonymizer/anonymizer.py` -- Render/anonymize: `aymurai/text/anonymization/doc_anonymizer.py` +- Render/anonymize: `aymurai/text/anonymization/docx.py` and `aymurai/text/anonymization/pdf.py` - Desambiguación canónica: `aymurai/utils/entity_disambiguation/` ## Persistencia (DB) diff --git a/docs/pipelines/anonymizer/README.md b/docs/pipelines/anonymizer/README.md index 11e864e7..67880ba0 100644 --- a/docs/pipelines/anonymizer/README.md +++ b/docs/pipelines/anonymizer/README.md @@ -47,7 +47,7 @@ Editable source: [pipeline.excalidraw](pipeline.excalidraw) ### Core backend modules - Router: `aymurai/api/endpoints/routers/anonymizer/anonymizer.py` -- Rendering: `aymurai/text/anonymization/doc_anonymizer.py` +- Rendering: `aymurai/text/anonymization/docx.py` and `aymurai/text/anonymization/pdf.py` - Canonical entity mapping: `aymurai/utils/entity_disambiguation/` ## Persistence (DB) diff --git a/notebooks/experiments/pdf-support/06-pymupdf-layout.ipynb b/notebooks/experiments/pdf-support/06-pymupdf-layout.ipynb new file mode 100644 index 00000000..803c8d22 --- /dev/null +++ b/notebooks/experiments/pdf-support/06-pymupdf-layout.ipynb @@ -0,0 +1,253 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "1098eca1", + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext rich\n", + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "markdown", + "id": "7e81fbe5", + "metadata": {}, + "source": [ + "# End-to-End PDF Anonymization (PyMuPDF Layout + AymurAI API)\n", + "This notebook builds layout-based paragraphs from the source PDF, runs `/anonymizer/predict` + `/anonymizer/disambiguate`, and compiles an anonymized PDF.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "258fbd18", + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "import time\n", + "from pathlib import Path\n", + "\n", + "import pymupdf\n", + "import requests\n", + "from tqdm.auto import tqdm" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fcfd985e", + "metadata": {}, + "outputs": [], + "source": [ + "# Change these values to test different documents/environments.\n", + "API_URL = \"http://localhost:8999\"\n", + "SOURCE_PDF = Path(\"./document.pdf\")\n", + "\n", + "OUTPUT_DIR = Path(\"./output\")\n", + "USE_CACHE = False\n", + "\n", + "# Optional: keep as None to rely on backend default policies.\n", + "LABEL_POLICIES = None\n", + "\n", + "# Keep aligned with current anonymizer defaults.\n", + "RENDER_POLICY = {\"suffix_mode\": \"auto\", \"suffix_threshold\": 1}\n", + "\n", + "SOURCE_PDF" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f3860b71", + "metadata": {}, + "outputs": [], + "source": [ + "def extract_document_via_api(pdf_path: Path) -> dict:\n", + " with pdf_path.open(\"rb\") as handle:\n", + " response = requests.post(\n", + " f\"{API_URL}/document-extract\",\n", + " files={\"file\": (pdf_path.name, handle, \"application/pdf\")},\n", + " timeout=600,\n", + " )\n", + "\n", + " response.raise_for_status()\n", + " return response.json()\n", + "\n", + "\n", + "def predict_paragraph(text: str, retries: int = 2) -> dict:\n", + " last_error = None\n", + " for attempt in range(retries + 1):\n", + " try:\n", + " response = requests.post(\n", + " f\"{API_URL}/anonymizer/predict\",\n", + " json={\"text\": text},\n", + " params={\"use_cache\": USE_CACHE},\n", + " timeout=600,\n", + " )\n", + " response.raise_for_status()\n", + " return response.json()\n", + " except Exception as exc:\n", + " last_error = exc\n", + " if attempt < retries:\n", + " time.sleep(2)\n", + " else:\n", + " raise last_error\n", + "\n", + " raise RuntimeError(\"Predict request exhausted retries\")\n", + "\n", + "\n", + "def disambiguate(predictions: list[dict]) -> dict:\n", + " payload = {\"paragraphs\": predictions}\n", + " if LABEL_POLICIES is not None:\n", + " payload[\"label_policies\"] = LABEL_POLICIES\n", + "\n", + " response = requests.post(\n", + " f\"{API_URL}/anonymizer/disambiguate\",\n", + " json=payload,\n", + " timeout=600,\n", + " )\n", + " response.raise_for_status()\n", + " return response.json()\n", + "\n", + "\n", + "def compile_pdf(pdf_path: Path, annotations: dict) -> Path:\n", + " payload = {\n", + " \"data\": annotations[\"data\"],\n", + " \"render_policy\": RENDER_POLICY,\n", + " }\n", + " if annotations.get(\"label_policies\") is not None:\n", + " payload[\"label_policies\"] = annotations[\"label_policies\"]\n", + "\n", + " with pdf_path.open(\"rb\") as handle:\n", + " response = requests.post(\n", + " f\"{API_URL}/anonymizer/anonymize-document\",\n", + " data={\"annotations\": json.dumps(payload, ensure_ascii=False)},\n", + " files={\"file\": (pdf_path.name, handle, \"application/pdf\")},\n", + " timeout=1200,\n", + " )\n", + "\n", + " response.raise_for_status()\n", + "\n", + " OUTPUT_DIR.mkdir(parents=True, exist_ok=True)\n", + " output_path = OUTPUT_DIR / f\"{pdf_path.stem}.anonymized.pdf\"\n", + " output_path.write_bytes(response.content)\n", + " return output_path" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b0a54485", + "metadata": {}, + "outputs": [], + "source": [ + "document_extract_payload = extract_document_via_api(SOURCE_PDF)\n", + "paragraphs = document_extract_payload[\"document\"]\n", + "\n", + "print(f\"Document ID: {document_extract_payload['document_id']}\")\n", + "print(f\"Paragraphs extracted: {len(paragraphs)}\")\n", + "\n", + "paragraphs[:5]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3beaadee", + "metadata": {}, + "outputs": [], + "source": [ + "predictions = [\n", + " predict_paragraph(paragraph)\n", + " for paragraph in tqdm(paragraphs, desc=\"Predicting paragraphs\")\n", + "]\n", + "total_labels = sum(len(pred.get(\"labels\") or []) for pred in predictions)\n", + "print(f\"Predictions: {len(predictions)} paragraphs, {total_labels} labels\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "682760e0", + "metadata": {}, + "outputs": [], + "source": [ + "disambiguated = disambiguate(predictions)\n", + "total_labels = sum(len(pred.get(\"labels\") or []) for pred in disambiguated[\"data\"])\n", + "print(f\"Disambiguated labels: {total_labels}\")\n", + "disambiguated.keys()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "eae3f2c9", + "metadata": {}, + "outputs": [], + "source": [ + "[data for data in disambiguated[\"data\"] if data[\"labels\"]]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "665dde4a", + "metadata": {}, + "outputs": [], + "source": [ + "output_pdf = compile_pdf(SOURCE_PDF, disambiguated)\n", + "print(output_pdf.resolve())\n", + "output_pdf" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "715a782a", + "metadata": {}, + "outputs": [], + "source": [ + "with pymupdf.open(str(output_pdf)) as doc:\n", + " watermark_hits = sum(\n", + " len(page.search_for(\"Documento anonimizado por AymurAI\")) for page in doc\n", + " )\n", + " print(f\"Pages: {doc.page_count}\")\n", + " print(f\"Watermark hits: {watermark_hits}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2a274809", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "aymurai", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.20" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/tests/api/routers/anonymizer/test_anonymizer.py b/tests/api/routers/anonymizer/test_anonymizer.py index 54a627e1..e003ad33 100644 --- a/tests/api/routers/anonymizer/test_anonymizer.py +++ b/tests/api/routers/anonymizer/test_anonymizer.py @@ -1,14 +1,304 @@ +import base64 import json +import re import subprocess -from unittest.mock import patch +import sys +from datetime import datetime, timedelta, timezone +from pathlib import Path +from unittest.mock import MagicMock, patch +import pymupdf import pytest +from docx import Document from aymurai.database.schema import AnonymizationParagraph from aymurai.database.utils import text_to_uuid +from aymurai.text.anonymization import DocxAnonymizer, PdfAnonymizer, get_anonymizer +from aymurai.text.anonymization.alignment import index_paragraphs from tests.api.conftest import build_label from tests.api.routers.conftest import build_mock_pipeline +PNG_1X1 = base64.b64decode( + "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMCAO+a6R8AAAAASUVORK5CYII=" +) +WATERMARK_URL = "https://www.aymurai.info/" + +WINDOWS_PYMUPDF_LAYOUT_XFAIL = pytest.mark.xfail( + sys.platform == "win32", + reason="pymupdf4llm ONNX layout model receives int32 tensors on Windows (expects int64)", + strict=False, +) + + +def _write_pdf(path: Path, configure) -> Path: + doc = pymupdf.open() + page = doc.new_page() + configure(doc, page) + doc.save(path) + doc.close() + return path + + +def _label_dict(text: str, label: str = "PER", **attrs) -> dict: + payload = build_label(label, text).model_dump(mode="json") + payload["attrs"].update(attrs) + return payload + + +def _run_pdf_anonymizer( + tmp_path: Path, + source_path: Path, + document: str, + labels: list[dict], +) -> Path: + output_dir = tmp_path / "out" + output_dir.mkdir(exist_ok=True) + output_path = PdfAnonymizer().anonymize( + {"path": str(source_path)}, + [{"document": document, "labels": labels}], + str(output_dir), + ) + return Path(output_path) + + +@pytest.mark.integration +def test_anonymization_package_exports_and_registry_are_stable(): + assert PdfAnonymizer.__name__ == "PdfAnonymizer" + assert DocxAnonymizer.__name__ == "DocxAnonymizer" + assert isinstance(get_anonymizer("pdf"), PdfAnonymizer) + assert isinstance(get_anonymizer("docx"), DocxAnonymizer) + + +@pytest.mark.integration +@WINDOWS_PYMUPDF_LAYOUT_XFAIL +def test_pdf_anonymizer_falls_back_from_invalid_alt_offsets(tmp_path): + document = "Ana Perez firmo el escrito" + source_path = _write_pdf( + tmp_path / "invalid-alt.pdf", + lambda _doc, page: page.insert_text((72, 72), document), + ) + labels = [ + _label_dict( + "Ana Perez", + aymurai_alt_start_char=999, + aymurai_alt_end_char=1000, + ) + ] + + output_path = _run_pdf_anonymizer(tmp_path, source_path, document, labels) + + with pymupdf.open(output_path) as output_doc: + page_text = output_doc[0].get_text() + + assert "Ana Perez" not in page_text + assert "" in page_text + + +@pytest.mark.integration +@WINDOWS_PYMUPDF_LAYOUT_XFAIL +def test_pdf_anonymizer_scrubs_pdf_payloads_and_preserves_safe_links(tmp_path): + document = "Ana Perez presento el escrito" + + def configure(doc: pymupdf.Document, page: pymupdf.Page) -> None: + page.insert_text((72, 72), document) + sensitive_rect = page.search_for("Ana Perez")[0] + page.insert_link( + { + "kind": pymupdf.LINK_URI, + "from": sensitive_rect, + "uri": "https://secret.example", + } + ) + safe_rect = pymupdf.Rect(72, 140, 180, 155) + page.insert_text((72, 150), "Portal publico") + page.insert_link( + { + "kind": pymupdf.LINK_URI, + "from": safe_rect, + "uri": "https://safe.example", + } + ) + page.add_file_annot((220, 72), b"attached secret", "attached.txt") + doc.set_metadata( + { + "title": "Secret title", + "author": "Secret author", + "subject": "Secret subject", + "keywords": "alpha,beta", + "creator": "Secret creator", + "producer": "Secret producer", + } + ) + doc.set_xml_metadata("top-secret") + doc.embfile_add("secret.txt", b"secret bytes", filename="secret.txt") + + source_path = _write_pdf(tmp_path / "metadata.pdf", configure) + labels = [_label_dict("Ana Perez")] + + output_path = _run_pdf_anonymizer(tmp_path, source_path, document, labels) + + with pymupdf.open(output_path) as output_doc: + page = output_doc[0] + link_uris = {link.get("uri") for link in page.get_links()} + + assert output_doc.metadata.get("title") == "" + assert output_doc.metadata.get("subject") == "" + assert output_doc.metadata.get("keywords") == "" + assert output_doc.metadata.get("creationDate") == "" + assert re.fullmatch( + r"D:\d{14}\+00'00'", + output_doc.metadata.get("modDate") or "", + ) + assert output_doc.metadata.get("trapped") == "" + assert output_doc.metadata.get("author") == "" + assert output_doc.metadata.get("creator") == "AymurAI" + assert output_doc.metadata.get("producer") == "AymurAI" + assert not output_doc.get_xml_metadata() + assert output_doc.embfile_names() == [] + assert list(page.annots() or []) == [] + assert "https://secret.example" not in link_uris + assert "https://safe.example" in link_uris + assert WATERMARK_URL in link_uris + + +@pytest.mark.integration +@WINDOWS_PYMUPDF_LAYOUT_XFAIL +def test_pdf_anonymizer_moves_watermark_away_from_footer_content(tmp_path): + document = "Ana Perez presento el escrito" + footer_rect = pymupdf.Rect(360, 760, 575, 815) + + def configure(_doc: pymupdf.Document, page: pymupdf.Page) -> None: + page.insert_text((72, 72), document) + page.draw_rect(footer_rect, color=(0, 0, 0), fill=(0, 0, 0), overlay=True) + + source_path = _write_pdf(tmp_path / "footer-watermark.pdf", configure) + output_path = _run_pdf_anonymizer( + tmp_path, + source_path, + document, + [_label_dict("Ana Perez")], + ) + + with pymupdf.open(output_path) as output_doc: + page = output_doc[0] + watermark_links = [ + link for link in page.get_links() if link.get("uri") == WATERMARK_URL + ] + + assert len(watermark_links) == 1 + watermark_rect = pymupdf.Rect(watermark_links[0]["from"]) + assert not watermark_rect.intersects(footer_rect) + assert watermark_rect.x1 < footer_rect.x0 + + +@pytest.mark.integration +@WINDOWS_PYMUPDF_LAYOUT_XFAIL +def test_pdf_anonymizer_removes_image_backed_entities(tmp_path): + source_path = _write_pdf( + tmp_path / "image.pdf", + lambda _doc, page: ( + page.insert_image(pymupdf.Rect(60, 60, 220, 110), stream=PNG_1X1), + page.insert_text((80, 90), "Ana Perez"), + ), + ) + + output_path = _run_pdf_anonymizer( + tmp_path, + source_path, + "Ana Perez", + [_label_dict("Ana Perez")], + ) + + with pymupdf.open(output_path) as output_doc: + page = output_doc[0] + page_text = page.get_text() + + assert page.get_image_info() == [] + assert "Ana Perez" not in page_text + assert "" in page_text + + +@pytest.mark.integration +@WINDOWS_PYMUPDF_LAYOUT_XFAIL +def test_pdf_anonymizer_removes_signature_widgets_without_restoring_appearance( + tmp_path, +): + def configure(_doc: pymupdf.Document, page: pymupdf.Page) -> None: + page.insert_text((80, 90), "Ana Perez") + widget = pymupdf.Widget() + widget.field_name = "sig_1" + widget.field_type = pymupdf.PDF_WIDGET_TYPE_SIGNATURE + widget.rect = pymupdf.Rect(60, 60, 220, 110) + page.add_widget(widget) + + source_path = _write_pdf(tmp_path / "signature.pdf", configure) + output_path = _run_pdf_anonymizer( + tmp_path, + source_path, + "Ana Perez", + [_label_dict("Ana Perez")], + ) + + with pymupdf.open(output_path) as output_doc: + page = output_doc[0] + page_text = page.get_text() + + assert list(page.widgets() or []) == [] + assert page.get_image_info() == [] + assert "Ana Perez" not in page_text + assert "" in page_text + + +def test_index_paragraphs_reads_docx_xml_as_utf8(tmp_path): + xml_path = tmp_path / "document.xml" + xml_path.write_bytes( + """ + + + Señora — resolución + + +""".encode("utf-8") + ) + + paragraphs = index_paragraphs(str(xml_path)) + + assert len(paragraphs) == 1 + assert paragraphs[0]["plain_text"] == "Señora — resolución" + + +@pytest.mark.integration +def test_docx_anonymizer_sets_aymurai_core_properties(tmp_path): + source_path = tmp_path / "source.docx" + document = Document() + document.add_paragraph("Ana Perez firmo el escrito") + document.core_properties.author = "Sensitive Author" + document.core_properties.last_modified_by = "Sensitive Modifier" + document.save(source_path) + + started_at = datetime.now(timezone.utc).replace(microsecond=0) + + output_path = DocxAnonymizer().anonymize( + {"path": str(source_path)}, + [ + { + "document": "Ana Perez firmo el escrito", + "labels": [_label_dict("Ana Perez")], + } + ], + str(tmp_path / "out"), + ) + + output_document = Document(output_path) + core_properties = output_document.core_properties + assert core_properties.author == "" + assert core_properties.last_modified_by == "AymurAI" + assert core_properties.modified is not None + modified = core_properties.modified + if modified.tzinfo is None: + modified = modified.replace(tzinfo=timezone.utc) + assert started_at <= modified <= datetime.now(timezone.utc) + timedelta(seconds=5) + @pytest.mark.integration @patch("aymurai.api.endpoints.routers.anonymizer.anonymizer.load_pipeline") @@ -230,8 +520,8 @@ def test_should_disambiguate_and_persist_paragraphs( ): mock_build_canonical_entities.return_value = [] mock_get_canonical_dates.return_value = [] - mock_map_canonical_entities.side_effect = ( - lambda predictions, canonical_entities: predictions + mock_map_canonical_entities.side_effect = lambda predictions, canonical_entities: ( + predictions ) text = "Ana Pérez denunció en el juzgado." @@ -293,10 +583,53 @@ def test_should_return_validation_when_paragraph_exists(client, db_session): @pytest.mark.integration +@patch("aymurai.api.endpoints.routers.anonymizer.anonymizer.get_anonymizer") +def test_should_return_application_pdf_when_pdf_document_is_anonymized( + mock_get_anonymizer, + client, + tmp_path, +): + anonymized_path = _write_pdf( + tmp_path / "output.pdf", + lambda _doc, page: page.insert_text((72, 72), "Anonymized PDF output"), + ) + mock_get_anonymizer.return_value = MagicMock(return_value=str(anonymized_path)) + + annotations = { + "data": [ + { + "document": "Ana Perez presento el escrito", + "labels": [build_label("PER", "Ana Perez").model_dump(mode="json")], + } + ], + "label_policies": {"PER": {"anonymize": True, "disambiguation": "none"}}, + "render_policy": {"suffix_mode": "auto", "suffix_threshold": 1}, + } + + response = client.post( + "/anonymizer/anonymize-document", + data={"annotations": json.dumps(annotations)}, + files={"file": ("sample.pdf", b"%PDF-1.4 fake", "application/pdf")}, + ) + + assert response.status_code == 200 + assert response.headers["content-type"] == "application/pdf" + assert len(response.content) > 0 + + @patch("aymurai.api.endpoints.routers.anonymizer.anonymizer.subprocess.check_output") +@patch("aymurai.api.endpoints.routers.anonymizer.anonymizer.get_anonymizer") def test_should_anonymize_document_when_annotations_are_valid( - mock_check_output, client + mock_get_anonymizer, mock_check_output, client, tmp_path ): + # Fake anonymizer that writes a dummy docx output + anonymized_path = str(tmp_path / "output.docx") + with open(anonymized_path, "wb") as f: + f.write(b"fake-docx-content") + + mock_anonymizer = MagicMock(return_value=anonymized_path) + mock_get_anonymizer.return_value = mock_anonymizer + def fake_convert(*args, **kwargs): cmd = args[0] source_path = cmd[-1] @@ -320,7 +653,13 @@ def fake_convert(*args, **kwargs): response = client.post( "/anonymizer/anonymize-document", data={"annotations": json.dumps(annotations)}, - files={"file": ("sample.txt", b"input-document", "text/plain")}, + files={ + "file": ( + "sample.docx", + b"input-document", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ) + }, ) assert response.status_code == 200 @@ -330,9 +669,73 @@ def fake_convert(*args, **kwargs): @pytest.mark.integration @patch("aymurai.api.endpoints.routers.anonymizer.anonymizer.subprocess.check_output") +@patch("aymurai.api.endpoints.routers.anonymizer.anonymizer.get_anonymizer") +def test_should_exclude_null_alt_attrs_from_anonymize_document_preds( + mock_get_anonymizer, mock_check_output, client, tmp_path +): + anonymized_path = str(tmp_path / "output.docx") + with open(anonymized_path, "wb") as f: + f.write(b"fake-docx-content") + + mock_anonymizer = MagicMock(return_value=anonymized_path) + mock_get_anonymizer.return_value = mock_anonymizer + + def fake_convert(*args, **kwargs): + cmd = args[0] + source_path = cmd[-1] + output_path = source_path.rsplit(".", 1)[0] + ".odt" + with open(output_path, "wb") as output_file: + output_file.write(b"odt-content") + return "ok" + + mock_check_output.side_effect = fake_convert + annotations = { + "data": [ + { + "document": "Ana Perez denuncio en el juzgado.", + "labels": [build_label("PER", "Ana Perez").model_dump(mode="json")], + } + ], + "label_policies": {"PER": {"anonymize": True, "disambiguation": "fuzzy"}}, + "render_policy": {"suffix_mode": "auto", "suffix_threshold": 1}, + } + + response = client.post( + "/anonymizer/anonymize-document", + data={"annotations": json.dumps(annotations)}, + files={ + "file": ( + "sample.docx", + b"input-document", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ) + }, + ) + + assert response.status_code == 200 + preds = mock_anonymizer.call_args[0][1] + assert preds[0]["labels"][0]["text"] == "Ana Perez" + + attrs = preds[0]["labels"][0]["attrs"] + assert "aymurai_alt_text" not in attrs + assert "aymurai_alt_start_char" not in attrs + assert "aymurai_alt_end_char" not in attrs + + +@pytest.mark.integration +@patch("aymurai.api.endpoints.routers.anonymizer.anonymizer.subprocess.check_output") +@patch("aymurai.api.endpoints.routers.anonymizer.anonymizer.get_anonymizer") def test_should_return_500_when_anonymize_document_conversion_fails( - mock_check_output, client + mock_get_anonymizer, mock_check_output, client, tmp_path ): + # Fake anonymizer that writes a dummy output + anonymized_path = str(tmp_path / "output.docx") + with open(anonymized_path, "wb") as f: + f.write(b"fake-docx-content") + + mock_anonymizer = MagicMock(return_value=anonymized_path) + mock_get_anonymizer.return_value = mock_anonymizer + mock_check_output.side_effect = subprocess.CalledProcessError( 1, ["libreoffice"], @@ -347,7 +750,13 @@ def test_should_return_500_when_anonymize_document_conversion_fails( response = client.post( "/anonymizer/anonymize-document", data={"annotations": json.dumps(annotations)}, - files={"file": ("sample.txt", b"input-document", "text/plain")}, + files={ + "file": ( + "sample.docx", + b"input-document", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ) + }, ) assert response.status_code == 500 diff --git a/tests/api/routers/misc/test_document_extract.py b/tests/api/routers/misc/test_document_extract.py index 6a67fdd6..124c4b9c 100644 --- a/tests/api/routers/misc/test_document_extract.py +++ b/tests/api/routers/misc/test_document_extract.py @@ -1,5 +1,6 @@ import concurrent.futures import io +import sys from unittest.mock import patch import pytest @@ -74,6 +75,11 @@ def test_should_extract_real_text_from_sample_docx_without_mocking(client): @pytest.mark.integration @pytest.mark.slow +@pytest.mark.xfail( + sys.platform == "win32", + reason="pymupdf4llm ONNX layout model receives int32 tensors on Windows (expects int64)", + strict=False, +) def test_should_extract_real_text_from_pdf_without_mocking(client): """Test that a real PDF upload is extracted without mocking.""" expected_paragraphs = [ diff --git a/tests/api/routers/test_pipeline_flows.py b/tests/api/routers/test_pipeline_flows.py index 3df22aaa..8d53952d 100644 --- a/tests/api/routers/test_pipeline_flows.py +++ b/tests/api/routers/test_pipeline_flows.py @@ -1,9 +1,11 @@ +import io import json import shutil import uuid -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest +from docx import Document as DocxDocument from aymurai.database.schema import DataPublicDocumentParagraph from tests.api.routers.conftest import build_mock_pipeline @@ -20,6 +22,7 @@ def _fake_libreoffice_convert(*args, **kwargs): @pytest.mark.integration @patch("aymurai.api.endpoints.routers.anonymizer.anonymizer.subprocess.check_output") +@patch("aymurai.api.endpoints.routers.anonymizer.anonymizer.get_anonymizer") @patch( "aymurai.api.endpoints.routers.anonymizer.anonymizer.map_canonical_entities_ner_preds" ) @@ -33,8 +36,10 @@ def test_should_run_anonymizer_flow_end_to_end( mock_build_canonical_entities, mock_get_canonical_dates, mock_map_canonical_entities, + mock_get_anonymizer, mock_check_output, client, + tmp_path, ): mock_extract.return_value = "Ana Pérez denunció.\nJuan Soto declaró." mock_load_pipeline.return_value = build_mock_pipeline() @@ -43,6 +48,12 @@ def test_should_run_anonymizer_flow_end_to_end( mock_map_canonical_entities.side_effect = lambda predictions, canonical_entities: ( predictions ) + + anonymized_path = str(tmp_path / "output.docx") + with open(anonymized_path, "wb") as f: + f.write(b"fake-docx-content") + mock_anonymizer = MagicMock(return_value=anonymized_path) + mock_get_anonymizer.return_value = mock_anonymizer mock_check_output.side_effect = _fake_libreoffice_convert extract_response = client.post( @@ -81,7 +92,13 @@ def test_should_run_anonymizer_flow_end_to_end( compile_response = client.post( "/anonymizer/anonymize-document", data={"annotations": json.dumps(annotations)}, - files={"file": ("sample.txt", b"doc-bytes", "text/plain")}, + files={ + "file": ( + "sample.docx", + b"doc-bytes", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ) + }, ) assert compile_response.status_code == 200 assert compile_response.headers["content-type"] == "application/octet-stream" @@ -162,10 +179,22 @@ def test_should_compile_anonymized_document_with_real_libreoffice_when_available "render_policy": {"suffix_mode": "auto", "suffix_threshold": 1}, } + doc = DocxDocument() + doc.add_paragraph("Texto base para anonimizar.") + buf = io.BytesIO() + doc.save(buf) + docx_bytes = buf.getvalue() + response = client.post( "/anonymizer/anonymize-document", data={"annotations": json.dumps(annotations)}, - files={"file": ("sample.txt", b"input-document", "text/plain")}, + files={ + "file": ( + "sample.docx", + docx_bytes, + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ) + }, ) assert response.status_code == 200