Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 99 additions & 86 deletions src/mcp/server/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
from collections.abc import AsyncGenerator, Awaitable, Callable
from contextlib import asynccontextmanager
from dataclasses import dataclass
from functools import partial
from http import HTTPStatus
from typing import Any
from typing import Any, Final

import anyio
import pydantic_core
Expand Down Expand Up @@ -59,13 +60,20 @@
# Special key for the standalone GET stream
GET_STREAM_KEY = "_GET_stream"

# Buffer for the per-request `_request_streams` so the serial `message_router`
# can deposit a response and move on instead of head-of-line blocking the
# whole session on a lazily-started `sse_writer`. See #1764.
REQUEST_STREAM_BUFFER_SIZE: Final = 16

# Session ID validation pattern (visible ASCII characters ranging from 0x21 to 0x7E)
# Pattern ensures entire string contains only valid characters by using ^ and $ anchors
SESSION_ID_PATTERN = re.compile(r"^[\x21-\x7E]+$")

# Type aliases
StreamId = str
EventId = str
# An SSE event-dict as accepted by sse-starlette (`event`, `data`, `id`, `retry`).
SSEEvent = dict[str, Any]


@dataclass
Expand Down Expand Up @@ -169,7 +177,7 @@
MemoryObjectReceiveStream[EventMessage],
],
] = {}
self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {}
self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[SSEEvent]] = {}
self._terminated = False
# Idle timeout cancel scope; managed by the session manager.
self.idle_scope: anyio.CancelScope | None = None
Expand Down Expand Up @@ -256,31 +264,48 @@

return SessionMessage(message, metadata=metadata)

async def _maybe_send_priming_event(
self,
request_id: RequestId,
sse_stream_writer: MemoryObjectSendStream[dict[str, Any]],
protocol_version: str,
) -> None:
"""Send priming event for SSE resumability if event_store is configured.
async def _mint_priming_event(self, stream_id: StreamId, protocol_version: str) -> SSEEvent | None:
"""Store the priming cursor for `stream_id` and return its SSE wire form.

