Skip to content

Commit 53138e6

Browse files
committed
Add mcp_proxy for bidirectional message forwarding
Adds a convenience function for proxying messages between two MCP transports, enabling bidirectional message forwarding with proper error handling. Features: - Bidirectional forwarding between client and server transports - Optional error callback (sync or async) for exceptions on streams - Graceful handling of closed/broken streams - Clean shutdown on context exit This is a simpler reimplementation of the proxy pattern from #1711/#1763, addressing all review feedback.
1 parent a3a4b8d commit 53138e6

File tree

3 files changed

+384
-0
lines changed

3 files changed

+384
-0
lines changed

‎src/mcp/__init__.py‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from .client.sessionimportClientSession
22
from .client.session_groupimportClientSessionGroup
33
from .client.stdioimportStdioServerParameters, stdio_client
4+
from .proxyimportMessageStream, mcp_proxy
45
from .server.sessionimportServerSession
56
from .server.stdioimportstdio_server
67
from .shared.exceptionsimportMcpError, UrlElicitationRequiredError
@@ -97,6 +98,7 @@
9798
"LoggingLevel",
9899
"LoggingMessageNotification",
99100
"McpError",
101+
"MessageStream",
100102
"Notification",
101103
"PingRequest",
102104
"ProgressNotification",
@@ -130,6 +132,7 @@
130132
"ToolUseContent",
131133
"UnsubscribeRequest",
132134
"UrlElicitationRequiredError",
135+
"mcp_proxy",
133136
"stdio_client",
134137
"stdio_server",
135138
]

‎src/mcp/proxy.py‎

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
"""Utilities for proxying messages between MCP transports."""
2+
3+
fromcollections.abcimportAsyncGenerator, Awaitable, Callable
4+
fromcontextlibimportasynccontextmanager
5+
6+
importanyio
7+
fromanyio.streams.memoryimportMemoryObjectReceiveStream, MemoryObjectSendStream
8+
9+
frommcp.shared.messageimportSessionMessage
10+
11+
MessageStream=tuple[
12+
MemoryObjectReceiveStream[SessionMessage|Exception],
13+
MemoryObjectSendStream[SessionMessage],
14+
]
15+
16+
17+
@asynccontextmanager
18+
asyncdefmcp_proxy(
19+
client_streams: MessageStream,
20+
server_streams: MessageStream,
21+
on_error: Callable[[Exception], None|Awaitable[None]] |None=None,
22+
) ->AsyncGenerator[None, None]:
23+
"""Proxy messages bidirectionally between two MCP transports.
24+
25+
Sets up bidirectional message forwarding between two transport pairs.
26+
Messages from the client are forwarded to the server, and vice versa.
27+
When the context exits, both forwarding directions are cancelled.
28+
29+
Args:
30+
client_streams: A tuple of (read_stream, write_stream) for the client side.
31+
server_streams: A tuple of (read_stream, write_stream) for the server side.
32+
on_error: Optional callback for handling exceptions received on streams.
33+
Can be sync or async. Called with the Exception object.
34+
35+
Example:
36+
```python
37+
async with mcp_proxy(
38+
client_streams=(client_read, client_write),
39+
server_streams=(server_read, server_write),
40+
on_error=lambda e: print(f"Error:{e}"),
41+
):
42+
# Proxy is active, forwarding messages bidirectionally
43+
await some_operation()
44+
# Forwarding stops when exiting the context
45+
```
46+
"""
47+
client_read, client_write=client_streams
48+
server_read, server_write=server_streams
49+
50+
asyncdefforward(
51+
read: MemoryObjectReceiveStream[SessionMessage|Exception],
52+
write: MemoryObjectSendStream[SessionMessage],
53+
) ->None:
54+
asyncformsginread:
55+
ifisinstance(msg, Exception):
56+
ifon_error:
57+
try:
58+
result=on_error(msg)
59+
ifisinstance(result, Awaitable):
60+
awaitresult
61+
exceptException:
62+
pass# Don't let callback errors crash the proxy
63+
else:
64+
try:
65+
awaitwrite.send(msg)
66+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
67+
return# Destination closed, stop this direction
68+
69+
asyncwithanyio.create_task_group() astg:
70+
tg.start_soon(forward, client_read, server_write)
71+
tg.start_soon(forward, server_read, client_write)
72+
try:
73+
yield
74+
finally:
75+
tg.cancel_scope.cancel()

0 commit comments

Comments
(0)