@@ -444,33 +444,57 @@ class EncodedCandidate:
444444def _try_decode_base64 (text : str ) -> str | None :
445445 """Attempt to decode Base64 string.
446446
447+ Limits decoded output to 10KB to prevent DoS attacks via memory exhaustion.
448+ Fails closed: raises error if decoded content exceeds limit to prevent PII leaks.
449+
447450 Args:
448451 text: String that looks like Base64.
449452
450453 Returns:
451- Decoded string if valid, None otherwise.
454+ Decoded string if valid and under size limit, None if invalid encoding.
455+
456+ Raises:
457+ ValueError: If decoded content exceeds 10KB (security limit).
452458 """
453459 try :
454460 decoded_bytes = base64 .b64decode (text , validate = True )
461+ # Security: Fail closed - reject content > 10KB to prevent memory DoS and PII bypass
462+ if len (decoded_bytes ) > 10_000 :
463+ msg = (
464+ f"Base64 decoded content too large ({ len (decoded_bytes ):,} bytes). "
465+ )
466+ raise ValueError (msg )
455467 # Check if result is valid UTF-8
456468 return decoded_bytes .decode ("utf-8" , errors = "strict" )
457- except (binascii .Error , UnicodeDecodeError , ValueError ):
469+ except (binascii .Error , UnicodeDecodeError ):
458470 return None
459471
460472
461473def _try_decode_hex (text : str ) -> str | None :
462474 """Attempt to decode hex string.
463475
476+ Limits decoded output to 10KB to prevent DoS attacks via memory exhaustion.
477+ Fails closed: raises error if decoded content exceeds limit to prevent PII leaks.
478+
464479 Args:
465480 text: String that looks like hex.
466481
467482 Returns:
468- Decoded string if valid, None otherwise.
483+ Decoded string if valid and under size limit, None if invalid encoding.
484+
485+ Raises:
486+ ValueError: If decoded content exceeds 10KB (security limit).
469487 """
470488 try :
471489 decoded_bytes = bytes .fromhex (text )
490+ # Security: Fail closed - reject content > 10KB to prevent memory DoS and PII bypass
491+ if len (decoded_bytes ) > 10_000 :
492+ msg = (
493+ f"Hex decoded content too large ({ len (decoded_bytes ):,} bytes). "
494+ )
495+ raise ValueError (msg )
472496 return decoded_bytes .decode ("utf-8" , errors = "strict" )
473- except ( ValueError , UnicodeDecodeError ) :
497+ except UnicodeDecodeError :
474498 return None
475499
476500
@@ -591,8 +615,13 @@ def _mask_pii(text: str, detection: PiiDetectionResult, config: PIIConfig) -> tu
591615 if not detection .analyzer_results :
592616 # Check encoded content even if no direct PII found
593617 if config .detect_encoded_pii :
594- return _mask_encoded_pii (normalized_text , config )
595- return normalized_text , {}
618+ masked_text , encoded_detections = _mask_encoded_pii (normalized_text , config , original_text = text )
619+ # If no encoded PII found, return original text to preserve special characters
620+ if not encoded_detections :
621+ return text , {}
622+ return masked_text , encoded_detections
623+ # No PII detected - return original text to preserve special characters
624+ return text , {}
596625
597626 # Use Presidio's optimized anonymizer with replace operator
598627 anonymizer = _get_anonymizer_engine ()
@@ -616,7 +645,7 @@ def _mask_pii(text: str, detection: PiiDetectionResult, config: PIIConfig) -> tu
616645 return masked_text , encoded_detections
617646
618647
619- def _mask_encoded_pii (text : str , config : PIIConfig ) -> tuple [str , dict [str , list [str ]]]:
648+ def _mask_encoded_pii (text : str , config : PIIConfig , original_text : str | None = None ) -> tuple [str , dict [str , list [str ]]]:
620649 """Detect and mask PII in encoded content (Base64, URL-encoded, hex).
621650
622651 Strategy:
@@ -625,24 +654,26 @@ def _mask_encoded_pii(text: str, config: PIIConfig) -> tuple[str, dict[str, list
625654 3. Map detections back to mask the encoded versions in original text
626655
627656 Args:
628- text: Text potentially containing encoded PII.
657+ text: Normalized text potentially containing encoded PII.
629658 config: PII configuration specifying which entities to detect.
659+ original_text: Original (non-normalized) text to return if no PII found.
630660
631661 Returns:
632662 Tuple of (masked_text, encoded_detections_mapping).
663+ Returns original_text if provided and no PII found, otherwise text.
633664 """
634665 # Build fully decoded text and get candidate mappings
635666 decoded_text , candidates = _build_decoded_text (text )
636667
637668 if not candidates :
638- return text , {}
669+ return original_text or text , {}
639670
640671 # Pass fully decoded text to Presidio ONCE
641672 engine = _get_analyzer_engine ()
642673 analyzer_results = engine .analyze (decoded_text , entities = [e .value for e in config .entities ], language = "en" )
643674
644675 if not analyzer_results :
645- return text , {}
676+ return original_text or text , {}
646677
647678 # Map detections back to encoded chunks in original text
648679 # Strategy: Check if the decoded chunk contributed to any PII detection
0 commit comments