Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 48 additions & 7 deletions evaluations/research-agent-team-eval/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import argparse
import asyncio
import aiohttp
import ipaddress
from typing import Any, Dict, List, Tuple, Optional
from collections import defaultdict
from dataclasses import dataclass, field
Expand Down Expand Up @@ -985,8 +986,46 @@ async def evaluate_url_validity_async(

urls = re.findall(r"https?://[^\s\)]+", report)

def is_safe_public_url(url: str) -> bool:
"""Block private/internal URL targets to reduce SSRF risk."""
try:
parsed = urlparse(url)
if parsed.scheme not in {"http", "https"}:
return False
if not parsed.hostname:
return False

hostname = parsed.hostname.strip("[]").lower()
if hostname in {"localhost", "localhost.localdomain"}:
return False
if hostname.endswith(".local"):
return False

try:
ip = ipaddress.ip_address(hostname)
if (
ip.is_private
or ip.is_loopback
or ip.is_link_local
or ip.is_multicast
or ip.is_reserved
or ip.is_unspecified
):
return False
except ValueError:
# Hostname is not a direct IP literal; keep it eligible.
pass
Comment on lines +1015 to +1017

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Resolve hostnames before classifying URLs as safe

The new SSRF guard only blocks IP literals; any hostname that is not directly parseable as an IP is marked eligible (except ValueError: ... pass). That means attacker-controlled domains (for example DNS names that resolve to 169.254.169.254 or RFC1918 targets) will still be fetched by session.head, so internal-network probing remains possible when evaluating untrusted reports. Resolve hostnames and reject non-global resolved addresses before making outbound requests.

Useful? React with 👍 / 👎.


return True
except Exception:
return False

safe_urls = [url for url in urls if is_safe_public_url(url)]
blocked_urls = [url for url in urls if not is_safe_public_url(url)]

results = {
"total_urls": len(urls),
"blocked_urls": len(blocked_urls),
"valid_urls": 0,
"invalid_urls": 0,
"timeout_urls": 0,
Expand All @@ -995,19 +1034,19 @@ async def evaluate_url_validity_async(
}

# Decide which URLs to check
if len(urls) <= 20:
if len(safe_urls) <= 20:
# Check all URLs if 20 or fewer
urls_to_check = urls
results["sample_size"] = len(urls)
urls_to_check = safe_urls
results["sample_size"] = len(safe_urls)
else:
# Random sample of 20 if more than 20
urls_to_check = random.sample(urls, 20)
urls_to_check = random.sample(safe_urls, 20)
results["sample_size"] = 20

async def check_url(session, url):
try:
async with session.head(
url, timeout=5, allow_redirects=True
url, timeout=5, allow_redirects=False
) as response:
if response.status < 400:
return url, "valid"
Expand Down Expand Up @@ -1039,13 +1078,15 @@ async def check_url(session, url):
score = 0

# Update feedback to indicate sampling
if len(urls) > 20:
feedback = f"{results['valid_urls']}/{results['sample_size']} URLs valid (random sample from {len(urls)} total)"
if len(safe_urls) > 20:
feedback = f"{results['valid_urls']}/{results['sample_size']} URLs valid (random sample from {len(safe_urls)} eligible)"
else:
feedback = f"{results['valid_urls']}/{results['sample_size']} URLs valid"

if results["broken_links"]:
feedback += f", {len(results['broken_links'])} broken"
if blocked_urls:
feedback += f", skipped {len(blocked_urls)} private/internal URL(s)"

return MetricResult(score=score, details=results, feedback=feedback)

Expand Down
Loading