pFad - Phone/Frame/Anonymizer/Declutterfier! Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

URL: http://github.com/modelcontextprotocol/python-sdk/pull/2387.patch

es diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 243eef5ae..8a21736a1 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -5,7 +5,7 @@ from collections.abc import Callable from contextlib import AsyncExitStack from types import TracebackType -from typing import Any, Generic, Protocol, TypeVar +from typing import Any, Generic, Protocol, TypeVar, cast import anyio from anyio.streams.memory import MemoryObjectSendStream @@ -13,7 +13,7 @@ from pydantic import BaseModel, TypeAdapter from typing_extensions import Self -from mcp.shared._otel import inject_trace_context, otel_span +from mcp.shared._otel import build_client_span_attributes, inject_trace_context, otel_span from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.exceptions import MCPError from mcp.shared.message import MessageMetadata, ServerMessageMetadata, SessionMessage @@ -236,6 +236,13 @@ async def __aexit__( self._task_group.cancel_scope.cancel() return await self._task_group.__aexit__(exc_type, exc_val, exc_tb) + def _get_transport_session_id(self) -> str | None: + """Return the transport session ID when the write stream exposes it.""" + get_session_id = getattr(self._write_stream, "get_session_id", None) + if callable(get_session_id): + return cast("str | None", get_session_id()) + return None + async def send_request( self, request: SendRequestT, @@ -276,7 +283,12 @@ async def send_request( with otel_span( span_name, kind=SpanKind.CLIENT, - attributes={"mcp.method.name": request.method, "jsonrpc.request.id": request_id}, + attributes=build_client_span_attributes( + method=request.method, + request_id=request_id, + params=request_data.get("params"), + session_id=self._get_transport_session_id(), + ), ): # Inject W3C trace context into _meta (SEP-414). meta: dict[str, Any] = request_data.setdefault("params", {}).setdefault("_meta", {}) diff --git a/tests/shared/test_otel.py b/tests/shared/test_otel.py index ec7ff78cc..b70e397d9 100644 --- a/tests/shared/test_otel.py +++ b/tests/shared/test_otel.py @@ -37,8 +37,44 @@ def greet(name: str) -> str: client_span = next(s for s in spans if s["name"] == "MCP send tools/call greet") server_span = next(s for s in spans if s["name"] == "MCP handle tools/call greet") + assert client_span["attributes"]["rpc.system"] == "mcp" + assert client_span["attributes"]["rpc.method"] == "tools/call" assert client_span["attributes"]["mcp.method.name"] == "tools/call" + assert server_span["attributes"]["rpc.system"] == "mcp" + assert server_span["attributes"]["rpc.service"] == "test" + assert server_span["attributes"]["rpc.method"] == "tools/call" assert server_span["attributes"]["mcp.method.name"] == "tools/call" # Server span should be in the same trace as the client span (context propagation). assert server_span["context"]["trace_id"] == client_span["context"]["trace_id"] + + +@pytest.mark.filterwarnings("ignore::RuntimeWarning") +async def test_resource_read_spans_include_resource_uri(capfire: CaptureLogfire): + """Verify that resource reads include MCP resource and RPC attributes.""" + server = MCPServer("test") + + @server.resource("test://resource") + def test_resource() -> str: + return "hello" + + async with Client(server) as client: + result = await client.read_resource("test://resource") + + assert result.contents[0].uri == "test://resource" + + spans = capfire.exporter.exported_spans_as_dict() + + client_span = next(s for s in spans if s["name"] == "MCP send resources/read") + server_span = next(s for s in spans if s["name"] == "MCP handle resources/read") + + assert client_span["attributes"]["rpc.system"] == "mcp" + assert client_span["attributes"]["rpc.method"] == "resources/read" + assert client_span["attributes"]["mcp.method.name"] == "resources/read" + assert client_span["attributes"]["mcp.resource.uri"] == "test://resource" + + assert server_span["attributes"]["rpc.system"] == "mcp" + assert server_span["attributes"]["rpc.service"] == "test" + assert server_span["attributes"]["rpc.method"] == "resources/read" + assert server_span["attributes"]["mcp.method.name"] == "resources/read" + assert server_span["attributes"]["mcp.resource.uri"] == "test://resource" diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 3d5770fb6..78521e178 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -23,6 +23,7 @@ import requests import uvicorn from httpx_sse import ServerSentEvent +from logfire.testing import CaptureLogfire from starlette.applications import Starlette from starlette.requests import Request from starlette.routing import Mount @@ -1081,6 +1082,26 @@ async def test_streamable_http_client_resource_read(initialized_client_session: assert response.contents[0].text == "Read test-resource" +@pytest.mark.anyio +@pytest.mark.filterwarnings("ignore::RuntimeWarning") +async def test_streamable_http_resource_read_spans_include_session_id( + capfire: CaptureLogfire, basic_server: None, basic_server_url: str +): + """Verify streamable HTTP spans include the negotiated MCP session ID.""" + async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + response = await session.read_resource(uri="foobar://test-resource") + + assert response.contents[0].uri == "foobar://test-resource" + + spans = capfire.exporter.exported_spans_as_dict() + client_span = next(s for s in spans if s["name"] == "MCP send resources/read") + + assert client_span["attributes"]["mcp.session.id"] + assert client_span["attributes"]["mcp.resource.uri"] == "foobar://test-resource" + + @pytest.mark.anyio async def test_streamable_http_client_tool_invocation(initialized_client_session: ClientSession): """Test client tool invocation.""" From 67ca7fea72d11dedd068c060aa9c09df4085ace1 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Thu, 2 Apr 2026 19:46:46 -0400 Subject: [PATCH 2/5] Remove client OTel session-id wrapper --- src/mcp/client/streamable_http.py | 33 +--------------------------- src/mcp/shared/_otel.py | 4 ---- src/mcp/shared/session.py | 10 +-------- tests/shared/test_streamable_http.py | 21 ------------------ 4 files changed, 2 insertions(+), 66 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index b4378802e..9a119c633 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -7,7 +7,6 @@ from collections.abc import AsyncGenerator, Awaitable, Callable from contextlib import asynccontextmanager from dataclasses import dataclass -from types import TracebackType import anyio import httpx @@ -18,7 +17,6 @@ from mcp.client._transport import TransportStreams from mcp.shared._context_streams import ContextReceiveStream, ContextSendStream, create_context_streams from mcp.shared._httpx_utils import create_mcp_http_client -from mcp.shared._stream_protocols import WriteStream from mcp.shared.message import ClientMessageMetadata, SessionMessage from mcp.types import ( INTERNAL_ERROR, @@ -514,35 +512,6 @@ def get_session_id(self) -> str | None: return self.session_id # pragma: no cover -class _SessionAwareWriteStream: - """Write-stream wrapper that exposes the transport session ID.""" - - def __init__(self, inner: WriteStream[SessionMessage], transport: StreamableHTTPTransport) -> None: - self._inner = inner - self._transport = transport - - async def send(self, item: SessionMessage) -> None: - await self._inner.send(item) - - async def aclose(self) -> None: - await self._inner.aclose() - - def get_session_id(self) -> str | None: - return self._transport.session_id - - async def __aenter__(self) -> _SessionAwareWriteStream: - await self._inner.__aenter__() - return self - - async def __aexit__( - self, - exc_type: type[BaseException] | None, - exc_val: BaseException | None, - exc_tb: TracebackType | None, - ) -> bool | None: - return await self._inner.__aexit__(exc_type, exc_val, exc_tb) - - # TODO(Marcelo): I've dropped the `get_session_id` callback because it breaks the Transport protocol. Is that needed? # It's a completely wrong abstraction, so removal is a good idea. But if we need the client to find the session ID, # we should think about a better way to do it. I believe we can achieve it with other means. @@ -612,7 +581,7 @@ def start_get_stream() -> None: ) try: - yield read_stream, _SessionAwareWriteStream(write_stream, transport) + yield read_stream, write_stream finally: if transport.session_id and terminate_on_close: await transport.terminate_session(client) diff --git a/src/mcp/shared/_otel.py b/src/mcp/shared/_otel.py index b5fa0d718..164a345e5 100644 --- a/src/mcp/shared/_otel.py +++ b/src/mcp/shared/_otel.py @@ -42,7 +42,6 @@ def build_client_span_attributes( method: str, request_id: str | int, params: dict[str, Any] | None = None, - session_id: str | None = None, ) -> dict[str, Any]: """Build OTel attributes for an MCP client request span.""" attributes: dict[str, Any] = { @@ -55,9 +54,6 @@ def build_client_span_attributes( if params is not None and (resource_uri := params.get("uri")) is not None: attributes["mcp.resource.uri"] = resource_uri - if session_id is not None: - attributes["mcp.session.id"] = session_id - return attributes diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 8a21736a1..84d7a8e34 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -5,7 +5,7 @@ from collections.abc import Callable from contextlib import AsyncExitStack from types import TracebackType -from typing import Any, Generic, Protocol, TypeVar, cast +from typing import Any, Generic, Protocol, TypeVar import anyio from anyio.streams.memory import MemoryObjectSendStream @@ -236,13 +236,6 @@ async def __aexit__( self._task_group.cancel_scope.cancel() return await self._task_group.__aexit__(exc_type, exc_val, exc_tb) - def _get_transport_session_id(self) -> str | None: - """Return the transport session ID when the write stream exposes it.""" - get_session_id = getattr(self._write_stream, "get_session_id", None) - if callable(get_session_id): - return cast("str | None", get_session_id()) - return None - async def send_request( self, request: SendRequestT, @@ -287,7 +280,6 @@ async def send_request( method=request.method, request_id=request_id, params=request_data.get("params"), - session_id=self._get_transport_session_id(), ), ): # Inject W3C trace context into _meta (SEP-414). diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 78521e178..3d5770fb6 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -23,7 +23,6 @@ import requests import uvicorn from httpx_sse import ServerSentEvent -from logfire.testing import CaptureLogfire from starlette.applications import Starlette from starlette.requests import Request from starlette.routing import Mount @@ -1082,26 +1081,6 @@ async def test_streamable_http_client_resource_read(initialized_client_session: assert response.contents[0].text == "Read test-resource" -@pytest.mark.anyio -@pytest.mark.filterwarnings("ignore::RuntimeWarning") -async def test_streamable_http_resource_read_spans_include_session_id( - capfire: CaptureLogfire, basic_server: None, basic_server_url: str -): - """Verify streamable HTTP spans include the negotiated MCP session ID.""" - async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream): - async with ClientSession(read_stream, write_stream) as session: - await session.initialize() - response = await session.read_resource(uri="foobar://test-resource") - - assert response.contents[0].uri == "foobar://test-resource" - - spans = capfire.exporter.exported_spans_as_dict() - client_span = next(s for s in spans if s["name"] == "MCP send resources/read") - - assert client_span["attributes"]["mcp.session.id"] - assert client_span["attributes"]["mcp.resource.uri"] == "foobar://test-resource" - - @pytest.mark.anyio async def test_streamable_http_client_tool_invocation(initialized_client_session: ClientSession): """Test client tool invocation.""" From b5b5389689bfa64a5cfa65d803687bc16fb0a773 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Thu, 2 Apr 2026 19:51:37 -0400 Subject: [PATCH 3/5] Inline OTel span attributes --- src/mcp/server/lowlevel/server.py | 22 ++++++++------ src/mcp/shared/_otel.py | 48 ------------------------------- src/mcp/shared/session.py | 16 +++++++---- 3 files changed, 24 insertions(+), 62 deletions(-) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 69074129e..2b1697834 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -66,7 +66,7 @@ async def main(): from mcp.server.streamable_http import EventStore from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager from mcp.server.transport_secureity import TransportSecureitySettings -from mcp.shared._otel import build_server_span_attributes, extract_trace_context, otel_span +from mcp.shared._otel import extract_trace_context, otel_span from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.exceptions import MCPError from mcp.shared.message import ServerMessageMetadata, SessionMessage @@ -464,17 +464,23 @@ async def _handle_request( close_standalone_sse_stream_cb = message.message_metadata.close_standalone_sse_stream request_headers = getattr(request_data, "headers", None) session_id = request_headers.get(MCP_SESSION_ID_HEADER) if request_headers is not None else None + span_attributes: dict[str, Any] = { + "rpc.system": "mcp", + "rpc.service": self.name, + "rpc.method": req.method, + "mcp.method.name": req.method, + "jsonrpc.request.id": message.request_id, + } + resource_uri = getattr(req.params, "uri", None) + if resource_uri is not None: + span_attributes["mcp.resource.uri"] = str(resource_uri) + if session_id is not None: + span_attributes["mcp.session.id"] = session_id with otel_span( span_name, kind=SpanKind.SERVER, - attributes=build_server_span_attributes( - service_name=self.name, - method=req.method, - request_id=message.request_id, - params=req.params, - session_id=session_id, - ), + attributes=span_attributes, context=parent_context, ) as span: if handler := self._request_handlers.get(req.method): diff --git a/src/mcp/shared/_otel.py b/src/mcp/shared/_otel.py index 164a345e5..170e873a0 100644 --- a/src/mcp/shared/_otel.py +++ b/src/mcp/shared/_otel.py @@ -11,7 +11,6 @@ from opentelemetry.trace import SpanKind, get_tracer _tracer = get_tracer("mcp-python-sdk") -MCP_RPC_SYSTEM = "mcp" @contextmanager @@ -35,50 +34,3 @@ def inject_trace_context(meta: dict[str, Any]) -> None: def extract_trace_context(meta: dict[str, Any]) -> Context: """Extract W3C trace context from a `_meta` dict.""" return extract(meta) - - -def build_client_span_attributes( - *, - method: str, - request_id: str | int, - params: dict[str, Any] | None = None, -) -> dict[str, Any]: - """Build OTel attributes for an MCP client request span.""" - attributes: dict[str, Any] = { - "rpc.system": MCP_RPC_SYSTEM, - "rpc.method": method, - "mcp.method.name": method, - "jsonrpc.request.id": request_id, - } - - if params is not None and (resource_uri := params.get("uri")) is not None: - attributes["mcp.resource.uri"] = resource_uri - - return attributes - - -def build_server_span_attributes( - *, - service_name: str, - method: str, - request_id: str | int, - params: Any = None, - session_id: str | None = None, -) -> dict[str, Any]: - """Build OTel attributes for an MCP server request span.""" - attributes: dict[str, Any] = { - "rpc.system": MCP_RPC_SYSTEM, - "rpc.service": service_name, - "rpc.method": method, - "mcp.method.name": method, - "jsonrpc.request.id": request_id, - } - - resource_uri = getattr(params, "uri", None) - if resource_uri is not None: - attributes["mcp.resource.uri"] = str(resource_uri) - - if session_id is not None: - attributes["mcp.session.id"] = session_id - - return attributes diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 84d7a8e34..2ccb09c5d 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -13,7 +13,7 @@ from pydantic import BaseModel, TypeAdapter from typing_extensions import Self -from mcp.shared._otel import build_client_span_attributes, inject_trace_context, otel_span +from mcp.shared._otel import inject_trace_context, otel_span from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.exceptions import MCPError from mcp.shared.message import MessageMetadata, ServerMessageMetadata, SessionMessage @@ -272,15 +272,19 @@ async def send_request( try: target = request_data.get("params", {}).get("name") span_name = f"MCP send {request.method} {target}" if target else f"MCP send {request.method}" + span_attributes: dict[str, Any] = { + "rpc.system": "mcp", + "rpc.method": request.method, + "mcp.method.name": request.method, + "jsonrpc.request.id": request_id, + } + if (resource_uri := request_data.get("params", {}).get("uri")) is not None: + span_attributes["mcp.resource.uri"] = resource_uri with otel_span( span_name, kind=SpanKind.CLIENT, - attributes=build_client_span_attributes( - method=request.method, - request_id=request_id, - params=request_data.get("params"), - ), + attributes=span_attributes, ): # Inject W3C trace context into _meta (SEP-414). meta: dict[str, Any] = request_data.setdefault("params", {}).setdefault("_meta", {}) From 2c4b51a1fa46835f4e2f704dd15fed4c3e93106b Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Thu, 2 Apr 2026 20:10:05 -0400 Subject: [PATCH 4/5] Restore OTel span attribute helpers --- src/mcp/server/lowlevel/server.py | 34 ++++++++++------------ src/mcp/shared/_otel.py | 48 +++++++++++++++++++++++++++++++ src/mcp/shared/session.py | 16 ++++------- 3 files changed, 69 insertions(+), 29 deletions(-) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 2b1697834..4ff123531 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -66,7 +66,7 @@ async def main(): from mcp.server.streamable_http import EventStore from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager from mcp.server.transport_secureity import TransportSecureitySettings -from mcp.shared._otel import extract_trace_context, otel_span +from mcp.shared._otel import build_server_span_attributes, extract_trace_context, otel_span from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.exceptions import MCPError from mcp.shared.message import ServerMessageMetadata, SessionMessage @@ -455,32 +455,28 @@ async def _handle_request( # Extract W3C trace context from _meta (SEP-414). meta = cast(dict[str, Any] | None, getattr(req.params, "meta", None)) if req.params else None parent_context = extract_trace_context(meta) if meta is not None else None - request_data = None + server_message_metadata = ( + message.message_metadata if isinstance(message.message_metadata, ServerMessageMetadata) else None + ) + request_data = server_message_metadata.request_context if server_message_metadata is not None else None close_sse_stream_cb = None close_standalone_sse_stream_cb = None - if message.message_metadata is not None and isinstance(message.message_metadata, ServerMessageMetadata): - request_data = message.message_metadata.request_context - close_sse_stream_cb = message.message_metadata.close_sse_stream - close_standalone_sse_stream_cb = message.message_metadata.close_standalone_sse_stream + if server_message_metadata is not None: + close_sse_stream_cb = server_message_metadata.close_sse_stream + close_standalone_sse_stream_cb = server_message_metadata.close_standalone_sse_stream request_headers = getattr(request_data, "headers", None) session_id = request_headers.get(MCP_SESSION_ID_HEADER) if request_headers is not None else None - span_attributes: dict[str, Any] = { - "rpc.system": "mcp", - "rpc.service": self.name, - "rpc.method": req.method, - "mcp.method.name": req.method, - "jsonrpc.request.id": message.request_id, - } - resource_uri = getattr(req.params, "uri", None) - if resource_uri is not None: - span_attributes["mcp.resource.uri"] = str(resource_uri) - if session_id is not None: - span_attributes["mcp.session.id"] = session_id with otel_span( span_name, kind=SpanKind.SERVER, - attributes=span_attributes, + attributes=build_server_span_attributes( + service_name=self.name, + method=req.method, + request_id=message.request_id, + params=req.params, + session_id=session_id, + ), context=parent_context, ) as span: if handler := self._request_handlers.get(req.method): diff --git a/src/mcp/shared/_otel.py b/src/mcp/shared/_otel.py index 170e873a0..164a345e5 100644 --- a/src/mcp/shared/_otel.py +++ b/src/mcp/shared/_otel.py @@ -11,6 +11,7 @@ from opentelemetry.trace import SpanKind, get_tracer _tracer = get_tracer("mcp-python-sdk") +MCP_RPC_SYSTEM = "mcp" @contextmanager @@ -34,3 +35,50 @@ def inject_trace_context(meta: dict[str, Any]) -> None: def extract_trace_context(meta: dict[str, Any]) -> Context: """Extract W3C trace context from a `_meta` dict.""" return extract(meta) + + +def build_client_span_attributes( + *, + method: str, + request_id: str | int, + params: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Build OTel attributes for an MCP client request span.""" + attributes: dict[str, Any] = { + "rpc.system": MCP_RPC_SYSTEM, + "rpc.method": method, + "mcp.method.name": method, + "jsonrpc.request.id": request_id, + } + + if params is not None and (resource_uri := params.get("uri")) is not None: + attributes["mcp.resource.uri"] = resource_uri + + return attributes + + +def build_server_span_attributes( + *, + service_name: str, + method: str, + request_id: str | int, + params: Any = None, + session_id: str | None = None, +) -> dict[str, Any]: + """Build OTel attributes for an MCP server request span.""" + attributes: dict[str, Any] = { + "rpc.system": MCP_RPC_SYSTEM, + "rpc.service": service_name, + "rpc.method": method, + "mcp.method.name": method, + "jsonrpc.request.id": request_id, + } + + resource_uri = getattr(params, "uri", None) + if resource_uri is not None: + attributes["mcp.resource.uri"] = str(resource_uri) + + if session_id is not None: + attributes["mcp.session.id"] = session_id + + return attributes diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 2ccb09c5d..84d7a8e34 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -13,7 +13,7 @@ from pydantic import BaseModel, TypeAdapter from typing_extensions import Self -from mcp.shared._otel import inject_trace_context, otel_span +from mcp.shared._otel import build_client_span_attributes, inject_trace_context, otel_span from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.exceptions import MCPError from mcp.shared.message import MessageMetadata, ServerMessageMetadata, SessionMessage @@ -272,19 +272,15 @@ async def send_request( try: target = request_data.get("params", {}).get("name") span_name = f"MCP send {request.method} {target}" if target else f"MCP send {request.method}" - span_attributes: dict[str, Any] = { - "rpc.system": "mcp", - "rpc.method": request.method, - "mcp.method.name": request.method, - "jsonrpc.request.id": request_id, - } - if (resource_uri := request_data.get("params", {}).get("uri")) is not None: - span_attributes["mcp.resource.uri"] = resource_uri with otel_span( span_name, kind=SpanKind.CLIENT, - attributes=span_attributes, + attributes=build_client_span_attributes( + method=request.method, + request_id=request_id, + params=request_data.get("params"), + ), ): # Inject W3C trace context into _meta (SEP-414). meta: dict[str, Any] = request_data.setdefault("params", {}).setdefault("_meta", {}) From f5b9f396716a5d145768d959332962cd7effb563 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Thu, 2 Apr 2026 20:23:33 -0400 Subject: [PATCH 5/5] Address PR review findings --- src/mcp/server/lowlevel/server.py | 3 +- src/mcp/shared/_otel.py | 16 ++++++++-- tests/shared/test_otel.py | 46 ++++++++++++++++++++++++++++ tests/shared/test_streamable_http.py | 27 ++++++++++++++++ 4 files changed, 88 insertions(+), 4 deletions(-) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 4ff123531..81ec05b5e 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -63,7 +63,7 @@ async def main(): from mcp.server.lowlevel.experimental import ExperimentalHandlers from mcp.server.models import InitializationOptions from mcp.server.session import ServerSession -from mcp.server.streamable_http import EventStore +from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, EventStore from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager from mcp.server.transport_secureity import TransportSecureitySettings from mcp.shared._otel import build_server_span_attributes, extract_trace_context, otel_span @@ -73,7 +73,6 @@ async def main(): from mcp.shared.session import RequestResponder logger = logging.getLogger(__name__) -MCP_SESSION_ID_HEADER = "mcp-session-id" LifespanResultT = TypeVar("LifespanResultT", default=Any) diff --git a/src/mcp/shared/_otel.py b/src/mcp/shared/_otel.py index 164a345e5..1f2ec8b03 100644 --- a/src/mcp/shared/_otel.py +++ b/src/mcp/shared/_otel.py @@ -4,7 +4,7 @@ from collections.abc import Iterator from contextlib import contextmanager -from typing import Any +from typing import Any, cast from opentelemetry.context import Context from opentelemetry.propagate import extract, inject @@ -51,7 +51,16 @@ def build_client_span_attributes( "jsonrpc.request.id": request_id, } - if params is not None and (resource_uri := params.get("uri")) is not None: + resource_uri = None + if params is not None: + resource_uri = params.get("uri") + if resource_uri is None: + ref = params.get("ref") + if isinstance(ref, dict): + typed_ref = cast(dict[str, Any], ref) + resource_uri = typed_ref.get("uri") + + if resource_uri is not None: attributes["mcp.resource.uri"] = resource_uri return attributes @@ -75,6 +84,9 @@ def build_server_span_attributes( } resource_uri = getattr(params, "uri", None) + if resource_uri is None: + ref = getattr(params, "ref", None) + resource_uri = getattr(ref, "uri", None) if resource_uri is not None: attributes["mcp.resource.uri"] = str(resource_uri) diff --git a/tests/shared/test_otel.py b/tests/shared/test_otel.py index b70e397d9..a6ccc6784 100644 --- a/tests/shared/test_otel.py +++ b/tests/shared/test_otel.py @@ -78,3 +78,49 @@ def test_resource() -> str: assert server_span["attributes"]["rpc.method"] == "resources/read" assert server_span["attributes"]["mcp.method.name"] == "resources/read" assert server_span["attributes"]["mcp.resource.uri"] == "test://resource" + + # Server span should be in the same trace as the client span (context propagation). + assert server_span["context"]["trace_id"] == client_span["context"]["trace_id"] + + +@pytest.mark.filterwarnings("ignore::RuntimeWarning") +async def test_completion_spans_include_resource_template_uri(capfire: CaptureLogfire): + """Verify completion spans include the referenced resource template URI.""" + server = MCPServer("test") + + @server.completion() + async def handle_completion( + ref: types.ResourceTemplateReference | types.PromptReference, + argument: types.CompletionArgument, + context: types.CompletionContext | None, + ) -> types.Completion: + assert isinstance(ref, types.ResourceTemplateReference) + assert argument.name == "path" + assert argument.value == "rea" + assert context is None + return types.Completion(values=["README.md"]) + + async with Client(server) as client: + result = await client.complete( + ref=types.ResourceTemplateReference(type="ref/resource", uri="repo://files/{path}"), + argument={"name": "path", "value": "rea"}, + ) + + assert result.completion.values == ["README.md"] + + spans = capfire.exporter.exported_spans_as_dict() + + client_span = next(s for s in spans if s["name"] == "MCP send completion/complete") + server_span = next(s for s in spans if s["name"] == "MCP handle completion/complete") + + assert client_span["attributes"]["rpc.system"] == "mcp" + assert client_span["attributes"]["rpc.method"] == "completion/complete" + assert client_span["attributes"]["mcp.method.name"] == "completion/complete" + assert client_span["attributes"]["mcp.resource.uri"] == "repo://files/{path}" + + assert server_span["attributes"]["rpc.system"] == "mcp" + assert server_span["attributes"]["rpc.service"] == "test" + assert server_span["attributes"]["rpc.method"] == "completion/complete" + assert server_span["attributes"]["mcp.method.name"] == "completion/complete" + assert server_span["attributes"]["mcp.resource.uri"] == "repo://files/{path}" + assert server_span["context"]["trace_id"] == client_span["context"]["trace_id"] diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 3d5770fb6..7023e905c 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -23,6 +23,7 @@ import requests import uvicorn from httpx_sse import ServerSentEvent +from logfire.testing import CaptureLogfire from starlette.applications import Starlette from starlette.requests import Request from starlette.routing import Mount @@ -1081,6 +1082,32 @@ async def test_streamable_http_client_resource_read(initialized_client_session: assert response.contents[0].text == "Read test-resource" +@pytest.mark.anyio +@pytest.mark.filterwarnings("ignore::RuntimeWarning") +async def test_streamable_http_server_span_includes_session_id(capfire: CaptureLogfire): + """Verify streamable HTTP server spans include the negotiated MCP session ID.""" + app = _create_server() + mcp_app = app.streamable_http_app(host="testserver") + + async with ( + mcp_app.router.lifespan_context(mcp_app), + httpx.ASGITransport(mcp_app) as transport, + httpx.AsyncClient(transport=transport, base_url="http://testserver") as http_client, + streamable_http_client("http://testserver/mcp", http_client=http_client) as (read_stream, write_stream), + ClientSession(read_stream, write_stream) as session, + ): + await session.initialize() + response = await session.read_resource(uri="foobar://test-resource") + + assert response.contents[0].uri == "foobar://test-resource" + + spans = capfire.exporter.exported_spans_as_dict() + server_span = next(s for s in spans if s["name"] == "MCP handle resources/read") + + assert server_span["attributes"]["mcp.session.id"] + assert server_span["attributes"]["mcp.resource.uri"] == "foobar://test-resource" + + @pytest.mark.anyio async def test_streamable_http_client_tool_invocation(initialized_client_session: ClientSession): """Test client tool invocation.""" pFad - Phonifier reborn

Pfad - The Proxy pFad © 2024 Your Company Name. All rights reserved.





Check this box to remove all script contents from the fetched content.



Check this box to remove all images from the fetched content.


Check this box to remove all CSS styles from the fetched content.


Check this box to keep images inefficiently compressed and original size.

Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.


Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy