diff --git a/falcon_mcp/common/constants.py b/falcon_mcp/common/constants.py new file mode 100644 index 0000000..5352f7e --- /dev/null +++ b/falcon_mcp/common/constants.py @@ -0,0 +1,52 @@ +""" +Centralized constants for Falcon MCP. +""" + +# Common error codes and their meanings +ERROR_CODE_DESCRIPTIONS = { + 403: "Permission denied. The API credentials don't have the required access.", + 401: "Authentication failed. The API credentials are invalid or expired.", + 404: "Resource not found. The requested resource does not exist.", + 429: "Rate limit exceeded. Too many requests in a short period.", + 500: "Server error. An unexpected error occurred on the server.", + 503: "Service unavailable. The service is temporarily unavailable.", +} + + +class SearchLimits: + """Constants for search and pagination limits.""" + DEFAULT = 10 + DEFAULT_DISCOVER = 100 + MAX_DETECTIONS = 9999 + MAX_HOSTS = 5000 + MAX_INCIDENTS = 500 + MAX_CLOUD_SCORE = 2500 + MAX_DISCOVER_APPS = 1000 + MAX_INTEL = 5000 + MAX_SPOTLIGHT = 5000 + MAX_IDP_RESULTS = 200 + MAX_IDP_RELATIONSHIPS = 3 + DEFAULT_IDP_RELATIONSHIPS = 2 + + +class GraphQLDefaults: + """Default values for GraphQL query parameters.""" + FIRST_INCIDENTS = 10 + FIRST_ENTITIES_BATCH = 50 + DEFAULT_TIMELINE_LIMIT = 50 + DEFAULT_RELATIONSHIP_LIMIT = 50 + + +class ServerDefaults: + """Server-level configuration constants.""" + DEFAULT_HOST = "127.0.0.1" + DEFAULT_PORT = 8000 + DEFAULT_TRANSPORT = "stdio" + CORE_TOOLS_COUNT = 3 + + +class TransportTypes: + """Transport protocol identifiers.""" + STDIO = "stdio" + SSE = "sse" + STREAMABLE_HTTP = "streamable-http" diff --git a/falcon_mcp/common/errors.py b/falcon_mcp/common/errors.py index 640bfdc..da7d39b 100644 --- a/falcon_mcp/common/errors.py +++ b/falcon_mcp/common/errors.py @@ -7,20 +7,11 @@ from typing import Any, Dict, Optional from .api_scopes import get_required_scopes +from .constants import ERROR_CODE_DESCRIPTIONS from .logging import get_logger logger = get_logger(__name__) -# Common error codes and their meanings -ERROR_CODE_DESCRIPTIONS = { - 403: "Permission denied. The API credentials don't have the required access.", - 401: "Authentication failed. The API credentials are invalid or expired.", - 404: "Resource not found. The requested resource does not exist.", - 429: "Rate limit exceeded. Too many requests in a short period.", - 500: "Server error. An unexpected error occurred on the server.", - 503: "Service unavailable. The service is temporarily unavailable.", -} - class FalconError(Exception): """Base exception for all Falcon MCP server errors.""" diff --git a/falcon_mcp/modules/cloud.py b/falcon_mcp/modules/cloud.py index 35cf8c2..2b3f46d 100644 --- a/falcon_mcp/modules/cloud.py +++ b/falcon_mcp/modules/cloud.py @@ -16,6 +16,7 @@ from falcon_mcp.common.logging import get_logger from falcon_mcp.common.utils import prepare_api_parameters from falcon_mcp.modules.base import BaseModule +from falcon_mcp.common.constants import SearchLimits from falcon_mcp.resources.cloud import ( IMAGES_VULNERABILITIES_FQL_DOCUMENTATION, KUBERNETES_CONTAINERS_FQL_DOCUMENTATION, @@ -89,9 +90,9 @@ def search_kubernetes_containers( examples={"cloud:'AWS'", "cluster_name:'prod'"}, ), limit: int = Field( - default=10, + default=SearchLimits.DEFAULT, ge=1, - le=9999, + le=SearchLimits.MAX_DETECTIONS, description="The maximum number of containers to return in this response (default: 10; max: 9999). Use with the offset parameter to manage pagination of results.", ), offset: int | None = Field( @@ -184,9 +185,9 @@ def search_images_vulnerabilities( examples={"cve_id:*'*2025*'", "cvss_score:>5"}, ), limit: int = Field( - default=10, + default=SearchLimits.DEFAULT, ge=1, - le=9999, + le=SearchLimits.MAX_DETECTIONS, description="The maximum number of containers to return in this response (default: 10; max: 9999). Use with the offset parameter to manage pagination of results.", ), offset: int | None = Field( diff --git a/falcon_mcp/modules/detections.py b/falcon_mcp/modules/detections.py index c6d565b..c87840d 100644 --- a/falcon_mcp/modules/detections.py +++ b/falcon_mcp/modules/detections.py @@ -13,6 +13,7 @@ from falcon_mcp.common.logging import get_logger from falcon_mcp.modules.base import BaseModule +from falcon_mcp.common.constants import SearchLimits from falcon_mcp.resources.detections import SEARCH_DETECTIONS_FQL_DOCUMENTATION logger = get_logger(__name__) @@ -66,9 +67,9 @@ def search_detections( examples={"agent_id:'77d11725xxxxxxxxxxxxxxxxxxxxc48ca19'", "status:'new'"}, ), limit: int = Field( - default=10, + default=SearchLimits.DEFAULT, ge=1, - le=9999, + le=SearchLimits.MAX_DETECTIONS, description="The maximum number of detections to return in this response (default: 10; max: 9999). Use with the offset parameter to manage pagination of results.", ), offset: int | None = Field( diff --git a/falcon_mcp/modules/discover.py b/falcon_mcp/modules/discover.py index cf4a20e..b0607b6 100644 --- a/falcon_mcp/modules/discover.py +++ b/falcon_mcp/modules/discover.py @@ -15,6 +15,7 @@ from falcon_mcp.common.logging import get_logger from falcon_mcp.common.utils import prepare_api_parameters from falcon_mcp.modules.base import BaseModule +from falcon_mcp.common.constants import SearchLimits from falcon_mcp.resources.discover import ( SEARCH_APPLICATIONS_FQL_DOCUMENTATION, SEARCH_UNMANAGED_ASSETS_FQL_DOCUMENTATION, @@ -96,9 +97,9 @@ def search_applications( examples={"browser_extension", "host_info", "install_usage"}, ), limit: int = Field( - default=100, + default=SearchLimits.DEFAULT_DISCOVER, ge=1, - le=1000, + le=SearchLimits.MAX_DISCOVER_APPS, description="Maximum number of items to return: 1-1000. Default is 100.", ), sort: str | None = Field( @@ -153,9 +154,9 @@ def search_unmanaged_assets( examples={"platform_name:'Windows'", "criticality:'Critical'"}, ), limit: int = Field( - default=100, + default=SearchLimits.DEFAULT_DISCOVER, ge=1, - le=5000, + le=SearchLimits.MAX_HOSTS, description="Maximum number of items to return: 1-5000. Default is 100.", ), offset: int | None = Field( diff --git a/falcon_mcp/modules/hosts.py b/falcon_mcp/modules/hosts.py index 5a907ba..b0adffb 100644 --- a/falcon_mcp/modules/hosts.py +++ b/falcon_mcp/modules/hosts.py @@ -13,6 +13,7 @@ from falcon_mcp.common.logging import get_logger from falcon_mcp.modules.base import BaseModule +from falcon_mcp.common.constants import SearchLimits from falcon_mcp.resources.hosts import SEARCH_HOSTS_FQL_DOCUMENTATION logger = get_logger(__name__) @@ -66,9 +67,9 @@ def search_hosts( examples={"platform_name:'Windows'", "hostname:'PC*'"}, ), limit: int = Field( - default=10, + default=SearchLimits.DEFAULT, ge=1, - le=5000, + le=SearchLimits.MAX_HOSTS, description="The maximum records to return. [1-5000]", ), offset: int | None = Field( diff --git a/falcon_mcp/modules/idp.py b/falcon_mcp/modules/idp.py index 1abd41e..9098002 100644 --- a/falcon_mcp/modules/idp.py +++ b/falcon_mcp/modules/idp.py @@ -16,6 +16,7 @@ from falcon_mcp.common.logging import get_logger from falcon_mcp.common.utils import sanitize_input from falcon_mcp.modules.base import BaseModule +from falcon_mcp.common.constants import GraphQLDefaults, SearchLimits logger = get_logger(__name__) @@ -83,16 +84,16 @@ def investigate_entity( ), # Relationship Parameters (when relationship_analysis is included) relationship_depth: int = Field( - default=2, + default=SearchLimits.DEFAULT_IDP_RELATIONSHIPS, ge=1, - le=3, + le=SearchLimits.MAX_IDP_RELATIONSHIPS, description="Depth of relationship analysis (1-3 levels)", ), # General Parameters limit: int = Field( - default=10, + default=SearchLimits.DEFAULT, ge=1, - le=200, + le=SearchLimits.MAX_IDP_RESULTS, description="Maximum number of results to return", ), include_associations: bool = Field( @@ -378,7 +379,7 @@ def _build_entity_details_query( if include_incidents: fields.append(""" - openIncidents(first: 10) { + openIncidents(first: {GraphQLDefaults.FIRST_INCIDENTS}) {{ nodes { type startTime @@ -430,7 +431,7 @@ def _build_entity_details_query( return f""" query {{ - entities(entityIds: {entity_ids_json}, first: 50) {{ + entities(entityIds: {entity_ids_json}, first: {GraphQLDefaults.FIRST_ENTITIES_BATCH}) {{ nodes {{ {fields_string} }} @@ -795,7 +796,7 @@ def _build_risk_assessment_query( return f""" query {{ - entities(entityIds: {entity_ids_json}, first: 50) {{ + entities(entityIds: {entity_ids_json}, first: {GraphQLDefaults.FIRST_ENTITIES_BATCH}) {{ nodes {{ entityId primaryDisplayName diff --git a/falcon_mcp/modules/incidents.py b/falcon_mcp/modules/incidents.py index 3e18180..550dbd2 100644 --- a/falcon_mcp/modules/incidents.py +++ b/falcon_mcp/modules/incidents.py @@ -11,6 +11,7 @@ from pydantic import AnyUrl, Field from falcon_mcp.modules.base import BaseModule +from falcon_mcp.common.constants import SearchLimits from falcon_mcp.resources.incidents import ( CROWD_SCORE_FQL_DOCUMENTATION, SEARCH_BEHAVIORS_FQL_DOCUMENTATION, @@ -105,9 +106,9 @@ def show_crowd_score( description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://incidents/crowd-score/fql-guide` resource when building this filter parameter.", ), limit: int = Field( - default=10, + default=SearchLimits.DEFAULT, ge=1, - le=2500, + le=SearchLimits.MAX_CLOUD_SCORE, description="Maximum number of records to return. (Max: 2500)", ), offset: int | None = Field( @@ -171,9 +172,9 @@ def search_incidents( description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://incidents/search/fql-guide` resource when building this filter parameter.", ), limit: int = Field( - default=10, + default=SearchLimits.DEFAULT, ge=1, - le=500, + le=SearchLimits.MAX_INCIDENTS, description="Maximum number of records to return. (Max: 500)", ), offset: int | None = Field( @@ -234,9 +235,9 @@ def search_behaviors( description="FQL Syntax formatted string used to limit the results. IMPORTANT: use the `falcon://incidents/behaviors/fql-guide` resource when building this filter parameter.", ), limit: int = Field( - default=10, + default=SearchLimits.DEFAULT, ge=1, - le=500, + le=SearchLimits.MAX_INCIDENTS, description="Maximum number of records to return. (Max: 500)", ), offset: int | None = Field( diff --git a/falcon_mcp/modules/intel.py b/falcon_mcp/modules/intel.py index 48f256f..12111eb 100644 --- a/falcon_mcp/modules/intel.py +++ b/falcon_mcp/modules/intel.py @@ -12,6 +12,7 @@ from falcon_mcp.common.logging import get_logger from falcon_mcp.modules.base import BaseModule +from falcon_mcp.common.constants import SearchLimits from falcon_mcp.resources.intel import ( QUERY_ACTOR_ENTITIES_FQL_DOCUMENTATION, QUERY_INDICATOR_ENTITIES_FQL_DOCUMENTATION, @@ -96,9 +97,9 @@ def query_actor_entities( description="FQL query expression that should be used to limit the results. IMPORTANT: use the `falcon://intel/actors/fql-guide` resource when building this filter parameter.", ), limit: int = Field( - default=10, + default=SearchLimits.DEFAULT, ge=1, - le=5000, + le=SearchLimits.MAX_INTEL, description="Maximum number of records to return. Max 5000", examples={10, 20, 100}, ), @@ -147,9 +148,9 @@ def query_indicator_entities( description="FQL query expression that should be used to limit the results. IMPORTANT: use the `falcon://intel/indicators/fql-guide` resource when building this filter parameter.", ), limit: int = Field( - default=10, + default=SearchLimits.DEFAULT, ge=1, - le=5000, + le=SearchLimits.MAX_INTEL, description="Maximum number of records to return. (Max: 5000)", ), offset: int | None = Field( @@ -205,9 +206,9 @@ def query_report_entities( description="FQL query expression that should be used to limit the results. IMPORTANT: use the `falcon://intel/reports/fql-guide` resource when building this filter parameter.", ), limit: int = Field( - default=10, + default=SearchLimits.DEFAULT, ge=1, - le=5000, + le=SearchLimits.MAX_INTEL, description="Maximum number of records to return. (Max: 5000)", ), offset: int | None = Field( diff --git a/falcon_mcp/modules/serverless.py b/falcon_mcp/modules/serverless.py index 8377f70..eadfdc1 100644 --- a/falcon_mcp/modules/serverless.py +++ b/falcon_mcp/modules/serverless.py @@ -15,6 +15,7 @@ from falcon_mcp.common.logging import get_logger from falcon_mcp.common.utils import prepare_api_parameters from falcon_mcp.modules.base import BaseModule +from falcon_mcp.common.constants import SearchLimits from falcon_mcp.resources.serverless import SERVERLESS_VULNERABILITIES_FQL_DOCUMENTATION logger = get_logger(__name__) @@ -61,7 +62,7 @@ def search_serverless_vulnerabilities( examples={"cloud_provider:'aws'", "severity:'HIGH'"}, ), limit: int | None = Field( - default=10, + default=SearchLimits.DEFAULT, ge=1, description="The upper-bound on the number of records to retrieve. (Default: 10)", ), diff --git a/falcon_mcp/modules/spotlight.py b/falcon_mcp/modules/spotlight.py index be5460b..224fa64 100644 --- a/falcon_mcp/modules/spotlight.py +++ b/falcon_mcp/modules/spotlight.py @@ -13,6 +13,7 @@ from falcon_mcp.common.logging import get_logger from falcon_mcp.modules.base import BaseModule +from falcon_mcp.common.constants import SearchLimits from falcon_mcp.resources.spotlight import SEARCH_VULNERABILITIES_FQL_DOCUMENTATION logger = get_logger(__name__) @@ -60,9 +61,9 @@ def search_vulnerabilities( examples={"status:'open'", "cve.severity:'HIGH'"}, ), limit: int = Field( - default=10, + default=SearchLimits.DEFAULT, ge=1, - le=5000, + le=SearchLimits.MAX_SPOTLIGHT, description="Maximum number of results to return. (Max: 5000, Default: 10)", ), offset: int | None = Field( diff --git a/falcon_mcp/server.py b/falcon_mcp/server.py index 4a1b471..807d329 100644 --- a/falcon_mcp/server.py +++ b/falcon_mcp/server.py @@ -16,6 +16,10 @@ from falcon_mcp import registry from falcon_mcp.client import FalconClient +from falcon_mcp.common.constants import ( + ServerDefaults, + TransportTypes, +) from falcon_mcp.common.logging import configure_logging, get_logger logger = get_logger(__name__) @@ -122,7 +126,7 @@ def _register_tools(self) -> int: name="falcon_list_modules", ) - tool_count = 3 # the tools added above + tool_count = ServerDefaults.CORE_TOOLS_COUNT # the tools added above # Register tools from modules for module in self.modules.values(): @@ -162,7 +166,7 @@ def list_modules(self) -> Dict[str, List[str]]: """Lists all available modules in the falcon-mcp server.""" return {"modules": registry.get_module_names()} - def run(self, transport: str = "stdio", host: str = "127.0.0.1", port: int = 8000): + def run(self, transport: str = TransportTypes.STDIO, host: str = ServerDefaults.DEFAULT_HOST, port: int = ServerDefaults.DEFAULT_PORT): """Run the MCP server. Args: @@ -170,7 +174,7 @@ def run(self, transport: str = "stdio", host: str = "127.0.0.1", port: int = 800 host: Host to bind to for HTTP transports (default: 127.0.0.1) port: Port to listen on for HTTP transports (default: 8000) """ - if transport == "streamable-http": + if transport == TransportTypes.STREAMABLE_HTTP: # For streamable-http, use uvicorn directly for custom host/port logger.info("Starting streamable-http server on %s:%d", host, port) @@ -184,7 +188,7 @@ def run(self, transport: str = "stdio", host: str = "127.0.0.1", port: int = 800 port=port, log_level="info" if not self.debug else "debug", ) - elif transport == "sse": + elif transport == TransportTypes.SSE: # For sse, use uvicorn directly for custom host/port (same pattern as streamable-http) logger.info("Starting sse server on %s:%d", host, port) @@ -244,8 +248,8 @@ def parse_args(): parser.add_argument( "--transport", "-t", - choices=["stdio", "sse", "streamable-http"], - default=os.environ.get("FALCON_MCP_TRANSPORT", "stdio"), + choices=[TransportTypes.STDIO, TransportTypes.SSE, TransportTypes.STREAMABLE_HTTP], + default=os.environ.get("FALCON_MCP_TRANSPORT", TransportTypes.STDIO), help="Transport protocol to use (default: stdio, env: FALCON_MCP_TRANSPORT)", ) @@ -281,7 +285,7 @@ def parse_args(): # HTTP transport configuration parser.add_argument( "--host", - default=os.environ.get("FALCON_MCP_HOST", "127.0.0.1"), + default=os.environ.get("FALCON_MCP_HOST", ServerDefaults.DEFAULT_HOST), help="Host to bind to for HTTP transports (default: 127.0.0.1, env: FALCON_MCP_HOST)", ) @@ -289,7 +293,7 @@ def parse_args(): "--port", "-p", type=int, - default=int(os.environ.get("FALCON_MCP_PORT", "8000")), + default=int(os.environ.get("FALCON_MCP_PORT", str(ServerDefaults.DEFAULT_PORT))), help="Port to listen on for HTTP transports (default: 8000, env: FALCON_MCP_PORT)", ) diff --git a/tests/test_streamable_http_transport.py b/tests/test_streamable_http_transport.py index 8b12f4d..0afd534 100644 --- a/tests/test_streamable_http_transport.py +++ b/tests/test_streamable_http_transport.py @@ -6,6 +6,7 @@ from unittest.mock import MagicMock, patch from falcon_mcp.server import FalconMCPServer +from falcon_mcp.common.constants import ServerDefaults, TransportTypes class TestStreamableHttpTransport(unittest.TestCase): @@ -21,6 +22,8 @@ def test_streamable_http_transport_initialization( mock_client, ): """Test streamable-http transport initialization.""" + test_host = "0.0.0.0" # nosec + test_port = "8080" # nosec # Setup mocks mock_client_instance = MagicMock() mock_client_instance.authenticate.return_value = True @@ -35,11 +38,11 @@ def test_streamable_http_transport_initialization( server = FalconMCPServer(debug=True) # Test streamable-http transport - server.run("streamable-http", host="0.0.0.0", port=8080) + server.run(TransportTypes.STREAMABLE_HTTP, host=test_host, port=test_port) # Verify uvicorn was called with correct parameters mock_uvicorn.run.assert_called_once_with( - mock_app, host="0.0.0.0", port=8080, log_level="debug" + mock_app, host=test_host, port=test_port, log_level="debug" ) # Verify streamable_http_app was called @@ -69,13 +72,13 @@ def test_streamable_http_default_parameters( server = FalconMCPServer(debug=False) # Test streamable-http transport with defaults - server.run("streamable-http") + server.run(TransportTypes.STREAMABLE_HTTP) # Verify uvicorn was called with default parameters mock_uvicorn.run.assert_called_once_with( mock_app, - host="127.0.0.1", - port=8000, + host=ServerDefaults.DEFAULT_HOST, + port=ServerDefaults.DEFAULT_PORT, log_level="info", ) @@ -99,10 +102,10 @@ def test_non_streamable_http_transport_unchanged( server = FalconMCPServer() # Test stdio transport (should use original method) - server.run("stdio") + server.run(TransportTypes.STDIO) # Verify the original run method was called - mock_server_instance.run.assert_called_once_with("stdio") + mock_server_instance.run.assert_called_once_with(TransportTypes.STDIO) # Verify streamable_http_app was NOT called mock_server_instance.streamable_http_app.assert_not_called() @@ -117,6 +120,8 @@ def test_streamable_http_custom_parameters( mock_client, ): """Test streamable-http transport with custom parameters.""" + test_host = "192.168.1.100" + test_port = "9000" # Setup mocks mock_client_instance = MagicMock() mock_client_instance.authenticate.return_value = True @@ -131,13 +136,13 @@ def test_streamable_http_custom_parameters( server = FalconMCPServer(debug=True) # Test streamable-http transport with custom parameters - server.run("streamable-http", host="192.168.1.100", port=9000) + server.run(TransportTypes.STREAMABLE_HTTP, host=test_host, port=test_port) # Verify uvicorn was called with custom parameters mock_uvicorn.run.assert_called_once_with( mock_app, - host="192.168.1.100", - port=9000, + host=test_host, + port=test_port, log_level="debug", ) @@ -163,13 +168,13 @@ def test_streamable_http_logging_levels( # Test with debug=True server_debug = FalconMCPServer(debug=True) - server_debug.run("streamable-http") + server_debug.run(TransportTypes.STREAMABLE_HTTP) # Verify debug log level mock_uvicorn.run.assert_called_with( mock_app, - host="127.0.0.1", - port=8000, + host=ServerDefaults.DEFAULT_HOST, + port=ServerDefaults.DEFAULT_PORT, log_level="debug", ) @@ -178,13 +183,13 @@ def test_streamable_http_logging_levels( # Test with debug=False server_info = FalconMCPServer(debug=False) - server_info.run("streamable-http") + server_info.run(TransportTypes.STREAMABLE_HTTP) # Verify info log level mock_uvicorn.run.assert_called_with( mock_app, - host="127.0.0.1", - port=8000, + host=ServerDefaults.DEFAULT_HOST, + port=ServerDefaults.DEFAULT_PORT, log_level="info", )