Only sends priming events to clients with protocol version >= 2025-11-25,
which includes the fix for handling empty SSE data. Older clients would
crash trying to parse empty data as JSON.
Called before the request is dispatched so the priming row precedes
anything `message_router` can store for this stream. Returns `None`
when no event store is configured or the client predates 2025-11-25
(older clients cannot parse the empty-data event).
"""
if not self._event_store:
return
# Priming events have empty data which older clients cannot handle.
return None
if not is_version_at_least(protocol_version, "2025-11-25"):
return
priming_event_id = await self._event_store.store_event(
str(request_id), # Convert RequestId to StreamId (str)
None, # Priming event has no payload
)
priming_event: dict[str, str | int] = {"id": priming_event_id, "data": ""}
return None
priming_event_id = await self._event_store.store_event(stream_id, None)
priming_event: SSEEvent = {"id": priming_event_id, "data": ""}
if self._retry_interval is not None:
priming_event["retry"] = self._retry_interval
await sse_stream_writer.send(priming_event)
return priming_event

async def _run_sse_writer(
self,
request_id: RequestId,
sse_stream_writer: MemoryObjectSendStream[SSEEvent],
request_stream_reader: MemoryObjectReceiveStream[EventMessage],
priming_event: SSEEvent | None,
) -> None:
"""Forward `_request_streams[request_id]` onto the SSE wire for one POST."""
try:
async with sse_stream_writer, request_stream_reader:
if priming_event is not None:
await sse_stream_writer.send(priming_event)
async for event_message in request_stream_reader:
await sse_stream_writer.send(self._create_event_data(event_message))
if isinstance(event_message.message, JSONRPCResponse | JSONRPCError):
break
except anyio.ClosedResourceError: # pragma: lax no cover
logger.debug("SSE stream closed by close_sse_stream()")
except Exception: # pragma: lax no cover
logger.exception("Error in SSE writer")
finally:
logger.debug("Closing SSE writer")
self._sse_stream_writers.pop(request_id, None)
await self._clean_up_memory_streams(request_id)

def _create_error_response(
self,
Expand Down Expand Up @@ -334,7 +359,7 @@
"""Extract the session ID from request headers."""
return request.headers.get(MCP_SESSION_ID_HEADER)

def _create_event_data(self, event_message: EventMessage) -> dict[str, str]:
def _create_event_data(self, event_message: EventMessage) -> SSEEvent:
"""Create event data dictionary from an EventMessage."""
event_data = {
"event": "message",
Expand Down Expand Up @@ -521,13 +546,13 @@
else request.headers.get(MCP_PROTOCOL_VERSION_HEADER, DEFAULT_NEGOTIATED_VERSION)
)

# Extract the request ID outside the try block for proper scope
request_id = str(message.id)
# Register this stream for the request ID
self._request_streams[request_id] = anyio.create_memory_object_stream[EventMessage](0)
request_stream_reader = self._request_streams[request_id][1]

if self.is_json_response_enabled:
self._request_streams[request_id] = anyio.create_memory_object_stream[EventMessage](
REQUEST_STREAM_BUFFER_SIZE
)
request_stream_reader = self._request_streams[request_id][1]
# Process the message
metadata = ServerMessageMetadata(request_context=request)
session_message = SessionMessage(message, metadata=metadata)
Expand Down Expand Up @@ -571,50 +596,30 @@
finally:
await self._clean_up_memory_streams(request_id)
else:
# Create SSE stream
sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, str]](0)
# Mint the priming event before any per-request state exists:
# `EventStore.store_event` is user code and may raise, in which
# case the outer handler returns a 500 with nothing to clean up.
# Still strictly precedes dispatch, so storage order == wire order.
priming_event = await self._mint_priming_event(request_id, protocol_version)

# Store writer reference so close_sse_stream() can close it
sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[SSEEvent](0)
self._sse_stream_writers[request_id] = sse_stream_writer
self._request_streams[request_id] = anyio.create_memory_object_stream[EventMessage](
REQUEST_STREAM_BUFFER_SIZE
)
request_stream_reader = self._request_streams[request_id][1]

async def sse_writer():
# Get the request ID from the incoming request message
try:
async with sse_stream_writer, request_stream_reader:
# Send priming event for SSE resumability
await self._maybe_send_priming_event(request_id, sse_stream_writer, protocol_version)

# Process messages from the request-specific stream
async for event_message in request_stream_reader:
# Build the event data
event_data = self._create_event_data(event_message)
await sse_stream_writer.send(event_data)

# If response, remove from pending streams and close
if isinstance(event_message.message, JSONRPCResponse | JSONRPCError):
break
except anyio.ClosedResourceError: # pragma: lax no cover
# Expected when close_sse_stream() is called
logger.debug("SSE stream closed by close_sse_stream()")
except Exception: # pragma: lax no cover
logger.exception("Error in SSE writer")
finally:
logger.debug("Closing SSE writer")
self._sse_stream_writers.pop(request_id, None)
await self._clean_up_memory_streams(request_id)

# Create and start EventSourceResponse
# SSE stream mode (original behavior)
# Set up headers
headers = {
"Cache-Control": "no-cache, no-transform",
"Connection": "keep-alive",

Check failure on line 614 in src/mcp/server/streamable_http.py

View check run for this annotation

Claude / Claude Code Review

Buffered request stream lets close_sse_stream() discard the priming event before it reaches the wire, wedging the request

With the per-request stream now buffered (`REQUEST_STREAM_BUFFER_SIZE = 16`) and the priming event only put on the wire by the lazily-started `_run_sse_writer`, a tool can emit up to 16 notifications and call `ctx.close_sse_stream()` before the writer ever runs — `close_sse_stream()` discards the writer and request stream, the priming send hits `ClosedResourceError`, the SSE response carries zero events, and the client (which only auto-reconnects when it saw at least one event id, `src/mcp/clien
Comment on lines +603 to 614

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The SSE branch of _handle_post_request registers self._sse_stream_writers[request_id] before starting the EventSourceResponse, but the except Exception: "SSE response error" block only closes the writer and calls _clean_up_memory_streams — it never pops _sse_stream_writers[request_id], so a startup failure that happens before _run_sse_writer is ever scheduled leaves a closed-writer entry in the dict for the transport's lifetime. This is the same stale-entry class the PR's new finally in replay_sender fixes; a one-line self._sse_stream_writers.pop(request_id, None) in that except block closes the gap (the gap itself is pre-existing in shape, so non-blocking).

Extended reasoning...

What the bug is. In the SSE (non-JSON) branch of _handle_post_request, the handler registers self._sse_stream_writers[request_id] = sse_stream_writer before constructing and starting the EventSourceResponse. The only places this entry is ever removed are close_sse_stream() (user-invoked), _run_sse_writer's finally, and replay_sender's new finally. The error path right below the registration — except Exception: logger.exception("SSE response error") — closes sse_stream_writer and calls _clean_up_memory_streams(request_id) (which only touches _request_streams), but never pops _sse_stream_writers[request_id].

The code path that triggers it. _run_sse_writer is the data_sender_callable of the EventSourceResponse; it is started lazily, two start_soon hops after the response is scheduled in the task group. If the task group fails before that callable ever runs — e.g. writer.send(session_message) raising ClosedResourceError because the session is being terminated concurrently, or the ASGI send failing during response startup — _run_sse_writer's finally never fires, so nothing removes the dict entry.

Why existing code doesn't prevent it. Cleanup of _sse_stream_writers relies entirely on _run_sse_writer (or explicit close_sse_stream() calls from user code). Neither terminate() nor connect()'s teardown clears _sse_stream_writers — they only clear _request_streams. So once the except block runs without the pop, the closed-writer entry persists for the transport's lifetime.

Step-by-step proof.

  1. Client POSTs a request with id "7"; the handler reaches the SSE branch and sets self._sse_stream_writers["7"] = sse_stream_writer (line ~606).
  2. The handler enters async with anyio.create_task_group() as tg, calls tg.start_soon(response, scope, receive, send), and then await writer.send(session_message).
  3. Suppose the session is being torn down concurrently and writer.send() raises ClosedResourceError (or the ASGI send fails while the response is starting). The task group unwinds before EventSourceResponse ever invokes its data_sender_callable, so _run_sse_writer("7", ...) never runs.
  4. The except Exception block at lines ~634-637 executes: it closes sse_stream_writer and calls _clean_up_memory_streams("7"), removing _request_streams["7"] — but _sse_stream_writers["7"] is left in place, holding a closed stream.
  5. Nothing else ever pops it: terminate() and connect() teardown only clear _request_streams, and close_sse_stream("7") is only called by user/tool code for an active stream. The entry lives until the transport object is garbage-collected.

Impact. Small and bounded: one closed-writer dict entry per failed SSE response startup, an exceptional path (pragma: lax no cover). It is not a correctness wedge — but it is exactly the stale-entry class this PR just fixed in replay_sender by adding a finally that pops the writer dict, so the POST error path is now the one writer path left without symmetric cleanup. The registration-before-try and the except-without-pop existed in the same shape before this PR; the PR rewrote this region without closing the gap.

How to fix. Add self._sse_stream_writers.pop(request_id, None) to the except Exception: "SSE response error" block, alongside the existing await sse_stream_writer.aclose() and _clean_up_memory_streams(request_id). That makes the POST error path consistent with _run_sse_writer's finally and replay_sender's new finally.

Comment on lines +599 to 614

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 With the per-request stream now buffered (REQUEST_STREAM_BUFFER_SIZE = 16) and the priming event only put on the wire by the lazily-started _run_sse_writer, a tool can emit up to 16 notifications and call ctx.close_sse_stream() before the writer ever runs — close_sse_stream() discards the writer and request stream, the priming send hits ClosedResourceError, the SSE response carries zero events, and the client (which only auto-reconnects when it saw at least one event id, src/mcp/client/streamable_http.py:429-432) never replays the stored response, so call_tool() hangs. The close-before-writer-starts race is pre-existing for tools that close after 0–1 messages, but on main the buffer-0 stream parked the router so any tool emitting ≥2 messages before close was structurally guaranteed the client held a priming cursor — this PR widens that window from ≤1 to ≤16 messages on exactly the resumability path it hardens. Consider draining/flushing on close instead of discarding, delivering the priming event eagerly from the POST handler, or having close_sse_stream() defer until the writer has started.

Extended reasoning...

What the bug is. In the SSE branch of _handle_post_request the per-request stream is now created with REQUEST_STREAM_BUFFER_SIZE = 16 and the priming event is no longer sent eagerly: it is minted in the handler (_mint_priming_event, before dispatch) but only put on the wire by _run_sse_writer — the EventSourceResponse data_sender_callable, started lazily two start_soon hops deep (the very lag #1764's reporters observe in production). During that lag, a tool using the documented polling pattern can run all the way to ctx.close_sse_stream(): close_sse_stream(request_id) (lines 219–227) pops and synchronously closes the sse_stream_writer and both halves of _request_streams[request_id]. When _run_sse_writer finally starts, its first await — sending the priming event — raises ClosedResourceError (caught and debug-logged), so the SSE response completes having carried zero events, and any notifications sitting in the 16-slot buffer are dropped from the wire.\n\nWhy the event store does not save you. close_sse_stream is only offered when an event store is configured, precisely so the client can reconnect with Last-Event-ID and replay what it missed. But the client transport only reconnects if it received at least one event id on the stream: src/mcp/client/streamable_http.py:429-432 ("Stream ended without response - reconnect if we received an event with ID", if last_event_id is not None). With zero events on the wire, last_event_id is None, no replay GET is ever issued, and the tool's eventual JSONRPCResponse — duly stored in the event store — is never delivered. The caller's call_tool() / send_request() hangs until its own timeout: a wedged request on the resumability path this PR is hardening.\n\nWhy this PR widens the window (and what part is pre-existing). On main the per-request stream had buffer 0, so the serial message_router parked on its first deposit for this request until sse_writer had started and reached its first receive() — and the old sse_writer sent the priming event before that first receive(). The session write_stream is also buffer-0, so the tool's second related send could not complete until the priming event was already handed into the response's content stream. In other words, on main any tool that emitted two or more related messages before closing had a structural guarantee that the client held a resumption cursor by close time. With the 16-slot buffer the router deposits and returns and the tool never blocks, so the unsafe window widens from "close after ≤1 message" to "close after ≤16 messages". The 0/1-message close pattern (the exact shape of tool_with_stream_close) was already exposed on main under the same writer-start lag — that part is pre-existing — but multi-notification polling tools that were previously safe by construction become unsafe with this change.\n\nStep-by-step proof. (1) Client POSTs tools/call id=7 (protocol ≥ 2025-11-25, event store configured). The handler stores the priming row, registers the 16-slot stream, start_soons the EventSourceResponse, and dispatches the request. (2) Under load, the data_sender_callable has not yet run — the production-observed lag from #1764. (3) The tool emits "progress 1" and "progress 2" (the router stores ids 4, 5 and deposits both into the 16-slot buffer without blocking), then calls await ctx.close_sse_stream() to switch the client to polling. (4) close_sse_stream closes the writer and both request-stream halves; the two buffered notifications are dropped. (5) _run_sse_writer starts, its priming send hits ClosedResourceError, and the SSE response completes with zero events. (6) The client saw no event id, does not reconnect (streamable_http.py:430), and the tool's response (stored later as id 6) is never delivered — the request hangs. On main, step (3)'s second send could not complete before the priming event was in the response pipeline, so step (6) could not occur for this sequence.\n\nOn the objection that the new window is unrealistic. It is true that on an idle in-process loop the data-sender task is ready within a couple of scheduler passes and essentially always wins the race — which is exactly why the existing tests (tool_with_stream_close, tool_with_multiple_notifications_and_close) never see this: they emit at most one notification before closing and the writer always starts first. But the PR's own motivation is that the lazily-started sse_writer lag is not reproducible on loopback yet is observed consistently in production by multiple #1764 reporters — the same premise that justifies the buffer fix is the condition under which this close-before-priming window now spans a tool's entire notification burst instead of at most one message. And while it is fair to say that on main the same stalled-writer regime was "worse" (the tool wedged at its second send and head-of-line blocked the whole session — the bug being fixed), the new failure mode is arguably harder to diagnose: the session stays healthy, the event store has everything, and exactly one request silently hangs with no resumption cursor and no error anywhere.\n\nHow to fix. Any of: (a) make close_sse_stream() drain-or-flush rather than discard — e.g. close only the send side of _request_streams[request_id] and let _run_sse_writer send the priming event plus the buffered messages before tearing down; (b) deliver the priming event eagerly from the POST handler (e.g. push it into the wire stream before dispatching the request) so a tool-initiated close can never strand the client without a cursor; or (c) have close_sse_stream() no-op / defer until the writer has actually started and sent the priming event. Option (b) also restores the invariant the PR description claims ("storage order == wire order" — and wire delivery before any tool-initiated close).

"Content-Type": CONTENT_TYPE_SSE,
**({MCP_SESSION_ID_HEADER: self.mcp_session_id} if self.mcp_session_id else {}),
}
response = EventSourceResponse(
content=sse_stream_reader,
data_sender_callable=sse_writer,
data_sender_callable=partial(
self._run_sse_writer, request_id, sse_stream_writer, request_stream_reader, priming_event
),
headers=headers,
)

Expand All @@ -633,20 +638,16 @@
finally:
await sse_stream_reader.aclose()

except Exception as err: # pragma: lax no cover
# Reached only when something raises during POST handling outside
# the per-SSE-stream guard above; whether tests reach this depends
# on client teardown timing.
except Exception as err:
logger.exception("Error handling POST request")
response = self._create_error_response(
f"Error handling POST request: {err}",
"Error handling POST request",
HTTPStatus.INTERNAL_SERVER_ERROR,
INTERNAL_ERROR,
)
await response(scope, receive, send)
if writer:
await writer.send(Exception(err))
return # pragma: no cover
await writer.send(Exception(err))
return
Comment on lines +641 to +650

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟣 Pre-existing issue (not introduced by this PR, but the PR rewrites exactly these lines and removes the coverage pragma): the outer except Exception in _handle_post_request unconditionally sends a fresh 500 and then await writer.send(Exception(err)), but it also wraps the notifications/responses branch which has already sent a 202 Accepted before its writer.send(session_message). If that send raises (e.g. ClosedResourceError from a concurrent DELETE/terminate()), the second await response(scope, receive, send) attempts another http.response.start on an already-responded connection and the resulting RuntimeError escapes handle_request as an unhandled ASGI error. Since these lines are being rewritten anyway, tracking whether a response has started (or catching Closed/BrokenResourceError around the 202-path writer.send) would close the gap.

Extended reasoning...

What the bug is. The rewritten outer error handler at the end of _handle_post_request (src/mcp/server/streamable_http.py:641-650) unconditionally builds a fresh 500 Response, calls await response(scope, receive, send), and then await writer.send(Exception(err)). However, this except wraps code paths where a response has already been sent on this connection — most concretely the notifications/responses branch, which sends a 202 Accepted (await response(scope, receive, send) for non-JSONRPCRequest messages) before doing await writer.send(session_message).

The code path that triggers it. writer here is _read_stream_writer. A concurrent DELETEterminate() (or session-shutdown teardown in connect()) closes that stream. So if a notification/response POST races termination, the 202 has already gone out on the wire, and then writer.send(session_message) raises anyio.ClosedResourceError (or BrokenResourceError if the session loop died). Control falls into the outer except Exception as err, which calls await response(scope, receive, send) a second time. On a real ASGI server (uvicorn/Starlette), starting a second http.response.start on a connection whose response is already complete raises RuntimeError. Because that RuntimeError is raised inside the except handler, it is not caught: it escapes _handle_post_request / handle_request to the ASGI server as an unhandled application error, and the trailing writer.send(Exception(err)) is never reached (it would also fail on the closed writer).

Step-by-step proof. (1) Client A POSTs a JSON-RPC notification; _handle_post_request reaches the if not isinstance(message, JSONRPCRequest) branch and sends 202 Accepted — http.response.start + body have gone out. (2) Concurrently, client B (or an idle-timeout path) issues DELETE /mcp; _handle_delete_requestterminate() closes _read_stream_writer. (3) Back in A's request task, await writer.send(session_message) raises ClosedResourceError. (4) The outer except logs, builds a 500, and calls await response(scope, receive, send) — the ASGI server raises RuntimeError("Response already started"). (5) That RuntimeError propagates out of handle_request; the manager awaits transport.handle_request directly in the per-request ASGI coroutine, so nothing upstream catches it — uvicorn logs "Exception in ASGI application" for a request that had already completed from the client's perspective.

Why existing code doesn't prevent it. The SSE branch and the JSON branch each have their own inner except blocks, so post-response failures there are swallowed before reaching the outer handler — the 202 path is the one already-responded path that funnels into this except. Nothing in the handler tracks whether a response has started.

Why this is pre-existing, and addressing the dissenting verifier. One verifier argued this is not actionable because the behavior is byte-for-byte identical to main: the old block also called await response(scope, receive, send) unconditionally (the removed if writer: guard only wrapped the writer.send, and writer is checked non-None at function entry anyway), and the trigger is a narrow teardown race whose only consequence is a server-side traceback for a request the client already got its 202 for — the session task group is unaffected and the skipped writer.send is moot since the writer being closed is what triggered the path. All of that is correct, which is why this is filed as pre-existing and non-blocking, not as a regression. The reason it is still worth a note is that this PR actively rewrites these exact lines, drops the # pragma: lax no cover, and asserts (via the new test_priming_store_failure_leaves_no_per_request_state test) that this is now a real, intentional 500 path — without guarding the case where a response has already started. Touching it now is essentially free.

How to fix. Either (a) track a response_started flag (set after each successful await response(...)) and only attempt the 500 when it is False, or (b) scope the outer except to the pre-response section / catch anyio.ClosedResourceError/BrokenResourceError separately around the 202-path writer.send and treat it as a no-op (the client already has its 202 and the session is being torn down). Either way, also skip the final writer.send(Exception(err)) when the writer is the thing that failed.


async def _handle_get_request(self, request: Request, send: Send) -> None:
"""Handle GET request to establish SSE.
Expand Down Expand Up @@ -697,13 +698,15 @@
return

