Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion executor_manager/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class TimeoutConfig:

# Sandbox timeouts
sandbox_default: int = 1800 # 30 minutes
sandbox_inactive_timeout: int = field(
default_factory=lambda: int(os.getenv("SANDBOX_INACTIVE_TIMEOUT", "7200"))
)
execution_default: int = 600 # 10 minutes
container_ready: int = field(
default_factory=lambda: int(os.getenv("CONTAINER_READY_TIMEOUT", "20"))
Expand All @@ -66,7 +69,8 @@ class TimeoutConfig:
http_execution_request: float = 30.0
http_container_wait: float = 5.0

# Redis TTL
# Redis data retention window for sandbox state
# This is not the sandbox execution timeout; sandbox expiry uses expires_at.
redis_ttl: int = 86400 # 24 hours
# Session hash TTL must be longer than redis_ttl to ensure GC can load sandbox data
# Formula: redis_ttl + GC_INTERVAL + buffer (default: 86400 + 3600 + 3600 = 93600)
Expand Down
9 changes: 5 additions & 4 deletions executor_manager/routers/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ async def create_sandbox(request: CreateSandboxRequest, http_request: Request):
"""Create a new sandbox.

Creates an isolated execution environment (Docker container) that can
run multiple executions. The sandbox will automatically terminate
after the specified timeout unless kept alive.
run multiple executions. The sandbox remains available for new executions
until its active timeout expires, and background cleanup removes idle
sandboxes after the inactivity window.

Args:
request: Sandbox creation parameters
Expand Down Expand Up @@ -205,8 +206,8 @@ async def keep_alive(
):
"""Extend sandbox timeout.

Adds additional time to the sandbox expiration. The sandbox will
automatically terminate after the new timeout unless kept alive again.
Adds additional time to the sandbox active timeout. Idle sandbox cleanup
is still controlled by the background inactivity window.

Args:
sandbox_id: Unique sandbox identifier (internally uses task_id)
Expand Down
14 changes: 8 additions & 6 deletions executor_manager/schemas/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class CreateSandboxResponse(BaseModel):
container_name: Docker container name
shell_type: Execution environment type
created_at: Timestamp when sandbox was created
expires_at: Timestamp when sandbox will auto-terminate
expires_at: Timestamp when the active timeout window ends
message: Optional status message
"""

Expand All @@ -79,7 +79,9 @@ class CreateSandboxResponse(BaseModel):
description="Executor namespace when available",
)
created_at: float = Field(..., description="Creation timestamp")
expires_at: Optional[float] = Field(None, description="Expiration timestamp")
expires_at: Optional[float] = Field(
None, description="Active-timeout expiration timestamp"
)
message: Optional[str] = Field(None, description="Status message")


Expand All @@ -97,9 +99,9 @@ class SandboxStatusResponse(BaseModel):
created_at: Creation timestamp
started_at: When sandbox became running
last_activity_at: Last activity timestamp
expires_at: Expiration timestamp
expires_at: Active-timeout expiration timestamp
uptime: Sandbox uptime in seconds
time_remaining: Seconds until expiration
time_remaining: Seconds until active-timeout expiration
execution_count: Number of executions in this sandbox
error_message: Error message if failed
metadata: Additional metadata
Expand Down Expand Up @@ -157,8 +159,8 @@ class KeepAliveResponse(BaseModel):

Attributes:
sandbox_id: Unique sandbox identifier
expires_at: New expiration timestamp
time_remaining: Seconds until expiration
expires_at: New active-timeout expiration timestamp
time_remaining: Seconds until active-timeout expiration
message: Status message
"""

Expand Down
27 changes: 22 additions & 5 deletions executor_manager/services/sandbox/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -977,8 +977,8 @@ async def _terminate_expired_sandbox(self, task_id_str: str) -> None:
async def _collect_expired_sandboxes(self) -> None:
"""Terminate expired sandboxes.

Uses repository to efficiently find sandboxes whose last_activity_timestamp
is older than the configured TTL.
Scans active sandboxes and terminates those that have been idle longer
than the configured inactivity timeout.
"""
lock = get_distributed_lock()
if not lock.acquire("sandbox_gc", expire_seconds=300):
Expand All @@ -989,9 +989,26 @@ async def _collect_expired_sandboxes(self) -> None:

try:
logger.info("[SandboxManager] Running sandbox GC...")
expired_task_ids = self._repository.get_expired_sandbox_ids(
self._config.timeout.redis_ttl
)
active_task_ids = self._repository.get_active_sandbox_ids()
if not active_task_ids:
logger.info("[SandboxManager] No active sandboxes found")
return

now = time.time()
inactivity_timeout = self._config.timeout.sandbox_inactive_timeout
expired_task_ids = []
for task_id_str in active_task_ids:
sandbox = self._repository.load_sandbox(task_id_str)
if sandbox is None:
Comment on lines +1000 to +1002
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use async repository reads in the GC scan loop

_collect_expired_sandboxes is an async scheduled job, but it now iterates every active sandbox and calls synchronous load_sandbox for each ID. With many active sandboxes this introduces N blocking Redis round-trips on the event loop, which can delay other async jobs (like heartbeat checks) and API responsiveness during GC runs.

Useful? React with 👍 / 👎.

self._repository.remove_from_active_set(task_id_str)
logger.debug(
f"[SandboxManager] Cleaned orphaned ZSet entry: {task_id_str}"
)
continue

idle_seconds = now - sandbox.last_activity_at
if idle_seconds >= inactivity_timeout:
expired_task_ids.append(task_id_str)
Comment on lines +999 to +1011
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add a legacy fallback for sandboxes without expires_at.

Records saved before this change can load with expires_at=None; sandbox.is_expired() then returns False, so GC will skip those running containers until the Redis hash expires and the metadata needed to terminate them is gone.

🛠️ Suggested fallback for legacy records
             expired_task_ids = []
+            legacy_cutoff = time.time() - self._config.timeout.redis_ttl
             for task_id_str in active_task_ids:
                 sandbox = self._repository.load_sandbox(task_id_str)
                 if sandbox is None:
                     self._repository.remove_from_active_set(task_id_str)
                     logger.debug(
                         f"[SandboxManager] Cleaned orphaned ZSet entry: {task_id_str}"
                     )
                     continue
 
-                if sandbox.is_expired():
+                if sandbox.expires_at is None:
+                    if sandbox.last_activity_at <= legacy_cutoff:
+                        expired_task_ids.append(task_id_str)
+                    continue
+
+                if sandbox.is_expired():
                     expired_task_ids.append(task_id_str)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@executor_manager/services/sandbox/manager.py` around lines 998 - 1009, The GC
loop must handle legacy sandboxes with sandbox.expires_at == None so they aren't
skipped; in the loop that loads sandboxes (use _repository.load_sandbox and the
sandbox variable) change the expiry check to first detect expires_at is None and
then apply a fallback TTL (e.g. compute fallback_expired = sandbox.created_at +
DEFAULT_SANDBOX_TTL < now) or treat None as expired immediately depending on
intended policy, otherwise call sandbox.is_expired(); add or reuse a
DEFAULT_SANDBOX_TTL constant (or Sandbox.default_ttl) and update the
expired_task_ids append logic to use this fallback branch.


if not expired_task_ids:
logger.info("[SandboxManager] No expired sandboxes found")
Expand Down
20 changes: 12 additions & 8 deletions executor_manager/services/sandbox/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
- {subtask_id} fields: Execution data JSON
- TTL: session_hash_ttl (longer than redis_ttl to ensure GC can load data)
- Active Sandboxes ZSet: wegent-sandbox:active (score = last_activity timestamp)
- GC uses redis_ttl to determine which sandboxes are expired
- Stores recent activity timestamps and supports orphan cleanup helpers

GC Design:
- GC runs every GC_INTERVAL (default 3600s) to clean up expired sandboxes
- Sandboxes are considered expired if last_activity > redis_ttl (default 86400s)
- Sandbox expiration is determined by sandbox.last_activity_at in SandboxManager
- Session hash has TTL = session_hash_ttl = redis_ttl + GC_INTERVAL + buffer
- This ensures GC can still load sandbox data even when it's marked as expired
- This ensures GC can still load sandbox data long enough to terminate expired sandboxes
"""

import json
Expand Down Expand Up @@ -109,6 +109,9 @@ def save_sandbox(self, sandbox: Sandbox) -> bool:
"status": sandbox.status.value,
"error_message": sandbox.error_message,
"created_at": sandbox.created_at,
"started_at": sandbox.started_at,
"last_activity_at": sandbox.last_activity_at,
"expires_at": sandbox.expires_at,
"shell_type": sandbox.shell_type,
"user_id": sandbox.user_id,
"user_name": sandbox.user_name,
Expand Down Expand Up @@ -333,16 +336,17 @@ async def get_active_sandbox_ids_async(self) -> List[str]:
return []

def get_expired_sandbox_ids(self, max_age_seconds: int) -> List[str]:
"""Get sandbox IDs that have been inactive for longer than max_age.
"""Get sandbox IDs whose last-activity score is older than max_age.

Uses ZRANGEBYSCORE to efficiently find sandboxes whose last_activity_timestamp
is older than the cutoff time.
This helper is based on ZSet activity timestamps. SandboxManager GC now
expires sandboxes using sandbox.expires_at, but this method remains useful
for diagnostics or legacy maintenance paths that need activity-age queries.

Args:
max_age_seconds: Maximum age in seconds (e.g., 86400 for 24 hours)
max_age_seconds: Maximum activity age in seconds

Returns:
List of expired sandbox IDs
List of sandbox IDs whose activity score is older than the cutoff
"""
if self.redis_client is None:
return []
Expand Down
69 changes: 64 additions & 5 deletions executor_manager/tests/services/test_sandbox_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,36 @@ def test_save_sandbox_success(
mock_redis_client.expire.assert_called()
mock_redis_client.zadd.assert_called()

def test_save_sandbox_round_trip_preserves_timing_fields(
self, sandbox_manager_with_mock_redis, mock_redis_client, sample_sandbox
):
"""Test save/load preserves started_at, last_activity_at, and expires_at."""
manager = sandbox_manager_with_mock_redis

result = manager._repository.save_sandbox(sample_sandbox)

assert result is True

hset_args = mock_redis_client.hset.call_args[0]
saved_hash_key = hset_args[0]
saved_field = hset_args[1]
saved_payload = hset_args[2]
saved_data = json.loads(saved_payload)

assert saved_data["started_at"] == sample_sandbox.started_at
assert saved_data["last_activity_at"] == sample_sandbox.last_activity_at
assert saved_data["expires_at"] == sample_sandbox.expires_at

mock_redis_client.hget.return_value = saved_payload
loaded_sandbox = manager._repository.load_sandbox(sample_sandbox.sandbox_id)

assert loaded_sandbox is not None
assert saved_hash_key.endswith(sample_sandbox.sandbox_id)
assert saved_field == "__sandbox__"
assert loaded_sandbox.started_at == sample_sandbox.started_at
assert loaded_sandbox.last_activity_at == sample_sandbox.last_activity_at
assert loaded_sandbox.expires_at == sample_sandbox.expires_at

def test_save_sandbox_missing_task_id(
self, sandbox_manager_with_mock_redis, mock_redis_client
):
Expand Down Expand Up @@ -961,13 +991,14 @@ async def test_collect_expired_sandboxes_terminates_old(
self,
sandbox_manager_with_mock_redis,
mock_redis_client,
sample_sandbox_redis_data,
mocker,
sample_sandbox,
):
"""Test terminates sandboxes older than 24 hours."""
"""Test terminates sandboxes idle for more than two hours."""
manager = sandbox_manager_with_mock_redis
mock_redis_client.zrangebyscore.return_value = ["12345"]
mock_redis_client.zrange.return_value = ["12345"]
sample_sandbox.last_activity_at = time.time() - (2 * 3600) - 60
sample_sandbox.expires_at = time.time() + 3600

# Mock repository.load_sandbox to return a sandbox
mocker.patch.object(
Expand All @@ -990,9 +1021,9 @@ async def test_collect_expired_sandboxes_terminates_old(
async def test_collect_expired_sandboxes_cleans_orphaned(
self, sandbox_manager_with_mock_redis, mock_redis_client, mocker
):
"""Test cleans orphaned ZSet entries."""
"""Test cleans orphaned active set entries."""
manager = sandbox_manager_with_mock_redis
mock_redis_client.zrangebyscore.return_value = ["orphaned-id"]
mock_redis_client.zrange.return_value = ["orphaned-id"]
mock_redis_client.hget.return_value = None # No sandbox data

await manager._collect_expired_sandboxes()
Expand All @@ -1001,6 +1032,34 @@ async def test_collect_expired_sandboxes_cleans_orphaned(
"wegent-sandbox:active", "orphaned-id"
)

@pytest.mark.asyncio
async def test_collect_expired_sandboxes_skips_unexpired(
self,
sandbox_manager_with_mock_redis,
mock_redis_client,
mocker,
sample_sandbox,
):
"""Test keeps sandboxes with recent activity even if expires_at is in the past."""
manager = sandbox_manager_with_mock_redis
mock_redis_client.zrange.return_value = ["12345"]
sample_sandbox.last_activity_at = time.time() - 300
sample_sandbox.expires_at = time.time() - 60

mocker.patch.object(
manager._repository, "load_sandbox", return_value=sample_sandbox
)
mock_terminate = mocker.patch.object(
manager,
"terminate_sandbox",
new_callable=AsyncMock,
return_value=(True, "Terminated"),
)

await manager._collect_expired_sandboxes()

mock_terminate.assert_not_called()

# ----- Scheduler Integration Tests -----

@pytest.mark.asyncio
Expand Down
Loading