diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index c1c8a0f61..63eff3750 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -288,6 +288,8 @@ def _create_error_response( status_code: HTTPStatus, error_code: int = INVALID_REQUEST, headers: dict[str, str] | None = None, + *, + request_id: RequestId | None = None, ) -> Response: """Create an error response with a simple string message.""" response_headers = {"Content-Type": CONTENT_TYPE_JSON} @@ -300,7 +302,7 @@ def _create_error_response( # Return a properly formatted JSON error response error_response = JSONRPCError( jsonrpc="2.0", - id=None, + id=request_id, error=ErrorData(code=error_code, message=error_message), ) @@ -436,7 +438,7 @@ async def _validate_accept_header(self, request: Request, scope: Scope, send: Se return False return True - async def _handle_post_request(self, scope: Scope, request: Request, receive: Receive, send: Send) -> None: + async def _handle_post_request(self, scope: Scope, request: Request, receive: Receive, send: Send) -> None: # noqa: C901 """Handle POST requests containing JSON-RPC messages.""" writer = self._read_stream_writer if writer is None: # pragma: no cover @@ -523,6 +525,19 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re # Extract the request ID outside the try block for proper scope request_id = str(message.id) + + # Reject duplicate request IDs — a client that reuses an id while + # the original request is still in flight violates the JSON-RPC + # spec and would silently overwrite the prior stream slot. + if request_id in self._request_streams: + response = self._create_error_response( + f"Conflict: Request ID {request_id!r} is already in flight on this session", + HTTPStatus.CONFLICT, + request_id=message.id, + ) + await response(scope, receive, send) + return + # Register this stream for the request ID self._request_streams[request_id] = anyio.create_memory_object_stream[EventMessage](0) request_stream_reader = self._request_streams[request_id][1] diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 7ceac8e86..0a1efa8c7 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -2384,3 +2384,73 @@ async def asgi_receive() -> Message: assert body_chunks[-1] == {"type": "http.response.body", "body": b"", "more_body": False} assert "Error in standalone SSE writer" not in caplog.text assert "Error in standalone SSE response" not in caplog.text + + +@pytest.mark.anyio +async def test_duplicate_request_id_rejected_with_409() -> None: + """A POST with a request id already in _request_streams is rejected 409. + + When a client reuses a JSON-RPC request id while the original request + is still in flight, the server must surface the violation instead of + silently overwriting the prior stream slot (gh-2655). + """ + transport = StreamableHTTPServerTransport( + mcp_session_id="test-session", + security_settings=TransportSecuritySettings(enable_dns_rebinding_protection=False), + ) + # Satisfy the read-stream guard so the POST handler proceeds. + read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0) + transport._read_stream_writer = read_stream_writer # pyright: ignore[reportPrivateUsage] + + # Seed an in-flight request stream so the duplicate check triggers. + duplicate_id = "dup-1" + seeded_send, seeded_receive = anyio.create_memory_object_stream[EventMessage](0) + transport._request_streams[duplicate_id] = (seeded_send, seeded_receive) # pyright: ignore[reportPrivateUsage] + + sent: list[Message] = [] + + async def asgi_send(message: Message) -> None: + sent.append(message) + + async def asgi_receive() -> Message: + return { + "type": "http.request", + "body": json.dumps( + { + "jsonrpc": "2.0", + "method": "tools/call", + "params": {"name": "test", "arguments": {}}, + "id": duplicate_id, + } + ).encode(), + "more_body": False, + } + + scope: Scope = { + "type": "http", + "method": "POST", + "path": "/mcp", + "query_string": b"", + "headers": [ + (b"accept", b"application/json, text/event-stream"), + (b"content-type", b"application/json"), + (b"mcp-session-id", b"test-session"), + ], + } + + async with read_stream_writer, read_stream, seeded_send, seeded_receive: + with anyio.fail_after(5): + await transport.handle_request(scope, asgi_receive, asgi_send) + + status = next(m["status"] for m in sent if m["type"] == "http.response.start") + assert status == 409 + + body = b"".join(m["body"] for m in sent if m["type"] == "http.response.body") + error_response = json.loads(body) + assert error_response["jsonrpc"] == "2.0" + assert error_response["id"] == duplicate_id + assert error_response["error"]["code"] == -32600 + assert duplicate_id in error_response["error"]["message"] + + # The original in-flight stream must not have been replaced. + assert duplicate_id in transport._request_streams # pyright: ignore[reportPrivateUsage]