# Create SSE stream
sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, str]](0)
sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[SSEEvent](0)

async def standalone_sse_writer():
try:
# Create a standalone message stream for server-initiated messages

self._request_streams[GET_STREAM_KEY] = anyio.create_memory_object_stream[EventMessage](0)
self._request_streams[GET_STREAM_KEY] = anyio.create_memory_object_stream[EventMessage](
REQUEST_STREAM_BUFFER_SIZE
)
standalone_stream_reader = self._request_streams[GET_STREAM_KEY][1]

async with sse_stream_writer, standalone_stream_reader:
Expand Down Expand Up @@ -871,7 +874,7 @@
replay_protocol_version = request.headers.get(MCP_PROTOCOL_VERSION_HEADER, DEFAULT_NEGOTIATED_VERSION)

# Create SSE stream for replay
sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, str]](0)
sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[SSEEvent](0)

async def replay_sender():
try:
Expand All @@ -886,22 +889,32 @@

# If stream ID not in mapping, create it
if stream_id and stream_id not in self._request_streams: # pragma: no branch
# Register SSE writer so close_sse_stream() can close it
self._sse_stream_writers[stream_id] = sse_stream_writer

# Send priming event for this new connection
await self._maybe_send_priming_event(stream_id, sse_stream_writer, replay_protocol_version)

# Create new request streams for this connection
self._request_streams[stream_id] = anyio.create_memory_object_stream[EventMessage](0)
msg_reader = self._request_streams[stream_id][1]

# Forward messages to SSE
async with msg_reader:
async for event_message in msg_reader:
event_data = self._create_event_data(event_message)

await sse_stream_writer.send(event_data)
try:
# Register SSE writer so close_sse_stream() can close it
self._sse_stream_writers[stream_id] = sse_stream_writer

# Prime the resumed connection so the client sees the stream
# is re-registered. The replay→live-tail ordering window here
# is pre-existing and tracked separately.
priming_event = await self._mint_priming_event(stream_id, replay_protocol_version)
if priming_event is not None:
await sse_stream_writer.send(priming_event)

# Create new request streams for this connection
self._request_streams[stream_id] = anyio.create_memory_object_stream[EventMessage](
REQUEST_STREAM_BUFFER_SIZE
)
msg_reader = self._request_streams[stream_id][1]

# Forward messages to SSE
async with msg_reader:
async for event_message in msg_reader:
event_data = self._create_event_data(event_message)

await sse_stream_writer.send(event_data)
finally:
self._sse_stream_writers.pop(stream_id, None)
await self._clean_up_memory_streams(stream_id)
except anyio.ClosedResourceError: # pragma: lax no cover
# Expected when close_sse_stream() is called
logger.debug("Replay SSE stream closed by close_sse_stream()")
Expand Down
77 changes: 77 additions & 0 deletions tests/interaction/transports/test_hosting_resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,43 @@ async def test_a_post_sse_stream_begins_with_a_priming_event_and_stamps_every_ev
)


