diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index e47b6e7fb..645a5b40a 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -118,6 +118,12 @@ class InstrumentorConfig(TypedDict): "min_version": "1.0.0", "package_name": "xpander-sdk", }, + "mcp_agent": { + "module_name": "agentops.instrumentation.providers.mcp_agent", + "class_name": "MCPAgentInstrumentor", + "min_version": "0.1.0", + "package_name": "mcp-agent", + }, } # Combine all target packages for monitoring diff --git a/agentops/instrumentation/providers/mcp_agent/__init__.py b/agentops/instrumentation/providers/mcp_agent/__init__.py new file mode 100644 index 000000000..bf94137dd --- /dev/null +++ b/agentops/instrumentation/providers/mcp_agent/__init__.py @@ -0,0 +1,21 @@ +"""MCP Agent instrumentation for AgentOps. + +This package provides OpenTelemetry-based instrumentation for MCP Agent, +enabling telemetry collection and tracing for MCP-based agent workflows. +""" + +from agentops.instrumentation.common import LibraryInfo + +# Library information +_library_info = LibraryInfo(name="mcp-agent") +LIBRARY_NAME = _library_info.name +LIBRARY_VERSION = _library_info.version + +# Import after defining constants to avoid circular imports +from agentops.instrumentation.providers.mcp_agent.instrumentor import MCPAgentInstrumentor # noqa: E402 + +__all__ = [ + "LIBRARY_NAME", + "LIBRARY_VERSION", + "MCPAgentInstrumentor", +] \ No newline at end of file diff --git a/agentops/instrumentation/providers/mcp_agent/config.py b/agentops/instrumentation/providers/mcp_agent/config.py new file mode 100644 index 000000000..22a8c1b3c --- /dev/null +++ b/agentops/instrumentation/providers/mcp_agent/config.py @@ -0,0 +1,79 @@ +"""Configuration for MCP Agent instrumentation.""" + +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class Config: + """Configuration for MCP Agent instrumentation. + + This configuration controls how AgentOps instruments MCP Agent, + including what data to capture and how to integrate with its + existing telemetry system. + """ + + # Data capture settings + capture_prompts: bool = True + """Whether to capture prompts sent to agents.""" + + capture_completions: bool = True + """Whether to capture agent completions/responses.""" + + capture_errors: bool = True + """Whether to capture and report errors.""" + + capture_tool_calls: bool = True + """Whether to capture MCP tool calls and their results.""" + + capture_workflows: bool = True + """Whether to capture workflow execution details.""" + + # Integration settings + integrate_with_existing_telemetry: bool = True + """Whether to integrate with MCP Agent's existing OpenTelemetry setup.""" + + override_tracer_config: bool = False + """Whether to override MCP Agent's tracer configuration.""" + + # Performance settings + max_prompt_length: Optional[int] = 10000 + """Maximum length of prompts to capture (None for unlimited).""" + + max_completion_length: Optional[int] = 10000 + """Maximum length of completions to capture (None for unlimited).""" + + # Filtering settings + excluded_tools: Optional[list[str]] = None + """List of tool names to exclude from instrumentation.""" + + excluded_workflows: Optional[list[str]] = None + """List of workflow names to exclude from instrumentation.""" + + def should_capture_tool(self, tool_name: str) -> bool: + """Check if a tool should be captured.""" + if not self.capture_tool_calls: + return False + if self.excluded_tools and tool_name in self.excluded_tools: + return False + return True + + def should_capture_workflow(self, workflow_name: str) -> bool: + """Check if a workflow should be captured.""" + if not self.capture_workflows: + return False + if self.excluded_workflows and workflow_name in self.excluded_workflows: + return False + return True + + def truncate_prompt(self, prompt: str) -> str: + """Truncate prompt if needed.""" + if self.max_prompt_length and len(prompt) > self.max_prompt_length: + return prompt[:self.max_prompt_length] + "... [truncated]" + return prompt + + def truncate_completion(self, completion: str) -> str: + """Truncate completion if needed.""" + if self.max_completion_length and len(completion) > self.max_completion_length: + return completion[:self.max_completion_length] + "... [truncated]" + return completion \ No newline at end of file diff --git a/agentops/instrumentation/providers/mcp_agent/instrumentor.py b/agentops/instrumentation/providers/mcp_agent/instrumentor.py new file mode 100644 index 000000000..beb82ec36 --- /dev/null +++ b/agentops/instrumentation/providers/mcp_agent/instrumentor.py @@ -0,0 +1,316 @@ +"""MCP Agent Instrumentation for AgentOps + +This module provides comprehensive instrumentation for MCP Agent, including: +- Tracing of agent workflows and tool calls +- Integration with MCP Agent's telemetry system +- Metrics collection for agent operations +- Distributed tracing support + +The instrumentation hooks into MCP Agent's existing OpenTelemetry setup +and extends it with AgentOps-specific tracking. +""" + +from typing import Dict, Any, Optional +from wrapt import wrap_function_wrapper + +from opentelemetry.metrics import Meter +from opentelemetry import trace + +from agentops.logging import logger +from agentops.instrumentation.common import ( + CommonInstrumentor, + InstrumentorConfig, + WrapConfig, + StandardMetrics, + MetricsRecorder, +) +from agentops.instrumentation.providers.mcp_agent import LIBRARY_NAME, LIBRARY_VERSION +from agentops.instrumentation.providers.mcp_agent.config import Config +from agentops.instrumentation.providers.mcp_agent.wrappers import ( + handle_telemetry_manager_traced, + handle_tool_call_attributes, + handle_workflow_attributes, + handle_agent_execution_attributes, + handle_tracer_configuration, +) +from agentops.semconv import Meters + +_instruments = ("mcp-agent >= 0.1.0",) + + +class MCPAgentInstrumentor(CommonInstrumentor): + """ + Instrumentor for MCP Agent library. + + This instrumentor provides comprehensive telemetry integration with MCP Agent, + hooking into its existing OpenTelemetry infrastructure while adding AgentOps-specific + tracking and metrics. + """ + + def __init__(self, config: Optional[InstrumentorConfig] = None): + """Initialize the MCP Agent instrumentor.""" + super().__init__( + library_name=LIBRARY_NAME, + library_version=LIBRARY_VERSION, + instruments=_instruments, + config=config or InstrumentorConfig(), + ) + self._original_telemetry_manager = None + self._original_tracer_config = None + + def _instrument(self, **kwargs): + """Instrument MCP Agent library.""" + tracer_provider = kwargs.get("tracer_provider") + meter_provider = kwargs.get("meter_provider") + + if not tracer_provider: + tracer_provider = trace.get_tracer_provider() + + tracer = trace.get_tracer( + LIBRARY_NAME, + LIBRARY_VERSION, + tracer_provider=tracer_provider, + ) + + meter = meter_provider.get_meter( + name=LIBRARY_NAME, + version=LIBRARY_VERSION, + ) if meter_provider else None + + # Initialize metrics + metrics = self._initialize_metrics(meter) if meter else None + + # Get configuration + config = Config( + capture_prompts=self.config.capture_prompts, + capture_completions=self.config.capture_completions, + capture_errors=self.config.capture_errors, + ) + + # Hook into MCP Agent's telemetry system + self._instrument_telemetry_manager(tracer, metrics, config) + self._instrument_tracer_config(tracer, metrics, config) + self._instrument_tool_calls(tracer, metrics, config) + self._instrument_workflows(tracer, metrics, config) + self._instrument_agent_execution(tracer, metrics, config) + + def _uninstrument(self, **kwargs): + """Remove instrumentation from MCP Agent library.""" + # Restore original telemetry manager if saved + if self._original_telemetry_manager: + try: + import mcp_agent.tracing.telemetry as telemetry_module + telemetry_module.TelemetryManager = self._original_telemetry_manager + self._original_telemetry_manager = None + except ImportError: + pass + + # Restore original tracer config if saved + if self._original_tracer_config: + try: + import mcp_agent.tracing.tracer as tracer_module + tracer_module.TracingConfig = self._original_tracer_config + self._original_tracer_config = None + except ImportError: + pass + + # Unwrap all wrapped functions + for wrap_config in self._get_wrap_configs(): + try: + self._unwrap_function( + wrap_config.module_name, + wrap_config.object_name, + wrap_config.method_name, + ) + except Exception as e: + logger.debug(f"Failed to unwrap {wrap_config}: {e}") + + def _initialize_metrics(self, meter: Meter) -> StandardMetrics: + """Initialize MCP Agent-specific metrics.""" + metrics = StandardMetrics(meter) + + # Add MCP Agent-specific metrics + metrics.tool_calls_counter = meter.create_counter( + name=Meters.MCP_AGENT_TOOL_CALLS, + description="Number of MCP tool calls", + unit="1", + ) + + metrics.workflow_duration = meter.create_histogram( + name=Meters.MCP_AGENT_WORKFLOW_DURATION, + description="Duration of MCP Agent workflows", + unit="ms", + ) + + metrics.agent_executions_counter = meter.create_counter( + name=Meters.MCP_AGENT_EXECUTIONS, + description="Number of agent executions", + unit="1", + ) + + return metrics + + def _instrument_telemetry_manager( + self, + tracer: trace.Tracer, + metrics: Optional[StandardMetrics], + config: Config + ): + """Hook into MCP Agent's TelemetryManager to intercept trace creation.""" + try: + wrap_function_wrapper( + module="mcp_agent.tracing.telemetry", + name="TelemetryManager.traced", + wrapper=handle_telemetry_manager_traced(tracer, metrics, config), + ) + logger.debug("Successfully instrumented MCP Agent TelemetryManager") + except Exception as e: + logger.warning(f"Failed to instrument MCP Agent TelemetryManager: {e}") + + def _instrument_tracer_config( + self, + tracer: trace.Tracer, + metrics: Optional[StandardMetrics], + config: Config + ): + """Hook into MCP Agent's TracingConfig to monitor tracer configuration.""" + try: + wrap_function_wrapper( + module="mcp_agent.tracing.tracer", + name="TracingConfig.configure", + wrapper=handle_tracer_configuration(tracer, metrics, config), + ) + logger.debug("Successfully instrumented MCP Agent TracingConfig") + except Exception as e: + logger.warning(f"Failed to instrument MCP Agent TracingConfig: {e}") + + def _instrument_tool_calls( + self, + tracer: trace.Tracer, + metrics: Optional[StandardMetrics], + config: Config + ): + """Instrument MCP Agent tool calls.""" + wrap_configs = [ + WrapConfig( + module_name="mcp_agent.core.context", + object_name="Context", + method_name="call_tool", + wrapper=handle_tool_call_attributes(tracer, metrics, config), + ), + WrapConfig( + module_name="mcp_agent.executor.executor", + object_name="Executor", + method_name="execute_tool", + wrapper=handle_tool_call_attributes(tracer, metrics, config), + ), + ] + + for wrap_config in wrap_configs: + try: + self._wrap_method(wrap_config) + logger.debug(f"Successfully instrumented {wrap_config}") + except Exception as e: + logger.warning(f"Failed to instrument {wrap_config}: {e}") + + def _instrument_workflows( + self, + tracer: trace.Tracer, + metrics: Optional[StandardMetrics], + config: Config + ): + """Instrument MCP Agent workflows.""" + wrap_configs = [ + WrapConfig( + module_name="mcp_agent.workflows.base", + object_name="BaseWorkflow", + method_name="run", + wrapper=handle_workflow_attributes(tracer, metrics, config), + ), + WrapConfig( + module_name="mcp_agent.workflows.base", + object_name="BaseWorkflow", + method_name="arun", + wrapper=handle_workflow_attributes(tracer, metrics, config), + ), + ] + + for wrap_config in wrap_configs: + try: + self._wrap_method(wrap_config) + logger.debug(f"Successfully instrumented {wrap_config}") + except Exception as e: + logger.warning(f"Failed to instrument {wrap_config}: {e}") + + def _instrument_agent_execution( + self, + tracer: trace.Tracer, + metrics: Optional[StandardMetrics], + config: Config + ): + """Instrument MCP Agent execution.""" + wrap_configs = [ + WrapConfig( + module_name="mcp_agent.agents.base", + object_name="BaseAgent", + method_name="execute", + wrapper=handle_agent_execution_attributes(tracer, metrics, config), + ), + WrapConfig( + module_name="mcp_agent.agents.base", + object_name="BaseAgent", + method_name="aexecute", + wrapper=handle_agent_execution_attributes(tracer, metrics, config), + ), + ] + + for wrap_config in wrap_configs: + try: + self._wrap_method(wrap_config) + logger.debug(f"Successfully instrumented {wrap_config}") + except Exception as e: + logger.warning(f"Failed to instrument {wrap_config}: {e}") + + def _get_wrap_configs(self) -> list[WrapConfig]: + """Get all wrap configurations for uninstrumentation.""" + return [ + # Tool call wrapping + WrapConfig( + module_name="mcp_agent.core.context", + object_name="Context", + method_name="call_tool", + wrapper=None, + ), + WrapConfig( + module_name="mcp_agent.executor.executor", + object_name="Executor", + method_name="execute_tool", + wrapper=None, + ), + # Workflow wrapping + WrapConfig( + module_name="mcp_agent.workflows.base", + object_name="BaseWorkflow", + method_name="run", + wrapper=None, + ), + WrapConfig( + module_name="mcp_agent.workflows.base", + object_name="BaseWorkflow", + method_name="arun", + wrapper=None, + ), + # Agent execution wrapping + WrapConfig( + module_name="mcp_agent.agents.base", + object_name="BaseAgent", + method_name="execute", + wrapper=None, + ), + WrapConfig( + module_name="mcp_agent.agents.base", + object_name="BaseAgent", + method_name="aexecute", + wrapper=None, + ), + ] \ No newline at end of file diff --git a/agentops/instrumentation/providers/mcp_agent/wrappers.py b/agentops/instrumentation/providers/mcp_agent/wrappers.py new file mode 100644 index 000000000..db1b3afb7 --- /dev/null +++ b/agentops/instrumentation/providers/mcp_agent/wrappers.py @@ -0,0 +1,608 @@ +"""Wrapper functions for MCP Agent instrumentation. + +This module provides wrapper functions that hook into MCP Agent's +telemetry system and various components to provide comprehensive +observability. +""" + +import asyncio +import functools +import time +from typing import Any, Callable, Dict, Optional + +from opentelemetry import trace +from opentelemetry.trace import SpanKind, Status, StatusCode + +from agentops.logging import logger +from agentops.instrumentation.common import StandardMetrics +from agentops.instrumentation.providers.mcp_agent.config import Config +from agentops.semconv.span_attributes import SpanAttributes +from agentops.semconv.agent import AgentAttributes +from agentops.semconv.tool import ToolAttributes + + +# Create a unified Attributes class for convenience +class Attributes: + """Unified attributes for MCP Agent instrumentation.""" + # Agent attributes + AGENT_TYPE = "agent.type" + AGENT_NAME = AgentAttributes.AGENT_NAME + + # Operation attributes + OPERATION_TYPE = "operation.type" + + # Tool attributes + TOOL_NAME = ToolAttributes.TOOL_NAME + + # Input/Output attributes + INPUT_PROMPT = SpanAttributes.LLM_PROMPTS + OUTPUT_COMPLETION = SpanAttributes.LLM_COMPLETIONS + + +def handle_telemetry_manager_traced( + tracer: trace.Tracer, + metrics: Optional[StandardMetrics], + config: Config, +) -> Callable: + """Wrapper for MCP Agent's TelemetryManager.traced decorator. + + This hooks into MCP Agent's existing telemetry system to add + AgentOps-specific tracking while preserving their functionality. + """ + def wrapper(wrapped, instance, args, kwargs): + # Get the original decorator parameters + name = args[0] if args else kwargs.get("name") + kind = args[1] if len(args) > 1 else kwargs.get("kind", SpanKind.INTERNAL) + attributes = args[2] if len(args) > 2 else kwargs.get("attributes", {}) + + # Call the original traced method to get the decorator + original_decorator = wrapped(*args, **kwargs) + + # Create our enhanced decorator + def enhanced_decorator(func): + # Apply the original decorator + decorated_func = original_decorator(func) + + # Add our additional instrumentation + @functools.wraps(decorated_func) + async def async_wrapper(*func_args, **func_kwargs): + span_name = f"agentops.mcp_agent.{name or func.__qualname__}" + + with tracer.start_as_current_span( + span_name, + kind=kind, + attributes={ + Attributes.AGENT_TYPE: "mcp_agent", + Attributes.OPERATION_TYPE: "traced_function", + **attributes, + } + ) as span: + try: + # Record metrics if available + if metrics: + metrics.request_counter.add(1, { + "operation": name or func.__qualname__, + "agent_type": "mcp_agent", + }) + + # Execute the original decorated function + result = await decorated_func(*func_args, **func_kwargs) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + if config.capture_errors: + logger.error(f"Error in MCP Agent traced function {name}: {e}") + raise + + @functools.wraps(decorated_func) + def sync_wrapper(*func_args, **func_kwargs): + span_name = f"agentops.mcp_agent.{name or func.__qualname__}" + + with tracer.start_as_current_span( + span_name, + kind=kind, + attributes={ + Attributes.AGENT_TYPE: "mcp_agent", + Attributes.OPERATION_TYPE: "traced_function", + **attributes, + } + ) as span: + try: + # Record metrics if available + if metrics: + metrics.request_counter.add(1, { + "operation": name or func.__qualname__, + "agent_type": "mcp_agent", + }) + + # Execute the original decorated function + result = decorated_func(*func_args, **func_kwargs) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + if config.capture_errors: + logger.error(f"Error in MCP Agent traced function {name}: {e}") + raise + + # Return the appropriate wrapper based on function type + if asyncio.iscoroutinefunction(func): + return async_wrapper + else: + return sync_wrapper + + return enhanced_decorator + + return wrapper + + +def handle_tracer_configuration( + tracer: trace.Tracer, + metrics: Optional[StandardMetrics], + config: Config, +) -> Callable: + """Wrapper for MCP Agent's TracingConfig.configure method. + + This allows us to monitor when MCP Agent configures its tracing + and potentially integrate with or override its configuration. + """ + async def async_wrapper(wrapped, instance, args, kwargs): + settings = args[0] if args else kwargs.get("settings") + session_id = args[1] if len(args) > 1 else kwargs.get("session_id") + force = args[2] if len(args) > 2 else kwargs.get("force", False) + + with tracer.start_as_current_span( + "agentops.mcp_agent.tracer_config", + kind=SpanKind.INTERNAL, + attributes={ + Attributes.AGENT_TYPE: "mcp_agent", + Attributes.OPERATION_TYPE: "tracer_configuration", + "session_id": session_id or "unknown", + "force": force, + "otel_enabled": getattr(settings, "enabled", False) if settings else False, + } + ) as span: + try: + # Log configuration details if integration is enabled + if config.integrate_with_existing_telemetry: + logger.debug(f"MCP Agent configuring tracer with session_id: {session_id}") + + # Optionally override configuration + if config.override_tracer_config and settings: + logger.info("Overriding MCP Agent tracer configuration with AgentOps settings") + # Modify settings as needed + # For example, add AgentOps exporter + if hasattr(settings, "exporters") and "agentops" not in settings.exporters: + settings.exporters.append("agentops") + + # Call the original configure method + result = await wrapped(*args, **kwargs) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + logger.error(f"Error configuring MCP Agent tracer: {e}") + raise + + def sync_wrapper(wrapped, instance, args, kwargs): + # Handle sync version if needed + if asyncio.iscoroutinefunction(wrapped): + # Run async version in event loop + loop = asyncio.get_event_loop() + return loop.run_until_complete(async_wrapper(wrapped, instance, args, kwargs)) + else: + # Direct sync implementation + return wrapped(*args, **kwargs) + + return async_wrapper if asyncio.iscoroutinefunction(handle_tracer_configuration) else sync_wrapper + + +def handle_tool_call_attributes( + tracer: trace.Tracer, + metrics: Optional[StandardMetrics], + config: Config, +) -> Callable: + """Wrapper for MCP Agent tool calls.""" + + async def async_wrapper(wrapped, instance, args, kwargs): + tool_name = args[0] if args else kwargs.get("tool_name", "unknown") + tool_args = args[1] if len(args) > 1 else kwargs.get("arguments", {}) + + # Check if we should capture this tool + if not config.should_capture_tool(tool_name): + return await wrapped(*args, **kwargs) + + with tracer.start_as_current_span( + f"agentops.mcp_agent.tool_call.{tool_name}", + kind=SpanKind.CLIENT, + attributes={ + Attributes.AGENT_TYPE: "mcp_agent", + Attributes.OPERATION_TYPE: "tool_call", + Attributes.TOOL_NAME: tool_name, + "tool_args": str(tool_args)[:1000] if config.capture_prompts else "[hidden]", + } + ) as span: + start_time = time.time() + + try: + # Execute the tool call + result = await wrapped(*args, **kwargs) + + # Record metrics + if metrics: + duration_ms = (time.time() - start_time) * 1000 + metrics.tool_calls_counter.add(1, { + "tool_name": tool_name, + "status": "success", + }) + metrics.request_duration.record(duration_ms, { + "operation": "tool_call", + "tool_name": tool_name, + }) + + # Capture result if configured + if config.capture_completions and result: + span.set_attribute("tool_result", str(result)[:1000]) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + # Record error metrics + if metrics: + metrics.tool_calls_counter.add(1, { + "tool_name": tool_name, + "status": "error", + }) + metrics.error_counter.add(1, { + "operation": "tool_call", + "tool_name": tool_name, + }) + + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + + if config.capture_errors: + logger.error(f"Error in MCP Agent tool call {tool_name}: {e}") + raise + + def sync_wrapper(wrapped, instance, args, kwargs): + tool_name = args[0] if args else kwargs.get("tool_name", "unknown") + tool_args = args[1] if len(args) > 1 else kwargs.get("arguments", {}) + + # Check if we should capture this tool + if not config.should_capture_tool(tool_name): + return wrapped(*args, **kwargs) + + with tracer.start_as_current_span( + f"agentops.mcp_agent.tool_call.{tool_name}", + kind=SpanKind.CLIENT, + attributes={ + Attributes.AGENT_TYPE: "mcp_agent", + Attributes.OPERATION_TYPE: "tool_call", + Attributes.TOOL_NAME: tool_name, + "tool_args": str(tool_args)[:1000] if config.capture_prompts else "[hidden]", + } + ) as span: + start_time = time.time() + + try: + # Execute the tool call + result = wrapped(*args, **kwargs) + + # Record metrics + if metrics: + duration_ms = (time.time() - start_time) * 1000 + metrics.tool_calls_counter.add(1, { + "tool_name": tool_name, + "status": "success", + }) + metrics.request_duration.record(duration_ms, { + "operation": "tool_call", + "tool_name": tool_name, + }) + + # Capture result if configured + if config.capture_completions and result: + span.set_attribute("tool_result", str(result)[:1000]) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + # Record error metrics + if metrics: + metrics.tool_calls_counter.add(1, { + "tool_name": tool_name, + "status": "error", + }) + metrics.error_counter.add(1, { + "operation": "tool_call", + "tool_name": tool_name, + }) + + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + + if config.capture_errors: + logger.error(f"Error in MCP Agent tool call {tool_name}: {e}") + raise + + # Return appropriate wrapper based on function type + if asyncio.iscoroutinefunction(handle_tool_call_attributes): + return async_wrapper + return sync_wrapper + + +def handle_workflow_attributes( + tracer: trace.Tracer, + metrics: Optional[StandardMetrics], + config: Config, +) -> Callable: + """Wrapper for MCP Agent workflows.""" + + async def async_wrapper(wrapped, instance, args, kwargs): + workflow_name = getattr(instance, "__class__", type(instance)).__name__ + + # Check if we should capture this workflow + if not config.should_capture_workflow(workflow_name): + return await wrapped(*args, **kwargs) + + with tracer.start_as_current_span( + f"agentops.mcp_agent.workflow.{workflow_name}", + kind=SpanKind.SERVER, + attributes={ + Attributes.AGENT_TYPE: "mcp_agent", + Attributes.OPERATION_TYPE: "workflow", + "workflow_name": workflow_name, + } + ) as span: + start_time = time.time() + + try: + # Capture workflow input if configured + if config.capture_prompts and args: + span.set_attribute("workflow_input", str(args[0])[:1000]) + + # Execute the workflow + result = await wrapped(*args, **kwargs) + + # Record metrics + if metrics: + duration_ms = (time.time() - start_time) * 1000 + metrics.workflow_duration.record(duration_ms, { + "workflow_name": workflow_name, + "status": "success", + }) + + # Capture result if configured + if config.capture_completions and result: + span.set_attribute("workflow_result", str(result)[:1000]) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + # Record error metrics + if metrics: + metrics.error_counter.add(1, { + "operation": "workflow", + "workflow_name": workflow_name, + }) + + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + + if config.capture_errors: + logger.error(f"Error in MCP Agent workflow {workflow_name}: {e}") + raise + + def sync_wrapper(wrapped, instance, args, kwargs): + workflow_name = getattr(instance, "__class__", type(instance)).__name__ + + # Check if we should capture this workflow + if not config.should_capture_workflow(workflow_name): + return wrapped(*args, **kwargs) + + with tracer.start_as_current_span( + f"agentops.mcp_agent.workflow.{workflow_name}", + kind=SpanKind.SERVER, + attributes={ + Attributes.AGENT_TYPE: "mcp_agent", + Attributes.OPERATION_TYPE: "workflow", + "workflow_name": workflow_name, + } + ) as span: + start_time = time.time() + + try: + # Capture workflow input if configured + if config.capture_prompts and args: + span.set_attribute("workflow_input", str(args[0])[:1000]) + + # Execute the workflow + result = wrapped(*args, **kwargs) + + # Record metrics + if metrics: + duration_ms = (time.time() - start_time) * 1000 + metrics.workflow_duration.record(duration_ms, { + "workflow_name": workflow_name, + "status": "success", + }) + + # Capture result if configured + if config.capture_completions and result: + span.set_attribute("workflow_result", str(result)[:1000]) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + # Record error metrics + if metrics: + metrics.error_counter.add(1, { + "operation": "workflow", + "workflow_name": workflow_name, + }) + + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + + if config.capture_errors: + logger.error(f"Error in MCP Agent workflow {workflow_name}: {e}") + raise + + # Return appropriate wrapper based on function type + if asyncio.iscoroutinefunction(handle_workflow_attributes): + return async_wrapper + return sync_wrapper + + +def handle_agent_execution_attributes( + tracer: trace.Tracer, + metrics: Optional[StandardMetrics], + config: Config, +) -> Callable: + """Wrapper for MCP Agent execution.""" + + async def async_wrapper(wrapped, instance, args, kwargs): + agent_name = getattr(instance, "name", type(instance).__name__) + prompt = args[0] if args else kwargs.get("prompt", "") + + with tracer.start_as_current_span( + f"agentops.mcp_agent.agent_execution.{agent_name}", + kind=SpanKind.SERVER, + attributes={ + Attributes.AGENT_TYPE: "mcp_agent", + Attributes.OPERATION_TYPE: "agent_execution", + Attributes.AGENT_NAME: agent_name, + } + ) as span: + start_time = time.time() + + try: + # Capture prompt if configured + if config.capture_prompts and prompt: + truncated_prompt = config.truncate_prompt(str(prompt)) + span.set_attribute(Attributes.INPUT_PROMPT, truncated_prompt) + + # Execute the agent + result = await wrapped(*args, **kwargs) + + # Record metrics + if metrics: + duration_ms = (time.time() - start_time) * 1000 + metrics.agent_executions_counter.add(1, { + "agent_name": agent_name, + "status": "success", + }) + metrics.request_duration.record(duration_ms, { + "operation": "agent_execution", + "agent_name": agent_name, + }) + + # Capture result if configured + if config.capture_completions and result: + truncated_result = config.truncate_completion(str(result)) + span.set_attribute(Attributes.OUTPUT_COMPLETION, truncated_result) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + # Record error metrics + if metrics: + metrics.agent_executions_counter.add(1, { + "agent_name": agent_name, + "status": "error", + }) + metrics.error_counter.add(1, { + "operation": "agent_execution", + "agent_name": agent_name, + }) + + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + + if config.capture_errors: + logger.error(f"Error in MCP Agent execution for {agent_name}: {e}") + raise + + def sync_wrapper(wrapped, instance, args, kwargs): + agent_name = getattr(instance, "name", type(instance).__name__) + prompt = args[0] if args else kwargs.get("prompt", "") + + with tracer.start_as_current_span( + f"agentops.mcp_agent.agent_execution.{agent_name}", + kind=SpanKind.SERVER, + attributes={ + Attributes.AGENT_TYPE: "mcp_agent", + Attributes.OPERATION_TYPE: "agent_execution", + Attributes.AGENT_NAME: agent_name, + } + ) as span: + start_time = time.time() + + try: + # Capture prompt if configured + if config.capture_prompts and prompt: + truncated_prompt = config.truncate_prompt(str(prompt)) + span.set_attribute(Attributes.INPUT_PROMPT, truncated_prompt) + + # Execute the agent + result = wrapped(*args, **kwargs) + + # Record metrics + if metrics: + duration_ms = (time.time() - start_time) * 1000 + metrics.agent_executions_counter.add(1, { + "agent_name": agent_name, + "status": "success", + }) + metrics.request_duration.record(duration_ms, { + "operation": "agent_execution", + "agent_name": agent_name, + }) + + # Capture result if configured + if config.capture_completions and result: + truncated_result = config.truncate_completion(str(result)) + span.set_attribute(Attributes.OUTPUT_COMPLETION, truncated_result) + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + # Record error metrics + if metrics: + metrics.agent_executions_counter.add(1, { + "agent_name": agent_name, + "status": "error", + }) + metrics.error_counter.add(1, { + "operation": "agent_execution", + "agent_name": agent_name, + }) + + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + + if config.capture_errors: + logger.error(f"Error in MCP Agent execution for {agent_name}: {e}") + raise + + # Return appropriate wrapper based on function type + if asyncio.iscoroutinefunction(handle_agent_execution_attributes): + return async_wrapper + return sync_wrapper \ No newline at end of file diff --git a/agentops/semconv/meters.py b/agentops/semconv/meters.py index 9d8dec934..13e43a239 100644 --- a/agentops/semconv/meters.py +++ b/agentops/semconv/meters.py @@ -22,3 +22,8 @@ class Meters: AGENT_RUNS = "gen_ai.agent.runs" AGENT_TURNS = "gen_ai.agent.turns" AGENT_EXECUTION_TIME = "gen_ai.agent.execution_time" + + # MCP Agent specific metrics + MCP_AGENT_TOOL_CALLS = "gen_ai.mcp_agent.tool_calls" + MCP_AGENT_WORKFLOW_DURATION = "gen_ai.mcp_agent.workflow_duration" + MCP_AGENT_EXECUTIONS = "gen_ai.mcp_agent.executions" diff --git a/docs/mcp_agent_integration.md b/docs/mcp_agent_integration.md new file mode 100644 index 000000000..3adc6cd71 --- /dev/null +++ b/docs/mcp_agent_integration.md @@ -0,0 +1,284 @@ +# MCP Agent Integration with AgentOps + +## Overview + +AgentOps now provides comprehensive integration support for [MCP Agent](https://github.com/lastmile-ai/mcp-agent), enabling seamless telemetry collection and observability for MCP-based agent workflows. This integration hooks directly into MCP Agent's existing OpenTelemetry infrastructure while adding AgentOps-specific tracking and metrics. + +## Features + +### Telemetry Integration +- **Automatic instrumentation** of MCP Agent's TelemetryManager +- **Tracer configuration monitoring** to track when MCP Agent configures its tracing +- **Seamless integration** with existing OpenTelemetry setup +- **Optional override** of tracer configuration for custom exporters + +### Comprehensive Tracking +- **Tool Calls**: Track all MCP tool invocations with arguments and results +- **Workflows**: Monitor workflow execution with input/output capture +- **Agent Execution**: Trace agent operations with prompt and completion tracking +- **Error Handling**: Automatic error capture and reporting + +### Performance Metrics +- Tool call counters and duration +- Workflow execution duration +- Agent execution statistics +- Request/response metrics + +## Installation + +```bash +# Install AgentOps with MCP Agent support +pip install agentops + +# Install MCP Agent (if not already installed) +pip install mcp-agent +``` + +## Quick Start + +### Basic Usage + +```python +import agentops +from mcp_agent import MCPAgent + +# Initialize AgentOps (this automatically instruments MCP Agent) +agentops.init(api_key="your-api-key") + +# Use MCP Agent as normal - telemetry is automatically captured +agent = MCPAgent() +result = await agent.execute("Your prompt here") + +# End the session +agentops.end_session() +``` + +### Manual Instrumentation + +If you need more control over the instrumentation: + +```python +from agentops.instrumentation.providers.mcp_agent import MCPAgentInstrumentor +from agentops.instrumentation.providers.mcp_agent.config import Config + +# Configure the instrumentor +config = Config( + capture_prompts=True, + capture_completions=True, + capture_tool_calls=True, + capture_workflows=True, + max_prompt_length=10000, + max_completion_length=10000, +) + +# Create and apply instrumentation +instrumentor = MCPAgentInstrumentor(config) +instrumentor.instrument() + +# Your MCP Agent code here... + +# Clean up when done +instrumentor.uninstrument() +``` + +## Configuration Options + +The MCP Agent integration can be configured with the following options: + +### Data Capture Settings + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `capture_prompts` | bool | True | Capture prompts sent to agents | +| `capture_completions` | bool | True | Capture agent completions/responses | +| `capture_errors` | bool | True | Capture and report errors | +| `capture_tool_calls` | bool | True | Capture MCP tool calls and results | +| `capture_workflows` | bool | True | Capture workflow execution details | + +### Integration Settings + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `integrate_with_existing_telemetry` | bool | True | Integrate with MCP Agent's existing OpenTelemetry setup | +| `override_tracer_config` | bool | False | Override MCP Agent's tracer configuration | + +### Performance Settings + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `max_prompt_length` | int | 10000 | Maximum length of prompts to capture | +| `max_completion_length` | int | 10000 | Maximum length of completions to capture | + +### Filtering Settings + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `excluded_tools` | list[str] | None | Tool names to exclude from instrumentation | +| `excluded_workflows` | list[str] | None | Workflow names to exclude from instrumentation | + +## Advanced Usage + +### Filtering Specific Tools or Workflows + +```python +config = Config( + excluded_tools=["debug_tool", "internal_tool"], + excluded_workflows=["TestWorkflow"], +) +instrumentor = MCPAgentInstrumentor(config) +``` + +### Custom Telemetry Integration + +The integration preserves MCP Agent's existing telemetry while adding AgentOps tracking: + +```python +from mcp_agent.tracing.telemetry import telemetry + +# MCP Agent's decorator still works as expected +@telemetry.traced("custom_operation") +async def my_function(): + # This will be tracked by both MCP Agent and AgentOps + return await some_operation() +``` + +### Working with MCP Agent's TracingConfig + +```python +from mcp_agent.config import OpenTelemetrySettings + +# Configure MCP Agent's tracing +settings = OpenTelemetrySettings( + enabled=True, + exporters=["console", "otlp"], + service_name="my-mcp-service", +) + +# AgentOps will detect and integrate with this configuration +await tracing_config.configure(settings) +``` + +## Metrics and Observability + +### Available Metrics + +The integration provides the following metrics: + +- `gen_ai.mcp_agent.tool_calls`: Number of MCP tool calls +- `gen_ai.mcp_agent.workflow_duration`: Duration of MCP Agent workflows +- `gen_ai.mcp_agent.executions`: Number of agent executions + +### Span Attributes + +Each span includes relevant attributes: + +- `agent.type`: Always "mcp_agent" +- `agent.name`: Name of the specific agent +- `operation.type`: Type of operation (tool_call, workflow, agent_execution) +- `tool.name`: Name of the tool being called +- `workflow_name`: Name of the workflow +- `gen_ai.prompt`: Input prompt (if capture_prompts is enabled) +- `gen_ai.completion`: Output completion (if capture_completions is enabled) + +## Troubleshooting + +### Common Issues + +1. **Instrumentation not working**: Ensure MCP Agent is imported before calling `agentops.init()` + +2. **Missing telemetry data**: Check that the configuration options are set correctly + +3. **Performance impact**: If experiencing performance issues, consider: + - Reducing `max_prompt_length` and `max_completion_length` + - Excluding non-critical tools/workflows + - Disabling prompt/completion capture for high-volume operations + +### Debug Logging + +Enable debug logging to troubleshoot issues: + +```python +import logging +logging.getLogger("agentops").setLevel(logging.DEBUG) +``` + +## Example: Complete Integration + +```python +import asyncio +import agentops +from mcp_agent import MCPAgent +from mcp_agent.tools import Tool + +# Initialize AgentOps +agentops.init(api_key="your-api-key") + +# Define a custom tool +async def search_tool(query: str) -> str: + # Tool implementation + return f"Results for: {query}" + +# Create MCP Agent with tools +agent = MCPAgent( + tools=[ + Tool( + name="search", + description="Search for information", + function=search_tool + ) + ] +) + +# Execute agent with automatic telemetry +async def main(): + result = await agent.execute( + "Search for information about OpenTelemetry" + ) + print(result) + + # End session with success + agentops.end_session("Success") + +# Run the example +asyncio.run(main()) +``` + +## API Reference + +### MCPAgentInstrumentor + +```python +class MCPAgentInstrumentor: + def __init__(self, config: Optional[Config] = None) + def instrument(self, **kwargs) -> None + def uninstrument(self, **kwargs) -> None +``` + +### Config + +```python +@dataclass +class Config: + capture_prompts: bool = True + capture_completions: bool = True + capture_errors: bool = True + capture_tool_calls: bool = True + capture_workflows: bool = True + integrate_with_existing_telemetry: bool = True + override_tracer_config: bool = False + max_prompt_length: Optional[int] = 10000 + max_completion_length: Optional[int] = 10000 + excluded_tools: Optional[list[str]] = None + excluded_workflows: Optional[list[str]] = None +``` + +## Contributing + +We welcome contributions to improve the MCP Agent integration! Please see our [contributing guidelines](../CONTRIBUTING.md) for more information. + +## Support + +For issues or questions about the MCP Agent integration: +- Open an issue on [GitHub](https://github.com/AgentOps-AI/agentops/issues) +- Check the [AgentOps documentation](https://docs.agentops.ai) +- Join our [Discord community](https://discord.gg/agentops) \ No newline at end of file diff --git a/examples/mcp_agent_example.py b/examples/mcp_agent_example.py new file mode 100644 index 000000000..d8c81721c --- /dev/null +++ b/examples/mcp_agent_example.py @@ -0,0 +1,200 @@ +""" +Example demonstrating MCP Agent integration with AgentOps. + +This example shows how to use AgentOps to automatically instrument +and track MCP Agent operations, including tool calls, workflows, +and agent executions. +""" + +import asyncio +import os +from typing import Dict, Any + +# Import AgentOps (this will auto-instrument MCP Agent when imported) +import agentops + +# Mock MCP Agent imports (replace with actual imports in production) +# from mcp_agent import MCPAgent +# from mcp_agent.tools import Tool +# from mcp_agent.workflows import BaseWorkflow + + +# For demonstration purposes, we'll create mock classes +class MockTool: + """Mock tool for demonstration.""" + + def __init__(self, name: str, function): + self.name = name + self.function = function + + async def execute(self, **kwargs): + """Execute the tool.""" + return await self.function(**kwargs) + + +class MockWorkflow: + """Mock workflow for demonstration.""" + + def __init__(self, name: str): + self.name = name + + async def run(self, input_data: str) -> Dict[str, Any]: + """Run the workflow.""" + print(f"Running workflow '{self.name}' with input: {input_data}") + + # Simulate some processing + await asyncio.sleep(0.1) + + return { + "status": "completed", + "result": f"Processed: {input_data}", + "workflow": self.name, + } + + +class MockMCPAgent: + """Mock MCP Agent for demonstration.""" + + def __init__(self, name: str, tools: list = None): + self.name = name + self.tools = tools or [] + + async def execute(self, prompt: str) -> str: + """Execute the agent with a prompt.""" + print(f"Agent '{self.name}' executing prompt: {prompt}") + + # Simulate tool usage + if self.tools: + for tool in self.tools: + if "search" in prompt.lower() and tool.name == "search": + result = await tool.execute(query=prompt) + return f"Agent response: {result}" + + # Default response + return f"Agent '{self.name}' processed: {prompt}" + + +# Define custom tools +async def search_tool(query: str) -> str: + """Search tool implementation.""" + print(f"Searching for: {query}") + await asyncio.sleep(0.1) # Simulate API call + return f"Found 10 results for '{query}'" + + +async def calculator_tool(expression: str) -> str: + """Calculator tool implementation.""" + print(f"Calculating: {expression}") + try: + # WARNING: eval is dangerous in production! + result = eval(expression) + return f"Result: {result}" + except Exception as e: + return f"Error: {e}" + + +async def main(): + """Main example function.""" + + # Initialize AgentOps + # In production, use your actual API key + api_key = os.getenv("AGENTOPS_API_KEY", "demo-api-key") + + print("Initializing AgentOps...") + agentops.init( + api_key=api_key, + tags=["mcp-agent-demo", "example"], + metadata={"example": "mcp_agent_integration"}, + ) + + try: + # Create tools + search = MockTool("search", search_tool) + calculator = MockTool("calculator", calculator_tool) + + # Create agent with tools + agent = MockMCPAgent( + name="DemoAgent", + tools=[search, calculator], + ) + + # Execute various operations that will be tracked + print("\n--- Agent Execution ---") + response1 = await agent.execute("Search for OpenTelemetry documentation") + print(f"Response: {response1}\n") + + response2 = await agent.execute("Calculate 42 * 17") + print(f"Response: {response2}\n") + + # Create and run a workflow + print("--- Workflow Execution ---") + workflow = MockWorkflow("DataProcessingWorkflow") + workflow_result = await workflow.run("Process this data") + print(f"Workflow result: {workflow_result}\n") + + # Simulate an error scenario (will be captured by AgentOps) + print("--- Error Scenario ---") + try: + await agent.execute("Calculate invalid/expression") + except Exception as e: + print(f"Caught error: {e}\n") + + # End session successfully + print("Ending AgentOps session...") + agentops.end_session("Success") + + print("\n✅ Example completed successfully!") + print("Check your AgentOps dashboard to see the captured telemetry.") + + except Exception as e: + print(f"\n❌ Error occurred: {e}") + agentops.end_session("Error", end_state_reason=str(e)) + raise + + +def demonstrate_manual_instrumentation(): + """Demonstrate manual instrumentation with custom configuration.""" + + from agentops.instrumentation.providers.mcp_agent import MCPAgentInstrumentor + from agentops.instrumentation.providers.mcp_agent.config import Config + + # Create custom configuration + config = Config( + capture_prompts=True, + capture_completions=True, + capture_tool_calls=True, + capture_workflows=True, + capture_errors=True, + max_prompt_length=5000, # Limit prompt capture + max_completion_length=5000, # Limit completion capture + excluded_tools=["debug_tool"], # Exclude specific tools + excluded_workflows=["TestWorkflow"], # Exclude specific workflows + ) + + # Create instrumentor with custom config + instrumentor = MCPAgentInstrumentor(config) + + # Apply instrumentation + print("Applying MCP Agent instrumentation with custom config...") + instrumentor.instrument() + + # Your MCP Agent code here... + + # Clean up + instrumentor.uninstrument() + print("Instrumentation removed.") + + +if __name__ == "__main__": + print("=" * 60) + print("MCP Agent + AgentOps Integration Example") + print("=" * 60) + + # Run the async example + asyncio.run(main()) + + # Optionally demonstrate manual instrumentation + print("\n" + "=" * 60) + print("Manual Instrumentation Example") + print("=" * 60) + demonstrate_manual_instrumentation() \ No newline at end of file diff --git a/tests/integration/test_mcp_agent_integration.py b/tests/integration/test_mcp_agent_integration.py new file mode 100644 index 000000000..8acdd55d0 --- /dev/null +++ b/tests/integration/test_mcp_agent_integration.py @@ -0,0 +1,239 @@ +"""Integration tests for MCP Agent instrumentation. + +This module tests the integration between AgentOps and MCP Agent, +ensuring that telemetry is properly captured and integrated. +""" + +import asyncio +import pytest +from unittest.mock import Mock, patch, MagicMock +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from agentops.instrumentation.providers.mcp_agent import MCPAgentInstrumentor +from agentops.instrumentation.providers.mcp_agent.config import Config + + +class TestMCPAgentIntegration: + """Test suite for MCP Agent integration.""" + + @pytest.fixture + def setup_tracing(self): + """Set up OpenTelemetry tracing for tests.""" + # Create in-memory exporter + exporter = InMemorySpanExporter() + + # Set up tracer provider + provider = TracerProvider() + processor = SimpleSpanProcessor(exporter) + provider.add_span_processor(processor) + + # Set global tracer provider + trace.set_tracer_provider(provider) + + yield exporter + + # Clean up + exporter.clear() + + @pytest.fixture + def instrumentor(self): + """Create an MCP Agent instrumentor instance.""" + config = Config( + capture_prompts=True, + capture_completions=True, + capture_errors=True, + capture_tool_calls=True, + capture_workflows=True, + ) + return MCPAgentInstrumentor(config) + + def test_instrumentor_initialization(self, instrumentor): + """Test that the instrumentor initializes correctly.""" + assert instrumentor is not None + assert instrumentor.library_name == "mcp-agent" + assert instrumentor.config.capture_prompts is True + assert instrumentor.config.capture_completions is True + + @patch("agentops.instrumentation.providers.mcp_agent.instrumentor.wrap_function_wrapper") + def test_telemetry_manager_instrumentation(self, mock_wrap, instrumentor, setup_tracing): + """Test that TelemetryManager is properly instrumented.""" + # Mock the mcp_agent module + with patch.dict("sys.modules", {"mcp_agent.tracing.telemetry": MagicMock()}): + # Instrument + instrumentor.instrument() + + # Verify TelemetryManager.traced was wrapped + calls = [call for call in mock_wrap.call_args_list + if "TelemetryManager.traced" in str(call)] + assert len(calls) > 0, "TelemetryManager.traced should be wrapped" + + @patch("agentops.instrumentation.providers.mcp_agent.instrumentor.wrap_function_wrapper") + def test_tracer_config_instrumentation(self, mock_wrap, instrumentor, setup_tracing): + """Test that TracingConfig is properly instrumented.""" + # Mock the mcp_agent module + with patch.dict("sys.modules", {"mcp_agent.tracing.tracer": MagicMock()}): + # Instrument + instrumentor.instrument() + + # Verify TracingConfig.configure was wrapped + calls = [call for call in mock_wrap.call_args_list + if "TracingConfig.configure" in str(call)] + assert len(calls) > 0, "TracingConfig.configure should be wrapped" + + def test_tool_call_wrapper(self, instrumentor, setup_tracing): + """Test tool call wrapper functionality.""" + from agentops.instrumentation.providers.mcp_agent.wrappers import handle_tool_call_attributes + + tracer = trace.get_tracer("test") + config = Config(capture_tool_calls=True, capture_prompts=True, capture_completions=True) + + # Create wrapper + wrapper = handle_tool_call_attributes(tracer, None, config) + + # Mock function to wrap + async def mock_tool_call(tool_name, arguments): + return {"result": "success"} + + # Wrap the function + wrapped = wrapper(mock_tool_call, None, ["test_tool", {"arg": "value"}], {}) + + # Execute and verify + loop = asyncio.get_event_loop() + result = loop.run_until_complete(wrapped) + + assert result == {"result": "success"} + + # Check spans were created + spans = setup_tracing.get_finished_spans() + assert len(spans) > 0 + assert any("tool_call" in span.name for span in spans) + + def test_workflow_wrapper(self, instrumentor, setup_tracing): + """Test workflow wrapper functionality.""" + from agentops.instrumentation.providers.mcp_agent.wrappers import handle_workflow_attributes + + tracer = trace.get_tracer("test") + config = Config(capture_workflows=True, capture_prompts=True, capture_completions=True) + + # Create wrapper + wrapper = handle_workflow_attributes(tracer, None, config) + + # Mock workflow class + class MockWorkflow: + async def run(self, input_data): + return {"workflow_result": input_data} + + workflow = MockWorkflow() + + # Wrap the method + wrapped = wrapper(workflow.run, workflow, ["test_input"], {}) + + # Execute and verify + loop = asyncio.get_event_loop() + result = loop.run_until_complete(wrapped) + + assert result == {"workflow_result": "test_input"} + + # Check spans were created + spans = setup_tracing.get_finished_spans() + assert len(spans) > 0 + assert any("workflow" in span.name for span in spans) + + def test_agent_execution_wrapper(self, instrumentor, setup_tracing): + """Test agent execution wrapper functionality.""" + from agentops.instrumentation.providers.mcp_agent.wrappers import handle_agent_execution_attributes + + tracer = trace.get_tracer("test") + config = Config( + capture_prompts=True, + capture_completions=True, + max_prompt_length=100, + max_completion_length=100, + ) + + # Create wrapper + wrapper = handle_agent_execution_attributes(tracer, None, config) + + # Mock agent class + class MockAgent: + name = "TestAgent" + + async def execute(self, prompt): + return f"Response to: {prompt}" + + agent = MockAgent() + + # Wrap the method + wrapped = wrapper(agent.execute, agent, ["Test prompt"], {}) + + # Execute and verify + loop = asyncio.get_event_loop() + result = loop.run_until_complete(wrapped) + + assert result == "Response to: Test prompt" + + # Check spans were created + spans = setup_tracing.get_finished_spans() + assert len(spans) > 0 + assert any("agent_execution" in span.name for span in spans) + + # Check attributes + for span in spans: + if "agent_execution" in span.name: + attrs = span.attributes + assert attrs.get("agent.name") == "TestAgent" + assert "gen_ai.prompt" in attrs + + def test_config_filtering(self): + """Test configuration filtering for tools and workflows.""" + config = Config( + capture_tool_calls=True, + capture_workflows=True, + excluded_tools=["excluded_tool"], + excluded_workflows=["ExcludedWorkflow"], + ) + + # Test tool filtering + assert config.should_capture_tool("allowed_tool") is True + assert config.should_capture_tool("excluded_tool") is False + + # Test workflow filtering + assert config.should_capture_workflow("AllowedWorkflow") is True + assert config.should_capture_workflow("ExcludedWorkflow") is False + + def test_config_truncation(self): + """Test configuration truncation for prompts and completions.""" + config = Config( + max_prompt_length=10, + max_completion_length=10, + ) + + # Test prompt truncation + long_prompt = "This is a very long prompt that should be truncated" + truncated = config.truncate_prompt(long_prompt) + assert truncated == "This is a ... [truncated]" + + # Test completion truncation + long_completion = "This is a very long completion that should be truncated" + truncated = config.truncate_completion(long_completion) + assert truncated == "This is a ... [truncated]" + + def test_uninstrument(self, instrumentor): + """Test that uninstrumentation works correctly.""" + with patch("agentops.instrumentation.providers.mcp_agent.instrumentor.wrap_function_wrapper"): + # Instrument first + instrumentor.instrument() + + # Then uninstrument + instrumentor.uninstrument() + + # Verify state is cleaned up + assert instrumentor._original_telemetry_manager is None + assert instrumentor._original_tracer_config is None + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) \ No newline at end of file