Skip to content

[RAPTOR-14353] Gavrenkov/poc drum watchdog v2 #1632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions custom_model_runner/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

#### [1.16.24] - 2025-08-19
##### Changed
- Add NIM watchdog to automatically restart the server if it becomes unresponsive

#### [1.16.23] - 2025-08-18
##### Changed
- Add OTEL metrics and logs configuration.
Expand Down
2 changes: 1 addition & 1 deletion custom_model_runner/datarobot_drum/drum/description.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
This is proprietary source code of DataRobot, Inc. and its affiliates.
Released under the terms of DataRobot Tool and Utility Agreement.
"""
version = "1.16.23"
version = "1.16.24"
__version__ = version
project_name = "datarobot-drum"
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
Released under the terms of DataRobot Tool and Utility Agreement.
"""
import logging
import os
import sys
import time
from pathlib import Path
from threading import Thread

import requests
from flask import Response, jsonify, request
Expand All @@ -22,6 +25,7 @@
ModelInfoKeys,
RunLanguage,
TargetType,
URL_PREFIX_ENV_VAR_NAME,
)
from datarobot_drum.drum.exceptions import DrumCommonException
from datarobot_drum.drum.model_metadata import read_model_metadata_yaml
Expand Down Expand Up @@ -71,6 +75,7 @@ def __init__(self, params: dict):
"run_predictor_total", "finish", StatsOperation.SUB, "start"
)
self._predictor = self._setup_predictor()
self._server_watchdog = None

def _setup_predictor(self):
if self._run_language == RunLanguage.PYTHON:
Expand Down Expand Up @@ -301,10 +306,95 @@ def _run_flask_app(self, app):
processes = self._params.get("processes")
logger.info("Number of webserver processes: %s", processes)
try:
if str(os.environ.get("USE_NIM_WATCHDOG", "false")).lower() in ["true", "1", "yes"]:
# Start the watchdog thread before running the app
self._server_watchdog = Thread(
target=self.watchdog,
args=(port,), # Pass host and port as arguments
daemon=True,
name="OpenAI Watchdog",
)
self._server_watchdog.start()

app.run(host, port, threaded=False, processes=processes)
except OSError as e:
raise DrumCommonException("{}: host: {}; port: {}".format(e, host, port))

def watchdog(self, port):
"""
Watchdog thread that periodically checks if the server is alive by making
GET requests to the /ping/ endpoint. Makes 3 attempts with quadratic backoff
before terminating the Flask app.
"""

logger.info("Starting watchdog to monitor server health...")

import os

url_host = os.environ.get("TEST_URL_HOST", "localhost")
url_prefix = os.environ.get(URL_PREFIX_ENV_VAR_NAME, "")
health_url = f"http://{url_host}:{port}/{url_prefix}/info/"

request_timeout = 120
check_interval = 10 # seconds
max_attempts = 5

attempt = 0
base_sleep_time = 2

while True:
try:
# Check if server is responding to health checks
logger.debug(f"Server health check")
response = requests.get(health_url, timeout=request_timeout)
logger.debug(f"Server health check status: {response.status_code}")
# Connection succeeded, reset attempts and wait for next check
attempt = 0
time.sleep(check_interval) # Regular check interval
continue

except Exception as e:
attempt += 1
logger.error(f"health_url {health_url}")
logger.error(
f"Server health check failed (attempt {attempt}/{max_attempts}): {str(e)}"
)

if attempt >= max_attempts:
logger.error(
"All health check attempts failed. Forcefully killing all processes."
)

# First try clean termination
try:
self._terminate()
except Exception as e:
logger.error(f"Error during clean termination: {str(e)}")

# Force kill all processes
import subprocess

# Use more direct system commands to kill processes
try:
# Kill packedge jobs first (more aggressive approach)
logger.info("Killing Python package jobs")
# Run `busybox ps` and capture output
result = subprocess.run(["busybox", "ps"], capture_output=True, text=True)
# Parse lines, skip the header
lines = result.stdout.strip().split("\n")[1:]
# Extract the PID (first column)
pids = [int(line.split()[0]) for line in lines]
for pid in pids:
print("Killing pid:", pid)
subprocess.run(f"kill {pid}", shell=True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semgrep identified an issue in your code:
Found 'subprocess' function 'run' with 'shell=True'. This is dangerous because this call will spawn the command using a shell process. Doing so propagates current shell settings and variables, which makes it much easier for a malicious actor to execute commands. Use 'shell=False' instead.

To resolve this comment:

✨ Commit Assistant fix suggestion

Suggested change
subprocess.run(f"kill {pid}", shell=True)
subprocess.run(f"kill {pid}", shell=False)
View step-by-step instructions
  1. Change the subprocess.run(f"kill {pid}", shell=True) call to avoid using the shell.
  2. Update the line to pass the command and arguments as a list, like this: subprocess.run(["kill", str(pid)]).
    This prevents shell injection vulnerabilities and makes the command execution safer, especially when working with dynamic input.
💬 Ignore this finding

Reply with Semgrep commands to ignore this finding.

  • /fp <comment> for false positive
  • /ar <comment> for acceptable risk
  • /other <comment> for all other reasons

Alternatively, triage in Semgrep AppSec Platform to ignore the finding created by subprocess-shell-true.

You can view more details about this finding in the Semgrep AppSec Platform.

except Exception as kill_error:
logger.error(f"Error during process killing: {str(kill_error)}")

# Quadratic backoff
sleep_time = base_sleep_time * (attempt**2)
logger.info(f"Retrying in {sleep_time} seconds...")
time.sleep(sleep_time)

def terminate(self):
terminate_op = getattr(self._predictor, "terminate", None)
if callable(terminate_op):
Expand Down