@requirement("hosting:resume:priming")
async def test_the_priming_row_is_stored_before_any_handler_output_for_that_stream() -> None:
"""The priming cursor is the first row the event store records for a request's stream.

The POST handler stores the priming row before dispatching the request, so by construction
it precedes anything `message_router` can store for that stream id.
"""
store = SequencedEventStore()
mcp = MCPServer("resumable")

@mcp.tool()
async def burst(ctx: Context) -> str:
await ctx.info("a") # pyright: ignore[reportDeprecated]
await ctx.info("b") # pyright: ignore[reportDeprecated]
await ctx.info("c") # pyright: ignore[reportDeprecated]
return "done"

async with mounted_app(mcp, event_store=store) as (http, _):
session_id = await initialize_via_http(http)
with anyio.fail_after(5):
async with http.stream( # pragma: no branch
"POST", "/mcp", content=_tools_call(2, "burst", {}), headers=base_headers(session_id=session_id)
) as response:
await _read_events(response, 5)

# initialize wrote two rows (its own priming + response); everything after is this call.
call_rows = store._events[2:]
stream_id = call_rows[0][0]
assert [(s, None if m is None else type(m).__name__) for s, m in call_rows] == [
(stream_id, None),
(stream_id, "JSONRPCNotification"),
(stream_id, "JSONRPCNotification"),
(stream_id, "JSONRPCNotification"),
(stream_id, "JSONRPCResponse"),
]


