pFad - Phone/Frame/Anonymizer/Declutterfier! Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

URL: http://github.com/modelcontextprotocol/python-sdk/commit/9ed0b93cebbd2d008f8162786eb81d9c3992e045

="all" rel="stylesheet" href="https://github.githubassets.com/assets/global-9c8f61f9f58ad7b2.css" /> fix: handle ClosedResourceError in StreamableHTTP message router (#1384) · modelcontextprotocol/python-sdk@9ed0b93 · GitHub
Skip to content

Commit 9ed0b93

Browse files
Edison-A-NmaxisbeyKludex
authored
fix: handle ClosedResourceError in StreamableHTTP message router (#1384)
Co-authored-by: Max Isbey <224885523+maxisbey@users.noreply.github.com> Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>
1 parent 72a3400 commit 9ed0b93

2 files changed

Lines changed: 284 additions & 1 deletion

File tree

src/mcp/server/streamable_http.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1000,11 +1000,16 @@ async def message_router(): # pragma: no cover
10001000
# Stream might be closed, remove from registry
10011001
self._request_streams.pop(request_stream_id, None)
10021002
else:
1003-
logging.debug(
1003+
logger.debug(
10041004
f"""Request stream {request_stream_id} not found
10051005
for message. Still processing message as the client
10061006
might reconnect and replay."""
10071007
)
1008+
except anyio.ClosedResourceError:
1009+
if self._terminated:
1010+
logger.debug("Read stream closed by client")
1011+
else:
1012+
logger.exception("Unexpected closure of read stream in message router")
10081013
except Exception:
10091014
logger.exception("Error in message router")
10101015

Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
"""Test for issue #1363 - Race condition in StreamableHTTP transport causes ClosedResourceError.
2+
3+
This test reproduces the race condition described in issue #1363 where MCP servers
4+
in HTTP Streamable mode experience ClosedResourceError exceptions when requests
5+
fail validation early (e.g., due to incorrect Accept headers).
6+
7+
The race condition occurs because:
8+
1. Transport setup creates a message_router task
9+
2. Message router enters async for write_stream_reader loop
10+
3. write_stream_reader calls checkpoint() in receive(), yielding control
11+
4. Request handling processes HTTP request
12+
5. If validation fails early, request returns immediately
13+
6. Transport termination closes all streams including write_stream_reader
14+
7. Message router may still be in checkpoint() yield and hasn't returned to check stream state
15+
8. When message router resumes, it encounters a closed stream, raising ClosedResourceError
16+
"""
17+
18+
import logging
19+
import threading
20+
from collections.abc import AsyncGenerator
21+
from contextlib import asynccontextmanager
22+
23+
import anyio
24+
import httpx
25+
import pytest
26+
from starlette.applications import Starlette
27+
from starlette.routing import Mount
28+
29+
from mcp.server import Server
30+
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
31+
32+
SERVER_NAME = "test_race_condition_server"
33+
34+
35+
class RaceConditionTestServer(Server):
36+
def __init__(self):
37+
super().__init__(SERVER_NAME)
38+
39+
40+
def create_app(json_response: bool = False) -> Starlette:
41+
"""Create a Starlette application for testing."""
42+
app = RaceConditionTestServer()
43+
44+
# Create session manager
45+
session_manager = StreamableHTTPSessionManager(
46+
app=app,
47+
json_response=json_response,
48+
stateless=True, # Use stateless mode to trigger the race condition
49+
)
50+
51+
# Create Starlette app with lifespan
52+
@asynccontextmanager
53+
async def lifespan(app: Starlette) -> AsyncGenerator[None, None]:
54+
async with session_manager.run():
55+
yield
56+
57+
routes = [
58+
Mount("/", app=session_manager.handle_request),
59+
]
60+
61+
return Starlette(routes=routes, lifespan=lifespan)
62+
63+
64+
class ServerThread(threading.Thread):
65+
"""Thread that runs the ASGI application lifespan in a separate event loop."""
66+
67+
def __init__(self, app: Starlette):
68+
super().__init__(daemon=True)
69+
self.app = app
70+
self._stop_event = threading.Event()
71+
72+
def run(self) -> None:
73+
"""Run the lifespan in a new event loop."""
74+
75+
# Create a new event loop for this thread
76+
async def run_lifespan():
77+
# Use the lifespan context (always present in our tests)
78+
lifespan_context = getattr(self.app.router, "lifespan_context", None)
79+
assert lifespan_context is not None # Tests always create apps with lifespan
80+
async with lifespan_context(self.app):
81+
# Wait until stop is requested
82+
while not self._stop_event.is_set():
83+
await anyio.sleep(0.1)
84+
85+
anyio.run(run_lifespan)
86+
87+
def stop(self) -> None:
88+
"""Signal the thread to stop."""
89+
self._stop_event.set()
90+
91+
92+
def check_logs_for_race_condition_errors(caplog: pytest.LogCaptureFixture, test_name: str) -> None:
93+
"""
94+
Check logs for ClosedResourceError and other race condition errors.
95+
96+
Args:
97+
caplog: pytest log capture fixture
98+
test_name: Name of the test for better error messages
99+
"""
100+
# Check for specific race condition errors in logs
101+
errors_found: list[str] = []
102+
103+
for record in caplog.records: # pragma: no cover
104+
message = record.getMessage()
105+
if "ClosedResourceError" in message:
106+
errors_found.append("ClosedResourceError")
107+
if "Error in message router" in message:
108+
errors_found.append("Error in message router")
109+
if "anyio.ClosedResourceError" in message:
110+
errors_found.append("anyio.ClosedResourceError")
111+
112+
# Assert no race condition errors occurred
113+
if errors_found: # pragma: no cover
114+
error_msg = f"Test '{test_name}' found race condition errors in logs: {', '.join(set(errors_found))}\n"
115+
error_msg += "Log records:\n"
116+
for record in caplog.records:
117+
if any(err in record.getMessage() for err in ["ClosedResourceError", "Error in message router"]):
118+
error_msg += f" {record.levelname}: {record.getMessage()}\n"
119+
pytest.fail(error_msg)
120+
121+
122+
@pytest.mark.anyio
123+
async def test_race_condition_invalid_accept_headers(caplog: pytest.LogCaptureFixture):
124+
"""
125+
Test the race condition with invalid Accept headers.
126+
127+
This test reproduces the exact scenario described in issue #1363:
128+
- Send POST request with incorrect Accept headers (missing either application/json or text/event-stream)
129+
- Request fails validation early and returns quickly
130+
- This should trigger the race condition where message_router encounters ClosedResourceError
131+
"""
132+
app = create_app()
133+
server_thread = ServerThread(app)
134+
server_thread.start()
135+
136+
try:
137+
# Give the server thread a moment to start
138+
await anyio.sleep(0.1)
139+
140+
# Suppress WARNING logs (expected validation errors) and capture ERROR logs
141+
with caplog.at_level(logging.ERROR):
142+
# Test with missing text/event-stream in Accept header
143+
async with httpx.AsyncClient(
144+
transport=httpx.ASGITransport(app=app), base_url="http://testserver", timeout=5.0
145+
) as client:
146+
response = await client.post(
147+
"/",
148+
json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}},
149+
headers={
150+
"Accept": "application/json", # Missing text/event-stream
151+
"Content-Type": "application/json",
152+
},
153+
)
154+
# Should get 406 Not Acceptable due to missing text/event-stream
155+
assert response.status_code == 406
156+
157+
# Test with missing application/json in Accept header
158+
async with httpx.AsyncClient(
159+
transport=httpx.ASGITransport(app=app), base_url="http://testserver", timeout=5.0
160+
) as client:
161+
response = await client.post(
162+
"/",
163+
json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}},
164+
headers={
165+
"Accept": "text/event-stream", # Missing application/json
166+
"Content-Type": "application/json",
167+
},
168+
)
169+
# Should get 406 Not Acceptable due to missing application/json
170+
assert response.status_code == 406
171+
172+
# Test with completely invalid Accept header
173+
async with httpx.AsyncClient(
174+
transport=httpx.ASGITransport(app=app), base_url="http://testserver", timeout=5.0
175+
) as client:
176+
response = await client.post(
177+
"/",
178+
json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}},
179+
headers={
180+
"Accept": "text/plain", # Invalid Accept header
181+
"Content-Type": "application/json",
182+
},
183+
)
184+
# Should get 406 Not Acceptable
185+
assert response.status_code == 406
186+
187+
# Give background tasks time to complete
188+
await anyio.sleep(0.2)
189+
190+
finally:
191+
server_thread.stop()
192+
server_thread.join(timeout=5.0)
193+
# Check logs for race condition errors
194+
check_logs_for_race_condition_errors(caplog, "test_race_condition_invalid_accept_headers")
195+
196+
197+
@pytest.mark.anyio
198+
async def test_race_condition_invalid_content_type(caplog: pytest.LogCaptureFixture):
199+
"""
200+
Test the race condition with invalid Content-Type headers.
201+
202+
This test reproduces the race condition scenario with Content-Type validation failure.
203+
"""
204+
app = create_app()
205+
server_thread = ServerThread(app)
206+
server_thread.start()
207+
208+
try:
209+
# Give the server thread a moment to start
210+
await anyio.sleep(0.1)
211+
212+
# Suppress WARNING logs (expected validation errors) and capture ERROR logs
213+
with caplog.at_level(logging.ERROR):
214+
# Test with invalid Content-Type
215+
async with httpx.AsyncClient(
216+
transport=httpx.ASGITransport(app=app), base_url="http://testserver", timeout=5.0
217+
) as client:
218+
response = await client.post(
219+
"/",
220+
json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}},
221+
headers={
222+
"Accept": "application/json, text/event-stream",
223+
"Content-Type": "text/plain", # Invalid Content-Type
224+
},
225+
)
226+
assert response.status_code == 400
227+
228+
# Give background tasks time to complete
229+
await anyio.sleep(0.2)
230+
231+
finally:
232+
server_thread.stop()
233+
server_thread.join(timeout=5.0)
234+
# Check logs for race condition errors
235+
check_logs_for_race_condition_errors(caplog, "test_race_condition_invalid_content_type")
236+
237+
238+
@pytest.mark.anyio
239+
async def test_race_condition_message_router_async_for(caplog: pytest.LogCaptureFixture):
240+
"""
241+
Uses json_response=True to trigger the `if self.is_json_response_enabled` branch,
242+
which reproduces the ClosedResourceError when message_router is suspended
243+
in async for loop while transport cleanup closes streams concurrently.
244+
"""
245+
app = create_app(json_response=True)
246+
server_thread = ServerThread(app)
247+
server_thread.start()
248+
249+
try:
250+
# Give the server thread a moment to start
251+
await anyio.sleep(0.1)
252+
253+
# Suppress WARNING logs (expected validation errors) and capture ERROR logs
254+
with caplog.at_level(logging.ERROR):
255+
# Use httpx.ASGITransport to test the ASGI app directly
256+
async with httpx.AsyncClient(
257+
transport=httpx.ASGITransport(app=app), base_url="http://testserver", timeout=5.0
258+
) as client:
259+
# Send a valid initialize request
260+
response = await client.post(
261+
"/",
262+
json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}},
263+
headers={
264+
"Accept": "application/json, text/event-stream",
265+
"Content-Type": "application/json",
266+
},
267+
)
268+
# Should get a successful response
269+
assert response.status_code in (200, 201)
270+
271+
# Give background tasks time to complete
272+
await anyio.sleep(0.2)
273+
274+
finally:
275+
server_thread.stop()
276+
server_thread.join(timeout=5.0)
277+
# Check logs for race condition errors in message router
278+
check_logs_for_race_condition_errors(caplog, "test_race_condition_message_router_async_for")

0 commit comments

Comments
 (0)
pFad - Phonifier reborn

Pfad - The Proxy pFad © 2024 Your Company Name. All rights reserved.





Check this box to remove all script contents from the fetched content.



Check this box to remove all images from the fetched content.


Check this box to remove all CSS styles from the fetched content.


Check this box to keep images inefficiently compressed and original size.

Note: This service is not intended for secure transactions such as banking, social media, email, or purchasing. Use at your own risk. We assume no liability whatsoever for broken pages.


Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy