Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,54 +57,68 @@ async def traced_method(wrapped, instance, args, kwargs):
tool_arguments = args[1] if len(args) > 1 else {}

entity_name = tool_key if tool_key else "unknown_tool"
span_name = f"{entity_name}.tool"

with self._tracer.start_as_current_span(span_name) as span:
span.set_attribute(SpanAttributes.TRACELOOP_SPAN_KIND, TraceloopSpanKindValues.TOOL.value)
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_NAME, entity_name)
# Create parent server.mcp span
with self._tracer.start_as_current_span("mcp.server") as mcp_span:
mcp_span.set_attribute(SpanAttributes.TRACELOOP_SPAN_KIND, "server")
mcp_span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_NAME, "mcp.server")

if self._should_send_prompts():
try:
input_data = {
"tool_name": entity_name,
"arguments": tool_arguments
}
json_input = json.dumps(input_data, cls=self._get_json_encoder())
truncated_input = self._truncate_json_if_needed(json_input)
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_INPUT, truncated_input)
except (TypeError, ValueError):
pass # Skip input logging if serialization fails

try:
result = await wrapped(*args, **kwargs)

# Add output in traceloop format
if self._should_send_prompts() and result:
# Create nested tool span
span_name = f"{entity_name}.tool"
with self._tracer.start_as_current_span(span_name) as tool_span:
tool_span.set_attribute(SpanAttributes.TRACELOOP_SPAN_KIND, TraceloopSpanKindValues.TOOL.value)
tool_span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_NAME, entity_name)

if self._should_send_prompts():
try:
# Convert FastMCP Content objects to serializable format
output_data = []
for item in result:
if hasattr(item, 'text'):
output_data.append({"type": "text", "content": item.text})
elif hasattr(item, '__dict__'):
output_data.append(item.__dict__)
else:
output_data.append(str(item))

json_output = json.dumps(output_data, cls=self._get_json_encoder())
truncated_output = self._truncate_json_if_needed(json_output)
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_OUTPUT, truncated_output)
input_data = {
"tool_name": entity_name,
"arguments": tool_arguments
}
json_input = json.dumps(input_data, cls=self._get_json_encoder())
truncated_input = self._truncate_json_if_needed(json_input)
tool_span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_INPUT, truncated_input)
except (TypeError, ValueError):
pass # Skip output logging if serialization fails
pass # Skip input logging if serialization fails

span.set_status(Status(StatusCode.OK))
return result

except Exception as e:
span.set_attribute(ERROR_TYPE, type(e).__name__)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
try:
result = await wrapped(*args, **kwargs)

# Add output in traceloop format to tool span
if self._should_send_prompts() and result:
try:
# Convert FastMCP Content objects to serializable format
output_data = []
for item in result:
if hasattr(item, 'text'):
output_data.append({"type": "text", "content": item.text})
elif hasattr(item, '__dict__'):
output_data.append(item.__dict__)
else:
output_data.append(str(item))

json_output = json.dumps(output_data, cls=self._get_json_encoder())
truncated_output = self._truncate_json_if_needed(json_output)
tool_span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_OUTPUT, truncated_output)

# Also add response to MCP span
mcp_span.set_attribute(SpanAttributes.MCP_RESPONSE_VALUE, truncated_output)
except (TypeError, ValueError):
pass # Skip output logging if serialization fails

tool_span.set_status(Status(StatusCode.OK))
mcp_span.set_status(Status(StatusCode.OK))
return result

except Exception as e:
tool_span.set_attribute(ERROR_TYPE, type(e).__name__)
tool_span.record_exception(e)
tool_span.set_status(Status(StatusCode.ERROR, str(e)))

mcp_span.set_attribute(ERROR_TYPE, type(e).__name__)
mcp_span.record_exception(e)
mcp_span.set_status(Status(StatusCode.ERROR, str(e)))
raise

return traced_method

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE

from opentelemetry.instrumentation.mcp.version import __version__
from opentelemetry.instrumentation.mcp.utils import dont_throw
from opentelemetry.instrumentation.mcp.utils import dont_throw, Config
from opentelemetry.instrumentation.mcp.fastmcp_instrumentation import FastMCPInstrumentor

_instruments = ("mcp >= 1.6.0",)


class McpInstrumentor(BaseInstrumentor):
def __init__(self):
def __init__(self, exception_logger=None):
super().__init__()
Config.exception_logger = exception_logger
self._fastmcp_instrumentor = FastMCPInstrumentor()

def instrumentation_dependencies(self) -> Collection[str]:
Expand All @@ -38,6 +39,20 @@ def _instrument(self, **kwargs):
# Instrument FastMCP
self._fastmcp_instrumentor.instrument(tracer)

# Instrument FastMCP Client to create a session-level span
register_post_import_hook(
lambda _: wrap_function_wrapper(
"fastmcp.client", "Client.__aenter__", self._fastmcp_client_enter_wrapper(tracer)
),
"fastmcp.client",
)
register_post_import_hook(
lambda _: wrap_function_wrapper(
"fastmcp.client", "Client.__aexit__", self._fastmcp_client_exit_wrapper(tracer)
),
"fastmcp.client",
)