@requirement("hosting:resume:replay")
@requirement("hosting:resume:stream-scoped")
@requirement("hosting:resume:buffered-replay")
Expand Down Expand Up @@ -182,6 +219,46 @@ async def count(ctx: Context) -> str:
)


@requirement("hosting:resume:priming")
async def test_a_pre_2025_11_25_reconnect_replays_without_minting_a_priming_event() -> None:
"""A pre-2025-11-25 client reconnecting via Last-Event-ID gets the replay with no priming row.

The store-length assertion is the load-bearing proof that no priming cursor was minted.
"""
release = anyio.Event()
store = SequencedEventStore()
mcp = MCPServer("resumable")

@mcp.tool()
async def count(ctx: Context) -> str:
await ctx.info("tick 1") # pyright: ignore[reportDeprecated]
await release.wait()
await ctx.info("tick 2") # pyright: ignore[reportDeprecated]
return "counted"

async with mounted_app(mcp, event_store=store, retry_interval=0) as (http, _):
session_id = await initialize_via_http(http)
with anyio.fail_after(5):
async with http.stream(
"POST", "/mcp", content=_tools_call(1, "count", {}), headers=base_headers(session_id=session_id)
) as response:
_, first = await _read_events(response, 2)
release.set()
await store.wait_until_stored(6)
old_client_headers = base_headers(session_id=session_id) | {
"mcp-protocol-version": "2025-06-18",
"last-event-id": first.id,
}
async with http.stream("GET", "/mcp", headers=old_client_headers) as replay: # pragma: no branch
assert replay.status_code == 200
missed = await _read_events(replay, 2)

assert [(event.id, bool(event.data)) for event in missed] == snapshot([("5", True), ("6", True)])
# No priming cursor was minted on reconnect: the store still holds only the six rows
# written before the GET (init priming+response, POST priming, tick 1, tick 2, result).
assert len(store._events) == 6


@requirement("hosting:resume:bad-event-id")
async def test_an_unknown_last_event_id_yields_an_empty_replay_stream() -> None:
"""A Last-Event-ID the event store cannot map produces an empty SSE stream rather than an error.
Expand Down
Loading
Loading