From 6cd7750aa95fbd964e48d8be7dbc6563c5309176 Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Sat, 4 Oct 2025 01:36:03 +0000 Subject: [PATCH 1/2] refactor: rename dynamo_check.py to sanity_check.py - Rename deploy/dynamo_check.py to deploy/sanity_check.py - Update README.md to mention sanity check before trying out Dynamo - Update usage instructions in sanity_check.py to reflect new path Signed-off-by: Keiven Chang --- .devcontainer/post-create.sh | 2 +- README.md | 10 + deploy/dynamo_check.py | 2241 +--------------------------------- deploy/sanity_check.py | 2240 +++++++++++++++++++++++++++++++++ 4 files changed, 2252 insertions(+), 2241 deletions(-) mode change 100755 => 120000 deploy/dynamo_check.py create mode 100755 deploy/sanity_check.py diff --git a/.devcontainer/post-create.sh b/.devcontainer/post-create.sh index c219bab56e..fb6a12e822 100755 --- a/.devcontainer/post-create.sh +++ b/.devcontainer/post-create.sh @@ -123,7 +123,7 @@ else echo "⚠️ SSH agent forwarding not configured - SSH_AUTH_SOCK is not set" fi -show_and_run $DYNAMO_HOME/deploy/dynamo_check.py +show_and_run $DYNAMO_HOME/deploy/sanity_check.py cat < "NodeInfo": - """Add a child node and return it for chaining""" - self.children.append(child) - return child - - def add_metadata(self, key: str, value: str) -> "NodeInfo": - """Add metadata key-value pair""" - self.metadata[key] = value - return self - - def render( - self, prefix: str = "", is_last: bool = True, is_root: bool = True - ) -> List[str]: - """Render the tree node and its children as a list of strings""" - lines = [] - - # Determine the connector - if not is_root: - # Check if this is a sub-category item - if self.metadata and self.metadata.get("part_of_previous"): - connector = "│" - else: - connector = "└─" if is_last else "├─" - current_prefix = prefix + connector + " " - else: - current_prefix = "" - - # Build the line content - line_parts = [] - - # Add status symbol - if self.show_symbol and self.status != NodeStatus.NONE: - if self.status == NodeStatus.OK: - line_parts.append("✅") - elif self.status == NodeStatus.ERROR: - line_parts.append("❌") - elif self.status == NodeStatus.WARNING: - line_parts.append("⚠️") - elif self.status == NodeStatus.UNKNOWN: - line_parts.append("❓") - - # Add label and value - if self.desc: - line_parts.append(f"{self.label}: {self.desc}") - else: - line_parts.append(self.label) - - # Add metadata inline - consistent format for all - if self.metadata: - metadata_items = [] - for k, v in self.metadata.items(): - # Skip internal metadata that shouldn't be displayed - if k != "part_of_previous": - # Format all metadata consistently as "key=value" - metadata_items.append(f"{k}={v}") - - if metadata_items: - # Use consistent separator (comma) for all metadata - metadata_str = ", ".join(metadata_items) - line_parts[-1] += f", {metadata_str}" - - # Construct the full line - line_content = " ".join(line_parts) - if current_prefix or line_content: - lines.append(current_prefix + line_content) - - # Render children - for i, child in enumerate(self.children): - is_last_child = i == len(self.children) - 1 - if is_root: - child_prefix = "" - else: - child_prefix = prefix + (" " if is_last else "│ ") - lines.extend(child.render(child_prefix, is_last_child, False)) - - return lines - - def print_tree(self) -> None: - """Print the tree to console""" - for line in self.render(): - print(line) - - def has_errors(self) -> bool: - """Check if this node or any of its children have errors""" - # Check if this node has an error - if self.status == NodeStatus.ERROR: - return True - - # Recursively check all children - for child in self.children: - if child.has_errors(): - return True - - return False - - def _replace_home_with_var(self, path: str) -> str: - """Replace home directory with $HOME in path.""" - home = os.path.expanduser("~") - if path.startswith(home): - return path.replace(home, "$HOME", 1) - return path - - def _is_inside_container(self) -> bool: - """Check if we're running inside a container.""" - # Check for common container indicators - container_indicators = [ - # Docker - os.path.exists("/.dockerenv"), - # Podman/containerd - os.path.exists("/run/.containerenv"), - # Check if cgroup contains docker/containerd - self._check_cgroup_for_container(), - # Check environment variables - os.environ.get("container") is not None, - os.environ.get("DOCKER_CONTAINER") is not None, - ] - return any(container_indicators) - - def _check_cgroup_for_container(self) -> bool: - """Check cgroup for container indicators.""" - try: - with open("/proc/1/cgroup", "r") as f: - content = f.read() - return any( - indicator in content.lower() - for indicator in ["docker", "containerd", "podman", "lxc"] - ) - except Exception: - return False - - def _get_gpu_container_remedies(self) -> str: - """Get remedies for GPU issues when running inside a container.""" - return "maybe try a docker restart?" - - def _format_timestamp_pdt(self, timestamp: float) -> str: - """Format timestamp as PDT time string.""" - dt_utc = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc) - # Convert to PDT (UTC-7) - dt_pdt = dt_utc - datetime.timedelta(hours=7) - return dt_pdt.strftime("%Y-%m-%d %H:%M:%S PDT") - - -class SystemInfo(NodeInfo): - """Root node for system information""" - - def __init__( - self, - hostname: Optional[str] = None, - thorough_check: bool = False, - terse: bool = False, - ): - self.thorough_check = thorough_check - self.terse = terse - if hostname is None: - hostname = platform.node() - - # Get IP address - ip_address = self._get_ip_address() - - # Format label with hostname and IP - if ip_address: - label = f"System info (hostname={hostname}, IP={ip_address})" - else: - label = f"System info (hostname={hostname})" - - super().__init__(label=label, status=NodeStatus.INFO) - - # Suppress Prometheus endpoint warnings from planner module - self._suppress_planner_warnings() - - # Collect and add all system information - # Always show: OS, User, GPU, Framework, Dynamo - self.add_child(OSInfo()) - self.add_child(UserInfo()) - - # Add GPU info (always show, even if not found) - gpu_info = GPUInfo() - self.add_child(gpu_info) - - # Add Framework info (vllm, sglang, tensorrt_llm) - self.add_child(FrameworkInfo()) - - # In terse mode, only add other components if they have errors - if not self.terse: - # Add file permissions check - self.add_child(FilePermissionsInfo(thorough_check=self.thorough_check)) - - # Add Cargo (always show, even if not found) - self.add_child(CargoInfo(thorough_check=self.thorough_check)) - - # Add Maturin (Python-Rust build tool) - self.add_child(MaturinInfo()) - - # Add Python info - self.add_child(PythonInfo()) - else: - # In terse mode, only add components that have errors - self._add_error_only_components() - - # Add Dynamo workspace info (always show, even if not found) - self.add_child(DynamoInfo(thorough_check=self.thorough_check)) - - def _get_ip_address(self) -> Optional[str]: - """Get the primary IP address of the system.""" - try: - import socket - - # Get hostname - hostname = socket.gethostname() - # Get IP address - ip_address = socket.gethostbyname(hostname) - # Filter out localhost - if ip_address.startswith("127."): - # Try to get external IP by connecting to a public DNS - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - try: - # Connect to Google DNS (doesn't actually send data) - s.connect(("8.8.8.8", 80)) - ip_address = s.getsockname()[0] - finally: - s.close() - return ip_address - except Exception: - return None - - def _suppress_planner_warnings(self) -> None: - """Suppress Prometheus endpoint warnings from planner module during import testing.""" - # The planner module logs a warning about Prometheus endpoint when imported - # outside of a Kubernetes cluster. Suppress this for cleaner output. - planner_logger = logging.getLogger("dynamo.planner.defaults") - planner_logger.setLevel(logging.ERROR) - # Also suppress the defaults._get_default_prometheus_endpoint logger - defaults_logger = logging.getLogger("defaults._get_default_prometheus_endpoint") - defaults_logger.setLevel(logging.ERROR) - - def _add_error_only_components(self) -> None: - """In terse mode, only add components that have errors""" - # Create components and check their status - components_to_check = [ - ("File System", FilePermissionsInfo(thorough_check=self.thorough_check)), - ("Cargo", CargoInfo(thorough_check=self.thorough_check)), - ("Maturin", MaturinInfo()), - ("Python", PythonInfo()), - ] - - for name, component in components_to_check: - # Only add if the component has an error status - if component.status == NodeStatus.ERROR: - self.add_child(component) - - -class UserInfo(NodeInfo): - """User information""" - - def __init__(self): - # Get user info - username = os.getenv("USER") or os.getenv("LOGNAME") or "unknown" - if username == "unknown": - try: - import pwd - - username = pwd.getpwuid(os.getuid()).pw_name - except Exception: - try: - import subprocess - - result = subprocess.run( - ["whoami"], capture_output=True, text=True, timeout=5 - ) - if result.returncode == 0: - username = result.stdout.strip() - except Exception: - pass - uid = os.getuid() - gid = os.getgid() - - desc = f"user={username}, uid={uid}, gid={gid}" - - # Add warning if running as root - status = NodeStatus.WARNING if uid == 0 else NodeStatus.INFO - if uid == 0: - desc += " ⚠️" - - super().__init__(label="User info", desc=desc, status=status) - - -class OSInfo(NodeInfo): - """Operating system information""" - - def __init__(self): - # Collect OS information - uname = platform.uname() - - # Try to get distribution info - distro = "" - version = "" - try: - if os.path.exists("/etc/os-release"): - with open("/etc/os-release", "r") as f: - for line in f: - if line.startswith("NAME="): - distro = line.split("=", 1)[1].strip().strip('"') - elif line.startswith("VERSION="): - version = line.split("=", 1)[1].strip().strip('"') - except Exception: - pass - - # Get memory info - mem_used_gb = None - mem_total_gb = None - try: - with open("/proc/meminfo", "r") as f: - meminfo = {} - for line in f: - if ":" in line: - k, v = line.split(":", 1) - meminfo[k.strip()] = v.strip() - - if "MemTotal" in meminfo and "MemAvailable" in meminfo: - total_kb = float(meminfo["MemTotal"].split()[0]) - avail_kb = float(meminfo["MemAvailable"].split()[0]) - mem_used_gb = (total_kb - avail_kb) / (1024 * 1024) - mem_total_gb = total_kb / (1024 * 1024) - except Exception: - pass - - # Get CPU cores - cores = os.cpu_count() - - # Build the value string - if distro: - value = f"{distro} {version} ({uname.system} {uname.release} {uname.machine})".strip() - else: - value = f"{uname.system} {uname.release} {uname.machine}" - - super().__init__(label="OS", desc=value, status=NodeStatus.INFO) - - # Add memory and cores as metadata - if mem_used_gb is not None and mem_total_gb is not None: - self.add_metadata("Memory", f"{mem_used_gb:.1f}/{mem_total_gb:.1f} GiB") - if mem_total_gb > 0 and (mem_used_gb / mem_total_gb) >= 0.9: - self.status = NodeStatus.WARNING - if cores: - self.add_metadata("Cores", str(cores)) - - -class GPUInfo(NodeInfo): - """NVIDIA GPU information""" - - def __init__(self): - # Find nvidia-smi executable (check multiple paths) - nvidia_smi = shutil.which("nvidia-smi") - if not nvidia_smi: - # Check common paths if `which` fails - for candidate in [ - "/usr/bin/nvidia-smi", - "/usr/local/bin/nvidia-smi", - "/usr/local/nvidia/bin/nvidia-smi", - ]: - if os.path.exists(candidate) and os.access(candidate, os.X_OK): - nvidia_smi = candidate - break - - if not nvidia_smi: - super().__init__( - label="NVIDIA GPU", desc="nvidia-smi not found", status=NodeStatus.ERROR - ) - return - - try: - # Get GPU list - result = subprocess.run( - [nvidia_smi, "-L"], capture_output=True, text=True, timeout=10 - ) - - if result.returncode != 0: - # Extract and process error message from stderr or stdout - error_msg = "nvidia-smi failed" - - # Try stderr first, then stdout - for output in [result.stderr, result.stdout]: - if output and output.strip(): - error_lines = output.strip().splitlines() - if error_lines: - error_msg = error_lines[0].strip() - break - - # Handle NVML-specific errors - if "Failed to initialize NVML" in error_msg: - error_msg = "No NVIDIA GPU detected (NVML initialization failed)" - # Add docker restart suggestion specifically for NVML failures in containers - if self._is_inside_container(): - error_msg += " - maybe try a docker restart?" - - super().__init__( - label="NVIDIA GPU", desc=error_msg, status=NodeStatus.ERROR - ) - return - - # Parse GPU names - gpu_names = [] - lines = result.stdout.strip().splitlines() - for line in lines: - # Example: "GPU 0: NVIDIA A100-SXM4-40GB (UUID: GPU-...)" - if ":" in line: - gpu_name = line.split(":", 1)[1].split("(")[0].strip() - gpu_names.append(gpu_name) - - # Check for zero GPUs - if not gpu_names: - # Get driver and CUDA even for zero GPUs - driver, cuda = self._get_driver_cuda_versions(nvidia_smi) - driver_cuda_str = "" - if driver or cuda: - parts = [] - if driver: - parts.append(f"driver {driver}") - if cuda: - parts.append(f"CUDA {cuda}") - driver_cuda_str = f", {', '.join(parts)}" - super().__init__( - label="NVIDIA GPU", - desc=f"not detected{driver_cuda_str}", - status=NodeStatus.ERROR, - ) - return - - # Get driver and CUDA versions - driver, cuda = self._get_driver_cuda_versions(nvidia_smi) - - # Handle single vs multiple GPUs - if len(gpu_names) == 1: - # Single GPU - compact format - value = gpu_names[0] - if driver or cuda: - driver_cuda = [] - if driver: - driver_cuda.append(f"driver {driver}") - if cuda: - driver_cuda.append(f"CUDA {cuda}") - value += f", {', '.join(driver_cuda)}" - - super().__init__(label="NVIDIA GPU", desc=value, status=NodeStatus.OK) - - # Add power and memory metadata for single GPU - self._add_power_memory_info(nvidia_smi, 0) - else: - # Multiple GPUs - show count in main label - value = f"{len(gpu_names)} GPUs" - if driver or cuda: - driver_cuda = [] - if driver: - driver_cuda.append(f"driver {driver}") - if cuda: - driver_cuda.append(f"CUDA {cuda}") - value += f", {', '.join(driver_cuda)}" - - super().__init__(label="NVIDIA GPU", desc=value, status=NodeStatus.OK) - - # Add each GPU as a child node - for i, name in enumerate(gpu_names): - gpu_child = NodeInfo( - label=f"GPU {i}", desc=name, status=NodeStatus.OK - ) - # Add power and memory for this specific GPU - power_mem = self._get_power_memory_string(nvidia_smi, i) - if power_mem: - gpu_child.add_metadata("Stats", power_mem) - self.add_child(gpu_child) - - except Exception: - super().__init__( - label="NVIDIA GPU", desc="detection failed", status=NodeStatus.ERROR - ) - - def _get_driver_cuda_versions( - self, nvidia_smi: str - ) -> Tuple[Optional[str], Optional[str]]: - """Get NVIDIA driver and CUDA versions using query method.""" - driver, cuda = None, None - try: - # Use query method for more reliable detection - result = subprocess.run( - [nvidia_smi, "--query-gpu=driver_version", "--format=csv,noheader"], - capture_output=True, - text=True, - timeout=10, - ) - if result.returncode == 0 and result.stdout.strip(): - driver = result.stdout.strip().splitlines()[0].strip() - - # Try to get CUDA version from nvidia-smi output - result = subprocess.run( - [nvidia_smi], capture_output=True, text=True, timeout=10 - ) - if result.returncode == 0: - import re - - m = re.search(r"CUDA Version:\s*([0-9.]+)", result.stdout) - if m: - cuda = m.group(1) - except Exception: - pass - return driver, cuda - - def _add_power_memory_info(self, nvidia_smi: str, gpu_index: int = 0): - """Add power and memory metadata for a specific GPU.""" - power_mem = self._get_power_memory_string(nvidia_smi, gpu_index) - if power_mem: - # Split into Power and Memory parts - if "; " in power_mem: - parts = power_mem.split("; ") - for part in parts: - if part.startswith("Power:"): - self.add_metadata("Power", part.replace("Power: ", "")) - elif part.startswith("Memory:"): - self.add_metadata("Memory", part.replace("Memory: ", "")) - - def _get_power_memory_string( - self, nvidia_smi: str, gpu_index: int = 0 - ) -> Optional[str]: - """Get power and memory info string for a specific GPU.""" - try: - result = subprocess.run( - [ - nvidia_smi, - "--query-gpu=power.draw,power.limit,memory.used,memory.total", - "--format=csv,noheader,nounits", - ], - capture_output=True, - text=True, - timeout=10, - ) - if result.returncode == 0 and result.stdout.strip(): - lines = result.stdout.strip().splitlines() - if gpu_index < len(lines): - parts = lines[gpu_index].split(",") - if len(parts) >= 4: - power_draw = parts[0].strip() - power_limit = parts[1].strip() - mem_used = parts[2].strip() - mem_total = parts[3].strip() - - info_parts = [] - if power_draw and power_limit: - info_parts.append(f"Power: {power_draw}/{power_limit} W") - - if mem_used and mem_total: - # Add warning if memory usage is 90% or higher - warning = "" - try: - if float(mem_used) / float(mem_total) >= 0.9: - warning = " ⚠️" - except Exception: - pass - info_parts.append( - f"Memory: {mem_used}/{mem_total} MiB{warning}" - ) - - if info_parts: - return "; ".join(info_parts) - except Exception: - pass - return None - - -class FilePermissionsInfo(NodeInfo): - """File system check for development environment directories - - Checks writability of critical directories needed for: - - Dynamo development (top-level dynamo directory) - - Rust development (Cargo target directory + all files, RUSTUP_HOME, CARGO_HOME) - - Python development (site-packages) - - In thorough mode, also checks disk space for the dynamo working directory - and shows a warning if less than 10% free space is available. - - In fast mode, skips recursive file checking in Cargo target directory - for improved performance on large target directories. - """ - - def __init__(self, thorough_check: bool = False): - super().__init__(label="File System", status=NodeStatus.INFO) - self.thorough_check = thorough_check - - # Check top-level dynamo directory - self._check_dynamo_directory_permissions() - - # Check Rust toolchain directories (RUSTUP_HOME and CARGO_HOME) - self._check_rust_toolchain_permissions() - - # Check Cargo target directory (with optional recursive file checking) - self._check_cargo_target_permissions() - - # Check Python site-packages directory - self._check_site_packages_permissions() - - def _check_permissions_unified( - self, - candidate_paths: List[str], - label_prefix: str, - recursive: bool = False, - exclude_files: Optional[List[str]] = None, - ) -> List[NodeInfo]: - """Unified permission checking function - - Args: - candidate_paths: List of paths to check, uses first available one - label_prefix: Prefix for the node label - recursive: If True, check all files recursively; if False, check directory only - exclude_files: List of filenames to exclude from file checking (e.g., ['.git']) - - Returns: - List of NodeInfo objects for the results - """ - exclude_files = exclude_files or [] - results = [] - - # Find first available path - selected_path = None - for path in candidate_paths: - expanded_path = os.path.expanduser(path) - if os.path.exists(expanded_path): - selected_path = expanded_path - break - - if not selected_path: - # No paths exist - path_list = ", ".join(candidate_paths) - results.append( - NodeInfo( - label=f"{label_prefix} (tried: {path_list})", - desc="No candidate paths exist", - status=NodeStatus.ERROR, - ) - ) - return results - - try: - # Check if it's actually a directory - if not os.path.isdir(selected_path): - results.append( - NodeInfo( - label=f"{label_prefix} ({self._replace_home_with_var(selected_path)})", - desc="Path is not a directory", - status=NodeStatus.ERROR, - ) - ) - return results - - # Check if directory is effectively writable - if not self._is_effectively_writable(selected_path): - results.append( - NodeInfo( - label=f"{label_prefix} ({self._replace_home_with_var(selected_path)})", - desc="Directory not writable", - status=NodeStatus.ERROR, - ) - ) - return results - - if not recursive: - # Just check directory writability - # Check if running as root but directory is not owned by root - is_root = os.getuid() == 0 - is_root_owned = False - warning_symbol = "" - desc_text = "writable" - owner_name = None - - if is_root: - try: - stat_info = os.stat(selected_path) - is_root_owned = stat_info.st_uid == 0 - if not is_root_owned: - warning_symbol = " ⚠️" - # Get the owner name - try: - import pwd - - owner_name = pwd.getpwuid(stat_info.st_uid).pw_name - except Exception: - owner_name = f"uid={stat_info.st_uid}" - desc_text = f"writable (owned by {owner_name or 'root'})" - except Exception: - desc_text = "writable (owned by unknown)" - - # Add disk space info in thorough mode - status = NodeStatus.OK # Default status - if self.thorough_check: - disk_space, disk_warning = self._format_disk_space(selected_path) - desc_text += disk_space - # Override status if disk space is low - if disk_warning: - status = disk_warning - - results.append( - NodeInfo( - label=f"{label_prefix} ({self._replace_home_with_var(selected_path)}){warning_symbol}", - desc=desc_text, - status=status, - ) - ) - else: - # Check files recursively - ( - total_files, - non_writable_files, - non_writable_list, - ) = self._count_writable_files( - selected_path, recursive=True, exclude_files=exclude_files - ) - - # Create description based on results - desc, status = self._create_file_count_description( - total_files, non_writable_files, "files" - ) - - # Check if running as root but directory is not owned by root - is_root = os.getuid() == 0 - is_root_owned = False - warning_symbol = "" - owner_name = None - - if is_root: - try: - stat_info = os.stat(selected_path) - is_root_owned = stat_info.st_uid == 0 - if not is_root_owned: - warning_symbol = " ⚠️" - # Get the owner name - try: - import pwd - - owner_name = pwd.getpwuid(stat_info.st_uid).pw_name - except Exception: - owner_name = f"uid={stat_info.st_uid}" - # Modify description to indicate ownership - if "writable" in desc: - desc = desc.replace( - "writable", - f"writable (owned by {owner_name or 'root'})", - ) - except Exception: - # Modify description to indicate ownership - if "writable" in desc: - desc = desc.replace( - "writable", "writable (owned by unknown)" - ) - - # Add disk space info in thorough mode - if self.thorough_check: - disk_space, disk_warning = self._format_disk_space(selected_path) - desc += disk_space - # Override status if disk space is low - if disk_warning: - status = disk_warning - - results.append( - NodeInfo( - label=f"{label_prefix} ({self._replace_home_with_var(selected_path)}){warning_symbol}", - desc=desc, - status=status, - ) - ) - - # Add details for non-writable files if there are any (limit to first 10) - if non_writable_files > 0: - details_label = ( - f"Non-writable files (showing first 10 of {non_writable_files})" - ) - if non_writable_files <= 10: - details_label = f"Non-writable files ({non_writable_files})" - - details_node = NodeInfo( - label=details_label, - desc="; ".join(non_writable_list[:10]), - status=NodeStatus.WARNING, - ) - results.append(details_node) - - except Exception as e: - results.append( - NodeInfo( - label=f"{label_prefix} ({self._replace_home_with_var(selected_path)})", - desc=f"Permission check failed: {str(e)}", - status=NodeStatus.ERROR, - ) - ) - - return results - - def _is_effectively_writable(self, file_path: str) -> bool: - """Check if a file is effectively writable - - A file is considered effectively writable if: - 1. It's already writable (os.access check) - 2. We own the file (can chmod it) - 3. We are root (can do anything) - but only if os.access confirms write access - Note: Root may still be denied write access on NFS mounts due to root squashing - """ - try: - # First check if it's already writable - this works for all cases including NFS - if os.access(file_path, os.W_OK): - return True - - # Check if we own the file (and can therefore chmod it) - stat_info = os.stat(file_path) - if stat_info.st_uid == os.getuid(): - return True - - # For root, we still need to respect the os.access result - # Root privileges don't guarantee write access on NFS mounts - # If os.access(W_OK) returned False above, respect that even for root - return False - except Exception: - # If we can't stat the file, assume it's not writable - return False - - def _count_writable_files( - self, - directory: str, - recursive: bool = False, - exclude_files: Optional[List[str]] = None, - ) -> Tuple[int, int, List[str]]: - """Count total files and non-writable files in directory - - Returns: - Tuple of (total_files, non_writable_files, non_writable_list) - """ - exclude_files = exclude_files or [] - total_files = 0 - non_writable_files = 0 - non_writable_list = [] - - if recursive: - # Walk through all files in the directory tree recursively - for root, dirs, files in os.walk(directory): - for file in files: - file_path = os.path.join(root, file) - # Skip symbolic links - if os.path.islink(file_path): - continue - total_files += 1 - if not self._is_effectively_writable(file_path): - non_writable_files += 1 - rel_path = os.path.relpath(file_path, directory) - non_writable_list.append(rel_path) - else: - # Only check files in the immediate directory (non-recursive) - for item in os.listdir(directory): - if item in exclude_files: - continue - item_path = os.path.join(directory, item) - # Skip symbolic links and only check regular files - if os.path.isfile(item_path) and not os.path.islink(item_path): - total_files += 1 - try: - if not self._is_effectively_writable(item_path): - non_writable_files += 1 - non_writable_list.append(item) - except Exception: - non_writable_files += 1 - non_writable_list.append(item) - - return total_files, non_writable_files, non_writable_list - - def _create_file_count_description( - self, total_files: int, non_writable_files: int, context: str = "files" - ) -> Tuple[str, NodeStatus]: - """Create description and status for file count results""" - if total_files == 0: - return f"writable, no {context} found", NodeStatus.INFO - elif non_writable_files == 0: - return f"writable, all {total_files} {context} writable", NodeStatus.OK - else: - return ( - f"writable, {non_writable_files} of {total_files} {context} not writable", - NodeStatus.WARNING, - ) - - def _get_cargo_target_path_candidates(self) -> List[str]: - """Get candidate paths for cargo target directory""" - candidates = [] - - # Try to get target directory from cargo metadata (most accurate) - try: - result = subprocess.run( - ["cargo", "metadata", "--format-version=1", "--no-deps"], - capture_output=True, - text=True, - timeout=10, - cwd=".", - ) - if result.returncode == 0: - import json - - metadata = json.loads(result.stdout) - target_path = metadata.get("target_directory") - if target_path: - candidates.append(target_path) - except Exception: - pass - - # Add fallback candidates - cargo_target = os.environ.get("CARGO_TARGET_DIR") - if cargo_target: - candidates.append(cargo_target) - - candidates.append("~/.cargo/target") - return candidates - - def _check_dynamo_directory_permissions(self): - """Check top-level dynamo directory and key files writability""" - # Use the existing workspace detection logic - dynamo_root = DynamoInfo.find_workspace() - - if not dynamo_root: - self.add_child( - NodeInfo( - label="Dynamo workspace", - desc="workspace not found", - status=NodeStatus.ERROR, - ) - ) - return - - if not DynamoInfo.is_dynamo_workspace(dynamo_root): - self.add_child( - NodeInfo( - label="Dynamo workspace", - desc="not a valid dynamo workspace", - status=NodeStatus.ERROR, - ) - ) - return - - # Check dynamo root directory and files (exclude .git) - recursive = self.thorough_check - results = self._check_permissions_unified( - [dynamo_root], - "Dynamo workspace", - recursive=recursive, - exclude_files=[".git"], - ) - for result in results: - self.add_child(result) - - # Check .git directory separately - git_dir = os.path.join(dynamo_root, ".git") - if os.path.exists(git_dir): - git_results = self._check_permissions_unified( - [git_dir], "Dynamo .git directory", recursive=recursive - ) - for result in git_results: - self.add_child(result) - else: - self.add_child( - NodeInfo( - label="Dynamo .git directory", - desc="not available", - status=NodeStatus.WARNING, - ) - ) - - def _check_site_packages_permissions(self): - """Check site-packages directory writability""" - try: - import site - - # Get all candidate site-packages directories - site_packages_dirs = site.getsitepackages() - user_site = site.getusersitepackages() - if user_site: - site_packages_dirs.append(user_site) - - # Check each existing site-packages directory - recursive = self.thorough_check - for site_dir in site_packages_dirs: - if os.path.exists(site_dir): - results = self._check_permissions_unified( - [site_dir], "site-packages", recursive=recursive - ) - for result in results: - self.add_child(result) - - except Exception as e: - self.add_child( - NodeInfo( - label="Python site-packages", - desc=f"Permission check failed: {str(e)}", - status=NodeStatus.ERROR, - ) - ) - - def _check_cargo_target_permissions(self): - """Check Cargo target directory writability and file permissions""" - candidates = self._get_cargo_target_path_candidates() - recursive = self.thorough_check - results = self._check_permissions_unified( - candidates, "Cargo target", recursive=recursive - ) - - if not results or ( - len(results) == 1 - and results[0].status == NodeStatus.ERROR - and results[0].desc is not None - and "No candidate paths exist" in results[0].desc - ): - # No paths exist - show warning instead of error - self.add_child( - NodeInfo( - label="Cargo target", - desc="Path does not exist", - status=NodeStatus.WARNING, - ) - ) - else: - for result in results: - self.add_child(result) - - def _check_rust_toolchain_permissions(self): - """Check RUSTUP_HOME and CARGO_HOME directory writability - - These directories need recursive checking because: - - RUSTUP_HOME: rustup needs to write toolchain files, documentation, etc. - - CARGO_HOME: cargo needs to write registry cache, git repos, binaries, etc. - """ - # Check RUSTUP_HOME - rustup_env = os.environ.get("RUSTUP_HOME") - rustup_candidates = [rustup_env] if rustup_env is not None else [] - rustup_candidates.append("~/.rustup") - - recursive = self.thorough_check - rustup_results = self._check_permissions_unified( - rustup_candidates, "Rustup home", recursive=recursive - ) - for result in rustup_results: - self.add_child(result) - - # Check CARGO_HOME - cargo_env = os.environ.get("CARGO_HOME") - cargo_candidates = [cargo_env] if cargo_env is not None else [] - cargo_candidates.append("~/.cargo") - - cargo_results = self._check_permissions_unified( - cargo_candidates, "Cargo home", recursive=recursive - ) - for result in cargo_results: - self.add_child(result) - - def _format_disk_space(self, path: str) -> Tuple[str, Optional[NodeStatus]]: - """Format disk space information for a given path - - Returns: - Tuple of (formatted_string, warning_status_if_low_space) - """ - try: - # Get disk usage statistics - statvfs = os.statvfs(path) - - # Calculate sizes in bytes - total_bytes = statvfs.f_frsize * statvfs.f_blocks - free_bytes = statvfs.f_frsize * statvfs.f_bavail - used_bytes = total_bytes - free_bytes - - # Convert to human readable format - def format_bytes(bytes_val): - """Convert bytes to human readable format""" - for unit in ["B", "KB", "MB", "GB", "TB"]: - if bytes_val < 1024.0: - return f"{bytes_val:.1f} {unit}" - bytes_val /= 1024.0 - return f"{bytes_val:.1f} PB" - - # Calculate percentage used - percent_used = (used_bytes / total_bytes) * 100 - percent_free = 100 - percent_used - - formatted_string = f", {format_bytes(used_bytes)}/{format_bytes(total_bytes)} ({percent_used:.1f}% used)" - - # Return warning status if less than 10% free space - warning_status = NodeStatus.WARNING if percent_free < 10 else None - - return formatted_string, warning_status - - except Exception: - return "", None - - -class CargoInfo(NodeInfo): - """Cargo tool information""" - - def __init__(self, thorough_check: bool = False): - self.thorough_check = thorough_check - cargo_path = shutil.which("cargo") - cargo_version = None - - # Get cargo version - if cargo_path: - try: - result = subprocess.run( - ["cargo", "--version"], capture_output=True, text=True, timeout=5 - ) - if result.returncode == 0: - cargo_version = result.stdout.strip() - except Exception: - pass - - if not cargo_path and not cargo_version: - super().__init__( - label="Cargo", - desc="not found, install Rust toolchain to see cargo target directory", - status=NodeStatus.ERROR, - ) - return - - # Initialize with cargo path and version - value = "" - if cargo_path: - value = self._replace_home_with_var(cargo_path) - if cargo_version: - value += f", {cargo_version}" if value else cargo_version - - super().__init__(label="Cargo", desc=value, status=NodeStatus.OK) - - # Get cargo home directory from the environment (may not exist, which is OK) - cargo_home_env = os.environ.get("CARGO_HOME") - if cargo_home_env: - cargo_home = cargo_home_env - home_value = f"CARGO_HOME={self._replace_home_with_var(cargo_home)}" - else: - cargo_home = os.path.expanduser("~/.cargo") - home_value = ( - f"CARGO_HOME=, using {self._replace_home_with_var(cargo_home)}" - ) - - if cargo_home and os.path.exists(cargo_home): - status = NodeStatus.INFO - else: - home_value += " (directory does not exist)" - status = NodeStatus.WARNING - - home_node = NodeInfo( - label="Cargo home directory", desc=home_value, status=status - ) - self.add_child(home_node) - - # Get cargo target directory - cargo_target_env = os.environ.get("CARGO_TARGET_DIR") - cargo_target = self._get_cargo_target_directory() - - # Calculate total directory size (only if thorough check and directory exists) - size_str = "" - if cargo_target and os.path.exists(cargo_target) and self.thorough_check: - total_size_gb = self._get_directory_size_gb(cargo_target) - size_str = f", {total_size_gb:.1f} GB" if total_size_gb is not None else "" - - # Format the display value - if cargo_target_env: - display_cargo_target = ( - self._replace_home_with_var(cargo_target) if cargo_target else "unknown" - ) - target_value = f"CARGO_TARGET_DIR={display_cargo_target}{size_str}" - else: - display_cargo_target = ( - self._replace_home_with_var(cargo_target) if cargo_target else "unknown" - ) - target_value = ( - f"CARGO_TARGET_DIR=, using {display_cargo_target}{size_str}" - ) - - # Check directory existence and set status - if cargo_target and os.path.exists(cargo_target): - status = NodeStatus.INFO - target_node = NodeInfo( - label="Cargo target directory", - desc=target_value, - status=status, - ) - self.add_child(target_node) - # Add debug/release/binary info as children of target directory - self._add_build_info(target_node, cargo_target) - else: - target_value += " (directory does not exist)" - status = NodeStatus.WARNING if cargo_target_env else NodeStatus.INFO - target_node = NodeInfo( - label="Cargo target directory", - desc=target_value, - status=status, - ) - self.add_child(target_node) - - def _get_directory_size_gb(self, directory: str) -> Optional[float]: - """Get the size of a directory in GB.""" - try: - # Use du command to get directory size in bytes - result = subprocess.run( - ["du", "-sb", directory], capture_output=True, text=True, timeout=30 - ) - if result.returncode == 0: - # Parse output: "size_in_bytes\tdirectory_path" - size_bytes = int(result.stdout.split()[0]) - # Convert to GB - size_gb = size_bytes / (1024**3) - return size_gb - except Exception: - pass - return None - - def _get_cargo_target_directory(self) -> Optional[str]: - """Get cargo target directory using cargo metadata.""" - try: - # Use DynamoInfo's static method to find workspace - workspace_dir = DynamoInfo.find_workspace() - - # Run cargo metadata command to get target directory - cmd_args = ["cargo", "metadata", "--format-version=1", "--no-deps"] - kwargs: Dict[str, Any] = { - "capture_output": True, - "text": True, - "timeout": 10, - } - - # Add cwd if workspace_dir was found - if workspace_dir and os.path.isdir(workspace_dir): - kwargs["cwd"] = workspace_dir - - result = subprocess.run(cmd_args, **kwargs) - - if result.returncode == 0: - # Parse JSON output to extract target_directory - metadata = json.loads(result.stdout) - return metadata.get("target_directory") - except Exception: - pass - return None - - def _add_build_info(self, parent_node: NodeInfo, cargo_target: str): - """Add debug/release/binary information as children of target directory.""" - debug_dir = os.path.join(cargo_target, "debug") - release_dir = os.path.join(cargo_target, "release") - - # Check debug directory - if os.path.exists(debug_dir): - display_debug = self._replace_home_with_var(debug_dir) - debug_value = display_debug - - # Add size (only if thorough check) - if self.thorough_check: - debug_size_gb = self._get_directory_size_gb(debug_dir) - if debug_size_gb is not None: - debug_value += f", {debug_size_gb:.1f} GB" - - try: - debug_mtime = os.path.getmtime(debug_dir) - debug_time = self._format_timestamp_pdt(debug_mtime) - debug_value += f", modified={debug_time}" - except Exception: - debug_value += " (unable to read timestamp)" - - debug_node = NodeInfo( - label="Debug", desc=debug_value, status=NodeStatus.INFO - ) - parent_node.add_child(debug_node) - - # Check release directory - if os.path.exists(release_dir): - display_release = self._replace_home_with_var(release_dir) - release_value = display_release - - # Add size (only if thorough check) - if self.thorough_check: - release_size_gb = self._get_directory_size_gb(release_dir) - if release_size_gb is not None: - release_value += f", {release_size_gb:.1f} GB" - - try: - release_mtime = os.path.getmtime(release_dir) - release_time = self._format_timestamp_pdt(release_mtime) - release_value += f", modified={release_time}" - except Exception: - release_value += " (unable to read timestamp)" - - release_node = NodeInfo( - label="Release", desc=release_value, status=NodeStatus.INFO - ) - parent_node.add_child(release_node) - - # Find *.so file - so_file = self._find_so_file(cargo_target) - if so_file: - display_so = self._replace_home_with_var(so_file) - so_value = display_so - - # Add file size (only if thorough check) - if self.thorough_check: - try: - file_size_bytes = os.path.getsize(so_file) - file_size_mb = file_size_bytes / (1024**2) - so_value += f", {file_size_mb:.1f} MB" - except Exception: - pass - - try: - so_mtime = os.path.getmtime(so_file) - so_time = self._format_timestamp_pdt(so_mtime) - so_value += f", modified={so_time}" - except Exception: - so_value += " (unable to read timestamp)" - - binary_node = NodeInfo( - label="Binary", desc=so_value, status=NodeStatus.INFO - ) - parent_node.add_child(binary_node) - - def _find_so_file(self, target_directory: str) -> Optional[str]: - """Find the compiled *.so file in target directory.""" - # Check common locations for .so files - search_dirs = [ - os.path.join(target_directory, "debug"), - os.path.join(target_directory, "release"), - target_directory, - ] - - for search_dir in search_dirs: - if not os.path.exists(search_dir): - continue - - # Walk through directory looking for .so files - try: - for root, dirs, files in os.walk(search_dir): - for file in files: - if file.endswith(".so"): - return os.path.join(root, file) - # Don't recurse too deep - if root.count(os.sep) - search_dir.count(os.sep) > 2: - dirs[:] = [] # Stop recursion - except Exception: - pass - - return None - - -class MaturinInfo(NodeInfo): - """Maturin tool information (Python-Rust build tool)""" - - def __init__(self): - maturin_path = shutil.which("maturin") - if not maturin_path: - super().__init__(label="Maturin", desc="not found", status=NodeStatus.ERROR) - # Add installation hint as a child node - install_hint = NodeInfo( - label="Install with", - desc="uv pip install maturin[patchelf]", - status=NodeStatus.INFO, - ) - self.add_child(install_hint) - return - - try: - result = subprocess.run( - ["maturin", "--version"], capture_output=True, text=True, timeout=5 - ) - if result.returncode == 0: - version = result.stdout.strip() - # Include the maturin binary path like Cargo and Git do - display_maturin_path = self._replace_home_with_var(maturin_path) - super().__init__( - label="Maturin", - desc=f"{display_maturin_path}, {version}", - status=NodeStatus.OK, - ) - return - except Exception: - pass - - super().__init__(label="Maturin", desc="not found", status=NodeStatus.ERROR) - - -class PythonInfo(NodeInfo): - """Python installation information""" - - def __init__(self): - py_version = platform.python_version() - py_exec = sys.executable or "python" - display_py_exec = self._replace_home_with_var(py_exec) - - super().__init__( - label="Python", - desc=f"{py_version}, {display_py_exec}", - status=NodeStatus.OK if os.path.exists(py_exec) else NodeStatus.ERROR, - ) - - # Check for PyTorch (optional) - try: - torch = __import__("torch") - version = getattr(torch, "__version__", "installed") - - # Check CUDA availability - cuda_status = None - if hasattr(torch, "cuda"): - try: - cuda_available = torch.cuda.is_available() - cuda_status = ( - "✅torch.cuda.is_available" - if cuda_available - else "❌torch.cuda.is_available" - ) - except Exception: - pass - - # Get installation path - install_path = None - if hasattr(torch, "__file__") and torch.__file__: - file_path = torch.__file__ - if "site-packages" in file_path: - parts = file_path.split(os.sep) - for i, part in enumerate(parts): - if part == "site-packages": - install_path = os.sep.join(parts[: i + 1]) - break - elif file_path: - install_path = os.path.dirname(file_path) - - if install_path: - install_path = self._replace_home_with_var(install_path) - - package_info = PythonPackageInfo( - package_name="PyTorch", - version=version, - cuda_status=cuda_status, - install_path=install_path, - is_framework=False, - ) - self.add_child(package_info) - except ImportError: - pass # PyTorch is optional, don't show if not installed - - # Add PYTHONPATH - pythonpath = os.environ.get("PYTHONPATH", "") - self.add_child(PythonPathInfo(pythonpath)) - - -class FrameworkInfo(NodeInfo): - """LLM Framework information""" - - def __init__(self): - super().__init__(label="🤖Framework", status=NodeStatus.INFO) - - # Check for framework packages (mandatory to show) - frameworks_to_check = [ - ("vllm", "vLLM"), - ("sglang", "Sglang"), - ("tensorrt_llm", "tensorRT LLM"), - ] - - frameworks_found = 0 - - for module_name, display_name in frameworks_to_check: - # Regular import for all frameworks - try: - module = __import__(module_name) - version = getattr(module, "__version__", "installed") - frameworks_found += 1 - - # Get module path - module_path = None - if hasattr(module, "__file__") and module.__file__: - module_path = self._replace_home_with_var(module.__file__) - - # Get executable path - exec_path = None - exec_path_raw = shutil.which(module_name) - if exec_path_raw: - exec_path = self._replace_home_with_var(exec_path_raw) - - package_info = PythonPackageInfo( - package_name=display_name, - version=version, - module_path=module_path, - exec_path=exec_path, - is_framework=True, - is_installed=True, - ) - self.add_child(package_info) - except (ImportError, Exception): - # Framework not installed - don't add it - pass - - # If no frameworks found, set status to ERROR (X) and show what's missing - if frameworks_found == 0: - self.status = NodeStatus.ERROR - # List all the frameworks that were checked but not found - missing_frameworks = [] - for module_name, display_name in frameworks_to_check: - missing_frameworks.append(f"no {module_name}") - missing_text = ", ".join(missing_frameworks) - self.desc = missing_text - - -class PythonPackageInfo(NodeInfo): - """Python package information""" - - def __init__( - self, - package_name: str, - version: str, - cuda_status: Optional[str] = None, - module_path: Optional[str] = None, - exec_path: Optional[str] = None, - install_path: Optional[str] = None, - is_framework: bool = False, - is_installed: bool = True, - ): - # Build display value - display_value = version - - # Determine status based on whether package is installed - if not is_installed or version == "-": - # Framework not found - show with "-" and use UNKNOWN status for ❓ symbol - display_value = "-" - status = NodeStatus.UNKNOWN # Show ❓ for not found frameworks - else: - status = NodeStatus.OK - - # Add CUDA status for PyTorch - if cuda_status: - display_value = f"{version}, {cuda_status}" - # Don't add install path for PyTorch with CUDA status - # For frameworks, add module and exec paths - elif is_framework and (module_path or exec_path): - parts = [version] - if module_path: - parts.append(f"module={module_path}") - if exec_path: - parts.append(f"exec={exec_path}") - display_value = ", ".join(parts) - # For regular packages, add install path - elif install_path: - display_value = f"{version} ({install_path})" - - super().__init__(label=package_name, desc=display_value, status=status) - - -class PythonPathInfo(NodeInfo): - """PYTHONPATH environment variable information""" - - def __init__(self, pythonpath: str): - if pythonpath: - # Split by colon and replace home in each path - paths = pythonpath.split(":") - display_paths = [] - has_invalid_paths = False - - for p in paths: - display_path = self._replace_home_with_var(p) - # Check if path exists and is accessible - if not os.path.exists(p) or not os.access(p, os.R_OK): - display_paths.append( - f"{Colors.BRIGHT_RED}{display_path}{Colors.RESET}" - ) # Bright red path - has_invalid_paths = True - else: - display_paths.append(display_path) - - display_pythonpath = ":".join(display_paths) - status = NodeStatus.WARNING if has_invalid_paths else NodeStatus.INFO - else: - display_pythonpath = "not set" - status = ( - NodeStatus.INFO - ) # PYTHONPATH not set is fine with editable installs - - super().__init__(label="PYTHONPATH", desc=display_pythonpath, status=status) - - -class DynamoRuntimeInfo(NodeInfo): - """Dynamo runtime components information""" - - def __init__(self, workspace_dir: str, thorough_check: bool = False): - self.thorough_check = thorough_check - # Try to get package version - import importlib.metadata - - try: - version = importlib.metadata.version("ai-dynamo-runtime") - runtime_value = f"ai-dynamo-runtime {version}" - is_installed = True - except Exception: - runtime_value = "ai-dynamo-runtime - Not installed" - is_installed = False - - super().__init__( - label="Runtime components", - desc=runtime_value, - status=NodeStatus.INFO, # Will update based on components found - ) - - # Add package info if installed - if is_installed: - # Add dist-info directory - dist_info = self._find_dist_info() - if dist_info: - self.add_child(dist_info) - - # Add .pth file - pth_file = self._find_pth_file() - if pth_file: - self.add_child(pth_file) - - # Discover runtime components from source - components = self._discover_runtime_components(workspace_dir) - - # Find where each component actually is and add them - if components: - # Calculate max width for alignment - max_len = max(len(comp) for comp in components) - - components_found = False - for component in components: - try: - # Try to import to find actual location - module = __import__(component, fromlist=[""]) - module_path = getattr(module, "__file__", None) - - if module_path: - # Add timestamp for .so files - timestamp_str = "" - if module_path.endswith(".so"): - try: - stat = os.stat(module_path) - timestamp = self._format_timestamp_pdt(stat.st_mtime) - timestamp_str = f", modified={timestamp}" - except Exception: - pass - - display_path = self._replace_home_with_var(module_path) - padded_name = f"{component:<{max_len}}" - module_node = NodeInfo( - label=f"✅ {padded_name}", - desc=f"{display_path}{timestamp_str}", - status=NodeStatus.NONE, - ) - self.add_child(module_node) - components_found = True - except ImportError as e: - # Module not importable - show as error - padded_name = f"{component:<{max_len}}" - error_msg = str(e) if str(e) else "Import failed" - module_node = NodeInfo( - label=padded_name, desc=error_msg, status=NodeStatus.ERROR - ) - self.add_child(module_node) - # Don't set components_found to True for failed imports - - # Update status and value based on whether we found components - if components_found: - self.status = NodeStatus.OK - # If not installed but components work via PYTHONPATH, update the message - if not is_installed: - self.desc = "ai-dynamo-runtime (via PYTHONPATH)" - else: - self.status = NodeStatus.ERROR - else: - # No components discovered at all - self.status = NodeStatus.ERROR - - # Final check: if no children at all (no components found), ensure it's an error - if not self.children: - self.status = NodeStatus.ERROR - - def _discover_runtime_components(self, workspace_dir: str) -> list: - """Discover ai-dynamo-runtime components from filesystem. - - Returns: - List of runtime component module names - Example: ['dynamo._core', 'dynamo.nixl_connect', 'dynamo.llm', 'dynamo.runtime'] - - Note: Always includes 'dynamo._core' (compiled Rust module), then scans - lib/bindings/python/src/dynamo/ for additional components. - """ - components = ["dynamo._core"] # Always include compiled Rust module - - if not workspace_dir: - return components - - # Scan runtime components (llm, runtime, nixl_connect, etc.) - runtime_path = os.path.join(workspace_dir, "lib/bindings/python/src/dynamo") - if not os.path.exists(runtime_path): - return components - - for item in os.listdir(runtime_path): - item_path = os.path.join(runtime_path, item) - if os.path.isdir(item_path) and os.path.exists( - os.path.join(item_path, "__init__.py") - ): - components.append(f"dynamo.{item}") - - return components - - def _find_dist_info(self) -> Optional[NodeInfo]: - """Find the dist-info directory for ai-dynamo-runtime.""" - import site - - for site_dir in site.getsitepackages(): - pattern = os.path.join(site_dir, "ai_dynamo_runtime*.dist-info") - matches = glob.glob(pattern) - if matches: - path = matches[0] - display_path = self._replace_home_with_var(path) - try: - stat = os.stat(path) - timestamp = self._format_timestamp_pdt(stat.st_ctime) - return NodeInfo( - label=f" {display_path}", - desc=f"created={timestamp}", - status=NodeStatus.INFO, - metadata={"part_of_previous": True}, - ) - except Exception: - return NodeInfo( - label=f" {display_path}", - status=NodeStatus.INFO, - metadata={"part_of_previous": True}, - ) - return None - - def _find_pth_file(self) -> Optional[NodeInfo]: - """Find the .pth file for ai-dynamo-runtime.""" - import site - - for site_dir in site.getsitepackages(): - pth_path = os.path.join(site_dir, "ai_dynamo_runtime.pth") - if os.path.exists(pth_path): - display_path = self._replace_home_with_var(pth_path) - try: - stat = os.stat(pth_path) - timestamp = self._format_timestamp_pdt(stat.st_mtime) - node = NodeInfo( - label=f" {display_path}", - desc=f"modified={timestamp}", - status=NodeStatus.INFO, - metadata={"part_of_previous": True}, - ) - - # Read where it points to - with open(pth_path, "r") as f: - content = f.read().strip() - if content: - display_content = self._replace_home_with_var(content) - points_to = NodeInfo( - label="→", desc=display_content, status=NodeStatus.INFO - ) - node.add_child(points_to) - - return node - except Exception: - return NodeInfo(label=display_path, status=NodeStatus.INFO) - return None - - -class DynamoFrameworkInfo(NodeInfo): - """Dynamo framework components information""" - - def __init__(self, workspace_dir: str, thorough_check: bool = False): - self.thorough_check = thorough_check - # Try to get package version - import importlib.metadata - - try: - version = importlib.metadata.version("ai-dynamo") - framework_value = f"ai-dynamo {version}" - is_installed = True - except Exception: - framework_value = "ai-dynamo - Not installed" - is_installed = False - - super().__init__( - label="Framework components", - desc=framework_value, - status=NodeStatus.INFO, # Will update based on components found - ) - - # Add package info if installed - if is_installed: - import glob - import site - - for site_dir in site.getsitepackages(): - # Look specifically for ai_dynamo (not ai_dynamo_runtime) - dist_pattern = os.path.join(site_dir, "ai_dynamo-*.dist-info") - matches = glob.glob(dist_pattern) - if matches: - path = matches[0] - display_path = self._replace_home_with_var(path) - try: - stat = os.stat(path) - timestamp = self._format_timestamp_pdt(stat.st_ctime) - dist_node = NodeInfo( - label=f" {display_path}", - desc=f"created={timestamp}", - status=NodeStatus.INFO, - metadata={"part_of_previous": True}, - ) - self.add_child(dist_node) - except Exception: - dist_node = NodeInfo( - label=f" {display_path}", - status=NodeStatus.INFO, - metadata={"part_of_previous": True}, - ) - self.add_child(dist_node) - break - - # Discover framework components from source - components = self._discover_framework_components(workspace_dir) - - # Find where each component actually is and add them - if components: - # Sort components for consistent output - components.sort() - - # Calculate max width for alignment - max_len = max(len(comp) for comp in components) - - components_found = False - for component in components: - try: - # Try to import to find actual location - module = __import__(component, fromlist=[""]) - module_path = getattr(module, "__file__", None) - - if module_path: - display_path = self._replace_home_with_var(module_path) - padded_name = f"{component:<{max_len}}" - component_node = NodeInfo( - label=f"✅ {padded_name}", - desc=display_path, - status=NodeStatus.NONE, - ) - self.add_child(component_node) - components_found = True - except ImportError as e: - # Module not importable - show as error - padded_name = f"{component:<{max_len}}" - error_msg = str(e) if str(e) else "Import failed" - component_node = NodeInfo( - label=padded_name, desc=error_msg, status=NodeStatus.ERROR - ) - self.add_child(component_node) - # Don't set components_found to True for failed imports - - # Update status and value based on whether we found components - if components_found: - self.status = NodeStatus.OK - # If not installed but components work via PYTHONPATH, update the message - if not is_installed: - self.desc = "ai-dynamo (via PYTHONPATH)" - else: - self.status = NodeStatus.ERROR - else: - # No components discovered at all - self.status = NodeStatus.ERROR - - def _discover_framework_components(self, workspace_dir: str) -> list: - """Discover ai-dynamo framework components from filesystem. - - Returns: - List of framework component module names - Example: ['dynamo.frontend', 'dynamo.planner', 'dynamo.vllm', 'dynamo.sglang', 'dynamo.llama_cpp'] - - Note: Scans components/src/dynamo/... directory for modules with __init__.py files. - """ - components: List[str] = [] - - if not workspace_dir: - return components - - # Scan the components/src/dynamo/... Python directory for __init__.py files - components_path = os.path.join(workspace_dir, "components", "src", "dynamo") - if os.path.exists(components_path): - for item in os.listdir(components_path): - item_path = os.path.join(components_path, item) - if os.path.isdir(item_path): - # Check for dynamo module in src - module_path = os.path.join(item_path, "__init__.py") - if os.path.exists(module_path): - components.append(f"dynamo.{item}") - - return components - - -class DynamoInfo(NodeInfo): - """Dynamo workspace information""" - - def __init__(self, thorough_check: bool = False): - self.thorough_check = thorough_check - - # Find workspace directory - workspace_dir = DynamoInfo.find_workspace() - - if not workspace_dir: - # Show error when workspace is not found - super().__init__( - label="Dynamo", - desc="workspace not found - cannot detect Runtime and Framework components", - status=NodeStatus.ERROR, - ) - # Add helpful information about where we looked - search_paths = NodeInfo( - label="Searched in", - desc="current dir, ~/dynamo, DYNAMO_HOME, /workspace", - status=NodeStatus.INFO, - ) - self.add_child(search_paths) - hint = NodeInfo( - label="Hint", - desc="Run from a Dynamo workspace directory or set DYNAMO_HOME", - status=NodeStatus.INFO, - ) - self.add_child(hint) - return - - # Get git info - sha, date = self._get_git_info(workspace_dir) - - # Build main label - display_workspace = self._replace_home_with_var(workspace_dir) - if sha and date: - value = f"{display_workspace}, SHA: {sha}, Date: {date}" - else: - value = display_workspace - - super().__init__(label="Dynamo", desc=value, status=NodeStatus.INFO) - - # Always add runtime components - runtime_info = DynamoRuntimeInfo( - workspace_dir, thorough_check=self.thorough_check - ) - self.add_child(runtime_info) - - # Always add framework components - framework_info = DynamoFrameworkInfo( - workspace_dir, thorough_check=self.thorough_check - ) - self.add_child(framework_info) - - def _get_git_info(self, workspace_dir: str) -> Tuple[Optional[str], Optional[str]]: - """Get git SHA and date for the workspace.""" - try: - # Get short SHA - result = subprocess.run( - ["git", "rev-parse", "--short", "HEAD"], - capture_output=True, - text=True, - cwd=workspace_dir, - timeout=5, - ) - sha = result.stdout.strip() if result.returncode == 0 else None - - # Get commit date - result = subprocess.run( - ["git", "show", "-s", "--format=%ci", "HEAD"], - capture_output=True, - text=True, - cwd=workspace_dir, - timeout=5, - ) - if result.returncode == 0 and result.stdout.strip(): - # Convert to PDT format - date_str = result.stdout.strip() - # Parse and format as PDT - try: - # Parse the git date (format: 2025-08-30 23:22:29 +0000) - import datetime as dt_module - - # Split off timezone info - date_part = date_str.rsplit(" ", 1)[0] - dt = dt_module.datetime.strptime(date_part, "%Y-%m-%d %H:%M:%S") - # Convert to PDT (UTC-7) - dt_pdt = dt - dt_module.timedelta(hours=7) - date = dt_pdt.strftime("%Y-%m-%d %H:%M:%S PDT") - except Exception: - date = date_str - else: - date = None - - return sha, date - except Exception: - return None, None - - @staticmethod - def find_workspace() -> Optional[str]: - """Find dynamo workspace directory.""" - candidates = [] - - # Check DYNAMO_HOME environment variable first - dynamo_home = os.environ.get("DYNAMO_HOME") - if dynamo_home: - candidates.append(dynamo_home) - - # Then check common locations - candidates.extend( - [ - ".", # Current directory - os.path.expanduser("~/dynamo"), - "/workspace", - ] - ) - - for candidate in candidates: - if DynamoInfo.is_dynamo_workspace(candidate): - return os.path.abspath(candidate) - return None - - @staticmethod - def is_dynamo_workspace(path: str) -> bool: - """Check if directory is a dynamo workspace.""" - if not os.path.exists(path): - return False - - # Check for indicators of a dynamo workspace - indicators = [ - "README.md", - "components", - "lib/bindings/python", - "lib/runtime", - "Cargo.toml", - ] - - # Require at least 3 indicators to be confident - found = 0 - for indicator in indicators: - check_path = os.path.join(path, indicator) - if os.path.exists(check_path): - found += 1 - - return found >= 3 - - -def has_framework_errors(tree: NodeInfo) -> bool: - """Check if there are framework component errors in the tree""" - # Find the Dynamo node - for child in tree.children: - if child.label and "Dynamo" in child.label: - # Find the Framework components node - for dynamo_child in child.children: - if dynamo_child.label and "Framework components" in dynamo_child.label: - # Use the has_errors() method to check the entire subtree - return dynamo_child.has_errors() - return False - - -def show_installation_recommendation(): - """Show installation recommendations for missing components.""" - print("\nTo install missing components for development (not production):") - print(" Runtime: (cd lib/bindings/python && maturin develop)") - print(" Framework: uv pip install -e .") - print(" or export PYTHONPATH=$DYNAMO_HOME/components/src\n") - - -def main(): - """Main function - collect and display system information""" - import argparse - import sys - - # Parse command line arguments - parser = argparse.ArgumentParser( - description="Display system information for Dynamo project" - ) - parser.add_argument( - "--thorough-check", - action="store_true", - help="Enable thorough checking (file permissions, directory sizes, disk space, etc.)", - ) - parser.add_argument( - "--terse", - action="store_true", - help="Show only essential information (OS, User, GPU, Framework, Dynamo) and errors", - ) - args = parser.parse_args() - - # Validate mutual exclusion - if args.thorough_check and args.terse: - parser.error("--thorough-check and --terse cannot be used together") - - # Simply create a SystemInfo instance - it collects everything in its constructor - tree = SystemInfo(thorough_check=args.thorough_check, terse=args.terse) - tree.print_tree() - - # Check if there are framework component errors and show installation recommendation - if has_framework_errors(tree): - show_installation_recommendation() - - # Exit with non-zero status if there are any errors - if tree.has_errors(): - sys.exit(1) - else: - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/deploy/dynamo_check.py b/deploy/dynamo_check.py new file mode 120000 index 0000000000..bf0591926a --- /dev/null +++ b/deploy/dynamo_check.py @@ -0,0 +1 @@ +sanity_check.py \ No newline at end of file diff --git a/deploy/sanity_check.py b/deploy/sanity_check.py new file mode 100755 index 0000000000..f6ea22001f --- /dev/null +++ b/deploy/sanity_check.py @@ -0,0 +1,2240 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Dynamo System Information Checker + +A comprehensive diagnostic tool that displays system configuration and Dynamo project status +in a hierarchical tree format. This script checks for: + +- System resources (OS, CPU, memory, GPU) +- Development tools (Cargo/Rust, Maturin, Python) +- LLM frameworks (vllm, sglang, tensorrt_llm) +- Dynamo runtime and framework components +- File system (permissions and disk space, detailed with --thorough-check) +- Installation status and component availability + +The output uses status indicators: +- ✅ Component found and working +- ❌ Component missing or error +- ⚠️ Warning condition +- ❓ Component not found (for optional items) + +By default, the tool runs quickly by checking only directory permissions and skipping +size calculations. Use --thorough-check for detailed file-level permission analysis, +directory size information, and disk space checking. + +Exit codes: +- 0: All critical components are present +- 1: One or more errors detected (❌ status) + +Example output (default mode): + +System info (hostname=jensen-linux, IP=10.111.122.133) +├─ OS Ubuntu 24.04.1 LTS (Noble Numbat) (Linux 6.11.0-28-generic x86_64), Memory=26.7/125.5 GiB, Cores=32 +├─ User info: user=ubuntu, uid=1000, gid=1000 +├─ ✅ NVIDIA GPU NVIDIA RTX 6000 Ada Generation, driver 570.133.07, CUDA 12.8, Power=26.14/300.00 W, Memory=289/49140 MiB +├─ File System +│ ├─ ✅ Dynamo workspace ($HOME/dynamo) writable +│ ├─ ✅ Dynamo .git directory writable +│ ├─ ✅ Rustup home ($HOME/.rustup) writable +│ ├─ ✅ Cargo home ($HOME/.cargo) writable +│ ├─ ✅ Cargo target ($HOME/dynamo/.build/target) writable +│ └─ ✅ Python site-packages ($HOME/dynamo/venv/lib/python3.12/site-packages) writable +├─ ✅ Cargo $HOME/.cargo/bin/cargo, cargo 1.89.0 (c24e10642 2025-06-23) +│ ├─ Cargo home directory CARGO_HOME=$HOME/.cargo +│ └─ Cargo target directory CARGO_TARGET_DIR=$HOME/dynamo/.build/target +│ ├─ Debug $HOME/dynamo/.build/target/debug, modified=2025-08-30 16:26:49 PDT +│ ├─ Release $HOME/dynamo/.build/target/release, modified=2025-08-30 18:21:12 PDT +│ └─ Binary $HOME/dynamo/.build/target/debug/libdynamo_llm_capi.so, modified=2025-08-30 16:25:37 PDT +├─ ✅ Maturin /opt/dynamo/venv/bin/maturin, maturin 1.9.3 +├─ ✅ Python 3.12.3, /opt/dynamo/venv/bin/python +│ ├─ ✅ PyTorch 2.7.1+cu128, ✅torch.cuda.is_available +│ └─ PYTHONPATH not set +├─ 🤖Framework +│ ├─ ✅ vLLM: 0.10.1.1, module=/opt/vllm/vllm/__init__.py, exec=/opt/dynamo/venv/bin/vllm +│ └─ ✅ Sglang: 0.3.0, module=/opt/sglang/sglang/__init__.py +└─ Dynamo $HOME/dynamo, SHA: a03d29066, Date: 2025-08-30 16:22:29 PDT + ├─ ✅ Runtime components ai-dynamo-runtime 0.4.1 + │ │ /opt/dynamo/venv/lib/python3.12/site-packages/ai_dynamo_runtime-0.4.1.dist-info: created=2025-08-30 19:14:29 PDT + │ │ /opt/dynamo/venv/lib/python3.12/site-packages/ai_dynamo_runtime.pth: modified=2025-08-30 19:14:29 PDT + │ │ └─ →: $HOME/dynamo/lib/bindings/python/src + │ ├─ ✅ dynamo._core $HOME/dynamo/lib/bindings/python/src/dynamo/_core.cpython-312-x86_64-linux-gnu.so, modified=2025-08-30 19:14:29 PDT + │ ├─ ✅ dynamo.logits_processing $HOME/dynamo/lib/bindings/python/src/dynamo/logits_processing/__init__.py + │ ├─ ✅ dynamo.nixl_connect $HOME/dynamo/lib/bindings/python/src/dynamo/nixl_connect/__init__.py + │ ├─ ✅ dynamo.llm $HOME/dynamo/lib/bindings/python/src/dynamo/llm/__init__.py + │ └─ ✅ dynamo.runtime $HOME/dynamo/lib/bindings/python/src/dynamo/runtime/__init__.py + └─ ✅ Framework components ai-dynamo 0.5.0 + │ /opt/dynamo/venv/lib/python3.12/site-packages/ai_dynamo-0.5.0.dist-info: created=2025-09-05 16:20:35 PDT + ├─ ✅ dynamo.frontend $HOME/dynamo/components/src/dynamo/frontend/__init__.py + ├─ ✅ dynamo.llama_cpp $HOME/dynamo/components/src/dynamo/llama_cpp/__init__.py + ├─ ✅ dynamo.mocker $HOME/dynamo/components/src/dynamo/mocker/__init__.py + ├─ ✅ dynamo.planner $HOME/dynamo/components/src/dynamo/planner/__init__.py + ├─ ✅ dynamo.sglang $HOME/dynamo/components/src/dynamo/sglang/__init__.py + ├─ ✅ dynamo.trtllm $HOME/dynamo/components/src/dynamo/trtllm/__init__.py + └─ ✅ dynamo.vllm $HOME/dynamo/components/src/dynamo/vllm/__init__.py + +Usage: + python deploy/sanity_check.py [--thorough-check] [--terse] + +Options: + --thorough-check Enable thorough checking (file permissions, directory sizes, etc.) + --terse Enable terse output mode +""" + +import datetime +import glob +import json +import logging +import os +import platform +import shutil +import subprocess +import sys +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Dict, List, Optional, Tuple + + +# ANSI color constants +class Colors: + """ANSI color escape sequences for terminal output.""" + + RESET = "\033[0m" + BRIGHT_RED = "\033[38;5;196m" + + +class NodeStatus(Enum): + """Status of a tree node""" + + OK = "ok" # ✅ Success/available + ERROR = "error" # ❌ Error/not found + WARNING = "warn" # ⚠️ Warning + INFO = "info" # No symbol, just information + NONE = "none" # No status indicator + UNKNOWN = "unknown" # ❓ Unknown/not found + + +@dataclass +class NodeInfo: + """Base class for all information nodes in the tree structure""" + + # Core properties + label: str # Main text/description + desc: Optional[str] = None # Primary value/description + status: NodeStatus = NodeStatus.NONE # Status indicator + + # Additional metadata as key-value pairs + metadata: Dict[str, Any] = field(default_factory=dict) + + # Tree structure + children: List["NodeInfo"] = field(default_factory=list) + + # Display control + show_symbol: bool = True # Whether to show status symbol + + def add_child(self, child: "NodeInfo") -> "NodeInfo": + """Add a child node and return it for chaining""" + self.children.append(child) + return child + + def add_metadata(self, key: str, value: str) -> "NodeInfo": + """Add metadata key-value pair""" + self.metadata[key] = value + return self + + def render( + self, prefix: str = "", is_last: bool = True, is_root: bool = True + ) -> List[str]: + """Render the tree node and its children as a list of strings""" + lines = [] + + # Determine the connector + if not is_root: + # Check if this is a sub-category item + if self.metadata and self.metadata.get("part_of_previous"): + connector = "│" + else: + connector = "└─" if is_last else "├─" + current_prefix = prefix + connector + " " + else: + current_prefix = "" + + # Build the line content + line_parts = [] + + # Add status symbol + if self.show_symbol and self.status != NodeStatus.NONE: + if self.status == NodeStatus.OK: + line_parts.append("✅") + elif self.status == NodeStatus.ERROR: + line_parts.append("❌") + elif self.status == NodeStatus.WARNING: + line_parts.append("⚠️") + elif self.status == NodeStatus.UNKNOWN: + line_parts.append("❓") + + # Add label and value + if self.desc: + line_parts.append(f"{self.label}: {self.desc}") + else: + line_parts.append(self.label) + + # Add metadata inline - consistent format for all + if self.metadata: + metadata_items = [] + for k, v in self.metadata.items(): + # Skip internal metadata that shouldn't be displayed + if k != "part_of_previous": + # Format all metadata consistently as "key=value" + metadata_items.append(f"{k}={v}") + + if metadata_items: + # Use consistent separator (comma) for all metadata + metadata_str = ", ".join(metadata_items) + line_parts[-1] += f", {metadata_str}" + + # Construct the full line + line_content = " ".join(line_parts) + if current_prefix or line_content: + lines.append(current_prefix + line_content) + + # Render children + for i, child in enumerate(self.children): + is_last_child = i == len(self.children) - 1 + if is_root: + child_prefix = "" + else: + child_prefix = prefix + (" " if is_last else "│ ") + lines.extend(child.render(child_prefix, is_last_child, False)) + + return lines + + def print_tree(self) -> None: + """Print the tree to console""" + for line in self.render(): + print(line) + + def has_errors(self) -> bool: + """Check if this node or any of its children have errors""" + # Check if this node has an error + if self.status == NodeStatus.ERROR: + return True + + # Recursively check all children + for child in self.children: + if child.has_errors(): + return True + + return False + + def _replace_home_with_var(self, path: str) -> str: + """Replace home directory with $HOME in path.""" + home = os.path.expanduser("~") + if path.startswith(home): + return path.replace(home, "$HOME", 1) + return path + + def _is_inside_container(self) -> bool: + """Check if we're running inside a container.""" + # Check for common container indicators + container_indicators = [ + # Docker + os.path.exists("/.dockerenv"), + # Podman/containerd + os.path.exists("/run/.containerenv"), + # Check if cgroup contains docker/containerd + self._check_cgroup_for_container(), + # Check environment variables + os.environ.get("container") is not None, + os.environ.get("DOCKER_CONTAINER") is not None, + ] + return any(container_indicators) + + def _check_cgroup_for_container(self) -> bool: + """Check cgroup for container indicators.""" + try: + with open("/proc/1/cgroup", "r") as f: + content = f.read() + return any( + indicator in content.lower() + for indicator in ["docker", "containerd", "podman", "lxc"] + ) + except Exception: + return False + + def _get_gpu_container_remedies(self) -> str: + """Get remedies for GPU issues when running inside a container.""" + return "maybe try a docker restart?" + + def _format_timestamp_pdt(self, timestamp: float) -> str: + """Format timestamp as PDT time string.""" + dt_utc = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc) + # Convert to PDT (UTC-7) + dt_pdt = dt_utc - datetime.timedelta(hours=7) + return dt_pdt.strftime("%Y-%m-%d %H:%M:%S PDT") + + +class SystemInfo(NodeInfo): + """Root node for system information""" + + def __init__( + self, + hostname: Optional[str] = None, + thorough_check: bool = False, + terse: bool = False, + ): + self.thorough_check = thorough_check + self.terse = terse + if hostname is None: + hostname = platform.node() + + # Get IP address + ip_address = self._get_ip_address() + + # Format label with hostname and IP + if ip_address: + label = f"System info (hostname={hostname}, IP={ip_address})" + else: + label = f"System info (hostname={hostname})" + + super().__init__(label=label, status=NodeStatus.INFO) + + # Suppress Prometheus endpoint warnings from planner module + self._suppress_planner_warnings() + + # Collect and add all system information + # Always show: OS, User, GPU, Framework, Dynamo + self.add_child(OSInfo()) + self.add_child(UserInfo()) + + # Add GPU info (always show, even if not found) + gpu_info = GPUInfo() + self.add_child(gpu_info) + + # Add Framework info (vllm, sglang, tensorrt_llm) + self.add_child(FrameworkInfo()) + + # In terse mode, only add other components if they have errors + if not self.terse: + # Add file permissions check + self.add_child(FilePermissionsInfo(thorough_check=self.thorough_check)) + + # Add Cargo (always show, even if not found) + self.add_child(CargoInfo(thorough_check=self.thorough_check)) + + # Add Maturin (Python-Rust build tool) + self.add_child(MaturinInfo()) + + # Add Python info + self.add_child(PythonInfo()) + else: + # In terse mode, only add components that have errors + self._add_error_only_components() + + # Add Dynamo workspace info (always show, even if not found) + self.add_child(DynamoInfo(thorough_check=self.thorough_check)) + + def _get_ip_address(self) -> Optional[str]: + """Get the primary IP address of the system.""" + try: + import socket + + # Get hostname + hostname = socket.gethostname() + # Get IP address + ip_address = socket.gethostbyname(hostname) + # Filter out localhost + if ip_address.startswith("127."): + # Try to get external IP by connecting to a public DNS + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # Connect to Google DNS (doesn't actually send data) + s.connect(("8.8.8.8", 80)) + ip_address = s.getsockname()[0] + finally: + s.close() + return ip_address + except Exception: + return None + + def _suppress_planner_warnings(self) -> None: + """Suppress Prometheus endpoint warnings from planner module during import testing.""" + # The planner module logs a warning about Prometheus endpoint when imported + # outside of a Kubernetes cluster. Suppress this for cleaner output. + planner_logger = logging.getLogger("dynamo.planner.defaults") + planner_logger.setLevel(logging.ERROR) + # Also suppress the defaults._get_default_prometheus_endpoint logger + defaults_logger = logging.getLogger("defaults._get_default_prometheus_endpoint") + defaults_logger.setLevel(logging.ERROR) + + def _add_error_only_components(self) -> None: + """In terse mode, only add components that have errors""" + # Create components and check their status + components_to_check = [ + ("File System", FilePermissionsInfo(thorough_check=self.thorough_check)), + ("Cargo", CargoInfo(thorough_check=self.thorough_check)), + ("Maturin", MaturinInfo()), + ("Python", PythonInfo()), + ] + + for name, component in components_to_check: + # Only add if the component has an error status + if component.status == NodeStatus.ERROR: + self.add_child(component) + + +class UserInfo(NodeInfo): + """User information""" + + def __init__(self): + # Get user info + username = os.getenv("USER") or os.getenv("LOGNAME") or "unknown" + if username == "unknown": + try: + import pwd + + username = pwd.getpwuid(os.getuid()).pw_name + except Exception: + try: + import subprocess + + result = subprocess.run( + ["whoami"], capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0: + username = result.stdout.strip() + except Exception: + pass + uid = os.getuid() + gid = os.getgid() + + desc = f"user={username}, uid={uid}, gid={gid}" + + # Add warning if running as root + status = NodeStatus.WARNING if uid == 0 else NodeStatus.INFO + if uid == 0: + desc += " ⚠️" + + super().__init__(label="User info", desc=desc, status=status) + + +class OSInfo(NodeInfo): + """Operating system information""" + + def __init__(self): + # Collect OS information + uname = platform.uname() + + # Try to get distribution info + distro = "" + version = "" + try: + if os.path.exists("/etc/os-release"): + with open("/etc/os-release", "r") as f: + for line in f: + if line.startswith("NAME="): + distro = line.split("=", 1)[1].strip().strip('"') + elif line.startswith("VERSION="): + version = line.split("=", 1)[1].strip().strip('"') + except Exception: + pass + + # Get memory info + mem_used_gb = None + mem_total_gb = None + try: + with open("/proc/meminfo", "r") as f: + meminfo = {} + for line in f: + if ":" in line: + k, v = line.split(":", 1) + meminfo[k.strip()] = v.strip() + + if "MemTotal" in meminfo and "MemAvailable" in meminfo: + total_kb = float(meminfo["MemTotal"].split()[0]) + avail_kb = float(meminfo["MemAvailable"].split()[0]) + mem_used_gb = (total_kb - avail_kb) / (1024 * 1024) + mem_total_gb = total_kb / (1024 * 1024) + except Exception: + pass + + # Get CPU cores + cores = os.cpu_count() + + # Build the value string + if distro: + value = f"{distro} {version} ({uname.system} {uname.release} {uname.machine})".strip() + else: + value = f"{uname.system} {uname.release} {uname.machine}" + + super().__init__(label="OS", desc=value, status=NodeStatus.INFO) + + # Add memory and cores as metadata + if mem_used_gb is not None and mem_total_gb is not None: + self.add_metadata("Memory", f"{mem_used_gb:.1f}/{mem_total_gb:.1f} GiB") + if mem_total_gb > 0 and (mem_used_gb / mem_total_gb) >= 0.9: + self.status = NodeStatus.WARNING + if cores: + self.add_metadata("Cores", str(cores)) + + +class GPUInfo(NodeInfo): + """NVIDIA GPU information""" + + def __init__(self): + # Find nvidia-smi executable (check multiple paths) + nvidia_smi = shutil.which("nvidia-smi") + if not nvidia_smi: + # Check common paths if `which` fails + for candidate in [ + "/usr/bin/nvidia-smi", + "/usr/local/bin/nvidia-smi", + "/usr/local/nvidia/bin/nvidia-smi", + ]: + if os.path.exists(candidate) and os.access(candidate, os.X_OK): + nvidia_smi = candidate + break + + if not nvidia_smi: + super().__init__( + label="NVIDIA GPU", desc="nvidia-smi not found", status=NodeStatus.ERROR + ) + return + + try: + # Get GPU list + result = subprocess.run( + [nvidia_smi, "-L"], capture_output=True, text=True, timeout=10 + ) + + if result.returncode != 0: + # Extract and process error message from stderr or stdout + error_msg = "nvidia-smi failed" + + # Try stderr first, then stdout + for output in [result.stderr, result.stdout]: + if output and output.strip(): + error_lines = output.strip().splitlines() + if error_lines: + error_msg = error_lines[0].strip() + break + + # Handle NVML-specific errors + if "Failed to initialize NVML" in error_msg: + error_msg = "No NVIDIA GPU detected (NVML initialization failed)" + # Add docker restart suggestion specifically for NVML failures in containers + if self._is_inside_container(): + error_msg += " - maybe try a docker restart?" + + super().__init__( + label="NVIDIA GPU", desc=error_msg, status=NodeStatus.ERROR + ) + return + + # Parse GPU names + gpu_names = [] + lines = result.stdout.strip().splitlines() + for line in lines: + # Example: "GPU 0: NVIDIA A100-SXM4-40GB (UUID: GPU-...)" + if ":" in line: + gpu_name = line.split(":", 1)[1].split("(")[0].strip() + gpu_names.append(gpu_name) + + # Check for zero GPUs + if not gpu_names: + # Get driver and CUDA even for zero GPUs + driver, cuda = self._get_driver_cuda_versions(nvidia_smi) + driver_cuda_str = "" + if driver or cuda: + parts = [] + if driver: + parts.append(f"driver {driver}") + if cuda: + parts.append(f"CUDA {cuda}") + driver_cuda_str = f", {', '.join(parts)}" + super().__init__( + label="NVIDIA GPU", + desc=f"not detected{driver_cuda_str}", + status=NodeStatus.ERROR, + ) + return + + # Get driver and CUDA versions + driver, cuda = self._get_driver_cuda_versions(nvidia_smi) + + # Handle single vs multiple GPUs + if len(gpu_names) == 1: + # Single GPU - compact format + value = gpu_names[0] + if driver or cuda: + driver_cuda = [] + if driver: + driver_cuda.append(f"driver {driver}") + if cuda: + driver_cuda.append(f"CUDA {cuda}") + value += f", {', '.join(driver_cuda)}" + + super().__init__(label="NVIDIA GPU", desc=value, status=NodeStatus.OK) + + # Add power and memory metadata for single GPU + self._add_power_memory_info(nvidia_smi, 0) + else: + # Multiple GPUs - show count in main label + value = f"{len(gpu_names)} GPUs" + if driver or cuda: + driver_cuda = [] + if driver: + driver_cuda.append(f"driver {driver}") + if cuda: + driver_cuda.append(f"CUDA {cuda}") + value += f", {', '.join(driver_cuda)}" + + super().__init__(label="NVIDIA GPU", desc=value, status=NodeStatus.OK) + + # Add each GPU as a child node + for i, name in enumerate(gpu_names): + gpu_child = NodeInfo( + label=f"GPU {i}", desc=name, status=NodeStatus.OK + ) + # Add power and memory for this specific GPU + power_mem = self._get_power_memory_string(nvidia_smi, i) + if power_mem: + gpu_child.add_metadata("Stats", power_mem) + self.add_child(gpu_child) + + except Exception: + super().__init__( + label="NVIDIA GPU", desc="detection failed", status=NodeStatus.ERROR + ) + + def _get_driver_cuda_versions( + self, nvidia_smi: str + ) -> Tuple[Optional[str], Optional[str]]: + """Get NVIDIA driver and CUDA versions using query method.""" + driver, cuda = None, None + try: + # Use query method for more reliable detection + result = subprocess.run( + [nvidia_smi, "--query-gpu=driver_version", "--format=csv,noheader"], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode == 0 and result.stdout.strip(): + driver = result.stdout.strip().splitlines()[0].strip() + + # Try to get CUDA version from nvidia-smi output + result = subprocess.run( + [nvidia_smi], capture_output=True, text=True, timeout=10 + ) + if result.returncode == 0: + import re + + m = re.search(r"CUDA Version:\s*([0-9.]+)", result.stdout) + if m: + cuda = m.group(1) + except Exception: + pass + return driver, cuda + + def _add_power_memory_info(self, nvidia_smi: str, gpu_index: int = 0): + """Add power and memory metadata for a specific GPU.""" + power_mem = self._get_power_memory_string(nvidia_smi, gpu_index) + if power_mem: + # Split into Power and Memory parts + if "; " in power_mem: + parts = power_mem.split("; ") + for part in parts: + if part.startswith("Power:"): + self.add_metadata("Power", part.replace("Power: ", "")) + elif part.startswith("Memory:"): + self.add_metadata("Memory", part.replace("Memory: ", "")) + + def _get_power_memory_string( + self, nvidia_smi: str, gpu_index: int = 0 + ) -> Optional[str]: + """Get power and memory info string for a specific GPU.""" + try: + result = subprocess.run( + [ + nvidia_smi, + "--query-gpu=power.draw,power.limit,memory.used,memory.total", + "--format=csv,noheader,nounits", + ], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode == 0 and result.stdout.strip(): + lines = result.stdout.strip().splitlines() + if gpu_index < len(lines): + parts = lines[gpu_index].split(",") + if len(parts) >= 4: + power_draw = parts[0].strip() + power_limit = parts[1].strip() + mem_used = parts[2].strip() + mem_total = parts[3].strip() + + info_parts = [] + if power_draw and power_limit: + info_parts.append(f"Power: {power_draw}/{power_limit} W") + + if mem_used and mem_total: + # Add warning if memory usage is 90% or higher + warning = "" + try: + if float(mem_used) / float(mem_total) >= 0.9: + warning = " ⚠️" + except Exception: + pass + info_parts.append( + f"Memory: {mem_used}/{mem_total} MiB{warning}" + ) + + if info_parts: + return "; ".join(info_parts) + except Exception: + pass + return None + + +class FilePermissionsInfo(NodeInfo): + """File system check for development environment directories + + Checks writability of critical directories needed for: + - Dynamo development (top-level dynamo directory) + - Rust development (Cargo target directory + all files, RUSTUP_HOME, CARGO_HOME) + - Python development (site-packages) + + In thorough mode, also checks disk space for the dynamo working directory + and shows a warning if less than 10% free space is available. + + In fast mode, skips recursive file checking in Cargo target directory + for improved performance on large target directories. + """ + + def __init__(self, thorough_check: bool = False): + super().__init__(label="File System", status=NodeStatus.INFO) + self.thorough_check = thorough_check + + # Check top-level dynamo directory + self._check_dynamo_directory_permissions() + + # Check Rust toolchain directories (RUSTUP_HOME and CARGO_HOME) + self._check_rust_toolchain_permissions() + + # Check Cargo target directory (with optional recursive file checking) + self._check_cargo_target_permissions() + + # Check Python site-packages directory + self._check_site_packages_permissions() + + def _check_permissions_unified( + self, + candidate_paths: List[str], + label_prefix: str, + recursive: bool = False, + exclude_files: Optional[List[str]] = None, + ) -> List[NodeInfo]: + """Unified permission checking function + + Args: + candidate_paths: List of paths to check, uses first available one + label_prefix: Prefix for the node label + recursive: If True, check all files recursively; if False, check directory only + exclude_files: List of filenames to exclude from file checking (e.g., ['.git']) + + Returns: + List of NodeInfo objects for the results + """ + exclude_files = exclude_files or [] + results = [] + + # Find first available path + selected_path = None + for path in candidate_paths: + expanded_path = os.path.expanduser(path) + if os.path.exists(expanded_path): + selected_path = expanded_path + break + + if not selected_path: + # No paths exist + path_list = ", ".join(candidate_paths) + results.append( + NodeInfo( + label=f"{label_prefix} (tried: {path_list})", + desc="No candidate paths exist", + status=NodeStatus.ERROR, + ) + ) + return results + + try: + # Check if it's actually a directory + if not os.path.isdir(selected_path): + results.append( + NodeInfo( + label=f"{label_prefix} ({self._replace_home_with_var(selected_path)})", + desc="Path is not a directory", + status=NodeStatus.ERROR, + ) + ) + return results + + # Check if directory is effectively writable + if not self._is_effectively_writable(selected_path): + results.append( + NodeInfo( + label=f"{label_prefix} ({self._replace_home_with_var(selected_path)})", + desc="Directory not writable", + status=NodeStatus.ERROR, + ) + ) + return results + + if not recursive: + # Just check directory writability + # Check if running as root but directory is not owned by root + is_root = os.getuid() == 0 + is_root_owned = False + warning_symbol = "" + desc_text = "writable" + owner_name = None + + if is_root: + try: + stat_info = os.stat(selected_path) + is_root_owned = stat_info.st_uid == 0 + if not is_root_owned: + warning_symbol = " ⚠️" + # Get the owner name + try: + import pwd + + owner_name = pwd.getpwuid(stat_info.st_uid).pw_name + except Exception: + owner_name = f"uid={stat_info.st_uid}" + desc_text = f"writable (owned by {owner_name or 'root'})" + except Exception: + desc_text = "writable (owned by unknown)" + + # Add disk space info in thorough mode + status = NodeStatus.OK # Default status + if self.thorough_check: + disk_space, disk_warning = self._format_disk_space(selected_path) + desc_text += disk_space + # Override status if disk space is low + if disk_warning: + status = disk_warning + + results.append( + NodeInfo( + label=f"{label_prefix} ({self._replace_home_with_var(selected_path)}){warning_symbol}", + desc=desc_text, + status=status, + ) + ) + else: + # Check files recursively + ( + total_files, + non_writable_files, + non_writable_list, + ) = self._count_writable_files( + selected_path, recursive=True, exclude_files=exclude_files + ) + + # Create description based on results + desc, status = self._create_file_count_description( + total_files, non_writable_files, "files" + ) + + # Check if running as root but directory is not owned by root + is_root = os.getuid() == 0 + is_root_owned = False + warning_symbol = "" + owner_name = None + + if is_root: + try: + stat_info = os.stat(selected_path) + is_root_owned = stat_info.st_uid == 0 + if not is_root_owned: + warning_symbol = " ⚠️" + # Get the owner name + try: + import pwd + + owner_name = pwd.getpwuid(stat_info.st_uid).pw_name + except Exception: + owner_name = f"uid={stat_info.st_uid}" + # Modify description to indicate ownership + if "writable" in desc: + desc = desc.replace( + "writable", + f"writable (owned by {owner_name or 'root'})", + ) + except Exception: + # Modify description to indicate ownership + if "writable" in desc: + desc = desc.replace( + "writable", "writable (owned by unknown)" + ) + + # Add disk space info in thorough mode + if self.thorough_check: + disk_space, disk_warning = self._format_disk_space(selected_path) + desc += disk_space + # Override status if disk space is low + if disk_warning: + status = disk_warning + + results.append( + NodeInfo( + label=f"{label_prefix} ({self._replace_home_with_var(selected_path)}){warning_symbol}", + desc=desc, + status=status, + ) + ) + + # Add details for non-writable files if there are any (limit to first 10) + if non_writable_files > 0: + details_label = ( + f"Non-writable files (showing first 10 of {non_writable_files})" + ) + if non_writable_files <= 10: + details_label = f"Non-writable files ({non_writable_files})" + + details_node = NodeInfo( + label=details_label, + desc="; ".join(non_writable_list[:10]), + status=NodeStatus.WARNING, + ) + results.append(details_node) + + except Exception as e: + results.append( + NodeInfo( + label=f"{label_prefix} ({self._replace_home_with_var(selected_path)})", + desc=f"Permission check failed: {str(e)}", + status=NodeStatus.ERROR, + ) + ) + + return results + + def _is_effectively_writable(self, file_path: str) -> bool: + """Check if a file is effectively writable + + A file is considered effectively writable if: + 1. It's already writable (os.access check) + 2. We own the file (can chmod it) + 3. We are root (can do anything) - but only if os.access confirms write access + Note: Root may still be denied write access on NFS mounts due to root squashing + """ + try: + # First check if it's already writable - this works for all cases including NFS + if os.access(file_path, os.W_OK): + return True + + # Check if we own the file (and can therefore chmod it) + stat_info = os.stat(file_path) + if stat_info.st_uid == os.getuid(): + return True + + # For root, we still need to respect the os.access result + # Root privileges don't guarantee write access on NFS mounts + # If os.access(W_OK) returned False above, respect that even for root + return False + except Exception: + # If we can't stat the file, assume it's not writable + return False + + def _count_writable_files( + self, + directory: str, + recursive: bool = False, + exclude_files: Optional[List[str]] = None, + ) -> Tuple[int, int, List[str]]: + """Count total files and non-writable files in directory + + Returns: + Tuple of (total_files, non_writable_files, non_writable_list) + """ + exclude_files = exclude_files or [] + total_files = 0 + non_writable_files = 0 + non_writable_list = [] + + if recursive: + # Walk through all files in the directory tree recursively + for root, dirs, files in os.walk(directory): + for file in files: + file_path = os.path.join(root, file) + # Skip symbolic links + if os.path.islink(file_path): + continue + total_files += 1 + if not self._is_effectively_writable(file_path): + non_writable_files += 1 + rel_path = os.path.relpath(file_path, directory) + non_writable_list.append(rel_path) + else: + # Only check files in the immediate directory (non-recursive) + for item in os.listdir(directory): + if item in exclude_files: + continue + item_path = os.path.join(directory, item) + # Skip symbolic links and only check regular files + if os.path.isfile(item_path) and not os.path.islink(item_path): + total_files += 1 + try: + if not self._is_effectively_writable(item_path): + non_writable_files += 1 + non_writable_list.append(item) + except Exception: + non_writable_files += 1 + non_writable_list.append(item) + + return total_files, non_writable_files, non_writable_list + + def _create_file_count_description( + self, total_files: int, non_writable_files: int, context: str = "files" + ) -> Tuple[str, NodeStatus]: + """Create description and status for file count results""" + if total_files == 0: + return f"writable, no {context} found", NodeStatus.INFO + elif non_writable_files == 0: + return f"writable, all {total_files} {context} writable", NodeStatus.OK + else: + return ( + f"writable, {non_writable_files} of {total_files} {context} not writable", + NodeStatus.WARNING, + ) + + def _get_cargo_target_path_candidates(self) -> List[str]: + """Get candidate paths for cargo target directory""" + candidates = [] + + # Try to get target directory from cargo metadata (most accurate) + try: + result = subprocess.run( + ["cargo", "metadata", "--format-version=1", "--no-deps"], + capture_output=True, + text=True, + timeout=10, + cwd=".", + ) + if result.returncode == 0: + import json + + metadata = json.loads(result.stdout) + target_path = metadata.get("target_directory") + if target_path: + candidates.append(target_path) + except Exception: + pass + + # Add fallback candidates + cargo_target = os.environ.get("CARGO_TARGET_DIR") + if cargo_target: + candidates.append(cargo_target) + + candidates.append("~/.cargo/target") + return candidates + + def _check_dynamo_directory_permissions(self): + """Check top-level dynamo directory and key files writability""" + # Use the existing workspace detection logic + dynamo_root = DynamoInfo.find_workspace() + + if not dynamo_root: + self.add_child( + NodeInfo( + label="Dynamo workspace", + desc="workspace not found", + status=NodeStatus.ERROR, + ) + ) + return + + if not DynamoInfo.is_dynamo_workspace(dynamo_root): + self.add_child( + NodeInfo( + label="Dynamo workspace", + desc="not a valid dynamo workspace", + status=NodeStatus.ERROR, + ) + ) + return + + # Check dynamo root directory and files (exclude .git) + recursive = self.thorough_check + results = self._check_permissions_unified( + [dynamo_root], + "Dynamo workspace", + recursive=recursive, + exclude_files=[".git"], + ) + for result in results: + self.add_child(result) + + # Check .git directory separately + git_dir = os.path.join(dynamo_root, ".git") + if os.path.exists(git_dir): + git_results = self._check_permissions_unified( + [git_dir], "Dynamo .git directory", recursive=recursive + ) + for result in git_results: + self.add_child(result) + else: + self.add_child( + NodeInfo( + label="Dynamo .git directory", + desc="not available", + status=NodeStatus.WARNING, + ) + ) + + def _check_site_packages_permissions(self): + """Check site-packages directory writability""" + try: + import site + + # Get all candidate site-packages directories + site_packages_dirs = site.getsitepackages() + user_site = site.getusersitepackages() + if user_site: + site_packages_dirs.append(user_site) + + # Check each existing site-packages directory + recursive = self.thorough_check + for site_dir in site_packages_dirs: + if os.path.exists(site_dir): + results = self._check_permissions_unified( + [site_dir], "site-packages", recursive=recursive + ) + for result in results: + self.add_child(result) + + except Exception as e: + self.add_child( + NodeInfo( + label="Python site-packages", + desc=f"Permission check failed: {str(e)}", + status=NodeStatus.ERROR, + ) + ) + + def _check_cargo_target_permissions(self): + """Check Cargo target directory writability and file permissions""" + candidates = self._get_cargo_target_path_candidates() + recursive = self.thorough_check + results = self._check_permissions_unified( + candidates, "Cargo target", recursive=recursive + ) + + if not results or ( + len(results) == 1 + and results[0].status == NodeStatus.ERROR + and results[0].desc is not None + and "No candidate paths exist" in results[0].desc + ): + # No paths exist - show warning instead of error + self.add_child( + NodeInfo( + label="Cargo target", + desc="Path does not exist", + status=NodeStatus.WARNING, + ) + ) + else: + for result in results: + self.add_child(result) + + def _check_rust_toolchain_permissions(self): + """Check RUSTUP_HOME and CARGO_HOME directory writability + + These directories need recursive checking because: + - RUSTUP_HOME: rustup needs to write toolchain files, documentation, etc. + - CARGO_HOME: cargo needs to write registry cache, git repos, binaries, etc. + """ + # Check RUSTUP_HOME + rustup_env = os.environ.get("RUSTUP_HOME") + rustup_candidates = [rustup_env] if rustup_env is not None else [] + rustup_candidates.append("~/.rustup") + + recursive = self.thorough_check + rustup_results = self._check_permissions_unified( + rustup_candidates, "Rustup home", recursive=recursive + ) + for result in rustup_results: + self.add_child(result) + + # Check CARGO_HOME + cargo_env = os.environ.get("CARGO_HOME") + cargo_candidates = [cargo_env] if cargo_env is not None else [] + cargo_candidates.append("~/.cargo") + + cargo_results = self._check_permissions_unified( + cargo_candidates, "Cargo home", recursive=recursive + ) + for result in cargo_results: + self.add_child(result) + + def _format_disk_space(self, path: str) -> Tuple[str, Optional[NodeStatus]]: + """Format disk space information for a given path + + Returns: + Tuple of (formatted_string, warning_status_if_low_space) + """ + try: + # Get disk usage statistics + statvfs = os.statvfs(path) + + # Calculate sizes in bytes + total_bytes = statvfs.f_frsize * statvfs.f_blocks + free_bytes = statvfs.f_frsize * statvfs.f_bavail + used_bytes = total_bytes - free_bytes + + # Convert to human readable format + def format_bytes(bytes_val): + """Convert bytes to human readable format""" + for unit in ["B", "KB", "MB", "GB", "TB"]: + if bytes_val < 1024.0: + return f"{bytes_val:.1f} {unit}" + bytes_val /= 1024.0 + return f"{bytes_val:.1f} PB" + + # Calculate percentage used + percent_used = (used_bytes / total_bytes) * 100 + percent_free = 100 - percent_used + + formatted_string = f", {format_bytes(used_bytes)}/{format_bytes(total_bytes)} ({percent_used:.1f}% used)" + + # Return warning status if less than 10% free space + warning_status = NodeStatus.WARNING if percent_free < 10 else None + + return formatted_string, warning_status + + except Exception: + return "", None + + +class CargoInfo(NodeInfo): + """Cargo tool information""" + + def __init__(self, thorough_check: bool = False): + self.thorough_check = thorough_check + cargo_path = shutil.which("cargo") + cargo_version = None + + # Get cargo version + if cargo_path: + try: + result = subprocess.run( + ["cargo", "--version"], capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0: + cargo_version = result.stdout.strip() + except Exception: + pass + + if not cargo_path and not cargo_version: + super().__init__( + label="Cargo", + desc="not found, install Rust toolchain to see cargo target directory", + status=NodeStatus.ERROR, + ) + return + + # Initialize with cargo path and version + value = "" + if cargo_path: + value = self._replace_home_with_var(cargo_path) + if cargo_version: + value += f", {cargo_version}" if value else cargo_version + + super().__init__(label="Cargo", desc=value, status=NodeStatus.OK) + + # Get cargo home directory from the environment (may not exist, which is OK) + cargo_home_env = os.environ.get("CARGO_HOME") + if cargo_home_env: + cargo_home = cargo_home_env + home_value = f"CARGO_HOME={self._replace_home_with_var(cargo_home)}" + else: + cargo_home = os.path.expanduser("~/.cargo") + home_value = ( + f"CARGO_HOME=, using {self._replace_home_with_var(cargo_home)}" + ) + + if cargo_home and os.path.exists(cargo_home): + status = NodeStatus.INFO + else: + home_value += " (directory does not exist)" + status = NodeStatus.WARNING + + home_node = NodeInfo( + label="Cargo home directory", desc=home_value, status=status + ) + self.add_child(home_node) + + # Get cargo target directory + cargo_target_env = os.environ.get("CARGO_TARGET_DIR") + cargo_target = self._get_cargo_target_directory() + + # Calculate total directory size (only if thorough check and directory exists) + size_str = "" + if cargo_target and os.path.exists(cargo_target) and self.thorough_check: + total_size_gb = self._get_directory_size_gb(cargo_target) + size_str = f", {total_size_gb:.1f} GB" if total_size_gb is not None else "" + + # Format the display value + if cargo_target_env: + display_cargo_target = ( + self._replace_home_with_var(cargo_target) if cargo_target else "unknown" + ) + target_value = f"CARGO_TARGET_DIR={display_cargo_target}{size_str}" + else: + display_cargo_target = ( + self._replace_home_with_var(cargo_target) if cargo_target else "unknown" + ) + target_value = ( + f"CARGO_TARGET_DIR=, using {display_cargo_target}{size_str}" + ) + + # Check directory existence and set status + if cargo_target and os.path.exists(cargo_target): + status = NodeStatus.INFO + target_node = NodeInfo( + label="Cargo target directory", + desc=target_value, + status=status, + ) + self.add_child(target_node) + # Add debug/release/binary info as children of target directory + self._add_build_info(target_node, cargo_target) + else: + target_value += " (directory does not exist)" + status = NodeStatus.WARNING if cargo_target_env else NodeStatus.INFO + target_node = NodeInfo( + label="Cargo target directory", + desc=target_value, + status=status, + ) + self.add_child(target_node) + + def _get_directory_size_gb(self, directory: str) -> Optional[float]: + """Get the size of a directory in GB.""" + try: + # Use du command to get directory size in bytes + result = subprocess.run( + ["du", "-sb", directory], capture_output=True, text=True, timeout=30 + ) + if result.returncode == 0: + # Parse output: "size_in_bytes\tdirectory_path" + size_bytes = int(result.stdout.split()[0]) + # Convert to GB + size_gb = size_bytes / (1024**3) + return size_gb + except Exception: + pass + return None + + def _get_cargo_target_directory(self) -> Optional[str]: + """Get cargo target directory using cargo metadata.""" + try: + # Use DynamoInfo's static method to find workspace + workspace_dir = DynamoInfo.find_workspace() + + # Run cargo metadata command to get target directory + cmd_args = ["cargo", "metadata", "--format-version=1", "--no-deps"] + kwargs: Dict[str, Any] = { + "capture_output": True, + "text": True, + "timeout": 10, + } + + # Add cwd if workspace_dir was found + if workspace_dir and os.path.isdir(workspace_dir): + kwargs["cwd"] = workspace_dir + + result = subprocess.run(cmd_args, **kwargs) + + if result.returncode == 0: + # Parse JSON output to extract target_directory + metadata = json.loads(result.stdout) + return metadata.get("target_directory") + except Exception: + pass + return None + + def _add_build_info(self, parent_node: NodeInfo, cargo_target: str): + """Add debug/release/binary information as children of target directory.""" + debug_dir = os.path.join(cargo_target, "debug") + release_dir = os.path.join(cargo_target, "release") + + # Check debug directory + if os.path.exists(debug_dir): + display_debug = self._replace_home_with_var(debug_dir) + debug_value = display_debug + + # Add size (only if thorough check) + if self.thorough_check: + debug_size_gb = self._get_directory_size_gb(debug_dir) + if debug_size_gb is not None: + debug_value += f", {debug_size_gb:.1f} GB" + + try: + debug_mtime = os.path.getmtime(debug_dir) + debug_time = self._format_timestamp_pdt(debug_mtime) + debug_value += f", modified={debug_time}" + except Exception: + debug_value += " (unable to read timestamp)" + + debug_node = NodeInfo( + label="Debug", desc=debug_value, status=NodeStatus.INFO + ) + parent_node.add_child(debug_node) + + # Check release directory + if os.path.exists(release_dir): + display_release = self._replace_home_with_var(release_dir) + release_value = display_release + + # Add size (only if thorough check) + if self.thorough_check: + release_size_gb = self._get_directory_size_gb(release_dir) + if release_size_gb is not None: + release_value += f", {release_size_gb:.1f} GB" + + try: + release_mtime = os.path.getmtime(release_dir) + release_time = self._format_timestamp_pdt(release_mtime) + release_value += f", modified={release_time}" + except Exception: + release_value += " (unable to read timestamp)" + + release_node = NodeInfo( + label="Release", desc=release_value, status=NodeStatus.INFO + ) + parent_node.add_child(release_node) + + # Find *.so file + so_file = self._find_so_file(cargo_target) + if so_file: + display_so = self._replace_home_with_var(so_file) + so_value = display_so + + # Add file size (only if thorough check) + if self.thorough_check: + try: + file_size_bytes = os.path.getsize(so_file) + file_size_mb = file_size_bytes / (1024**2) + so_value += f", {file_size_mb:.1f} MB" + except Exception: + pass + + try: + so_mtime = os.path.getmtime(so_file) + so_time = self._format_timestamp_pdt(so_mtime) + so_value += f", modified={so_time}" + except Exception: + so_value += " (unable to read timestamp)" + + binary_node = NodeInfo( + label="Binary", desc=so_value, status=NodeStatus.INFO + ) + parent_node.add_child(binary_node) + + def _find_so_file(self, target_directory: str) -> Optional[str]: + """Find the compiled *.so file in target directory.""" + # Check common locations for .so files + search_dirs = [ + os.path.join(target_directory, "debug"), + os.path.join(target_directory, "release"), + target_directory, + ] + + for search_dir in search_dirs: + if not os.path.exists(search_dir): + continue + + # Walk through directory looking for .so files + try: + for root, dirs, files in os.walk(search_dir): + for file in files: + if file.endswith(".so"): + return os.path.join(root, file) + # Don't recurse too deep + if root.count(os.sep) - search_dir.count(os.sep) > 2: + dirs[:] = [] # Stop recursion + except Exception: + pass + + return None + + +class MaturinInfo(NodeInfo): + """Maturin tool information (Python-Rust build tool)""" + + def __init__(self): + maturin_path = shutil.which("maturin") + if not maturin_path: + super().__init__(label="Maturin", desc="not found", status=NodeStatus.ERROR) + # Add installation hint as a child node + install_hint = NodeInfo( + label="Install with", + desc="uv pip install maturin[patchelf]", + status=NodeStatus.INFO, + ) + self.add_child(install_hint) + return + + try: + result = subprocess.run( + ["maturin", "--version"], capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0: + version = result.stdout.strip() + # Include the maturin binary path like Cargo and Git do + display_maturin_path = self._replace_home_with_var(maturin_path) + super().__init__( + label="Maturin", + desc=f"{display_maturin_path}, {version}", + status=NodeStatus.OK, + ) + return + except Exception: + pass + + super().__init__(label="Maturin", desc="not found", status=NodeStatus.ERROR) + + +class PythonInfo(NodeInfo): + """Python installation information""" + + def __init__(self): + py_version = platform.python_version() + py_exec = sys.executable or "python" + display_py_exec = self._replace_home_with_var(py_exec) + + super().__init__( + label="Python", + desc=f"{py_version}, {display_py_exec}", + status=NodeStatus.OK if os.path.exists(py_exec) else NodeStatus.ERROR, + ) + + # Check for PyTorch (optional) + try: + torch = __import__("torch") + version = getattr(torch, "__version__", "installed") + + # Check CUDA availability + cuda_status = None + if hasattr(torch, "cuda"): + try: + cuda_available = torch.cuda.is_available() + cuda_status = ( + "✅torch.cuda.is_available" + if cuda_available + else "❌torch.cuda.is_available" + ) + except Exception: + pass + + # Get installation path + install_path = None + if hasattr(torch, "__file__") and torch.__file__: + file_path = torch.__file__ + if "site-packages" in file_path: + parts = file_path.split(os.sep) + for i, part in enumerate(parts): + if part == "site-packages": + install_path = os.sep.join(parts[: i + 1]) + break + elif file_path: + install_path = os.path.dirname(file_path) + + if install_path: + install_path = self._replace_home_with_var(install_path) + + package_info = PythonPackageInfo( + package_name="PyTorch", + version=version, + cuda_status=cuda_status, + install_path=install_path, + is_framework=False, + ) + self.add_child(package_info) + except ImportError: + pass # PyTorch is optional, don't show if not installed + + # Add PYTHONPATH + pythonpath = os.environ.get("PYTHONPATH", "") + self.add_child(PythonPathInfo(pythonpath)) + + +class FrameworkInfo(NodeInfo): + """LLM Framework information""" + + def __init__(self): + super().__init__(label="🤖Framework", status=NodeStatus.INFO) + + # Check for framework packages (mandatory to show) + frameworks_to_check = [ + ("vllm", "vLLM"), + ("sglang", "Sglang"), + ("tensorrt_llm", "tensorRT LLM"), + ] + + frameworks_found = 0 + + for module_name, display_name in frameworks_to_check: + # Regular import for all frameworks + try: + module = __import__(module_name) + version = getattr(module, "__version__", "installed") + frameworks_found += 1 + + # Get module path + module_path = None + if hasattr(module, "__file__") and module.__file__: + module_path = self._replace_home_with_var(module.__file__) + + # Get executable path + exec_path = None + exec_path_raw = shutil.which(module_name) + if exec_path_raw: + exec_path = self._replace_home_with_var(exec_path_raw) + + package_info = PythonPackageInfo( + package_name=display_name, + version=version, + module_path=module_path, + exec_path=exec_path, + is_framework=True, + is_installed=True, + ) + self.add_child(package_info) + except (ImportError, Exception): + # Framework not installed - don't add it + pass + + # If no frameworks found, set status to ERROR (X) and show what's missing + if frameworks_found == 0: + self.status = NodeStatus.ERROR + # List all the frameworks that were checked but not found + missing_frameworks = [] + for module_name, display_name in frameworks_to_check: + missing_frameworks.append(f"no {module_name}") + missing_text = ", ".join(missing_frameworks) + self.desc = missing_text + + +class PythonPackageInfo(NodeInfo): + """Python package information""" + + def __init__( + self, + package_name: str, + version: str, + cuda_status: Optional[str] = None, + module_path: Optional[str] = None, + exec_path: Optional[str] = None, + install_path: Optional[str] = None, + is_framework: bool = False, + is_installed: bool = True, + ): + # Build display value + display_value = version + + # Determine status based on whether package is installed + if not is_installed or version == "-": + # Framework not found - show with "-" and use UNKNOWN status for ❓ symbol + display_value = "-" + status = NodeStatus.UNKNOWN # Show ❓ for not found frameworks + else: + status = NodeStatus.OK + + # Add CUDA status for PyTorch + if cuda_status: + display_value = f"{version}, {cuda_status}" + # Don't add install path for PyTorch with CUDA status + # For frameworks, add module and exec paths + elif is_framework and (module_path or exec_path): + parts = [version] + if module_path: + parts.append(f"module={module_path}") + if exec_path: + parts.append(f"exec={exec_path}") + display_value = ", ".join(parts) + # For regular packages, add install path + elif install_path: + display_value = f"{version} ({install_path})" + + super().__init__(label=package_name, desc=display_value, status=status) + + +class PythonPathInfo(NodeInfo): + """PYTHONPATH environment variable information""" + + def __init__(self, pythonpath: str): + if pythonpath: + # Split by colon and replace home in each path + paths = pythonpath.split(":") + display_paths = [] + has_invalid_paths = False + + for p in paths: + display_path = self._replace_home_with_var(p) + # Check if path exists and is accessible + if not os.path.exists(p) or not os.access(p, os.R_OK): + display_paths.append( + f"{Colors.BRIGHT_RED}{display_path}{Colors.RESET}" + ) # Bright red path + has_invalid_paths = True + else: + display_paths.append(display_path) + + display_pythonpath = ":".join(display_paths) + status = NodeStatus.WARNING if has_invalid_paths else NodeStatus.INFO + else: + display_pythonpath = "not set" + status = ( + NodeStatus.INFO + ) # PYTHONPATH not set is fine with editable installs + + super().__init__(label="PYTHONPATH", desc=display_pythonpath, status=status) + + +class DynamoRuntimeInfo(NodeInfo): + """Dynamo runtime components information""" + + def __init__(self, workspace_dir: str, thorough_check: bool = False): + self.thorough_check = thorough_check + # Try to get package version + import importlib.metadata + + try: + version = importlib.metadata.version("ai-dynamo-runtime") + runtime_value = f"ai-dynamo-runtime {version}" + is_installed = True + except Exception: + runtime_value = "ai-dynamo-runtime - Not installed" + is_installed = False + + super().__init__( + label="Runtime components", + desc=runtime_value, + status=NodeStatus.INFO, # Will update based on components found + ) + + # Add package info if installed + if is_installed: + # Add dist-info directory + dist_info = self._find_dist_info() + if dist_info: + self.add_child(dist_info) + + # Add .pth file + pth_file = self._find_pth_file() + if pth_file: + self.add_child(pth_file) + + # Discover runtime components from source + components = self._discover_runtime_components(workspace_dir) + + # Find where each component actually is and add them + if components: + # Calculate max width for alignment + max_len = max(len(comp) for comp in components) + + components_found = False + for component in components: + try: + # Try to import to find actual location + module = __import__(component, fromlist=[""]) + module_path = getattr(module, "__file__", None) + + if module_path: + # Add timestamp for .so files + timestamp_str = "" + if module_path.endswith(".so"): + try: + stat = os.stat(module_path) + timestamp = self._format_timestamp_pdt(stat.st_mtime) + timestamp_str = f", modified={timestamp}" + except Exception: + pass + + display_path = self._replace_home_with_var(module_path) + padded_name = f"{component:<{max_len}}" + module_node = NodeInfo( + label=f"✅ {padded_name}", + desc=f"{display_path}{timestamp_str}", + status=NodeStatus.NONE, + ) + self.add_child(module_node) + components_found = True + except ImportError as e: + # Module not importable - show as error + padded_name = f"{component:<{max_len}}" + error_msg = str(e) if str(e) else "Import failed" + module_node = NodeInfo( + label=padded_name, desc=error_msg, status=NodeStatus.ERROR + ) + self.add_child(module_node) + # Don't set components_found to True for failed imports + + # Update status and value based on whether we found components + if components_found: + self.status = NodeStatus.OK + # If not installed but components work via PYTHONPATH, update the message + if not is_installed: + self.desc = "ai-dynamo-runtime (via PYTHONPATH)" + else: + self.status = NodeStatus.ERROR + else: + # No components discovered at all + self.status = NodeStatus.ERROR + + # Final check: if no children at all (no components found), ensure it's an error + if not self.children: + self.status = NodeStatus.ERROR + + def _discover_runtime_components(self, workspace_dir: str) -> list: + """Discover ai-dynamo-runtime components from filesystem. + + Returns: + List of runtime component module names + Example: ['dynamo._core', 'dynamo.nixl_connect', 'dynamo.llm', 'dynamo.runtime'] + + Note: Always includes 'dynamo._core' (compiled Rust module), then scans + lib/bindings/python/src/dynamo/ for additional components. + """ + components = ["dynamo._core"] # Always include compiled Rust module + + if not workspace_dir: + return components + + # Scan runtime components (llm, runtime, nixl_connect, etc.) + runtime_path = os.path.join(workspace_dir, "lib/bindings/python/src/dynamo") + if not os.path.exists(runtime_path): + return components + + for item in os.listdir(runtime_path): + item_path = os.path.join(runtime_path, item) + if os.path.isdir(item_path) and os.path.exists( + os.path.join(item_path, "__init__.py") + ): + components.append(f"dynamo.{item}") + + return components + + def _find_dist_info(self) -> Optional[NodeInfo]: + """Find the dist-info directory for ai-dynamo-runtime.""" + import site + + for site_dir in site.getsitepackages(): + pattern = os.path.join(site_dir, "ai_dynamo_runtime*.dist-info") + matches = glob.glob(pattern) + if matches: + path = matches[0] + display_path = self._replace_home_with_var(path) + try: + stat = os.stat(path) + timestamp = self._format_timestamp_pdt(stat.st_ctime) + return NodeInfo( + label=f" {display_path}", + desc=f"created={timestamp}", + status=NodeStatus.INFO, + metadata={"part_of_previous": True}, + ) + except Exception: + return NodeInfo( + label=f" {display_path}", + status=NodeStatus.INFO, + metadata={"part_of_previous": True}, + ) + return None + + def _find_pth_file(self) -> Optional[NodeInfo]: + """Find the .pth file for ai-dynamo-runtime.""" + import site + + for site_dir in site.getsitepackages(): + pth_path = os.path.join(site_dir, "ai_dynamo_runtime.pth") + if os.path.exists(pth_path): + display_path = self._replace_home_with_var(pth_path) + try: + stat = os.stat(pth_path) + timestamp = self._format_timestamp_pdt(stat.st_mtime) + node = NodeInfo( + label=f" {display_path}", + desc=f"modified={timestamp}", + status=NodeStatus.INFO, + metadata={"part_of_previous": True}, + ) + + # Read where it points to + with open(pth_path, "r") as f: + content = f.read().strip() + if content: + display_content = self._replace_home_with_var(content) + points_to = NodeInfo( + label="→", desc=display_content, status=NodeStatus.INFO + ) + node.add_child(points_to) + + return node + except Exception: + return NodeInfo(label=display_path, status=NodeStatus.INFO) + return None + + +class DynamoFrameworkInfo(NodeInfo): + """Dynamo framework components information""" + + def __init__(self, workspace_dir: str, thorough_check: bool = False): + self.thorough_check = thorough_check + # Try to get package version + import importlib.metadata + + try: + version = importlib.metadata.version("ai-dynamo") + framework_value = f"ai-dynamo {version}" + is_installed = True + except Exception: + framework_value = "ai-dynamo - Not installed" + is_installed = False + + super().__init__( + label="Framework components", + desc=framework_value, + status=NodeStatus.INFO, # Will update based on components found + ) + + # Add package info if installed + if is_installed: + import glob + import site + + for site_dir in site.getsitepackages(): + # Look specifically for ai_dynamo (not ai_dynamo_runtime) + dist_pattern = os.path.join(site_dir, "ai_dynamo-*.dist-info") + matches = glob.glob(dist_pattern) + if matches: + path = matches[0] + display_path = self._replace_home_with_var(path) + try: + stat = os.stat(path) + timestamp = self._format_timestamp_pdt(stat.st_ctime) + dist_node = NodeInfo( + label=f" {display_path}", + desc=f"created={timestamp}", + status=NodeStatus.INFO, + metadata={"part_of_previous": True}, + ) + self.add_child(dist_node) + except Exception: + dist_node = NodeInfo( + label=f" {display_path}", + status=NodeStatus.INFO, + metadata={"part_of_previous": True}, + ) + self.add_child(dist_node) + break + + # Discover framework components from source + components = self._discover_framework_components(workspace_dir) + + # Find where each component actually is and add them + if components: + # Sort components for consistent output + components.sort() + + # Calculate max width for alignment + max_len = max(len(comp) for comp in components) + + components_found = False + for component in components: + try: + # Try to import to find actual location + module = __import__(component, fromlist=[""]) + module_path = getattr(module, "__file__", None) + + if module_path: + display_path = self._replace_home_with_var(module_path) + padded_name = f"{component:<{max_len}}" + component_node = NodeInfo( + label=f"✅ {padded_name}", + desc=display_path, + status=NodeStatus.NONE, + ) + self.add_child(component_node) + components_found = True + except ImportError as e: + # Module not importable - show as error + padded_name = f"{component:<{max_len}}" + error_msg = str(e) if str(e) else "Import failed" + component_node = NodeInfo( + label=padded_name, desc=error_msg, status=NodeStatus.ERROR + ) + self.add_child(component_node) + # Don't set components_found to True for failed imports + + # Update status and value based on whether we found components + if components_found: + self.status = NodeStatus.OK + # If not installed but components work via PYTHONPATH, update the message + if not is_installed: + self.desc = "ai-dynamo (via PYTHONPATH)" + else: + self.status = NodeStatus.ERROR + else: + # No components discovered at all + self.status = NodeStatus.ERROR + + def _discover_framework_components(self, workspace_dir: str) -> list: + """Discover ai-dynamo framework components from filesystem. + + Returns: + List of framework component module names + Example: ['dynamo.frontend', 'dynamo.planner', 'dynamo.vllm', 'dynamo.sglang', 'dynamo.llama_cpp'] + + Note: Scans components/src/dynamo/... directory for modules with __init__.py files. + """ + components: List[str] = [] + + if not workspace_dir: + return components + + # Scan the components/src/dynamo/... Python directory for __init__.py files + components_path = os.path.join(workspace_dir, "components", "src", "dynamo") + if os.path.exists(components_path): + for item in os.listdir(components_path): + item_path = os.path.join(components_path, item) + if os.path.isdir(item_path): + # Check for dynamo module in src + module_path = os.path.join(item_path, "__init__.py") + if os.path.exists(module_path): + components.append(f"dynamo.{item}") + + return components + + +class DynamoInfo(NodeInfo): + """Dynamo workspace information""" + + def __init__(self, thorough_check: bool = False): + self.thorough_check = thorough_check + + # Find workspace directory + workspace_dir = DynamoInfo.find_workspace() + + if not workspace_dir: + # Show error when workspace is not found + super().__init__( + label="Dynamo", + desc="workspace not found - cannot detect Runtime and Framework components", + status=NodeStatus.ERROR, + ) + # Add helpful information about where we looked + search_paths = NodeInfo( + label="Searched in", + desc="current dir, ~/dynamo, DYNAMO_HOME, /workspace", + status=NodeStatus.INFO, + ) + self.add_child(search_paths) + hint = NodeInfo( + label="Hint", + desc="Run from a Dynamo workspace directory or set DYNAMO_HOME", + status=NodeStatus.INFO, + ) + self.add_child(hint) + return + + # Get git info + sha, date = self._get_git_info(workspace_dir) + + # Build main label + display_workspace = self._replace_home_with_var(workspace_dir) + if sha and date: + value = f"{display_workspace}, SHA: {sha}, Date: {date}" + else: + value = display_workspace + + super().__init__(label="Dynamo", desc=value, status=NodeStatus.INFO) + + # Always add runtime components + runtime_info = DynamoRuntimeInfo( + workspace_dir, thorough_check=self.thorough_check + ) + self.add_child(runtime_info) + + # Always add framework components + framework_info = DynamoFrameworkInfo( + workspace_dir, thorough_check=self.thorough_check + ) + self.add_child(framework_info) + + def _get_git_info(self, workspace_dir: str) -> Tuple[Optional[str], Optional[str]]: + """Get git SHA and date for the workspace.""" + try: + # Get short SHA + result = subprocess.run( + ["git", "rev-parse", "--short", "HEAD"], + capture_output=True, + text=True, + cwd=workspace_dir, + timeout=5, + ) + sha = result.stdout.strip() if result.returncode == 0 else None + + # Get commit date + result = subprocess.run( + ["git", "show", "-s", "--format=%ci", "HEAD"], + capture_output=True, + text=True, + cwd=workspace_dir, + timeout=5, + ) + if result.returncode == 0 and result.stdout.strip(): + # Convert to PDT format + date_str = result.stdout.strip() + # Parse and format as PDT + try: + # Parse the git date (format: 2025-08-30 23:22:29 +0000) + import datetime as dt_module + + # Split off timezone info + date_part = date_str.rsplit(" ", 1)[0] + dt = dt_module.datetime.strptime(date_part, "%Y-%m-%d %H:%M:%S") + # Convert to PDT (UTC-7) + dt_pdt = dt - dt_module.timedelta(hours=7) + date = dt_pdt.strftime("%Y-%m-%d %H:%M:%S PDT") + except Exception: + date = date_str + else: + date = None + + return sha, date + except Exception: + return None, None + + @staticmethod + def find_workspace() -> Optional[str]: + """Find dynamo workspace directory.""" + candidates = [] + + # Check DYNAMO_HOME environment variable first + dynamo_home = os.environ.get("DYNAMO_HOME") + if dynamo_home: + candidates.append(dynamo_home) + + # Then check common locations + candidates.extend( + [ + ".", # Current directory + os.path.expanduser("~/dynamo"), + "/workspace", + ] + ) + + for candidate in candidates: + if DynamoInfo.is_dynamo_workspace(candidate): + return os.path.abspath(candidate) + return None + + @staticmethod + def is_dynamo_workspace(path: str) -> bool: + """Check if directory is a dynamo workspace.""" + if not os.path.exists(path): + return False + + # Check for indicators of a dynamo workspace + indicators = [ + "README.md", + "components", + "lib/bindings/python", + "lib/runtime", + "Cargo.toml", + ] + + # Require at least 3 indicators to be confident + found = 0 + for indicator in indicators: + check_path = os.path.join(path, indicator) + if os.path.exists(check_path): + found += 1 + + return found >= 3 + + +def has_framework_errors(tree: NodeInfo) -> bool: + """Check if there are framework component errors in the tree""" + # Find the Dynamo node + for child in tree.children: + if child.label and "Dynamo" in child.label: + # Find the Framework components node + for dynamo_child in child.children: + if dynamo_child.label and "Framework components" in dynamo_child.label: + # Use the has_errors() method to check the entire subtree + return dynamo_child.has_errors() + return False + + +def show_installation_recommendation(): + """Show installation recommendations for missing components.""" + print("\nTo install missing components for development (not production):") + print(" Runtime: (cd lib/bindings/python && maturin develop)") + print(" Framework: uv pip install -e .") + print(" or export PYTHONPATH=$DYNAMO_HOME/components/src\n") + + +def main(): + """Main function - collect and display system information""" + import argparse + import sys + + # Parse command line arguments + parser = argparse.ArgumentParser( + description="Display system information for Dynamo project" + ) + parser.add_argument( + "--thorough-check", + action="store_true", + help="Enable thorough checking (file permissions, directory sizes, disk space, etc.)", + ) + parser.add_argument( + "--terse", + action="store_true", + help="Show only essential information (OS, User, GPU, Framework, Dynamo) and errors", + ) + args = parser.parse_args() + + # Validate mutual exclusion + if args.thorough_check and args.terse: + parser.error("--thorough-check and --terse cannot be used together") + + # Simply create a SystemInfo instance - it collects everything in its constructor + tree = SystemInfo(thorough_check=args.thorough_check, terse=args.terse) + tree.print_tree() + + # Check if there are framework component errors and show installation recommendation + if has_framework_errors(tree): + show_installation_recommendation() + + # Exit with non-zero status if there are any errors + if tree.has_errors(): + sys.exit(1) + else: + sys.exit(0) + + +if __name__ == "__main__": + main() From 4ceb2da0d1b6c3ad456a541057ff1e22ec777f82 Mon Sep 17 00:00:00 2001 From: Keiven Chang Date: Tue, 7 Oct 2025 00:32:32 +0000 Subject: [PATCH 2/2] fixed a corner case if DYNAMO_HOME export is a literal ~ char Signed-off-by: Keiven Chang --- deploy/sanity_check.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deploy/sanity_check.py b/deploy/sanity_check.py index f6ea22001f..fd3db7eab2 100755 --- a/deploy/sanity_check.py +++ b/deploy/sanity_check.py @@ -2134,7 +2134,7 @@ def find_workspace() -> Optional[str]: # Check DYNAMO_HOME environment variable first dynamo_home = os.environ.get("DYNAMO_HOME") if dynamo_home: - candidates.append(dynamo_home) + candidates.append(os.path.expanduser(dynamo_home)) # Then check common locations candidates.extend(