register_post_import_hook(
lambda _: wrap_function_wrapper(
"mcp.client.sse", "sse_client", self._transport_wrapper(tracer)
Expand Down Expand Up @@ -181,6 +196,52 @@ async def traced_method(wrapped, instance, args, kwargs):

return traced_method

def _fastmcp_client_enter_wrapper(self, tracer):
"""Wrapper for FastMCP Client.__aenter__ to start a session trace"""
@dont_throw
async def traced_method(wrapped, instance, args, kwargs):
# Start a root span for the MCP client session and make it current
span_context_manager = tracer.start_as_current_span("mcp.client.session")
span = span_context_manager.__enter__()
span.set_attribute(SpanAttributes.TRACELOOP_SPAN_KIND, "session")
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_NAME, "mcp.client.session")

# Store the span context manager on the instance to properly exit it later
setattr(instance, '_tracing_session_context_manager', span_context_manager)

try:
# Call the original method
result = await wrapped(*args, **kwargs)
return result
except Exception as e:
span.set_attribute(ERROR_TYPE, type(e).__name__)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
return traced_method

def _fastmcp_client_exit_wrapper(self, tracer):
"""Wrapper for FastMCP Client.__aexit__ to end the session trace"""
@dont_throw
async def traced_method(wrapped, instance, args, kwargs):
try:
# Call the original method first
result = await wrapped(*args, **kwargs)

# End the session span context manager
context_manager = getattr(instance, '_tracing_session_context_manager', None)
if context_manager:
context_manager.__exit__(None, None, None)

return result
except Exception as e:
# End the session span context manager with exception info
context_manager = getattr(instance, '_tracing_session_context_manager', None)
if context_manager:
context_manager.__exit__(type(e), e, e.__traceback__)
raise
return traced_method

async def _handle_tool_call(self, tracer, method, params, args, kwargs, wrapped):
"""Handle tools/call with tool semantics"""
# Extract the actual tool name
Expand Down
37 changes: 36 additions & 1 deletion packages/opentelemetry-instrumentation-mcp/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/opentelemetry-instrumentation-mcp/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pytest = "^8.2.2"
pytest-sugar = "1.0.0"
pytest-recording = "^0.13.1"
opentelemetry-sdk = "^1.27.0"
pytest-asyncio = "^1.2.0"

[build-system]
requires = ["poetry-core"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
async def test_fastmcp_server_mcp_parent_span(span_exporter, tracer_provider) -> None:
"""Test that FastMCP tool calls have mcp.server as parent span."""
from fastmcp import FastMCP, Client

# Create a simple FastMCP server
server = FastMCP("test-server")

@server.tool()
async def test_tool(x: int) -> int:
"""A simple test tool."""
return x * 2

# Use in-memory client to connect to the server
async with Client(server) as client:
# Test tool calling
result = await client.call_tool("test_tool", {"x": 5})
assert len(result) == 1
assert result[0].text == "10"

# Get the finished spans
spans = span_exporter.get_finished_spans()

# Debug: Print span details with parent info
print(f"\nTotal spans: {len(spans)}")
for i, span in enumerate(spans):
parent_id = span.parent.span_id if span.parent else "None"
print(f"Span {i}: name='{span.name}', span_id={span.get_span_context().span_id}, "
f"parent_id={parent_id}, trace_id={span.get_span_context().trace_id}")

# Look specifically for mcp.server and tool spans
server_mcp_spans = [span for span in spans if span.name == 'mcp.server']
tool_spans = [span for span in spans if span.name.endswith('.tool')]

print(f"\nMCP Server spans: {len(server_mcp_spans)}")
print(f"Tool spans: {len(tool_spans)}")

# Check if we have the expected spans
assert len(server_mcp_spans) >= 1, f"Expected at least 1 mcp.server span, found {len(server_mcp_spans)}"
assert len(tool_spans) >= 1, f"Expected at least 1 tool span, found {len(tool_spans)}"

# Find server-side spans (should be in same trace)
server_side_spans = []
for server_span in server_mcp_spans:
for tool_span in tool_spans:
if (server_span.get_span_context().trace_id == tool_span.get_span_context().trace_id and
tool_span.parent and
tool_span.parent.span_id == server_span.get_span_context().span_id):
server_side_spans.append((server_span, tool_span))
break

print(f"\nFound {len(server_side_spans)} server-side span pairs")

# Verify we found at least one proper parent-child relationship
assert len(server_side_spans) >= 1, "Expected at least one mcp.server span to be parent of a tool span"

# Check the specific parent-child relationship
server_span, tool_span = server_side_spans[0]
assert tool_span.parent.span_id == server_span.get_span_context().span_id, \
"Tool span should be child of mcp.server span"
assert server_span.get_span_context().trace_id == tool_span.get_span_context().trace_id, \
"Parent and child should be in same trace"

# Verify MCP server span attributes
assert server_span.attributes.get('traceloop.span.kind') == 'server', \
"Server span should have server span kind"
assert server_span.attributes.get('traceloop.entity.name') == 'mcp.server', \
"Server span should have mcp.server entity name"

# Verify tool span attributes
assert tool_span.attributes.get('traceloop.span.kind') == 'tool', \
"Tool span should have tool span kind"
assert tool_span.attributes.get('traceloop.entity.name') == 'test_tool', \
"Tool span should have correct entity name"
Loading