Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,18 @@ Provides tools for accessing and analyzing CrowdStrike Falcon detections:
Provides tools for accessing and analyzing CrowdStrike Falcon incidents:

- `falcon_show_crowd_score`: Show CrowdScore in the environment
- `falcon_get_incident_details`: Get incidents by ID
- `falcon_search_incidents`: Query for incidents
- `falcon_get_behavior_details`: Get behaviors by ID
- `falcon_get_incident_details`: Get incidents by ID
- `falcon_search_behaviors`: Query for behaviors
- `falcon_get_behavior_details`: Get behaviors by ID

### Intel Module

Provides tools for accessing and analyzing CrowdStrike Intel:

- `falcon_search_actors`: Get info about actors
- `falcon_search_indicators`: Get info about indicators
- `falcon_search_reports`: Get info about reports

## MCP Configuration

Expand Down
17 changes: 2 additions & 15 deletions src/common/api_scopes.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,8 @@
"QueryBehaviors": ["incidents:read"],
# Intel operations
"QueryIntelActorEntities": ["actors-falcon-intelligence:read"],
"QueryIntelIndicatorEntities": ["indicators:read"],
"QueryIntelReportEntities": ["reports:read"],
"QueryIntelRuleEntities": ["rules:read"],
"GetIntelActorEntities": ["actors-falcon-intelligence:read"],
"GetIntelIndicatorEntities": ["indicators:read"],
"GetIntelReportPDF": ["reports:read"],
"GetIntelReportEntities": ["reports:read"],
"GetIntelRuleEntities": ["rules:read"],
"GetIntelRuleFile": ["rules:read"],
"GetLatestIntelIndicatorTimestamp": ["indicators:read"],
"GetMitreReport": ["actors-falcon-intelligence:read"],
"GetRuleDetails": ["rules:read"],
"GetRulesDetails": ["rules:read"],
"GetRulePreview": ["rules:read"],
"GetVulnerabilities": ["vulnerabilities:read"],
"QueryIntelIndicatorEntities": ["indicators-falcon-intelligence:read"],
"QueryIntelReportEntities": ["reports-falcon-intelligence:read"],
# Add more mappings as needed
}

Expand Down
1 change: 1 addition & 0 deletions src/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
# Import all module classes so they're available for auto-discovery
from .detections import DetectionsModule
from .incidents import IncidentsModule
from .intel import IntelModule
205 changes: 205 additions & 0 deletions src/modules/intel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# pylint: disable=too-many-arguments,too-many-positional-arguments,redefined-builtin
"""
Intel module for Falcon MCP Server

This module provides tools for accessing and analyzing CrowdStrike Falcon intelligence data.
"""
from typing import Dict, List, Optional, Any

from mcp.server import FastMCP
from pydantic import Field

from ..common.logging import get_logger
from ..common.errors import handle_api_response
from ..common.utils import prepare_api_parameters
from .base import BaseModule

logger = get_logger(__name__)


class IntelModule(BaseModule):
"""Module for accessing and analyzing CrowdStrike Falcon intelligence data."""

def register_tools(self, server: FastMCP) -> None:
"""Register tools with the MCP server.

Args:
server: MCP server instance
"""
# Register tools
self._add_tool(
server,
self.query_actor_entities,
name="search_actors"
)

self._add_tool(
server,
self.query_indicator_entities,
name="search_indicators"
)

self._add_tool(
server,
self.query_report_entities,
name="search_reports"
)

def query_actor_entities(
self,
filter: Optional[str] = Field(default=None, description="FQL query expression that should be used to limit the results."),
limit: Optional[int] = Field(default=100, ge=1, le=5000, description="Maximum number of records to return. (Max: 5000)"),
offset: Optional[int] = Field(default=0, ge=0, description="Starting index of overall result set from which to return ids."),
sort: Optional[str] = Field(default=None, description="The property to sort by. (Ex: created_date|desc)"),
q: Optional[str] = Field(default=None, description="Free text search across all indexed fields."),
) -> Dict[str, Any]:
"""Get info about actors that match provided FQL filters.

Args:
filter: FQL query expression that should be used to limit the results.
limit: Maximum number of records to return. (Max: 5000)
offset: Starting index of overall result set from which to return ids.
sort: The property to sort by. (Ex: created_date|desc)
q: Free text search across all indexed fields.

Returns:
Information about actors that match the provided filters.
"""
# Prepare parameters
params = prepare_api_parameters({
"filter": filter,
"limit": limit,
"offset": offset,
"sort": sort,
"q": q,
})

# Define the operation name
operation = "QueryIntelActorEntities"

logger.debug("Searching actors with params: %s", params)

# Make the API request
response = self.client.command(operation, parameters=params)

# Handle the response
return handle_api_response(
response,
operation=operation,
error_message="Failed to search actors",
default_result=[]
)

