diff --git a/evaluations/research-agent-team-eval/evaluator.py b/evaluations/research-agent-team-eval/evaluator.py index d00a4d0..ef90599 100755 --- a/evaluations/research-agent-team-eval/evaluator.py +++ b/evaluations/research-agent-team-eval/evaluator.py @@ -36,6 +36,7 @@ import argparse import asyncio import aiohttp +import ipaddress from typing import Any, Dict, List, Tuple, Optional from collections import defaultdict from dataclasses import dataclass, field @@ -985,8 +986,46 @@ async def evaluate_url_validity_async( urls = re.findall(r"https?://[^\s\)]+", report) + def is_safe_public_url(url: str) -> bool: + """Block private/internal URL targets to reduce SSRF risk.""" + try: + parsed = urlparse(url) + if parsed.scheme not in {"http", "https"}: + return False + if not parsed.hostname: + return False + + hostname = parsed.hostname.strip("[]").lower() + if hostname in {"localhost", "localhost.localdomain"}: + return False + if hostname.endswith(".local"): + return False + + try: + ip = ipaddress.ip_address(hostname) + if ( + ip.is_private + or ip.is_loopback + or ip.is_link_local + or ip.is_multicast + or ip.is_reserved + or ip.is_unspecified + ): + return False + except ValueError: + # Hostname is not a direct IP literal; keep it eligible. + pass + + return True + except Exception: + return False + + safe_urls = [url for url in urls if is_safe_public_url(url)] + blocked_urls = [url for url in urls if not is_safe_public_url(url)] + results = { "total_urls": len(urls), + "blocked_urls": len(blocked_urls), "valid_urls": 0, "invalid_urls": 0, "timeout_urls": 0, @@ -995,19 +1034,19 @@ async def evaluate_url_validity_async( } # Decide which URLs to check - if len(urls) <= 20: + if len(safe_urls) <= 20: # Check all URLs if 20 or fewer - urls_to_check = urls - results["sample_size"] = len(urls) + urls_to_check = safe_urls + results["sample_size"] = len(safe_urls) else: # Random sample of 20 if more than 20 - urls_to_check = random.sample(urls, 20) + urls_to_check = random.sample(safe_urls, 20) results["sample_size"] = 20 async def check_url(session, url): try: async with session.head( - url, timeout=5, allow_redirects=True + url, timeout=5, allow_redirects=False ) as response: if response.status < 400: return url, "valid" @@ -1039,13 +1078,15 @@ async def check_url(session, url): score = 0 # Update feedback to indicate sampling - if len(urls) > 20: - feedback = f"{results['valid_urls']}/{results['sample_size']} URLs valid (random sample from {len(urls)} total)" + if len(safe_urls) > 20: + feedback = f"{results['valid_urls']}/{results['sample_size']} URLs valid (random sample from {len(safe_urls)} eligible)" else: feedback = f"{results['valid_urls']}/{results['sample_size']} URLs valid" if results["broken_links"]: feedback += f", {len(results['broken_links'])} broken" + if blocked_urls: + feedback += f", skipped {len(blocked_urls)} private/internal URL(s)" return MetricResult(score=score, details=results, feedback=feedback)