-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Buffer per-request StreamableHTTP streams to avoid serial-router head-of-line block #2934
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b0b398c
dde2df9
127b209
1f1cb48
ac6cc7d
45fcc1f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,8 +12,9 @@ | |
| from collections.abc import AsyncGenerator, Awaitable, Callable | ||
| from contextlib import asynccontextmanager | ||
| from dataclasses import dataclass | ||
| from functools import partial | ||
| from http import HTTPStatus | ||
| from typing import Any | ||
| from typing import Any, Final | ||
|
|
||
| import anyio | ||
| import pydantic_core | ||
|
|
@@ -59,13 +60,20 @@ | |
| # Special key for the standalone GET stream | ||
| GET_STREAM_KEY = "_GET_stream" | ||
|
|
||
| # Buffer for the per-request `_request_streams` so the serial `message_router` | ||
| # can deposit a response and move on instead of head-of-line blocking the | ||
| # whole session on a lazily-started `sse_writer`. See #1764. | ||
| REQUEST_STREAM_BUFFER_SIZE: Final = 16 | ||
|
|
||
| # Session ID validation pattern (visible ASCII characters ranging from 0x21 to 0x7E) | ||
| # Pattern ensures entire string contains only valid characters by using ^ and $ anchors | ||
| SESSION_ID_PATTERN = re.compile(r"^[\x21-\x7E]+$") | ||
|
|
||
| # Type aliases | ||
| StreamId = str | ||
| EventId = str | ||
| # An SSE event-dict as accepted by sse-starlette (`event`, `data`, `id`, `retry`). | ||
| SSEEvent = dict[str, Any] | ||
|
|
||
|
|
||
| @dataclass | ||
|
|
@@ -169,7 +177,7 @@ | |
| MemoryObjectReceiveStream[EventMessage], | ||
| ], | ||
| ] = {} | ||
| self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {} | ||
| self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[SSEEvent]] = {} | ||
| self._terminated = False | ||
| # Idle timeout cancel scope; managed by the session manager. | ||
| self.idle_scope: anyio.CancelScope | None = None | ||
|
|
@@ -256,31 +264,48 @@ | |
|
|
||
| return SessionMessage(message, metadata=metadata) | ||
|
|
||
| async def _maybe_send_priming_event( | ||
| self, | ||
| request_id: RequestId, | ||
| sse_stream_writer: MemoryObjectSendStream[dict[str, Any]], | ||
| protocol_version: str, | ||
| ) -> None: | ||
| """Send priming event for SSE resumability if event_store is configured. | ||
| async def _mint_priming_event(self, stream_id: StreamId, protocol_version: str) -> SSEEvent | None: | ||
| """Store the priming cursor for `stream_id` and return its SSE wire form. | ||
|
|
||
| Only sends priming events to clients with protocol version >= 2025-11-25, | ||
| which includes the fix for handling empty SSE data. Older clients would | ||
| crash trying to parse empty data as JSON. | ||
| Called before the request is dispatched so the priming row precedes | ||
| anything `message_router` can store for this stream. Returns `None` | ||
| when no event store is configured or the client predates 2025-11-25 | ||
| (older clients cannot parse the empty-data event). | ||
| """ | ||
| if not self._event_store: | ||
| return | ||
| # Priming events have empty data which older clients cannot handle. | ||
| return None | ||
| if not is_version_at_least(protocol_version, "2025-11-25"): | ||
| return | ||
| priming_event_id = await self._event_store.store_event( | ||
| str(request_id), # Convert RequestId to StreamId (str) | ||
| None, # Priming event has no payload | ||
| ) | ||
| priming_event: dict[str, str | int] = {"id": priming_event_id, "data": ""} | ||
| return None | ||
| priming_event_id = await self._event_store.store_event(stream_id, None) | ||
| priming_event: SSEEvent = {"id": priming_event_id, "data": ""} | ||
| if self._retry_interval is not None: | ||
| priming_event["retry"] = self._retry_interval | ||
| await sse_stream_writer.send(priming_event) | ||
| return priming_event | ||
|
|
||
| async def _run_sse_writer( | ||
| self, | ||
| request_id: RequestId, | ||
| sse_stream_writer: MemoryObjectSendStream[SSEEvent], | ||
| request_stream_reader: MemoryObjectReceiveStream[EventMessage], | ||
| priming_event: SSEEvent | None, | ||
| ) -> None: | ||
| """Forward `_request_streams[request_id]` onto the SSE wire for one POST.""" | ||
| try: | ||
| async with sse_stream_writer, request_stream_reader: | ||
| if priming_event is not None: | ||
| await sse_stream_writer.send(priming_event) | ||
| async for event_message in request_stream_reader: | ||
| await sse_stream_writer.send(self._create_event_data(event_message)) | ||
| if isinstance(event_message.message, JSONRPCResponse | JSONRPCError): | ||
| break | ||
| except anyio.ClosedResourceError: # pragma: lax no cover | ||
| logger.debug("SSE stream closed by close_sse_stream()") | ||
| except Exception: # pragma: lax no cover | ||
| logger.exception("Error in SSE writer") | ||
| finally: | ||
| logger.debug("Closing SSE writer") | ||
| self._sse_stream_writers.pop(request_id, None) | ||
| await self._clean_up_memory_streams(request_id) | ||
|
|
||
| def _create_error_response( | ||
| self, | ||
|
|
@@ -334,7 +359,7 @@ | |
| """Extract the session ID from request headers.""" | ||
| return request.headers.get(MCP_SESSION_ID_HEADER) | ||
|
|
||
| def _create_event_data(self, event_message: EventMessage) -> dict[str, str]: | ||
| def _create_event_data(self, event_message: EventMessage) -> SSEEvent: | ||
| """Create event data dictionary from an EventMessage.""" | ||
| event_data = { | ||
| "event": "message", | ||
|
|
@@ -521,13 +546,13 @@ | |
| else request.headers.get(MCP_PROTOCOL_VERSION_HEADER, DEFAULT_NEGOTIATED_VERSION) | ||
| ) | ||
|
|
||
| # Extract the request ID outside the try block for proper scope | ||
| request_id = str(message.id) | ||
| # Register this stream for the request ID | ||
| self._request_streams[request_id] = anyio.create_memory_object_stream[EventMessage](0) | ||
| request_stream_reader = self._request_streams[request_id][1] | ||
|
|
||
| if self.is_json_response_enabled: | ||
| self._request_streams[request_id] = anyio.create_memory_object_stream[EventMessage]( | ||
| REQUEST_STREAM_BUFFER_SIZE | ||
| ) | ||
| request_stream_reader = self._request_streams[request_id][1] | ||
| # Process the message | ||
| metadata = ServerMessageMetadata(request_context=request) | ||
| session_message = SessionMessage(message, metadata=metadata) | ||
|
|
@@ -571,50 +596,30 @@ | |
| finally: | ||
| await self._clean_up_memory_streams(request_id) | ||
| else: | ||
| # Create SSE stream | ||
| sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, str]](0) | ||
| # Mint the priming event before any per-request state exists: | ||
| # `EventStore.store_event` is user code and may raise, in which | ||
| # case the outer handler returns a 500 with nothing to clean up. | ||
| # Still strictly precedes dispatch, so storage order == wire order. | ||
| priming_event = await self._mint_priming_event(request_id, protocol_version) | ||
|
|
||
| # Store writer reference so close_sse_stream() can close it | ||
| sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[SSEEvent](0) | ||
| self._sse_stream_writers[request_id] = sse_stream_writer | ||
| self._request_streams[request_id] = anyio.create_memory_object_stream[EventMessage]( | ||
| REQUEST_STREAM_BUFFER_SIZE | ||
| ) | ||
| request_stream_reader = self._request_streams[request_id][1] | ||
|
|
||
| async def sse_writer(): | ||
| # Get the request ID from the incoming request message | ||
| try: | ||
| async with sse_stream_writer, request_stream_reader: | ||
| # Send priming event for SSE resumability | ||
| await self._maybe_send_priming_event(request_id, sse_stream_writer, protocol_version) | ||
|
|
||
| # Process messages from the request-specific stream | ||
| async for event_message in request_stream_reader: | ||
| # Build the event data | ||
| event_data = self._create_event_data(event_message) | ||
| await sse_stream_writer.send(event_data) | ||
|
|
||
| # If response, remove from pending streams and close | ||
| if isinstance(event_message.message, JSONRPCResponse | JSONRPCError): | ||
| break | ||
| except anyio.ClosedResourceError: # pragma: lax no cover | ||
| # Expected when close_sse_stream() is called | ||
| logger.debug("SSE stream closed by close_sse_stream()") | ||
| except Exception: # pragma: lax no cover | ||
| logger.exception("Error in SSE writer") | ||
| finally: | ||
| logger.debug("Closing SSE writer") | ||
| self._sse_stream_writers.pop(request_id, None) | ||
| await self._clean_up_memory_streams(request_id) | ||
|
|
||
| # Create and start EventSourceResponse | ||
| # SSE stream mode (original behavior) | ||
| # Set up headers | ||
| headers = { | ||
| "Cache-Control": "no-cache, no-transform", | ||
| "Connection": "keep-alive", | ||
|
Check failure on line 614 in src/mcp/server/streamable_http.py
|
||
|
Comment on lines
+599
to
614
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 With the per-request stream now buffered ( Extended reasoning...What the bug is. In the SSE branch of |
||
| "Content-Type": CONTENT_TYPE_SSE, | ||
| **({MCP_SESSION_ID_HEADER: self.mcp_session_id} if self.mcp_session_id else {}), | ||
| } | ||
| response = EventSourceResponse( | ||
| content=sse_stream_reader, | ||
| data_sender_callable=sse_writer, | ||
| data_sender_callable=partial( | ||
| self._run_sse_writer, request_id, sse_stream_writer, request_stream_reader, priming_event | ||
| ), | ||
| headers=headers, | ||
| ) | ||
|
|
||
|
|
@@ -633,20 +638,16 @@ | |
| finally: | ||
| await sse_stream_reader.aclose() | ||
|
|
||
| except Exception as err: # pragma: lax no cover | ||
| # Reached only when something raises during POST handling outside | ||
| # the per-SSE-stream guard above; whether tests reach this depends | ||
| # on client teardown timing. | ||
| except Exception as err: | ||
| logger.exception("Error handling POST request") | ||
| response = self._create_error_response( | ||
| f"Error handling POST request: {err}", | ||
| "Error handling POST request", | ||
| HTTPStatus.INTERNAL_SERVER_ERROR, | ||
| INTERNAL_ERROR, | ||
| ) | ||
| await response(scope, receive, send) | ||
| if writer: | ||
| await writer.send(Exception(err)) | ||
| return # pragma: no cover | ||
| await writer.send(Exception(err)) | ||
| return | ||
|
Comment on lines
+641
to
+650
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟣 Pre-existing issue (not introduced by this PR, but the PR rewrites exactly these lines and removes the coverage pragma): the outer Extended reasoning...What the bug is. The rewritten outer error handler at the end of The code path that triggers it. Step-by-step proof. (1) Client A POSTs a JSON-RPC notification; Why existing code doesn't prevent it. The SSE branch and the JSON branch each have their own inner Why this is pre-existing, and addressing the dissenting verifier. One verifier argued this is not actionable because the behavior is byte-for-byte identical to How to fix. Either (a) track a |
||
|
|
||
| async def _handle_get_request(self, request: Request, send: Send) -> None: | ||
| """Handle GET request to establish SSE. | ||
|
|
@@ -697,13 +698,15 @@ | |
| return | ||
|
|
||
| # Create SSE stream | ||
| sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, str]](0) | ||
| sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[SSEEvent](0) | ||
|
|
||
| async def standalone_sse_writer(): | ||
| try: | ||
| # Create a standalone message stream for server-initiated messages | ||
|
|
||
| self._request_streams[GET_STREAM_KEY] = anyio.create_memory_object_stream[EventMessage](0) | ||
| self._request_streams[GET_STREAM_KEY] = anyio.create_memory_object_stream[EventMessage]( | ||
| REQUEST_STREAM_BUFFER_SIZE | ||
| ) | ||
| standalone_stream_reader = self._request_streams[GET_STREAM_KEY][1] | ||
|
|
||
| async with sse_stream_writer, standalone_stream_reader: | ||
|
|
@@ -871,7 +874,7 @@ | |
| replay_protocol_version = request.headers.get(MCP_PROTOCOL_VERSION_HEADER, DEFAULT_NEGOTIATED_VERSION) | ||
|
|
||
| # Create SSE stream for replay | ||
| sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, str]](0) | ||
| sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[SSEEvent](0) | ||
|
|
||
| async def replay_sender(): | ||
| try: | ||
|
|
@@ -886,22 +889,32 @@ | |
|
|
||
| # If stream ID not in mapping, create it | ||
| if stream_id and stream_id not in self._request_streams: # pragma: no branch | ||
| # Register SSE writer so close_sse_stream() can close it | ||
| self._sse_stream_writers[stream_id] = sse_stream_writer | ||
|
|
||
| # Send priming event for this new connection | ||
| await self._maybe_send_priming_event(stream_id, sse_stream_writer, replay_protocol_version) | ||
|
|
||
| # Create new request streams for this connection | ||
| self._request_streams[stream_id] = anyio.create_memory_object_stream[EventMessage](0) | ||
| msg_reader = self._request_streams[stream_id][1] | ||
|
|
||
| # Forward messages to SSE | ||
| async with msg_reader: | ||
| async for event_message in msg_reader: | ||
| event_data = self._create_event_data(event_message) | ||
|
|
||
| await sse_stream_writer.send(event_data) | ||
| try: | ||
| # Register SSE writer so close_sse_stream() can close it | ||
| self._sse_stream_writers[stream_id] = sse_stream_writer | ||
|
|
||
| # Prime the resumed connection so the client sees the stream | ||
| # is re-registered. The replay→live-tail ordering window here | ||
| # is pre-existing and tracked separately. | ||
| priming_event = await self._mint_priming_event(stream_id, replay_protocol_version) | ||
| if priming_event is not None: | ||
| await sse_stream_writer.send(priming_event) | ||
|
|
||
| # Create new request streams for this connection | ||
| self._request_streams[stream_id] = anyio.create_memory_object_stream[EventMessage]( | ||
| REQUEST_STREAM_BUFFER_SIZE | ||
| ) | ||
| msg_reader = self._request_streams[stream_id][1] | ||
|
|
||
| # Forward messages to SSE | ||
| async with msg_reader: | ||
| async for event_message in msg_reader: | ||
| event_data = self._create_event_data(event_message) | ||
|
|
||
| await sse_stream_writer.send(event_data) | ||
| finally: | ||
| self._sse_stream_writers.pop(stream_id, None) | ||
| await self._clean_up_memory_streams(stream_id) | ||
| except anyio.ClosedResourceError: # pragma: lax no cover | ||
| # Expected when close_sse_stream() is called | ||
| logger.debug("Replay SSE stream closed by close_sse_stream()") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 The SSE branch of
_handle_post_requestregistersself._sse_stream_writers[request_id]before starting theEventSourceResponse, but theexcept Exception: "SSE response error"block only closes the writer and calls_clean_up_memory_streams— it never pops_sse_stream_writers[request_id], so a startup failure that happens before_run_sse_writeris ever scheduled leaves a closed-writer entry in the dict for the transport's lifetime. This is the same stale-entry class the PR's newfinallyinreplay_senderfixes; a one-lineself._sse_stream_writers.pop(request_id, None)in that except block closes the gap (the gap itself is pre-existing in shape, so non-blocking).Extended reasoning...
What the bug is. In the SSE (non-JSON) branch of
_handle_post_request, the handler registersself._sse_stream_writers[request_id] = sse_stream_writerbefore constructing and starting theEventSourceResponse. The only places this entry is ever removed areclose_sse_stream()(user-invoked),_run_sse_writer'sfinally, andreplay_sender's newfinally. The error path right below the registration —except Exception: logger.exception("SSE response error")— closessse_stream_writerand calls_clean_up_memory_streams(request_id)(which only touches_request_streams), but never pops_sse_stream_writers[request_id].The code path that triggers it.
_run_sse_writeris thedata_sender_callableof theEventSourceResponse; it is started lazily, twostart_soonhops after the response is scheduled in the task group. If the task group fails before that callable ever runs — e.g.writer.send(session_message)raisingClosedResourceErrorbecause the session is being terminated concurrently, or the ASGIsendfailing during response startup —_run_sse_writer'sfinallynever fires, so nothing removes the dict entry.Why existing code doesn't prevent it. Cleanup of
_sse_stream_writersrelies entirely on_run_sse_writer(or explicitclose_sse_stream()calls from user code). Neitherterminate()norconnect()'s teardown clears_sse_stream_writers— they only clear_request_streams. So once the except block runs without the pop, the closed-writer entry persists for the transport's lifetime.Step-by-step proof.
"7"; the handler reaches the SSE branch and setsself._sse_stream_writers["7"] = sse_stream_writer(line ~606).async with anyio.create_task_group() as tg, callstg.start_soon(response, scope, receive, send), and thenawait writer.send(session_message).writer.send()raisesClosedResourceError(or the ASGIsendfails while the response is starting). The task group unwinds beforeEventSourceResponseever invokes itsdata_sender_callable, so_run_sse_writer("7", ...)never runs.except Exceptionblock at lines ~634-637 executes: it closessse_stream_writerand calls_clean_up_memory_streams("7"), removing_request_streams["7"]— but_sse_stream_writers["7"]is left in place, holding a closed stream.terminate()andconnect()teardown only clear_request_streams, andclose_sse_stream("7")is only called by user/tool code for an active stream. The entry lives until the transport object is garbage-collected.Impact. Small and bounded: one closed-writer dict entry per failed SSE response startup, an exceptional path (
pragma: lax no cover). It is not a correctness wedge — but it is exactly the stale-entry class this PR just fixed inreplay_senderby adding afinallythat pops the writer dict, so the POST error path is now the one writer path left without symmetric cleanup. The registration-before-try and the except-without-pop existed in the same shape before this PR; the PR rewrote this region without closing the gap.How to fix. Add
self._sse_stream_writers.pop(request_id, None)to theexcept Exception: "SSE response error"block, alongside the existingawait sse_stream_writer.aclose()and_clean_up_memory_streams(request_id). That makes the POST error path consistent with_run_sse_writer'sfinallyandreplay_sender's newfinally.