diff --git a/executor_manager/common/config.py b/executor_manager/common/config.py index 3c13d7337..71651fa6e 100644 --- a/executor_manager/common/config.py +++ b/executor_manager/common/config.py @@ -47,6 +47,9 @@ class TimeoutConfig: # Sandbox timeouts sandbox_default: int = 1800 # 30 minutes + sandbox_inactive_timeout: int = field( + default_factory=lambda: int(os.getenv("SANDBOX_INACTIVE_TIMEOUT", "7200")) + ) execution_default: int = 600 # 10 minutes container_ready: int = field( default_factory=lambda: int(os.getenv("CONTAINER_READY_TIMEOUT", "20")) @@ -66,7 +69,8 @@ class TimeoutConfig: http_execution_request: float = 30.0 http_container_wait: float = 5.0 - # Redis TTL + # Redis data retention window for sandbox state + # This is not the sandbox execution timeout; sandbox expiry uses expires_at. redis_ttl: int = 86400 # 24 hours # Session hash TTL must be longer than redis_ttl to ensure GC can load sandbox data # Formula: redis_ttl + GC_INTERVAL + buffer (default: 86400 + 3600 + 3600 = 93600) diff --git a/executor_manager/routers/sandbox.py b/executor_manager/routers/sandbox.py index 50f00abf5..504e37605 100644 --- a/executor_manager/routers/sandbox.py +++ b/executor_manager/routers/sandbox.py @@ -57,8 +57,9 @@ async def create_sandbox(request: CreateSandboxRequest, http_request: Request): """Create a new sandbox. Creates an isolated execution environment (Docker container) that can - run multiple executions. The sandbox will automatically terminate - after the specified timeout unless kept alive. + run multiple executions. The sandbox remains available for new executions + until its active timeout expires, and background cleanup removes idle + sandboxes after the inactivity window. Args: request: Sandbox creation parameters @@ -205,8 +206,8 @@ async def keep_alive( ): """Extend sandbox timeout. - Adds additional time to the sandbox expiration. The sandbox will - automatically terminate after the new timeout unless kept alive again. + Adds additional time to the sandbox active timeout. Idle sandbox cleanup + is still controlled by the background inactivity window. Args: sandbox_id: Unique sandbox identifier (internally uses task_id) diff --git a/executor_manager/schemas/sandbox.py b/executor_manager/schemas/sandbox.py index 64970ce48..c5faed82c 100644 --- a/executor_manager/schemas/sandbox.py +++ b/executor_manager/schemas/sandbox.py @@ -66,7 +66,7 @@ class CreateSandboxResponse(BaseModel): container_name: Docker container name shell_type: Execution environment type created_at: Timestamp when sandbox was created - expires_at: Timestamp when sandbox will auto-terminate + expires_at: Timestamp when the active timeout window ends message: Optional status message """ @@ -79,7 +79,9 @@ class CreateSandboxResponse(BaseModel): description="Executor namespace when available", ) created_at: float = Field(..., description="Creation timestamp") - expires_at: Optional[float] = Field(None, description="Expiration timestamp") + expires_at: Optional[float] = Field( + None, description="Active-timeout expiration timestamp" + ) message: Optional[str] = Field(None, description="Status message") @@ -97,9 +99,9 @@ class SandboxStatusResponse(BaseModel): created_at: Creation timestamp started_at: When sandbox became running last_activity_at: Last activity timestamp - expires_at: Expiration timestamp + expires_at: Active-timeout expiration timestamp uptime: Sandbox uptime in seconds - time_remaining: Seconds until expiration + time_remaining: Seconds until active-timeout expiration execution_count: Number of executions in this sandbox error_message: Error message if failed metadata: Additional metadata @@ -157,8 +159,8 @@ class KeepAliveResponse(BaseModel): Attributes: sandbox_id: Unique sandbox identifier - expires_at: New expiration timestamp - time_remaining: Seconds until expiration + expires_at: New active-timeout expiration timestamp + time_remaining: Seconds until active-timeout expiration message: Status message """ diff --git a/executor_manager/services/sandbox/manager.py b/executor_manager/services/sandbox/manager.py index 58f18a283..15c21e8a8 100644 --- a/executor_manager/services/sandbox/manager.py +++ b/executor_manager/services/sandbox/manager.py @@ -977,8 +977,8 @@ async def _terminate_expired_sandbox(self, task_id_str: str) -> None: async def _collect_expired_sandboxes(self) -> None: """Terminate expired sandboxes. - Uses repository to efficiently find sandboxes whose last_activity_timestamp - is older than the configured TTL. + Scans active sandboxes and terminates those that have been idle longer + than the configured inactivity timeout. """ lock = get_distributed_lock() if not lock.acquire("sandbox_gc", expire_seconds=300): @@ -989,9 +989,26 @@ async def _collect_expired_sandboxes(self) -> None: try: logger.info("[SandboxManager] Running sandbox GC...") - expired_task_ids = self._repository.get_expired_sandbox_ids( - self._config.timeout.redis_ttl - ) + active_task_ids = self._repository.get_active_sandbox_ids() + if not active_task_ids: + logger.info("[SandboxManager] No active sandboxes found") + return + + now = time.time() + inactivity_timeout = self._config.timeout.sandbox_inactive_timeout + expired_task_ids = [] + for task_id_str in active_task_ids: + sandbox = self._repository.load_sandbox(task_id_str) + if sandbox is None: + self._repository.remove_from_active_set(task_id_str) + logger.debug( + f"[SandboxManager] Cleaned orphaned ZSet entry: {task_id_str}" + ) + continue + + idle_seconds = now - sandbox.last_activity_at + if idle_seconds >= inactivity_timeout: + expired_task_ids.append(task_id_str) if not expired_task_ids: logger.info("[SandboxManager] No expired sandboxes found") diff --git a/executor_manager/services/sandbox/repository.py b/executor_manager/services/sandbox/repository.py index ae503c92d..02f5f2455 100644 --- a/executor_manager/services/sandbox/repository.py +++ b/executor_manager/services/sandbox/repository.py @@ -13,13 +13,13 @@ - {subtask_id} fields: Execution data JSON - TTL: session_hash_ttl (longer than redis_ttl to ensure GC can load data) - Active Sandboxes ZSet: wegent-sandbox:active (score = last_activity timestamp) - - GC uses redis_ttl to determine which sandboxes are expired + - Stores recent activity timestamps and supports orphan cleanup helpers GC Design: - GC runs every GC_INTERVAL (default 3600s) to clean up expired sandboxes -- Sandboxes are considered expired if last_activity > redis_ttl (default 86400s) +- Sandbox expiration is determined by sandbox.last_activity_at in SandboxManager - Session hash has TTL = session_hash_ttl = redis_ttl + GC_INTERVAL + buffer -- This ensures GC can still load sandbox data even when it's marked as expired +- This ensures GC can still load sandbox data long enough to terminate expired sandboxes """ import json @@ -109,6 +109,9 @@ def save_sandbox(self, sandbox: Sandbox) -> bool: "status": sandbox.status.value, "error_message": sandbox.error_message, "created_at": sandbox.created_at, + "started_at": sandbox.started_at, + "last_activity_at": sandbox.last_activity_at, + "expires_at": sandbox.expires_at, "shell_type": sandbox.shell_type, "user_id": sandbox.user_id, "user_name": sandbox.user_name, @@ -333,16 +336,17 @@ async def get_active_sandbox_ids_async(self) -> List[str]: return [] def get_expired_sandbox_ids(self, max_age_seconds: int) -> List[str]: - """Get sandbox IDs that have been inactive for longer than max_age. + """Get sandbox IDs whose last-activity score is older than max_age. - Uses ZRANGEBYSCORE to efficiently find sandboxes whose last_activity_timestamp - is older than the cutoff time. + This helper is based on ZSet activity timestamps. SandboxManager GC now + expires sandboxes using sandbox.expires_at, but this method remains useful + for diagnostics or legacy maintenance paths that need activity-age queries. Args: - max_age_seconds: Maximum age in seconds (e.g., 86400 for 24 hours) + max_age_seconds: Maximum activity age in seconds Returns: - List of expired sandbox IDs + List of sandbox IDs whose activity score is older than the cutoff """ if self.redis_client is None: return [] diff --git a/executor_manager/tests/services/test_sandbox_manager.py b/executor_manager/tests/services/test_sandbox_manager.py index ff0f03ae9..7234f80de 100644 --- a/executor_manager/tests/services/test_sandbox_manager.py +++ b/executor_manager/tests/services/test_sandbox_manager.py @@ -580,6 +580,36 @@ def test_save_sandbox_success( mock_redis_client.expire.assert_called() mock_redis_client.zadd.assert_called() + def test_save_sandbox_round_trip_preserves_timing_fields( + self, sandbox_manager_with_mock_redis, mock_redis_client, sample_sandbox + ): + """Test save/load preserves started_at, last_activity_at, and expires_at.""" + manager = sandbox_manager_with_mock_redis + + result = manager._repository.save_sandbox(sample_sandbox) + + assert result is True + + hset_args = mock_redis_client.hset.call_args[0] + saved_hash_key = hset_args[0] + saved_field = hset_args[1] + saved_payload = hset_args[2] + saved_data = json.loads(saved_payload) + + assert saved_data["started_at"] == sample_sandbox.started_at + assert saved_data["last_activity_at"] == sample_sandbox.last_activity_at + assert saved_data["expires_at"] == sample_sandbox.expires_at + + mock_redis_client.hget.return_value = saved_payload + loaded_sandbox = manager._repository.load_sandbox(sample_sandbox.sandbox_id) + + assert loaded_sandbox is not None + assert saved_hash_key.endswith(sample_sandbox.sandbox_id) + assert saved_field == "__sandbox__" + assert loaded_sandbox.started_at == sample_sandbox.started_at + assert loaded_sandbox.last_activity_at == sample_sandbox.last_activity_at + assert loaded_sandbox.expires_at == sample_sandbox.expires_at + def test_save_sandbox_missing_task_id( self, sandbox_manager_with_mock_redis, mock_redis_client ): @@ -961,13 +991,14 @@ async def test_collect_expired_sandboxes_terminates_old( self, sandbox_manager_with_mock_redis, mock_redis_client, - sample_sandbox_redis_data, mocker, sample_sandbox, ): - """Test terminates sandboxes older than 24 hours.""" + """Test terminates sandboxes idle for more than two hours.""" manager = sandbox_manager_with_mock_redis - mock_redis_client.zrangebyscore.return_value = ["12345"] + mock_redis_client.zrange.return_value = ["12345"] + sample_sandbox.last_activity_at = time.time() - (2 * 3600) - 60 + sample_sandbox.expires_at = time.time() + 3600 # Mock repository.load_sandbox to return a sandbox mocker.patch.object( @@ -990,9 +1021,9 @@ async def test_collect_expired_sandboxes_terminates_old( async def test_collect_expired_sandboxes_cleans_orphaned( self, sandbox_manager_with_mock_redis, mock_redis_client, mocker ): - """Test cleans orphaned ZSet entries.""" + """Test cleans orphaned active set entries.""" manager = sandbox_manager_with_mock_redis - mock_redis_client.zrangebyscore.return_value = ["orphaned-id"] + mock_redis_client.zrange.return_value = ["orphaned-id"] mock_redis_client.hget.return_value = None # No sandbox data await manager._collect_expired_sandboxes() @@ -1001,6 +1032,34 @@ async def test_collect_expired_sandboxes_cleans_orphaned( "wegent-sandbox:active", "orphaned-id" ) + @pytest.mark.asyncio + async def test_collect_expired_sandboxes_skips_unexpired( + self, + sandbox_manager_with_mock_redis, + mock_redis_client, + mocker, + sample_sandbox, + ): + """Test keeps sandboxes with recent activity even if expires_at is in the past.""" + manager = sandbox_manager_with_mock_redis + mock_redis_client.zrange.return_value = ["12345"] + sample_sandbox.last_activity_at = time.time() - 300 + sample_sandbox.expires_at = time.time() - 60 + + mocker.patch.object( + manager._repository, "load_sandbox", return_value=sample_sandbox + ) + mock_terminate = mocker.patch.object( + manager, + "terminate_sandbox", + new_callable=AsyncMock, + return_value=(True, "Terminated"), + ) + + await manager._collect_expired_sandboxes() + + mock_terminate.assert_not_called() + # ----- Scheduler Integration Tests ----- @pytest.mark.asyncio