def query_indicator_entities(
self,
filter: Optional[str] = Field(default=None, description="FQL query expression that should be used to limit the results."),
limit: Optional[int] = Field(default=100, ge=1, le=5000, description="Maximum number of records to return. (Max: 5000)"),
offset: Optional[int] = Field(default=0, ge=0, description="Starting index of overall result set from which to return ids."),
sort: Optional[str] = Field(default=None, description="The property to sort by. (Ex: created_date|desc)"),
q: Optional[str] = Field(default=None, description="Free text search across all indexed fields."),
include_deleted: Optional[bool] = Field(default=False, description="Flag indicating if both published and deleted indicators should be returned."),
include_relations: Optional[bool] = Field(default=False, description="Flag indicating if related indicators should be returned."),
) -> List[Dict[str, Any]]:
"""Get info about indicators that match provided FQL filters.

Args:
filter: FQL query expression that should be used to limit the results.
limit: Maximum number of records to return. (Max: 5000)
offset: Starting index of overall result set from which to return ids.
sort: The property to sort by. (Ex: created_date|desc)
q: Free text search across all indexed fields.
include_deleted: Flag indicating if both published and deleted indicators should be returned.
include_relations: Flag indicating if related indicators should be returned.

Returns:
List of indicators that match the provided filters.
"""
# Prepare parameters
params = prepare_api_parameters({
"filter": filter,
"limit": limit,
"offset": offset,
"sort": sort,
"q": q,
"include_deleted": include_deleted,
"include_relations": include_relations,
})

# Define the operation name
operation = "QueryIntelIndicatorEntities"

logger.debug("Searching indicators with params: %s", params)

# Make the API request
response = self.client.command(operation, parameters=params)

# Handle the response
result = handle_api_response(
response,
operation=operation,
error_message="Failed to search indicators",
default_result=[]
)

# If handle_api_response returns an error dict instead of a list,
# it means there was an error, so we return it wrapped in a list
if isinstance(result, dict) and "error" in result:
return [result]

return result

def query_report_entities(
self,
filter: Optional[str] = Field(default=None, description="FQL query expression that should be used to limit the results."),
limit: int = Field(default=100, ge=1, le=5000, description="Maximum number of records to return. (Max: 5000)"),
offset: int = Field(default=0, ge=0, description="Starting index of overall result set from which to return ids."),
sort: Optional[str] = Field(default=None, description="The property to sort by. (Ex: created_date|desc)"),
q: Optional[str] = Field(default=None, description="Free text search across all indexed fields."),
include_deleted: Optional[bool] = Field(default=False, description="Flag indicating if both published and deleted reports should be returned."),
) -> List[Dict[str, Any]]:
"""Get info about reports that match provided FQL filters.

Args:
filter: FQL query expression that should be used to limit the results. Review the following table for a complete list of available filters.
limit: Maximum number of records to return. (Max: 5000)
offset: Starting index of overall result set from which to return ids.
sort: The property to sort by. (Ex: created_date|desc)
q: Free text search across all indexed fields.
include_deleted: Flag indicating if both published and deleted reports should be returned.
fields: The fields to return, or a predefined set of fields in the form of the collection name surrounded by two underscores.

Returns:
List of reports that match the provided filters.
"""
# Prepare parameters
params = prepare_api_parameters({
"filter": filter,
"limit": limit,
"offset": offset,
"sort": sort,
"q": q,
"include_deleted": include_deleted,
})

# Define the operation name
operation = "QueryIntelReportEntities"

logger.debug("Searching reports with params: %s", params)

# Make the API request
response = self.client.command(operation, parameters=params)

# Handle the response
result = handle_api_response(
response,
operation=operation,
error_message="Failed to search reports",
default_result=[]
)

# If handle_api_response returns an error dict instead of a list,
# it means there was an error, so we return it wrapped in a list
if self._is_error(result):
return [result]

return result
3 changes: 3 additions & 0 deletions tests/e2e/modules/test_incidents.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,19 @@ def assertions(tools, result):
self.assertGreaterEqual(len(tools), 1, "Expected at least 1 tool call")
used_tool = tools[len(tools) - 1]
self.assertEqual(used_tool['input']['tool_name'], "show_crowd_score")

# Verify the output contains the expected data
output = json.loads(used_tool['output'])
self.assertEqual(output["average_score"], 53) # (50+70+40)/3 = 53.33 rounded to 53
self.assertEqual(output["average_adjusted_score"], 63) # (60+80+50)/3 = 63.33 rounded to 63
self.assertEqual(len(output["scores"]), 3)

# Verify API call parameters
self.assertGreaterEqual(self._mock_api_instance.command.call_count, 1, "Expected at least 1 API call")
api_call_params = self._mock_api_instance.command.call_args_list[0][1].get('parameters', {})
self.assertEqual(api_call_params.get('limit'), 100) # Default limit
self.assertEqual(api_call_params.get('offset'), 0) # Default offset

# Verify result contains CrowdScore information
self.assertIn("CrowdScore", result)
self.assertIn("53", result) # Average score should be mentioned
Expand Down
Loading
Loading