diff --git a/src/mcp_server_uyuni/config.py b/src/mcp_server_uyuni/config.py index 872349c..b739d81 100644 --- a/src/mcp_server_uyuni/config.py +++ b/src/mcp_server_uyuni/config.py @@ -16,7 +16,7 @@ UYUNI_MCP_HOST = os.environ.get("UYUNI_MCP_HOST", "127.0.0.1") UYUNI_MCP_PORT = int(os.environ.get("UYUNI_MCP_PORT", "8000")) -UYUNI_AUTH_SERVER = os.environ.get("UYUNI_AUTH_SERVER") +UYUNI_AUTH_SERVER = os.environ.get("UYUNI_AUTH_SERVER", None) UYUNI_MCP_SSL_VERIFY = ( os.environ.get("UYUNI_MCP_SSL_VERIFY", "true").lower() diff --git a/src/mcp_server_uyuni/errors.py b/src/mcp_server_uyuni/errors.py new file mode 100644 index 0000000..ab50b9d --- /dev/null +++ b/src/mcp_server_uyuni/errors.py @@ -0,0 +1,92 @@ +"""Centralized API exception classes for the MCP server. + +This module exposes a small hierarchy of exceptions that describe +transport, HTTP and application-level failures when talking to Uyuni. +""" + +class APIError(Exception): + """Base class for API errors.""" + + +class HTTPError(APIError): + """HTTP status related error. + + Attributes: + status_code: HTTP status code returned by the server. + url: URL that was requested. + body: Optional response body. + """ + + def __init__(self, status_code: int, url: str, body: str | None = None): + msg = f"HTTP error {status_code} for {url}. Response body: {body!r}" + super().__init__(msg) + self.status_code = status_code + self.url = url + self.body = body + + +class AuthError(HTTPError): + """Authentication-related HTTP error. + """ + # No custom constructor: behave exactly like HTTPError. + pass + + +class NetworkError(APIError): + """Network/transport level error. + + Represents both timeouts and other connection-level failures. Callers + can inspect the `timed_out` attribute to distinguish timeouts. + """ + + def __init__(self, url: str, original: Exception | None = None, timed_out: bool = False): + if timed_out: + msg = ( + f"Timeout while contacting Uyuni at {url}. This may indicate a long-running " + "action or network issues. Original: {original}" + ) + else: + msg = f"Network error while contacting Uyuni at {url}: {original}" + super().__init__(msg) + self.url = url + self.original = original + self.timed_out = timed_out + + +class UnexpectedResponse(APIError): + """Application-level unexpected response from Uyuni. + + This represents business-logic level failures returned by the Uyuni API + (for example: resource not found, entity already exists, validation + failures, etc.). The `response` should be a human-readable string with + the message/reason returned by Uyuni. + """ + + def __init__(self, url: str, response: str | None = None): + msg = f"Unexpected response from Uyuni at {url}: {response!r}" + super().__init__(msg) + self.url = url + self.response = response + + +class NotFoundError(APIError): + """ + Raised when an entity/resource is not found in Uyuni or MCP APIs. + Can be used for missing systems, packages, events, etc. + The identifier is either an int or a str. + """ + def __init__(self, what: str, identifier: int | str = None): + msg = f"{what} not found" + (f" (identifier: {identifier})" if identifier is not None else "") + super().__init__(msg) + self.what = what + self.identifier: int | str | None = identifier + + +__all__ = [ + "APIError", + "HTTPError", + "AuthError", + "NetworkError", + "UnexpectedResponse", + "NotFoundError", +] diff --git a/src/mcp_server_uyuni/logging_config.py b/src/mcp_server_uyuni/logging_config.py index d778eb1..8fb3414 100644 --- a/src/mcp_server_uyuni/logging_config.py +++ b/src/mcp_server_uyuni/logging_config.py @@ -1,7 +1,7 @@ import sys import logging from typing import Union -from mcp_server_uyuni.constants import Transport +from .constants import Transport def get_logger( name: str = "mcp_server_uyuni", @@ -41,4 +41,4 @@ def get_logger( logger.addHandler(handler) - return logger \ No newline at end of file + return logger diff --git a/src/mcp_server_uyuni/server.py b/src/mcp_server_uyuni/server.py index ee5c1db..4ca96e6 100644 --- a/src/mcp_server_uyuni/server.py +++ b/src/mcp_server_uyuni/server.py @@ -23,11 +23,15 @@ from fastmcp.server.middleware import Middleware, MiddlewareContext from fastmcp import FastMCP, Context from mcp import LoggingLevel, ServerSession, types -from mcp_server_uyuni.logging_config import get_logger, Transport -from mcp_server_uyuni.uyuni_api import call as call_uyuni_api, TIMEOUT_HAPPENED -from mcp_server_uyuni.config import CONFIG +from .logging_config import get_logger, Transport +from .uyuni_api import call as call_uyuni_api, TIMEOUT_HAPPENED +from .config import CONFIG from .auth import AuthProvider +from .errors import ( + UnexpectedResponse, + NotFoundError +) class ActivationKeySchema(BaseModel): activation_key: str @@ -88,25 +92,32 @@ def _to_bool(value) -> bool: return str(value).lower() in ("true", "yes", "1") @mcp.tool() -async def get_list_of_active_systems(ctx: Context) -> List[Dict[str, Any]]: +async def list_systems(ctx: Context) -> List[Dict[str, Any]]: """ Fetches a list of active systems from the Uyuni server, returning their names and IDs. - The returned list contains dictionaries, each with a 'system_name' (str) and - a 'system_id' (int) for an active system. + The returned list contains system objects, each of which consists of a 'system_name' + and a numerical 'system_id' field for an active system. + + You SHOULD use the 'system_id' to call other system related tools. Returns: - List[Dict[str, Any]]: A list of system dictionaries (system_name and system_id). - Returns an empty list if the API call fails, - the response format is unexpected, or no systems are found. + A list of system objects (system_name and system_id). + Returns an empty list if no systems are found. + + Example: + [ + { "system_name": "ubuntu.example.com", "system_id": 100010000 }, + { "system_name": "opensuseleap15.example.com", "system_id": 100010001 } + ] """ log_string = "Getting list of active systems" logger.info(log_string) await ctx.info(log_string) - return await _get_list_of_active_systems(ctx.get_state('token')) + return await _list_systems(ctx.get_state('token')) -async def _get_list_of_active_systems(token: str) -> List[Dict[str, Union[str, int]]]: +async def _list_systems(token: str) -> List[Dict[str, Union[str, int]]]: async with httpx.AsyncClient(verify=CONFIG["UYUNI_MCP_SSL_VERIFY"]) as client: systems_data_result = await call_uyuni_api( @@ -114,8 +125,7 @@ async def _get_list_of_active_systems(token: str) -> List[Dict[str, Union[str, i method="GET", api_path="/rhn/manager/api/system/listSystems", error_context="fetching active systems", - token=token, - default_on_error=[] + token=token ) filtered_systems = [] @@ -124,159 +134,486 @@ async def _get_list_of_active_systems(token: str) -> List[Dict[str, Union[str, i if isinstance(system, dict): filtered_systems.append({'system_name': system.get('name'), 'system_id': system.get('id')}) else: - print(f"Warning: Unexpected item format in system list: {system}") - elif systems_data_result: # Log if not the default empty list but still not a list - print(f"Warning: Expected a list of systems, but received: {type(systems_data_result)}") + logger.warning(f"Unexpected item format in system list: {system}") + elif systems_data_result: + logger.warning(f"Expected a list of systems, but received: {type(systems_data_result)}") return filtered_systems -async def _resolve_system_id(system_identifier: Union[str, int], token: str) -> Optional[str]: - """ - Resolves a system identifier, which can be a name or an ID, to a numeric system ID string. - - If the identifier is numeric (or a string of digits), it's returned as a string. - If it's a non-numeric string, it's treated as a name and the ID is looked up via the system.getId API endpoint. - +@mcp.tool() +async def get_system_details(system_identifier: Union[str, int], ctx: Context): + """Gets details of the specified system. + Args: - system_identifier: The system name (e.g., "buildhost") or system ID (e.g., 1000010000). + system_identifier: The system name (e.g., "buildhost.example.com") or system ID (e.g., 1000010000). + Prefer using numerical system IDs instead of system names when possible. + Returns: - Optional[str]: The numeric system ID as a string if found, otherwise None. - """ - id_str = str(system_identifier) - if id_str.isdigit(): - return id_str - - # If it's not a digit string, it must be a name. - system_name = id_str - logger.info(f"System identifier '{system_name}' is not numeric, treating as a name and looking up ID.") + An object that contains the following attributes of the system: + - system_id: The numerical ID of the system within Uyuni server + - system_name: The registered system name, usually its main FQDN + - last_boot: The last boot time of the system known to Uyuni server + - uuid: UUID of the system if it is a virtual instance, null otherwise. + - cpu: An object with the following CPU attributes of the system: + - family: The CPU family + - mhz: The CPU clock speed + - model: The CPU model + - vendor: The CPU vendor + - arch: The CPU architecture + - network: Network addresses and the hostname of the system. + - hostname: The hostname of the system + - ip: The IPv4 address of the system + - ip6: The IPv6 address of the system + - installed_products: List of installed products on the system. + You can use this field to identify what OS the system is running. + + Example: + { + "system_id": "100010001", + "system_name": "opensuse.example.local", + "last_boot": "2025-04-01T15:21:56Z", + "uuid": "a8c3f40d-c1ae-406e-9f9b-96e7d5fdf5a3", + "cpu": { + "family": "15", + "mhz": "1896.436", + "model": "QEMU Virtual CPU", + "vendor": "AuthenticAMD", + "arch": "x86_64" + }, + "network": { + "hostname": "opensuse.example.local", + "ip": "192.168.122.193", + "ip6": "fe80::5054:ff:fe12:3456" + }, + "installed_products": [ + { + "release": "0", + "name": "SLES", + "isBaseProduct": true, + "arch": "x86_64", + "version": "15.7", + "friendlyName": "SUSE Linux Enterprise Server 15 SP7 x86_64" + }, + { + "release": "0", + "name": "sle-module-basesystem", + "isBaseProduct": false, + "arch": "x86_64", + "version": "15.7", + "friendlyName": "Basesystem Module 15 SP7 x86_64" + } + ] + } + """ + log_string = f"Getting details of system {system_identifier}" + logger.info(log_string) + await ctx.info(log_string) + return await _get_system_details(system_identifier, ctx.get_state('token')) + +async def _get_system_details(system_identifier: Union[str, int], token: str) -> Dict[str, Any]: + system_id = await _resolve_system_id(system_identifier, token) async with httpx.AsyncClient(verify=CONFIG["UYUNI_MCP_SSL_VERIFY"]) as client: - # The result from system.getId is an array of system structs - systems_list = await call_uyuni_api( + details_call: Coroutine = call_uyuni_api( client=client, method="GET", - api_path="/rhn/manager/api/system/getId", - params={'name': system_name}, - error_context=f"resolving system ID for name '{system_name}'", - token=token, - default_on_error=[] # Return an empty list on failure + api_path="/rhn/manager/api/system/getDetails", + params={'sid': system_id}, + error_context=f"Fetching details for system {system_id}", + token=token ) - - if not isinstance(systems_list, list): - logger.error(f"Expected a list of systems for name '{system_name}', but received: {type(systems_list)}") - return None - - if not systems_list: - logger.warning(f"System with name '{system_name}' not found.") - return None - - if len(systems_list) > 1: - logger.error(f"Multiple systems found with name '{system_name}'.") - return None - - first_system = systems_list[0] - if isinstance(first_system, dict) and 'id' in first_system: - resolved_id = str(first_system['id']) - logger.info(f"Found ID {resolved_id} for system name '{system_name}'.") - return resolved_id + uuid_call: Coroutine = call_uyuni_api( + client=client, + method="GET", + api_path="/rhn/manager/api/system/getUuid", + params={'sid': system_id}, + error_context=f"Fetching UUID for system {system_id}", + token=token + ) + cpu_call: Coroutine = call_uyuni_api( + client=client, + method="GET", + api_path="/rhn/manager/api/system/getCpu", + params={'sid': system_id}, + error_context=f"Fetching CPU information for system {system_id}", + token=token + ) + network_call: Coroutine = call_uyuni_api( + client=client, + method="GET", + api_path="/rhn/manager/api/system/getNetwork", + params={'sid': system_id}, + error_context=f"Fetching network information for system {system_id}", + token=token + ) + products_call: Coroutine = call_uyuni_api( + client=client, + method="GET", + api_path="/rhn/manager/api/system/getInstalledProducts", + params={'sid': system_id}, + error_context=f"Fetching installed product information for system {system_id}", + token=token + ) + + results = await asyncio.gather( + details_call, + uuid_call, + cpu_call, + network_call, + products_call + ) + + details_result, uuid_result, cpu_result, network_result, products_result = results + + if isinstance(details_result, dict): + # Only add the identifier if the API returned actual data + system_details = { + "system_id": details_result["id"], + "system_name": details_result["profile_name"], + "last_boot": details_result["last_boot"], + "uuid": uuid_result + } + + if isinstance(cpu_result, dict): + cpu_details = { + "family": cpu_result["family"], + "mhz": cpu_result["mhz"], + "model": cpu_result["model"], + "vendor": cpu_result["vendor"], + "arch": cpu_result["arch"] + } + system_details["cpu"] = cpu_details + else: + logger.error(f"Unexpected API response when getting CPU information for system {system_id}") + logger.error(cpu_result) + + if isinstance(network_result, dict): + network_details = { + "hostname": network_result["hostname"], + "ip": network_result["ip"], + "ip6": network_result["ip6"] + } + system_details["network"] = network_details + else: + logger.error(f"Unexpected API response when getting network information for system {system_id}") + logger.error(network_result) + + if isinstance(products_result, list): + base_product = [p["friendlyName"] for p in products_result if p["isBaseProduct"]] + system_details["installed_products"] = base_product + else: + logger.error(f"Unexpected API response when getting installed products for system {system_id}") + logger.error(products_result) + + return system_details else: - logger.error(f"System data for '{system_name}' is malformed. Expected a dict with 'id'. Got: {first_system}") - return None + logger.error(f"Unexpected API response when getting details for system {system_id}") + logger.error(details_result) + return {} @mcp.tool() -async def get_cpu_of_a_system(system_identifier: Union[str, int], ctx: Context) -> Dict[str, Any]: +async def get_system_event_history(system_identifier: Union[str, int], ctx: Context, offset: int = 0, limit: int = 10, earliest_date: str = None): + """Gets the event/action history of the specified system. - """Retrieves detailed CPU information for a specific system in the Uyuni server. + The output of this tool is paginated and can be controlled via 'offset' and 'limit' parameters. - Fetches CPU attributes such as model, core count, architecture, etc. + Optionally, the 'earliest_date' parameter can be set to an ISO-8601 date to specify the earliest date + for the events to be returned. + + You SHOULD use 'get_system_event_details' tool with an event ID to get the details of an event. + + You SHOULD use this tool to check the status of a reboot. A reboot is finished when + its related action is completed. Args: - system_identifier: The unique identifier of the system. It can be the system name (e.g. "buildhost") or the system ID (e.g. 1000010000). + system_identifier: The system name (e.g., "buildhost.example.com") or system ID (e.g., 1000010000). + Prefer using numerical system IDs instead of system names when possible. + offset: Number of results to skip + limit: Maximum number of results + earliest_date: The earliest ISO-8601 date-time string to filter the events (optional) Returns: - Dict[str, Any]: A dictionary containing the CPU attributes and the original system_identifier. - Returns an empty dictionary if the API call fails, - the response format is unexpected, or CPU data is not available. - """ - log_string = f"Getting CPU information of system with id {system_identifier}" + A list of event/action status, newest to oldest. + + A single event object contains the following attributes: + + - id: The ID of the event + - history_type: The type of the event + - status: Event's status (completed, failed, etc.) + - summary: A short summary of the event + - completed: ISO-8601 date & time of the event's completion timestamp + + Example: + [ + { + "id": 12, + "history_type": "System reboot", + "status": "Completed", + "summary": "System reboot scheduled by admin", + "completed": "2025-11-27T15:37:28Z" + }, + { + "id": 357, + "history_type": "Patch Update", + "status": "Failed" + "summary": "Patch Update: Security update for the Linux Kernel", + "completed": "2025-11-28T13:11:49Z" + } + ] + """ + log_string = f"Getting event history of system {system_identifier}" logger.info(log_string) await ctx.info(log_string) - return await _get_cpu_of_a_system(system_identifier, ctx.get_state('token')) + return await _get_system_event_history(system_identifier, limit, offset, earliest_date, ctx.get_state('token')) -async def _get_cpu_of_a_system(system_identifier: Union[str, int], token: str) -> Dict[str, Any]: +async def _get_system_event_history(system_identifier: Union[str, int], limit: int, offset: int, earliest_date: str, token: str) -> list[Any]: system_id = await _resolve_system_id(system_identifier, token) - if not system_id: - return {} # Helper function already logged the reason for failure. async with httpx.AsyncClient(verify=CONFIG["UYUNI_MCP_SSL_VERIFY"]) as client: - cpu_data_result = await call_uyuni_api( + params = {'sid': system_id, 'limit': limit, 'offset': offset} + if earliest_date: + params['earliestDate'] = earliest_date + + result = await call_uyuni_api( client=client, method="GET", - api_path="/rhn/manager/api/system/getCpu", - params={'sid': system_id}, - error_context=f"fetching CPU data for system {system_identifier}", - token=token, - default_on_error={} + api_path="/rhn/manager/api/system/getEventHistory", + params=params, + error_context=f"Fetching event history for system {system_id}", + token=token ) - if isinstance(cpu_data_result, dict): - # Only add the identifier if the API returned actual data - if cpu_data_result: - cpu_data_result['system_identifier'] = system_identifier - return cpu_data_result - # If not a dict but not the default empty dict, log it - elif cpu_data_result: - print(f"Warning: Expected a dict for CPU data, but received: {type(cpu_data_result)}") + if isinstance(result, list): + return result + else: + logger.error(f"Unexpected API response when getting event history for system {system_id}") + logger.error(result) return {} @mcp.tool() -async def get_all_systems_cpu_info(ctx: Context) -> List[Dict[str, Any]]: +async def get_system_event_details(system_identifier: Union[str, int], event_id: int, ctx: Context): + """Gets the details of the event associated with the especified server and event ID. + + The event ID must be a value returned by the 'get_system_event_history' tool. + + Args: + system_identifier: The system name (e.g., "buildhost.example.com") or system ID (e.g., 1000010000). + Prefer using numerical system IDs instead of system names when possible. + event_id: The ID of the event + + Returns: + An object that contains the details of the associated event. + + The event object contains the following attributes: + + - id: The ID of the event + - history_type: The type of the event + - status: Event's status (completed, failed, etc.) + - summary: A short summary of the event + - created: ISO-8601 date & time of the event's creation timestamp + - picked_up: ISO-8601 date & time when the event was picked up by the system + - completed: ISO-8601 date & time of the event's completion timestamp + - earliest_action: The earliest ISO-8601 date & time this action should occur + - result_msg: The result string of the action executed on the system + - result_code: The result code of the action executed on the system + - additional_info: Additional information on the event, if available + + Example: + [ + { + "id": 12, + "history_type": "System reboot", + "status": "Completed", + "summary": "System reboot scheduled by admin", + "completed": "2025-11-27T15:37:28Z" + }, + { + "id": 357, + "history_type": "Patch Update", + "status": "Failed" + "summary": "Patch Update: Security update for the Linux Kernel", + "completed": "2025-11-28T13:11:49Z" + } + ] + """ + log_string = f"Getting event history of system {system_identifier}" + logger.info(log_string) + await ctx.info(log_string) + return await _get_system_event_details(system_identifier, event_id, ctx.get_state('token')) + +async def _get_system_event_details(system_identifier: Union[str, int], event_id: int, token: str) -> Dict[str, Any]: + system_id = await _resolve_system_id(system_identifier, token) + + async with httpx.AsyncClient(verify=CONFIG["UYUNI_MCP_SSL_VERIFY"]) as client: + result = await call_uyuni_api( + client=client, + method="GET", + api_path="/rhn/manager/api/system/getEventDetails", + params={'sid': system_id, 'eid': event_id}, + error_context=f"Fetching event details for event {event_id}, system {system_id}", + token=token + ) + + if isinstance(result, dict): + return result + else: + logger.error(f"Unexpected API response when getting event details for event {event_id}, system {system_id}") + logger.error(result) + return {} + +@mcp.tool() +async def find_systems_by_name(name: str, ctx: Context) -> List[Dict[str, Union[str, int]]]: """ - Retrieves CPU information for all active systems in the Uyuni server. + Lists systems that match the provided hostname. - For each active system, this tool fetches its name, ID, and detailed CPU attributes. + Args: + name: The system name (e.g., "buildhost.example.com"). Returns: - List[Dict[str, Any]]: A list of dictionaries. Each dictionary contains: - - 'system_name' (str): The name of the system. - - 'system_id' (int): The unique ID of the system. - - 'cpu_info' (Dict[str, Any]): CPU attributes for the system. - Returns an empty list if no systems are found or if - fetching system list fails. Individual system CPU fetch - failures will result in empty 'cpu_info' for that system. + A list of system objects (system_name and system_id) that match the provided name. + Returns an empty list if no systems are found. + + Example: + [ + { "system_name": "ubuntu1.example.com", "system_id": 100010000 }, + { "system_name": "ubuntu2.example.com", "system_id": 100010001 } + ] + """ + log_string = f"Finding systems with name {name}" + logger.info(log_string) + await ctx.info(log_string) + + token = ctx.get_state('token') + async with httpx.AsyncClient(verify=CONFIG["UYUNI_MCP_SSL_VERIFY"]) as client: + systems_data_result = await call_uyuni_api( + client=client, + method="GET", + api_path="/rhn/manager/api/system/search/hostname", + params={'searchTerm': name}, + error_context=f"finding systems with name {name}", + token=token + ) + + filtered_systems = [] + if isinstance(systems_data_result, list): + for system in systems_data_result: + if isinstance(system, dict): + filtered_systems.append({'system_name': system.get('name'), 'system_id': system.get('id')}) + else: + logger.warning(f"Unexpected item format in system list: {system}") + elif systems_data_result: + logger.warning(f"Expected a list of systems, but received: {type(systems_data_result)}") + + return filtered_systems + +@mcp.tool() +async def find_systems_by_ip(ip_address: str, ctx: Context) -> List[Dict[str, Union[str, int]]]: """ + Lists systems that match the provided IP address. + + Args: + ip_address: The system IP address (e.g., "192.168.122.193"). - log_string = "Get CPU info for all systems" + Returns: + A list of system objects (system_name, system_id and ip) that match the provided IP address. + Returns an empty list if no systems are found. + + Example: + [ + { + "system_name": "ubuntu.example.com", + "system_id": 100010000, + "ip": "192.168.122.193" + } + ] + """ + log_string = f"Finding systems with IP address {ip_address}" logger.info(log_string) await ctx.info(log_string) - all_systems_cpu_data = [] - active_systems = await _get_list_of_active_systems(ctx.get_state('token')) + token = ctx.get_state('token') + async with httpx.AsyncClient(verify=CONFIG["UYUNI_MCP_SSL_VERIFY"]) as client: + systems_data_result = await call_uyuni_api( + client=client, + method="GET", + api_path="/rhn/manager/api/system/search/ip", + params={'searchTerm': ip_address}, + error_context=f"finding systems with IP address {ip_address}", + token=token + ) - if not active_systems: - print("Warning: No active systems found or failed to retrieve system list.") - return [] + filtered_systems = [] + if isinstance(systems_data_result, list): + for system in systems_data_result: + if isinstance(system, dict): + filtered_systems.append({'system_name': system.get('name'), 'system_id': system.get('id'), 'ip': system.get('ip')}) + else: + logger.warning(f"Unexpected item format in system list: {system}") + elif systems_data_result: + logger.warning(f"Expected a list of systems, but received: {type(systems_data_result)}") - for system_summary in active_systems: - system_id = system_summary.get('system_id') - system_name = system_summary.get('system_name') + return filtered_systems - if system_id is None: - print(f"Warning: Skipping system due to missing ID: {system_summary}") - continue +async def _resolve_system_id(system_identifier: Union[str, int], token: str) -> str: + """ + Resolves a system identifier, which can be a name or an ID, to a numeric system ID string. - print(f"Fetching CPU info for system: {system_name} (ID: {system_id})") - cpu_info = await _get_cpu_of_a_system(str(system_id), ctx.get_state('token')) + If the identifier is numeric (or a string of digits), it's returned as a string. + If it's a non-numeric string, it's treated as a name and the ID is looked up via the system.getId API endpoint. - all_systems_cpu_data.append({ - 'system_name': system_name, - 'system_id': system_id, - 'cpu_info': cpu_info - }) + Args: + system_identifier: The system name (e.g., "buildhost.example.com") or system ID (e.g., 1000010000). - return all_systems_cpu_data + Returns: + str: The numeric system ID as a string. -async def _fetch_cves_for_erratum(client: httpx.AsyncClient, advisory_name: str, system_id: int, + Raises: + NotFoundError: If no systems match the provided name. + UnexpectedResponse: If the Uyuni API returns an unexpected payload (non-list, malformed items, + or multiple matches for a single name). + """ + id_str = str(system_identifier) + if id_str.isdigit(): + return id_str + + # If it's not a digit string, it must be a name. + system_name = id_str + logger.info(f"System identifier '{system_name}' is not numeric, treating as a name and looking up ID.") + + async with httpx.AsyncClient(verify=CONFIG["UYUNI_MCP_SSL_VERIFY"]) as client: + api_path = "/rhn/manager/api/system/getId" + # The result from system.getId is an array of system structs + systems_list = await call_uyuni_api( + client=client, + method="GET", + api_path=api_path, + params={'name': system_name}, + error_context=f"resolving system ID for name '{system_name}'", + token=token + ) + + if not isinstance(systems_list, list): + logger.error(f"Expected a list of systems for name '{system_name}', but received: {type(systems_list)}") + raise UnexpectedResponse(CONFIG["UYUNI_SERVER"] + api_path, repr(systems_list)) + + if not systems_list: + logger.error(f"System with name '{system_name}' not found.") + raise NotFoundError("System", system_name) + + if len(systems_list) > 1: + logger.error(f"Multiple systems found with name '{system_name}'.") + raise UnexpectedResponse(CONFIG["UYUNI_SERVER"] + api_path, f"Multiple systems found for name {system_name}") + + first_system = systems_list[0] + if isinstance(first_system, dict) and 'id' in first_system: + resolved_id = str(first_system['id']) + logger.info(f"Found ID {resolved_id} for system name '{system_name}'.") + return resolved_id + else: + logger.error(f"System data for '{system_name}' is malformed. Expected a dict with 'id'. Got: {first_system}") + raise UnexpectedResponse(CONFIG["UYUNI_SERVER"] + api_path, f"Malformed system data: {first_system!r}") + +async def _fetch_cves_for_erratum(client: httpx.AsyncClient, advisory_name: str, system_id: int, list_cves_path: str, ctx: Context) -> List[str]: """ Internal helper to fetch CVEs for a given erratum advisory name. @@ -291,15 +628,18 @@ async def _fetch_cves_for_erratum(client: httpx.AsyncClient, advisory_name: str, List[str]: A list of CVE identifier strings. Returns an empty list on failure or if no CVEs are found. """ - log_string = f"Fetching CVEs for advisory {advisory_name}" - logger.info(log_string) - await ctx.info(log_string) + + msg = f"Fetching CVEs for advisory {advisory_name}" + logger.info(msg) + await ctx.info(msg) if not advisory_name: - print(f"Warning: advisory_name is missing for system ID {system_id}, cannot fetch CVEs.") + msg = f"advisory_name is missing for system ID {system_id}, cannot fetch CVEs." + logger.error(msg) + await ctx.error(msg) return [] - print(f"Fetching CVEs for advisory: {advisory_name} (system ID: {system_id})") + logger.info(f"Fetching CVEs for advisory: {advisory_name} (system ID: {system_id})") cve_list_from_api = await call_uyuni_api( client=client, method="GET", @@ -307,22 +647,16 @@ async def _fetch_cves_for_erratum(client: httpx.AsyncClient, advisory_name: str, error_context=f"fetching CVEs for advisory {advisory_name} (system ID: {system_id})", params={'advisoryName': advisory_name}, perform_login=False, # Login is handled by the calling function - default_on_error=None # Distinguish API error (None) from empty list [] ) processed_cves = [] if isinstance(cve_list_from_api, list): processed_cves = [str(cve) for cve in cve_list_from_api if cve] - elif cve_list_from_api is None: - # This means the API call might have failed OR API returned "result": null successfully. - # _call_uyuni_api would return default_on_error (None) on failure. - # If API returns "result": null, helper returns None. In both cases, processed_cves remains []. - pass return processed_cves @mcp.tool() -async def check_system_updates(system_identifier: Union[str, int], ctx: Context) -> Dict[str, Any]: +async def get_system_updates(system_identifier: Union[str, int], ctx: Context) -> Dict[str, Any]: """ Checks if a specific system in the Uyuni server has pending updates (relevant errata), @@ -340,25 +674,16 @@ async def check_system_updates(system_identifier: Union[str, int], ctx: Context) Each update dictionary will also include a 'cves' key containing a list of CVE identifiers associated with that update. Returns a dictionary with 'has_pending_updates': False and empty 'updates' - if the API call fails or the format is unexpected. + if no pending updates are found. """ log_string = f"Checking pending updates for system {system_identifier}" logger.info(log_string) await ctx.info(log_string) - return await _check_system_updates(system_identifier, ctx) + return await _get_system_updates(system_identifier, ctx) -async def _check_system_updates(system_identifier: Union[str, int], ctx: Context) -> Dict[str, Any]: +async def _get_system_updates(system_identifier: Union[str, int], ctx: Context) -> Dict[str, Any]: token = ctx.get_state('token') system_id = await _resolve_system_id(system_identifier, token) - default_error_response = { - 'system_identifier': system_identifier, - 'has_pending_updates': False, - 'update_count': 0, - 'updates': [] - } - if not system_id: - # Return a structure consistent with the success response, but indicating failure. - return default_error_response list_cves_api_path = '/rhn/manager/api/errata/listCves' @@ -369,8 +694,7 @@ async def _check_system_updates(system_identifier: Union[str, int], ctx: Context api_path="/rhn/manager/api/system/getRelevantErrata", params={'sid': system_id}, error_context=f"checking updates for system {system_identifier}", - token=token, - default_on_error=None # Distinguish API error from empty list + token=token ) unscheduled_errata_call: Coroutine = call_uyuni_api( @@ -379,8 +703,7 @@ async def _check_system_updates(system_identifier: Union[str, int], ctx: Context api_path="/rhn/manager/api/system/getUnscheduledErrata", params={'sid': str(system_id)}, error_context=f"checking unscheduled errata for system ID {system_id}", - token=token, - default_on_error=[] # Return empty list on failure + token=token ) results = await asyncio.gather( @@ -389,14 +712,6 @@ async def _check_system_updates(system_identifier: Union[str, int], ctx: Context ) relevant_updates_list, unscheduled_updates_list = results - if not isinstance(relevant_updates_list, list) or not isinstance(unscheduled_updates_list, list): - logger.error( - f"API calls for system {system_id} did not return lists as expected. " - f"Type of relevant_updates: {type(relevant_updates_list).__name__}, " - f"Type of unscheduled_updates: {type(unscheduled_updates_list).__name__}" - ) - return default_error_response - unscheduled_advisory_names = {erratum.get('advisory_name') for erratum in unscheduled_updates_list} enriched_updates_list = [] @@ -471,25 +786,29 @@ async def check_all_systems_for_updates(ctx: Context) -> List[Dict[str, Any]]: await ctx.info(log_string) systems_with_updates = [] - active_systems = await get_list_of_active_systems(ctx) # Get the list of all systems + active_systems = await _list_systems(ctx.get_state('token')) # Get the list of all systems if not active_systems: - print("Warning: No active systems found or failed to retrieve system list.") + msg = "No active systems found." + logger.warning(msg) + await ctx.warning(msg) return [] - print(f"Checking {len(active_systems)} systems for updates...") + msg = f"Checking {len(active_systems)} systems for updates..." + logger.info(msg) + await ctx.info(msg) - for system_summary in active_systems: + total_systems = len(active_systems) + for i, system_summary in enumerate(active_systems): system_id = system_summary.get('system_id') system_name = system_summary.get('system_name') - if system_id is None: - print(f"Warning: Skipping system due to missing ID: {system_summary}") - continue - - print(f"Checking updates for system: {system_name} (ID: {system_id})") - # Use the existing check_system_updates tool - update_check_result = await _check_system_updates(system_id, ctx) + await ctx.report_progress(i, total_systems) + msg = f"Checking updates for system: {system_name} (ID: {system_id})" + logger.info(msg) + await ctx.info(msg) + # Use the existing get_system_updates tool + update_check_result = await _get_system_updates(system_id, ctx) if update_check_result.get('has_pending_updates', False): # If the system has updates, add its info and update details to the result list @@ -500,18 +819,21 @@ async def check_all_systems_for_updates(ctx: Context) -> List[Dict[str, Any]]: 'updates': update_check_result.get('updates', []) }) # else: System has no updates, do nothing for this system + await ctx.report_progress(total_systems, total_systems) - print(f"Finished checking systems. Found {len(systems_with_updates)} systems with updates.") + msg = f"Finished checking systems. Found {len(systems_with_updates)} systems with updates." + logger.info(msg) + await ctx.info(msg) return systems_with_updates @write_tool() -async def schedule_apply_pending_updates_to_system(system_identifier: Union[str, int], ctx: Context, confirm: Union[bool, str] = False) -> str: +async def schedule_pending_updates_to_system(system_identifier: Union[str, int], ctx: Context, confirm: Union[bool, str] = False) -> str: """ Checks for pending updates on a system, schedules all of them to be applied, and returns the action ID of the scheduled task. - This tool first calls 'check_system_updates' to determine relevant errata. + This tool first calls 'get_system_updates' to determine relevant errata. If updates are found, it then calls the 'system/scheduleApplyErrata' API endpoint to apply all found errata. @@ -527,44 +849,41 @@ async def schedule_apply_pending_updates_to_system(system_identifier: Union[str, str: The action url if updates were successfully scheduled. Otherwise, returns an empty string. """ - log_string = f"Attempting to apply pending updates for system ID: {system_identifier}" - logger.info(log_string) - await ctx.info(log_string) + msg = f"Attempting to apply pending updates for system ID: {system_identifier}" + logger.info(msg) + await ctx.info(msg) is_confirmed = _to_bool(confirm) - if not is_confirmed: return f"CONFIRMATION REQUIRED: This will apply pending updates to the system {system_identifier}. Do you confirm?" token = ctx.get_state('token') - - # 1. Use check_system_updates to get relevant errata - update_info = await _check_system_updates(system_identifier, ctx) + update_info = await _get_system_updates(system_identifier, ctx) if not update_info or not update_info.get('has_pending_updates'): - print(f"No pending updates found for system {system_identifier}, or an error occurred while fetching update information.") - return "" + msg = f"No pending updates found for system {system_identifier}." + logger.info(msg) + return msg errata_list = update_info.get('updates', []) if not errata_list: # This case should ideally be covered by 'has_pending_updates' being false, # but good to have a safeguard. - print(f"Update check for system {system_identifier} indicated updates, but the updates list is empty.") - return "" + msg = f"Update check for system {system_identifier} indicated updates, but the updates list is empty." + logger.warning(msg) + return msg errata_ids = [erratum.get('update_id') for erratum in errata_list if erratum.get('update_id') is not None] - if not errata_ids: - print(f"Could not extract any valid errata IDs for system {system_identifier} from the update information: {errata_list}") - return "" + msg = f"Could not extract any valid errata IDs for system {system_identifier} from the update information: {errata_list}" + logger.error(msg) + return msg system_id = await _resolve_system_id(system_identifier, token) - if not system_id: - return "" # Helper function already logged the reason for failure. - - print(f"Found {len(errata_ids)} errata to apply for system {system_identifier} (ID: {system_id}). IDs: {errata_ids}") + msg = f"Found {len(errata_ids)} errata to apply for system {system_identifier} (ID: {system_id}). IDs: {errata_ids}" + logger.info(msg) + await ctx.info(msg) - # 2. Schedule apply errata using the API endpoint async with httpx.AsyncClient(verify=CONFIG["UYUNI_MCP_SSL_VERIFY"]) as client: payload = {"sid": int(system_id), "errataIds": errata_ids} api_result = await call_uyuni_api( @@ -573,22 +892,20 @@ async def schedule_apply_pending_updates_to_system(system_identifier: Union[str, api_path="/rhn/manager/api/system/scheduleApplyErrata", json_body=payload, error_context=f"scheduling errata application for system {system_identifier}", - token=token, - default_on_error=None # Helper will return None on error + token=token ) if isinstance(api_result, list) and api_result and isinstance(api_result[0], int): action_id = api_result[0] - print(f"Successfully scheduled action {action_id} to apply {len(errata_ids)} errata to system {system_identifier}.") + logger.info(f"Successfully scheduled action {action_id} to apply {len(errata_ids)} errata to system {system_identifier}.") return "Update successfully scheduled at " + CONFIG["UYUNI_SERVER"] + "/rhn/schedule/ActionDetails.do?aid=" + str(action_id) else: - # Error message already printed by _call_uyuni_api if it returned None - if api_result is not None: # Log if result is not None but also not the expected format - print(f"Failed to schedule errata for system {system_identifier} or unexpected API response format. Result: {api_result}") - return "" + msg = f"Failed to schedule errata for system {system_identifier}. Unexpected API response format. Result: {api_result}" + logger.error(msg) + return msg @write_tool() -async def schedule_apply_specific_update(system_identifier: Union[str, int], errata_id: Union[str, int], ctx: Context, confirm: Union[bool, str] = False) -> str: +async def schedule_specific_update(system_identifier: Union[str, int], errata_id: Union[str, int], ctx: Context, confirm: Union[bool, str] = False) -> str: """ Schedules a specific update (erratum) to be applied to a system. @@ -617,13 +934,10 @@ async def schedule_apply_specific_update(system_identifier: Union[str, int], err except (ValueError, TypeError): return f"Invalid errata ID '{errata_id}'. The ID must be an integer." - token = ctx.get_state('token') system_id = await _resolve_system_id(system_identifier, token) - if not system_id: - return "" # Helper function already logged the reason for failure. - print(f"Attempting to apply specific update (errata ID: {errata_id}) to system: {system_identifier}") + logger.info(f"Attempting to apply specific update (errata ID: {errata_id}) to system: {system_identifier}") if not is_confirmed: return f"CONFIRMATION REQUIRED: This will apply specific update (errata ID: {errata_id}) to the system {system_identifier}. Do you confirm?" @@ -631,31 +945,30 @@ async def schedule_apply_specific_update(system_identifier: Union[str, int], err async with httpx.AsyncClient(verify=CONFIG["UYUNI_MCP_SSL_VERIFY"]) as client: # The API expects a list of errata IDs, even if it's just one. payload = {"sid": int(system_id), "errataIds": [errata_id_int]} - api_result = await _call_uyuni_api( + api_result = await call_uyuni_api( client=client, method="POST", api_path="/rhn/manager/api/system/scheduleApplyErrata", json_body=payload, error_context=f"scheduling specific update (errata ID: {errata_id_int}) for system {system_identifier}", - token=token, - default_on_error=None # Helper returns None on error + token=token ) if isinstance(api_result, list) and api_result and isinstance(api_result[0], int): action_id = api_result[0] success_message = f"Update (errata ID: {errata_id_int}) successfully scheduled for system {system_identifier}. Action URL: {UYUNI_SERVER}/rhn/schedule/ActionDetails.do?aid={action_id}" - print(success_message) + logger.info(success_message) return success_message # Some schedule APIs might return int directly in result (though scheduleApplyErrata usually returns a list) elif isinstance(api_result, int): # Defensive check action_id = api_result success_message = f"Update (errata ID: {errata_id_int}) successfully scheduled. Action URL: {UYUNI_SERVER}/rhn/schedule/ActionDetails.do?aid={action_id}" - print(success_message) + logger.info(success_message) return success_message else: - if api_result is not None: # Log if not None but also not expected format - print(f"Failed to schedule specific update (errata ID: {errata_id_int}) for system {system_identifier} or unexpected API result format. Result: {api_result}") - return "" + msg = f"Failed to schedule specific update (errata ID: {errata_id_int}) for system {system_identifier} or unexpected API result format. Result: {api_result}" + logger.error(msg) + return msg @write_tool() async def add_system( @@ -720,7 +1033,7 @@ async def add_system( token = ctx.get_state('token') # Check if the system already exists - active_systems = await _get_list_of_active_systems(token) + active_systems = await _list_systems(token) for system in active_systems: if system.get('system_name') == host: message = f"System '{host}' already exists in Uyuni. No action taken." @@ -738,7 +1051,7 @@ async def add_system( # Unescape the raw string from the environment variable to convert literal '\n' to actual newlines for the JSON payload. ssh_priv_key = ssh_priv_key_raw.replace('\\n', '\n') - print(f"Attempting to add system: {host}") + logger.info(f"Attempting to add system: {host}") ssh_priv_key_pass = os.environ.get('UYUNI_SSH_PRIV_KEY_PASS') if not ssh_priv_key_pass: @@ -764,7 +1077,6 @@ async def add_system( json_body=payload, error_context=f"adding system {host}", token=token, - default_on_error=None, expect_timeout=True, ) @@ -777,7 +1089,7 @@ async def add_system( elif api_result == 1: # The API returns 1 on success logger.info("api_result was 1") success_message = f"System {host} successfully scheduled to be added." - print(success_message) + logger.info(success_message) return success_message else: logger.info(f"api result was NOT 1 {api_result}") @@ -813,11 +1125,9 @@ async def remove_system(system_identifier: Union[str, int], ctx: Context, cleanu token = ctx.get_state('token') system_id = await _resolve_system_id(system_identifier, token) - if not system_id: - return "" # Helper function already logged the reason for failure. # Check if the system exists before proceeding - active_systems = await _get_list_of_active_systems(token) + active_systems = await _list_systems(token) if not any(s.get('system_id') == int(system_id) for s in active_systems): message = f"System with ID {system_id} not found." logger.warning(message) @@ -836,8 +1146,7 @@ async def remove_system(system_identifier: Union[str, int], ctx: Context, cleanu api_path="/rhn/manager/api/system/deleteSystem", json_body={"sid": system_id, "cleanupType": cleanup_type}, error_context=f"removing system ID {system_id}", - token=token, - default_on_error=None + token=token ) if api_result == 1: @@ -850,9 +1159,9 @@ async def remove_system(system_identifier: Union[str, int], ctx: Context, cleanu return error_message @mcp.tool() -async def get_systems_needing_security_update_for_cve(cve_identifier: str, ctx: Context) -> List[Dict[str, Any]]: +async def list_systems_needing_update_for_cve(cve_identifier: str, ctx: Context) -> List[Dict[str, Any]]: """ - Finds systems requiring a security update due to a specific CVE identifier. + Finds systems requiring a security update for a specific CVE identifier. This tool identifies systems that are vulnerable to a given Common Vulnerabilities and Exposures (CVE) identifier. It first looks up the @@ -884,7 +1193,7 @@ async def get_systems_needing_security_update_for_cve(cve_identifier: str, ctx: token = ctx.get_state('token') async with httpx.AsyncClient(verify=CONFIG["UYUNI_MCP_SSL_VERIFY"]) as client: # 1. Call findByCve (login will be handled by the helper) - print(f"Searching for errata related to CVE: {cve_identifier}") + logger.info(f"Searching for errata related to CVE: {cve_identifier}") errata_list = await call_uyuni_api( client=client, method="GET", @@ -892,26 +1201,29 @@ async def get_systems_needing_security_update_for_cve(cve_identifier: str, ctx: params={'cveName': cve_identifier}, error_context=f"finding errata for CVE {cve_identifier}", token=token, - default_on_error=None # Distinguish API error from empty list ) if errata_list is None: # API call failed return [] if not isinstance(errata_list, list): - print(f"Warning: Expected a list of errata for CVE {cve_identifier}, but received: {type(errata_list)}") + msg = f"Expected a list of errata for CVE {cve_identifier}, but received: {type(errata_list)}" + logger.error(msg) + await ctx.error(msg) return [] if not errata_list: - print(f"No errata found for CVE {cve_identifier}.") + msg = f"No errata found for CVE {cve_identifier}." + logger.info(msg) + await ctx.info(msg) return [] # 2. For each erratum, call listAffectedSystems for erratum in errata_list: advisory_name = erratum.get('advisory_name') if not advisory_name: - print(f"Skipping erratum due to missing 'advisory_name': {erratum}") + logger.warning(f"Skipping erratum due to missing 'advisory_name': {erratum}") continue - print(f"Fetching systems affected by advisory: {advisory_name} (related to CVE: {cve_identifier})") + logger.info(f"Fetching systems affected by advisory: {advisory_name} (related to CVE: {cve_identifier})") systems_data_result = await call_uyuni_api( client=client, method="GET", @@ -919,13 +1231,12 @@ async def get_systems_needing_security_update_for_cve(cve_identifier: str, ctx: params={'advisoryName': advisory_name}, error_context=f"listing affected systems for advisory {advisory_name}", perform_login=False, # Login already performed for this client session - default_on_error=None # Distinguish API error from empty list ) if systems_data_result is None: # API call failed for this advisory continue # Move to the next advisory if not isinstance(systems_data_result, list): - print(f"Warning: Expected list of affected systems for {advisory_name}, got {type(systems_data_result)}") + logger.warning(f"Expected list of affected systems for {advisory_name}, got {type(systems_data_result)}") continue for system_info in systems_data_result: @@ -940,19 +1251,21 @@ async def get_systems_needing_security_update_for_cve(cve_identifier: str, ctx: 'cve_identifier': cve_identifier } else: - print(f"Warning: Received system data with missing ID or name for advisory {advisory_name}: {system_info}") + logger.warning(f"Received system data with missing ID or name for advisory {advisory_name}: {system_info}") else: - print(f"Warning: Unexpected item format in affected systems list for advisory {advisory_name}: {system_info}") + logger.warning(f"Unexpected item format in affected systems list for advisory {advisory_name}: {system_info}") if not affected_systems_map: - print(f"No systems found affected by CVE {cve_identifier} after checking all related errata.") + msg = f"No systems found affected by CVE {cve_identifier} after checking all related errata." + logger.info(msg) + await ctx.info(msg) else: - print(f"Found {len(affected_systems_map)} unique system(s) affected by CVE {cve_identifier}.") + logger.info(f"Found {len(affected_systems_map)} unique system(s) affected by CVE {cve_identifier}.") return list(affected_systems_map.values()) @mcp.tool() -async def get_systems_needing_reboot(ctx: Context) -> List[Dict[str, Any]]: # No change needed here +async def list_systems_needing_reboot(ctx: Context) -> List[Dict[str, Any]]: """ Fetches a list of systems from the Uyuni server that require a reboot. @@ -963,8 +1276,7 @@ async def get_systems_needing_reboot(ctx: Context) -> List[Dict[str, Any]]: # No Returns: List[Dict[str, Any]]: A list of system dictionaries (system_id, system_name, reboot_status) for systems requiring a reboot. Returns an empty list - if the API call fails, the response format is unexpected, - or no systems require a reboot. + if no systems require a reboot. """ log_string = "Fetch list of system that require a reboot." @@ -980,8 +1292,7 @@ async def get_systems_needing_reboot(ctx: Context) -> List[Dict[str, Any]]: # No method="GET", api_path=list_reboot_path, error_context="fetching systems needing reboot", - token=ctx.get_state('token'), - default_on_error=[] # Return empty list on error + token=ctx.get_state('token') ) if isinstance(reboot_data_result, list): @@ -996,9 +1307,9 @@ async def get_systems_needing_reboot(ctx: Context) -> List[Dict[str, Any]]: # No 'reboot_status': 'reboot_required' }) else: - print(f"Warning: Unexpected item format in reboot list: {system_info}") + logger.warning(f"Unexpected item format in reboot list: {system_info}") elif reboot_data_result: # Log if not default empty list but also not a list - print(f"Warning: Expected a list for systems needing reboot, but received: {type(reboot_data_result)}") + logger.warning(f"Expected a list for systems needing reboot, but received: {type(reboot_data_result)}") return systems_needing_reboot_list @@ -1017,6 +1328,7 @@ async def schedule_system_reboot(system_identifier: Union[str, int], ctx:Context a second time with `confirm=True`. The reboot is scheduled to occur as soon as possible (effectively "now"). + Returns: str: A message indicating the action ID if the reboot was successfully scheduled, e.g., "System reboot successfully scheduled. Action URL: ...". @@ -1030,8 +1342,6 @@ async def schedule_system_reboot(system_identifier: Union[str, int], ctx:Context token = ctx.get_state('token') system_id = await _resolve_system_id(system_identifier, token) - if not system_id: - return "" # Helper function already logged the reason for failure. if not is_confirmed: return f"CONFIRMATION REQUIRED: This will reboot system {system_identifier}. Do you confirm?" @@ -1049,8 +1359,7 @@ async def schedule_system_reboot(system_identifier: Union[str, int], ctx:Context api_path=schedule_reboot_path, json_body=payload, error_context=f"scheduling reboot for system {system_identifier}", - token=token, - default_on_error=None # Helper returns None on error + token=token ) # Uyuni's scheduleReboot API returns an integer action ID directly in 'result' @@ -1058,19 +1367,19 @@ async def schedule_system_reboot(system_identifier: Union[str, int], ctx:Context action_id = api_result action_detail_url = f"{CONFIG['UYUNI_SERVER']}/rhn/schedule/ActionDetails.do?aid={action_id}" success_message = f"System reboot successfully scheduled. Action URL: {action_detail_url}" - print(success_message) + logger.info(success_message) return success_message else: - # Error message already printed by _call_uyuni_api if it returned None - if api_result is not None: # Log if result is not None but also not an int - print(f"Failed to schedule reboot for system {system_identifier} or unexpected API result format. Result: {api_result}") - return "" + return "Unexpected API response format when scheduling reboot. Check server logs for details." @mcp.tool() async def list_all_scheduled_actions(ctx: Context) -> List[Dict[str, Any]]: """ Fetches a list of all scheduled actions from the Uyuni server. + You can use this tool to check the status of a reboot. A reboot is finished when + its related action is completed. + This includes completed, in-progress, failed, and archived actions. Each action in the list is a dictionary containing details such as action_id, name, type, scheduler, earliest execution time, @@ -1079,8 +1388,7 @@ async def list_all_scheduled_actions(ctx: Context) -> List[Dict[str, Any]]: Returns: List[Dict[str, Any]]: A list of action dictionaries. - Returns an empty list if the API call fails, - the response format is unexpected, or no actions are found. + Returns an empty list if no actions are found. """ log_string = "Listing all scheduled actions" @@ -1096,8 +1404,7 @@ async def list_all_scheduled_actions(ctx: Context) -> List[Dict[str, Any]]: method="GET", api_path=list_actions_path, error_context="listing all scheduled actions", - token=ctx.get_state('token'), - default_on_error=[] # Return empty list on error + token=ctx.get_state('token') ) if isinstance(api_result, list): @@ -1109,9 +1416,9 @@ async def list_all_scheduled_actions(ctx: Context) -> List[Dict[str, Any]]: modified_action['action_id'] = modified_action.pop('id') processed_actions_list.append(modified_action) else: - print(f"Warning: Unexpected item format in actions list: {action_dict}") + logger.warning(f"Unexpected item format in actions list: {action_dict}") elif api_result: # Log if not default empty list but also not a list - print(f"Warning: Expected a list for all scheduled actions, but received: {type(api_result)}") + logger.warning(f"Expected a list for all scheduled actions, but received: {type(api_result)}") return processed_actions_list @write_tool() @@ -1119,9 +1426,6 @@ async def cancel_action(action_id: int, ctx: Context, confirm: Union[bool, str] """ Cancels a specified action on the Uyuni server. - If the action ID is invalid or the action cannot be canceled, - the operation will fail. - Args: action_id: The integer ID of the action to be canceled. confirm: User confirmation is required to execute this action. This parameter @@ -1159,13 +1463,11 @@ async def cancel_action(action_id: int, ctx: Context, confirm: Union[bool, str] api_path=cancel_actions_path, json_body=payload, error_context=f"canceling action {action_id}", - token=ctx.get_state('token'), - default_on_error=0 # API returns 1 on success, so 0 can signify an error or unexpected response from helper + token=ctx.get_state('token') ) if api_result == 1: return f"Successfully canceled action: {action_id}" else: - # The _call_uyuni_api helper already prints detailed errors. return f"Failed to cancel action: {action_id}. The API did not return success (expected 1, got {api_result}). Check server logs for details." @mcp.tool() @@ -1180,8 +1482,7 @@ async def list_activation_keys(ctx: Context) -> List[Dict[str, str]]: List[Dict[str, str]]: A list of dictionaries, where each dictionary represents an activation key with 'key' and 'description' fields. Returns an empty list - if the API call fails, the response is not in - the expected format, or no keys are found. + if no keys are found. """ list_keys_path = '/rhn/manager/api/activationkey/listActivationKeys' @@ -1191,8 +1492,7 @@ async def list_activation_keys(ctx: Context) -> List[Dict[str, str]]: method="GET", api_path=list_keys_path, error_context="listing activation keys", - token=ctx.get_state('token'), - default_on_error=[] + token=ctx.get_state('token') ) filtered_keys = [] @@ -1201,7 +1501,9 @@ async def list_activation_keys(ctx: Context) -> List[Dict[str, str]]: if isinstance(key_data, dict): filtered_keys.append({'key': key_data.get('key'), 'description': key_data.get('description')}) else: - print(f"Warning: Unexpected item format in activation key list: {key_data}") + msg = f"Unexpected item format in activation key list: {key_data}" + logger.warning(msg) + await ctx.warning(msg) return filtered_keys async def get_unscheduled_errata(system_id: int, ctx: Context) -> List[Dict[str, Any]]: @@ -1210,8 +1512,6 @@ async def get_unscheduled_errata(system_id: int, ctx: Context) -> List[Dict[str, passed as parameter and have not ben scheduled yet. All elements in the result are patches that are applicable for the system. - If the system ID is invalid then the operation will fail. - Args: sid: The integer ID of the system for which we want to know the list of applicable errata. @@ -1233,8 +1533,7 @@ async def get_unscheduled_errata(system_id: int, ctx: Context) -> List[Dict[str, api_path=get_unscheduled_errata, params=payload, error_context=f"fetching unscheduled errata for system ID {system_id}", - token=ctx.get_state('token'), - default_on_error=None + token=ctx.get_state('token') ) if isinstance(unscheduled_errata_result, list): @@ -1243,17 +1542,17 @@ async def get_unscheduled_errata(system_id: int, ctx: Context) -> List[Dict[str, return unscheduled_errata_result else: - if unscheduled_errata_result is not None: - print(f"Failed to retrieve unscheduled errata for system ID {system_id} or \ - unexpected API result format. Result: {unscheduled_errata_result}") - return "" + msg = f"Failed to retrieve unscheduled errata for system ID {system_id}. Unexpected API result format. Result: {unscheduled_errata_result}" + logger.error(msg) + return msg def main_cli(): logger.info("Running Uyuni MCP server.") if CONFIG["UYUNI_MCP_TRANSPORT"] == Transport.HTTP.value: - mcp.add_middleware(AuthTokenMiddleware()) + if CONFIG["AUTH_SERVER"]: + mcp.add_middleware(AuthTokenMiddleware()) mcp.run(transport="streamable-http", host=CONFIG["UYUNI_MCP_HOST"], port=CONFIG["UYUNI_MCP_PORT"]) elif CONFIG["UYUNI_MCP_TRANSPORT"] == Transport.STDIO.value: mcp.run(transport="stdio") diff --git a/src/mcp_server_uyuni/uyuni_api.py b/src/mcp_server_uyuni/uyuni_api.py index 2ddd031..1bb6f69 100644 --- a/src/mcp_server_uyuni/uyuni_api.py +++ b/src/mcp_server_uyuni/uyuni_api.py @@ -1,8 +1,15 @@ from typing import Any, Dict, Optional import httpx -from mcp_server_uyuni.logging_config import get_logger -from mcp_server_uyuni.config import CONFIG +from .logging_config import get_logger +from .config import CONFIG +from .errors import ( + APIError, + HTTPError, + AuthError, + NetworkError, + UnexpectedResponse +) logger = get_logger(__name__) @@ -23,7 +30,6 @@ async def call( params: Dict[str, Any] = None, json_body: Dict[str, Any] = None, perform_login: bool = True, - default_on_error: Any = None, expected_result_key: str = 'result', expect_timeout: bool = False ) -> Any: @@ -35,72 +41,93 @@ async def call( # Safety check: Do not allow POST requests if write tools are disabled. # This acts as a secondary guard after the @write_tool decorator. if method.upper() == 'POST' and not CONFIG["UYUNI_MCP_WRITE_TOOLS_ENABLED"]: - error_msg = (f"Attempted to call a write API ({api_path}) while write tools are disabled. " - "Please set UYUNI_MCP_WRITE_TOOLS_ENABLED to 'true' to enable them.") + error_msg = ( + f"Attempted to call a write API ({api_path}) while write tools are disabled. " + "Please set UYUNI_MCP_WRITE_TOOLS_ENABLED to 'true' to enable them." + ) logger.error(error_msg) - return error_msg + raise APIError(error_msg) + full_api_url = CONFIG["UYUNI_SERVER"] + api_path if perform_login: try: if token: + # Try OIDC login with provided token login_response = await client.post( CONFIG["UYUNI_SERVER"] + '/rhn/manager/api/oidcLogin', headers={"Authorization": f"Bearer {token}"} ) + login_response.raise_for_status() elif CONFIG["UYUNI_USER"] and CONFIG["UYUNI_PASS"]: login_response = await client.post( CONFIG["UYUNI_SERVER"] + '/rhn/manager/api/login', json={"login": CONFIG["UYUNI_USER"], "password": CONFIG["UYUNI_PASS"]} ) - login_response.raise_for_status() + login_response.raise_for_status() + else: + logger.warning(f"perform_login=True but no token or username/password available for {error_context}; skipping login.") except httpx.HTTPStatusError as e: - logger.error(f"HTTP error during login for {error_context}: {e.request.url} - {e.response.status_code} - {e.response.text}") - return default_on_error + status = e.response.status_code if e.response is not None else None + body = e.response.text if e.response is not None else '' + logger.error(f"HTTP error during login for {error_context}: {getattr(e.request,'url',full_api_url)} - {status} - {body}") + if status in (401, 403): + raise AuthError(status, str(getattr(e.request, 'url', full_api_url)), body) + raise HTTPError(status or -1, str(getattr(e.request, 'url', full_api_url)), body) except httpx.RequestError as e: - logger.exception(f"Request error during login for {error_context}: {e.request.url} - {e}") - return default_on_error - except Exception as e: - logger.exception(f"An unexpected error occurred during login for {error_context}: {e}") - return default_on_error - - full_api_url = CONFIG["UYUNI_SERVER"] + api_path + logger.exception(f"Request error during login for {error_context}: {getattr(e.request,'url',full_api_url)} - {e}") + raise NetworkError(getattr(e.request, 'url', full_api_url), e) try: - if method.upper() == 'GET': + method_upper = method.upper() + if method_upper == 'GET': response = await client.get(full_api_url, params=params) - elif method.upper() == 'POST': + elif method_upper == 'POST': logger.info(f"POSTing to {full_api_url}") response = await client.post(full_api_url, json=json_body, params=params) - logger.info(f"POST response: {response.text}") + logger.debug(f"POST response status: {response.status_code}") else: - logger.info(f"Unsupported HTTP method '{method}' for {error_context}.") - return default_on_error + raise APIError(f"Unsupported HTTP method '{method}' for {error_context}.") + response.raise_for_status() - response_data = response.json() - if response_data.get('success'): - if expected_result_key in response_data: - return response_data[expected_result_key] - # If 'success' is true, but the expected_result_key is not there (e.g. 'result' is missing) - logger.info(f"API call for {error_context} succeeded but '{expected_result_key}' not found in response. Response: {response_data}") - return default_on_error - else: - print(f"API call for {error_context} reported failure. Response: {response_data}") - return default_on_error + # Parse JSON if possible, otherwise fall back to raw text + try: + response_data = response.json() + except Exception: + response_data = response.text + + # If response is a dict and follows Uyuni's {success: bool, result: ...} pattern + if isinstance(response_data, dict) and 'success' in response_data: + if response_data.get('success'): + # Prefer the expected_result_key but return full response dict as a fallback + return response_data.get(expected_result_key, response_data) + else: + logger.error(f"Uyuni API reported failure for {error_context}. Response: {response_data}") + raise UnexpectedResponse(full_api_url, response_data.get('message', response_data)) + + # Otherwise return whatever we received (list, dict, string, etc.) + return response_data except httpx.HTTPStatusError as e: - logger.error(f"HTTP error occurred while {error_context}: {e.request.url} - {e.response.status_code} - {e.response.text}") - return default_on_error + status = e.response.status_code if e.response is not None else None + body = e.response.text if e.response is not None else '' + logger.error(f"HTTP error occurred while {error_context}: {getattr(e.request,'url',full_api_url)} - {status} - {body}") + if status in (401, 403): + raise AuthError(status, str(getattr(e.request, 'url', full_api_url)), body) + raise HTTPError(status or -1, str(getattr(e.request, 'url', full_api_url)), body) except httpx.TimeoutException as e: - logger.info(f"timeout! timeout expected? {expect_timeout}") + logger.debug(f"Timeout! timeout expected? {expect_timeout}") if expect_timeout: - logger.info(f"A timeout occurred while {error_context} (expected for a long-running action): {e.request.url} - {e}") + logger.info(f"A timeout occurred while {error_context} (expected for a long-running action): {getattr(e.request,'url',full_api_url)} - {e}") return TIMEOUT_HAPPENED - logger.warning(f"A timeout occurred while {error_context}: {e.request.url} - {e}") - return default_on_error + logger.warning(f"A timeout occurred while {error_context}: {getattr(e.request,'url',full_api_url)} - {e}") + raise NetworkError(getattr(e.request, 'url', full_api_url), e, timed_out=True) except httpx.RequestError as e: - logger.exception(f"Request error occurred while {error_context}: {e.request.url} - {e}") - return default_on_error + logger.exception(f"Request error occurred while {error_context}: {getattr(e.request,'url',full_api_url)} - {e}") + raise NetworkError(getattr(e.request, 'url', full_api_url), e) + except UnexpectedResponse: + # Propagate API-specific failures unchanged + raise except Exception as e: # Catch other potential errors like JSONDecodeError logger.exception(f"An unexpected error occurred while {error_context}: {e}") - return default_on_error + raise APIError(f"Unexpected error while {error_context}: {e}")