From 5d3c43db2f3515ca00f4bb696554dd59f267b16a Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Fri, 16 Jan 2026 18:20:05 +0100 Subject: [PATCH 1/5] refactor: move StreamableHTTPASGIApp to streamable_http module - Move StreamableHTTPASGIApp from fastmcp/server.py to streamable_http.py - Move streamable_http_app() implementation from FastMCP to lowlevel Server - Add session_manager property to lowlevel Server, FastMCP delegates to it - Fix circular import by using TYPE_CHECKING in streamable_http_manager.py Claude-Generated-By: Claude Code (cli/claude-opus-4-5=100%) Claude-Steers: 8 Claude-Permission-Prompts: 1 Claude-Escapes: 0 --- src/mcp/server/fastmcp/server.py | 132 ++------------------ src/mcp/server/lowlevel/server.py | 139 ++++++++++++++++++++++ src/mcp/server/streamable_http.py | 15 ++- src/mcp/server/streamable_http_manager.py | 6 +- 4 files changed, 170 insertions(+), 122 deletions(-) diff --git a/src/mcp/server/fastmcp/server.py b/src/mcp/server/fastmcp/server.py index 75f2d2237..cb6d6e0f9 100644 --- a/src/mcp/server/fastmcp/server.py +++ b/src/mcp/server/fastmcp/server.py @@ -165,7 +165,6 @@ def __init__( if auth_server_provider and not token_verifier: # pragma: no cover self._token_verifier = ProviderTokenVerifier(auth_server_provider) self._custom_starlette_routes: list[Route] = [] - self._session_manager: StreamableHTTPSessionManager | None = None # Set up MCP protocol handlers self._setup_handlers() @@ -211,14 +210,7 @@ def session_manager(self) -> StreamableHTTPSessionManager: Raises: RuntimeError: If called before streamable_http_app() has been called. """ - if self._session_manager is None: # pragma: no cover - raise RuntimeError( - "Session manager can only be accessed after" - "calling streamable_http_app()." - "The session manager is created lazily" - "to avoid unnecessary initialization." - ) - return self._session_manager # pragma: no cover + return self._mcp_server.session_manager @overload def run(self, transport: Literal["stdio"] = ...) -> None: ... @@ -929,107 +921,19 @@ def streamable_http_app( host: str = "127.0.0.1", ) -> Starlette: """Return an instance of the StreamableHTTP server app.""" - from starlette.middleware import Middleware - - # Auto-enable DNS rebinding protection for localhost (IPv4 and IPv6) - if transport_security is None and host in ("127.0.0.1", "localhost", "::1"): - transport_security = TransportSecuritySettings( - enable_dns_rebinding_protection=True, - allowed_hosts=["127.0.0.1:*", "localhost:*", "[::1]:*"], - allowed_origins=["http://127.0.0.1:*", "http://localhost:*", "http://[::1]:*"], - ) - - # Create session manager on first call (lazy initialization) - if self._session_manager is None: # pragma: no branch - self._session_manager = StreamableHTTPSessionManager( - app=self._mcp_server, - event_store=event_store, - retry_interval=retry_interval, - json_response=json_response, - stateless=stateless_http, - security_settings=transport_security, - ) - - # Create the ASGI handler - streamable_http_app = StreamableHTTPASGIApp(self._session_manager) - - # Create routes - routes: list[Route | Mount] = [] - middleware: list[Middleware] = [] - required_scopes: list[str] = [] - - # Set up auth if configured - if self.settings.auth: # pragma: no cover - required_scopes = self.settings.auth.required_scopes or [] - - # Add auth middleware if token verifier is available - if self._token_verifier: - middleware = [ - Middleware( - AuthenticationMiddleware, - backend=BearerAuthBackend(self._token_verifier), - ), - Middleware(AuthContextMiddleware), - ] - - # Add auth endpoints if auth server provider is configured - if self._auth_server_provider: - from mcp.server.auth.routes import create_auth_routes - - routes.extend( - create_auth_routes( - provider=self._auth_server_provider, - issuer_url=self.settings.auth.issuer_url, - service_documentation_url=self.settings.auth.service_documentation_url, - client_registration_options=self.settings.auth.client_registration_options, - revocation_options=self.settings.auth.revocation_options, - ) - ) - - # Set up routes with or without auth - if self._token_verifier: # pragma: no cover - # Determine resource metadata URL - resource_metadata_url = None - if self.settings.auth and self.settings.auth.resource_server_url: - from mcp.server.auth.routes import build_resource_metadata_url - - # Build compliant metadata URL for WWW-Authenticate header - resource_metadata_url = build_resource_metadata_url(self.settings.auth.resource_server_url) - - routes.append( - Route( - streamable_http_path, - endpoint=RequireAuthMiddleware(streamable_http_app, required_scopes, resource_metadata_url), - ) - ) - else: - # Auth is disabled, no wrapper needed - routes.append( - Route( - streamable_http_path, - endpoint=streamable_http_app, - ) - ) - - # Add protected resource metadata endpoint if configured as RS - if self.settings.auth and self.settings.auth.resource_server_url: # pragma: no cover - from mcp.server.auth.routes import create_protected_resource_routes - - routes.extend( - create_protected_resource_routes( - resource_url=self.settings.auth.resource_server_url, - authorization_servers=[self.settings.auth.issuer_url], - scopes_supported=self.settings.auth.required_scopes, - ) - ) - - routes.extend(self._custom_starlette_routes) - - return Starlette( + return self._mcp_server.streamable_http_app( + streamable_http_path=streamable_http_path, + json_response=json_response, + stateless_http=stateless_http, + event_store=event_store, + retry_interval=retry_interval, + transport_security=transport_security, + host=host, + auth=self.settings.auth, + token_verifier=self._token_verifier, + auth_server_provider=self._auth_server_provider, + custom_starlette_routes=self._custom_starlette_routes, debug=self.settings.debug, - routes=routes, - middleware=middleware, - lifespan=lambda app: self.session_manager.run(), ) async def list_prompts(self) -> list[MCPPrompt]: @@ -1071,16 +975,6 @@ async def get_prompt(self, name: str, arguments: dict[str, Any] | None = None) - raise ValueError(str(e)) -class StreamableHTTPASGIApp: - """ASGI application for Streamable HTTP server transport.""" - - def __init__(self, session_manager: StreamableHTTPSessionManager): - self.session_manager = session_manager - - async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: # pragma: no cover - await self.session_manager.handle_request(scope, receive, send) - - class Context(BaseModel, Generic[ServerSessionT, LifespanContextT, RequestT]): """Context object providing access to MCP capabilities. diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 9d600a6b8..ae31ae732 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -79,15 +79,27 @@ async def main(): import anyio import jsonschema from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream +from starlette.applications import Starlette +from starlette.middleware import Middleware +from starlette.middleware.authentication import AuthenticationMiddleware +from starlette.routing import Mount, Route from typing_extensions import TypeVar import mcp.types as types +from mcp.server.auth.middleware.auth_context import AuthContextMiddleware +from mcp.server.auth.middleware.bearer_auth import BearerAuthBackend, RequireAuthMiddleware +from mcp.server.auth.provider import OAuthAuthorizationServerProvider, TokenVerifier +from mcp.server.auth.routes import build_resource_metadata_url, create_auth_routes, create_protected_resource_routes +from mcp.server.auth.settings import AuthSettings from mcp.server.experimental.request_context import Experimental from mcp.server.lowlevel.experimental import ExperimentalHandlers from mcp.server.lowlevel.func_inspection import create_call_wrapper from mcp.server.lowlevel.helper_types import ReadResourceContents from mcp.server.models import InitializationOptions from mcp.server.session import ServerSession +from mcp.server.streamable_http import EventStore, StreamableHTTPASGIApp +from mcp.server.streamable_http_manager import StreamableHTTPSessionManager +from mcp.server.transport_security import TransportSecuritySettings from mcp.shared.context import RequestContext from mcp.shared.exceptions import McpError, UrlElicitationRequiredError from mcp.shared.message import ServerMessageMetadata, SessionMessage @@ -162,6 +174,7 @@ def __init__( self.notification_handlers: dict[type, Callable[..., Awaitable[None]]] = {} self._tool_cache: dict[str, types.Tool] = {} self._experimental_handlers: ExperimentalHandlers | None = None + self._session_manager: StreamableHTTPSessionManager | None = None logger.debug("Initializing server %r", name) def create_initialization_options( @@ -258,6 +271,20 @@ def experimental(self) -> ExperimentalHandlers: self._experimental_handlers = ExperimentalHandlers(self, self.request_handlers, self.notification_handlers) return self._experimental_handlers + @property + def session_manager(self) -> StreamableHTTPSessionManager: + """Get the StreamableHTTP session manager. + + Raises: + RuntimeError: If called before streamable_http_app() has been called. + """ + if self._session_manager is None: + raise RuntimeError( + "Session manager can only be accessed after calling streamable_http_app(). " + "The session manager is created lazily to avoid unnecessary initialization." + ) + return self._session_manager + def list_prompts(self): def decorator( func: Callable[[], Awaitable[list[types.Prompt]]] @@ -801,6 +828,118 @@ async def _handle_notification(self, notify: Any): except Exception: # pragma: no cover logger.exception("Uncaught exception in notification handler") + def streamable_http_app( + self, + *, + streamable_http_path: str = "/mcp", + json_response: bool = False, + stateless_http: bool = False, + event_store: EventStore | None = None, + retry_interval: int | None = None, + transport_security: TransportSecuritySettings | None = None, + host: str = "127.0.0.1", + auth: AuthSettings | None = None, + token_verifier: TokenVerifier | None = None, + auth_server_provider: (OAuthAuthorizationServerProvider[Any, Any, Any] | None) = None, + custom_starlette_routes: list[Route] | None = None, + debug: bool = False, + ) -> Starlette: + """Return an instance of the StreamableHTTP server app.""" + # Auto-enable DNS rebinding protection for localhost (IPv4 and IPv6) + if transport_security is None and host in ("127.0.0.1", "localhost", "::1"): + transport_security = TransportSecuritySettings( + enable_dns_rebinding_protection=True, + allowed_hosts=["127.0.0.1:*", "localhost:*", "[::1]:*"], + allowed_origins=["http://127.0.0.1:*", "http://localhost:*", "http://[::1]:*"], + ) + + session_manager = StreamableHTTPSessionManager( + app=self, + event_store=event_store, + retry_interval=retry_interval, + json_response=json_response, + stateless=stateless_http, + security_settings=transport_security, + ) + self._session_manager = session_manager + + # Create the ASGI handler + streamable_http_app = StreamableHTTPASGIApp(session_manager) + + # Create routes + routes: list[Route | Mount] = [] + middleware: list[Middleware] = [] + required_scopes: list[str] = [] + + # Set up auth if configured + if auth: # pragma: no cover + required_scopes = auth.required_scopes or [] + + # Add auth middleware if token verifier is available + if token_verifier: + middleware = [ + Middleware( + AuthenticationMiddleware, + backend=BearerAuthBackend(token_verifier), + ), + Middleware(AuthContextMiddleware), + ] + + # Add auth endpoints if auth server provider is configured + if auth_server_provider: + routes.extend( + create_auth_routes( + provider=auth_server_provider, + issuer_url=auth.issuer_url, + service_documentation_url=auth.service_documentation_url, + client_registration_options=auth.client_registration_options, + revocation_options=auth.revocation_options, + ) + ) + + # Set up routes with or without auth + if token_verifier: # pragma: no cover + # Determine resource metadata URL + resource_metadata_url = None + if auth and auth.resource_server_url: + # Build compliant metadata URL for WWW-Authenticate header + resource_metadata_url = build_resource_metadata_url(auth.resource_server_url) + + routes.append( + Route( + streamable_http_path, + endpoint=RequireAuthMiddleware(streamable_http_app, required_scopes, resource_metadata_url), + ) + ) + else: + # Auth is disabled, no wrapper needed + routes.append( + Route( + streamable_http_path, + endpoint=streamable_http_app, + ) + ) + + # Add protected resource metadata endpoint if configured as RS + if auth and auth.resource_server_url: # pragma: no cover + routes.extend( + create_protected_resource_routes( + resource_url=auth.resource_server_url, + authorization_servers=[auth.issuer_url], + scopes_supported=auth.required_scopes, + ) + ) + + if custom_starlette_routes: + routes.extend(custom_starlette_routes) + + return Starlette( + debug=debug, + routes=routes, + middleware=middleware, + lifespan=lambda app: session_manager.run(), + ) + async def _ping_handler(request: types.PingRequest) -> types.ServerResult: return types.ServerResult(types.EmptyResult()) diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index 137a7da39..55df07c2d 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -14,7 +14,10 @@ from contextlib import asynccontextmanager from dataclasses import dataclass from http import HTTPStatus -from typing import Any +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from mcp.server.streamable_http_manager import StreamableHTTPSessionManager import anyio from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream @@ -1054,3 +1057,13 @@ async def message_router(): # pragma: no cover except Exception as e: # pragma: no cover # During cleanup, we catch all exceptions since streams might be in various states logger.debug(f"Error closing streams: {e}") + + +class StreamableHTTPASGIApp: + """ASGI application for Streamable HTTP server transport.""" + + def __init__(self, session_manager: "StreamableHTTPSessionManager"): + self.session_manager = session_manager + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: # pragma: no cover + await self.session_manager.handle_request(scope, receive, send) diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index 6a17f9c53..4e8c54a53 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -6,7 +6,7 @@ import logging from collections.abc import AsyncIterator from http import HTTPStatus -from typing import Any +from typing import TYPE_CHECKING, Any from uuid import uuid4 import anyio @@ -15,7 +15,6 @@ from starlette.responses import Response from starlette.types import Receive, Scope, Send -from mcp.server.lowlevel.server import Server as MCPServer from mcp.server.streamable_http import ( MCP_SESSION_ID_HEADER, EventStore, @@ -24,6 +23,9 @@ from mcp.server.transport_security import TransportSecuritySettings from mcp.types import INVALID_REQUEST, ErrorData, JSONRPCError +if TYPE_CHECKING: + from mcp.server.lowlevel.server import Server as MCPServer + logger = logging.getLogger(__name__) From bb67ae637204c0ef371655f58867961be5fbca08 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Fri, 16 Jan 2026 18:23:13 +0100 Subject: [PATCH 2/5] refactor: move StreamableHTTPASGIApp to streamable_http_manager module Move StreamableHTTPASGIApp from streamable_http.py to streamable_http_manager.py since it directly depends on StreamableHTTPSessionManager. Claude-Generated-By: Claude Code (cli/claude-opus-4-5=100%) Claude-Steers: 0 Claude-Permission-Prompts: 2 Claude-Escapes: 0 --- src/mcp/server/lowlevel/server.py | 4 ++-- src/mcp/server/streamable_http.py | 15 +-------------- src/mcp/server/streamable_http_manager.py | 10 ++++++++++ 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index ae31ae732..809401879 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -97,8 +97,8 @@ async def main(): from mcp.server.lowlevel.helper_types import ReadResourceContents from mcp.server.models import InitializationOptions from mcp.server.session import ServerSession -from mcp.server.streamable_http import EventStore, StreamableHTTPASGIApp -from mcp.server.streamable_http_manager import StreamableHTTPSessionManager +from mcp.server.streamable_http import EventStore +from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager from mcp.server.transport_security import TransportSecuritySettings from mcp.shared.context import RequestContext from mcp.shared.exceptions import McpError, UrlElicitationRequiredError diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index 55df07c2d..137a7da39 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -14,10 +14,7 @@ from contextlib import asynccontextmanager from dataclasses import dataclass from http import HTTPStatus -from typing import TYPE_CHECKING, Any - -if TYPE_CHECKING: - from mcp.server.streamable_http_manager import StreamableHTTPSessionManager +from typing import Any import anyio from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream @@ -1057,13 +1054,3 @@ async def message_router(): # pragma: no cover except Exception as e: # pragma: no cover # During cleanup, we catch all exceptions since streams might be in various states logger.debug(f"Error closing streams: {e}") - - -class StreamableHTTPASGIApp: - """ASGI application for Streamable HTTP server transport.""" - - def __init__(self, session_manager: "StreamableHTTPSessionManager"): - self.session_manager = session_manager - - async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: # pragma: no cover - await self.session_manager.handle_request(scope, receive, send) diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index 4e8c54a53..3213eceff 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -292,3 +292,13 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE media_type="application/json", ) await response(scope, receive, send) + + +class StreamableHTTPASGIApp: + """ASGI application for Streamable HTTP server transport.""" + + def __init__(self, session_manager: StreamableHTTPSessionManager): + self.session_manager = session_manager + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: # pragma: no cover + await self.session_manager.handle_request(scope, receive, send) From d68c85fffc9dfe0e3a9095c95b48fd57b8b649b2 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Fri, 16 Jan 2026 18:26:26 +0100 Subject: [PATCH 3/5] chore: add pragma no cover for untested branches Claude-Generated-By: Claude Code (cli/claude-opus-4-5=100%) Claude-Steers: 0 Claude-Permission-Prompts: 1 Claude-Escapes: 0 --- src/mcp/server/fastmcp/server.py | 2 +- src/mcp/server/lowlevel/server.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/mcp/server/fastmcp/server.py b/src/mcp/server/fastmcp/server.py index cb6d6e0f9..ef75e1fdf 100644 --- a/src/mcp/server/fastmcp/server.py +++ b/src/mcp/server/fastmcp/server.py @@ -210,7 +210,7 @@ def session_manager(self) -> StreamableHTTPSessionManager: Raises: RuntimeError: If called before streamable_http_app() has been called. """ - return self._mcp_server.session_manager + return self._mcp_server.session_manager # pragma: no cover @overload def run(self, transport: Literal["stdio"] = ...) -> None: ... diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 809401879..49987093f 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -278,12 +278,12 @@ def session_manager(self) -> StreamableHTTPSessionManager: Raises: RuntimeError: If called before streamable_http_app() has been called. """ - if self._session_manager is None: + if self._session_manager is None: # pragma: no cover raise RuntimeError( "Session manager can only be accessed after calling streamable_http_app(). " "The session manager is created lazily to avoid unnecessary initialization." ) - return self._session_manager + return self._session_manager # pragma: no cover def list_prompts(self): def decorator( @@ -930,7 +930,7 @@ def streamable_http_app( ) ) - if custom_starlette_routes: + if custom_starlette_routes: # pragma: no cover routes.extend(custom_starlette_routes) return Starlette( From a7117e5116a7d993d292edfabee77524281b3a79 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Sat, 17 Jan 2026 11:31:03 +0100 Subject: [PATCH 4/5] docs: add migration guide entry for StreamableHTTPASGIApp move Claude-Generated-By: Claude Code (cli/claude-opus-4-5=100%) Claude-Steers: 1 Claude-Permission-Prompts: 1 Claude-Escapes: 0 --- docs/migration.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/docs/migration.md b/docs/migration.md index eac51061c..7acb839b0 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -222,6 +222,22 @@ Affected types: The `ClientSession.read_resource()`, `subscribe_resource()`, and `unsubscribe_resource()` methods now accept both `str` and `AnyUrl` for backwards compatibility. +### `StreamableHTTPASGIApp` moved to `streamable_http_manager` module + +The `StreamableHTTPASGIApp` class has been moved from `mcp.server.fastmcp.server` to `mcp.server.streamable_http_manager`. + +**Before (v1):** + +```python +from mcp.server.fastmcp.server import StreamableHTTPASGIApp +``` + +**After (v2):** + +```python +from mcp.server.streamable_http_manager import StreamableHTTPASGIApp +``` + ## Deprecations From 6196f2bbb66e56838238376826b7e9a27bf877d2 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Sat, 17 Jan 2026 11:32:01 +0100 Subject: [PATCH 5/5] docs: add migration guide for StreamableHTTPASGIApp move and new lowlevel streamable_http_app() Claude-Generated-By: Claude Code (cli/claude-opus-4-5=100%) Claude-Steers: 0 Claude-Permission-Prompts: 1 Claude-Escapes: 1 --- docs/migration.md | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/docs/migration.md b/docs/migration.md index 7acb839b0..1d2119507 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -244,7 +244,29 @@ from mcp.server.streamable_http_manager import StreamableHTTPASGIApp ## New Features - +### `streamable_http_app()` available on lowlevel Server + +The `streamable_http_app()` method is now available directly on the lowlevel `Server` class, not just `FastMCP`. This allows using the streamable HTTP transport without the FastMCP wrapper. + +```python +from mcp.server.lowlevel.server import Server + +server = Server("my-server") + +# Register handlers... +@server.list_tools() +async def list_tools(): + return [...] + +# Create a Starlette app for streamable HTTP +app = server.streamable_http_app( + streamable_http_path="/mcp", + json_response=False, + stateless_http=False, +) +``` + +The lowlevel `Server` also now exposes a `session_manager` property to access the `StreamableHTTPSessionManager` after calling `streamable_http_app()`. ## Need Help?