- Notifications
You must be signed in to change notification settings - Fork 2.9k
Implement SEP-990 Enterprise Managed OAuth#1721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
BinoyOza-okta wants to merge 16 commits into modelcontextprotocol:mainChoose a base branch from BinoyOza-okta:feature/sep-990-enterprise-oauth
base:main
Could not load branches
Branch not found: {{refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline, and old review comments may become outdated.
Uh oh!
There was an error while loading. Please reload this page.
Open
Changes from all commits
Commits
Show all changes
16 commits Select commit Hold shift + click to select a range
f0de9e7 - Implemented SEP-990 feature for providing support for Enterprise Ma…
BinoyOza-okta 9759c7a Added test cases for missing lines of code.
BinoyOza-okta 7f80d32 - Added tests cases for few of the missing lines. src/mcp/client/auth…
BinoyOza-okta 1ea72c5 - Fixed pre-commit errors.
BinoyOza-okta c07b7b9 - Tried to fix the ruff error.
BinoyOza-okta f431b54 - Fixed ruff errors.
BinoyOza-okta d4392ae - Removed server side changes for enterprise_managed_auth.py
BinoyOza-okta db2f02c - Added README.md changes for SEP-990 implementation for enterprise m…
BinoyOza-okta 5fb2c0f - Resolved pyright checks error.
BinoyOza-okta 005bad4 - Resolved README.md file fixes for removing unused imports.
BinoyOza-okta 73b12b7 - Resolved pyright errors.
BinoyOza-okta 8214778 - Added new test cases for the missing code lines.
BinoyOza-okta 04ffe5a - Fixed the failing test cases.
BinoyOza-okta 09c05aa - Fixed the test cases.
BinoyOza-okta 28bb315 - Added typing for request payload structures TokenExchangeRequestDat…
BinoyOza-okta 84162df - Updated test case to include IDJAGClaims type model to verify payload.
BinoyOza-okta File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Uh oh!
There was an error while loading. Please reload this page.
Jump to
Jump to file
Failed to load files.
Loading
Uh oh!
There was an error while loading. Please reload this page.
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -61,6 +61,7 @@ | ||
| - [Writing MCP Clients](#writing-mcp-clients) | ||
| - [Client Display Utilities](#client-display-utilities) | ||
| - [OAuth Authentication for Clients](#oauth-authentication-for-clients) | ||
| - [Enterprise Managed Authorization](#enterprise-managed-authorization) | ||
| - [Parsing Tool Results](#parsing-tool-results) | ||
| - [MCP Primitives](#mcp-primitives) | ||
| - [Server Capabilities](#server-capabilities) | ||
| @@ -2356,6 +2357,288 @@ _Full example: [examples/snippets/clients/oauth_client.py](https://github.com/mo | ||
| For a complete working example, see [`examples/clients/simple-auth-client/`](examples/clients/simple-auth-client/). | ||
| #### Enterprise Managed Authorization | ||
| The SDK includes support for Enterprise Managed Authorization (SEP-990), which enables MCP clients to connect to protected servers using enterprise Single Sign-On (SSO) systems. This implementation supports: | ||
| - **RFC 8693**: OAuth 2.0 Token Exchange (ID Token → ID-JAG) | ||
| - **RFC 7523**: JSON Web Token (JWT) Profile for OAuth 2.0 Authorization Grants (ID-JAG → Access Token) | ||
| - Integration with enterprise identity providers (Okta, Azure AD, etc.) | ||
| **Key Components:** | ||
| The `EnterpriseAuthOAuthClientProvider` class extends the standard OAuth provider to implement the enterprise authorization flow: | ||
| **Token Exchange Flow:** | ||
| 1. **Obtain ID Token** from your enterprise IdP (e.g., Okta, Azure AD) | ||
| 2. **Exchange ID Token for ID-JAG** using RFC 8693 Token Exchange | ||
| 3. **Exchange ID-JAG for Access Token** using RFC 7523 JWT Bearer Grant | ||
| 4. **Use Access Token** to call protected MCP server tools | ||
| **Using the Access Token with MCP Server:** | ||
| 1. Once you have obtained the access token, you can use it to authenticate requests to the MCP server | ||
| 2. The access token is automatically included in all subsequent requests to the MCP server, allowing you to access protected tools and resources based on your enterprise identity and permissions. | ||
| **Handling Token Expiration and Refresh:** | ||
| Access tokens have a limited lifetime and will expire. When tokens expire: | ||
| - **Check Token Expiration**: Use the `expires_in` field to determine when the token expires | ||
| - **Refresh Flow**: When expired, repeat the token exchange flow with a fresh ID token from your IdP | ||
| - **Automatic Refresh**: Implement automatic token refresh before expiration (recommended for production) | ||
| - **Error Handling**: Catch authentication errors and retry with refreshed tokens | ||
| **Important Notes:** | ||
| - **ID Token Expiration**: If the ID token from your IdP expires, you must re-authenticate with the IdP to obtain a new ID token before performing token exchange | ||
| - **Token Storage**: Store tokens securely and implement the `TokenStorage` interface to persist tokens between application restarts | ||
| - **Scope Changes**: If you need different scopes, you must obtain a new ID token from the IdP with the required scopes | ||
| - **Security**: Never log or expose access tokens or ID tokens in production environments | ||
| **Example Usage:** | ||
| <!-- snippet-source examples/snippets/clients/enterprise_managed_auth_client.py --> | ||
| ```python | ||
BinoyOza-okta marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading. Please reload this page. | ||
| import asyncio | ||
| from datetime import datetime, timedelta, timezone | ||
| from typing import Any | ||
| import httpx | ||
| from pydantic import AnyUrl | ||
| from mcp import ClientSession | ||
| from mcp.client.auth import OAuthTokenError, TokenStorage | ||
| from mcp.client.auth.extensions import ( | ||
| EnterpriseAuthOAuthClientProvider, | ||
| TokenExchangeParameters, | ||
| ) | ||
| from mcp.client.sse import sse_client | ||
| from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken | ||
| from mcp.types import CallToolResult | ||
| # Placeholder function for IdP authentication | ||
| async def get_id_token_from_idp() -> str: | ||
| """ | ||
| Placeholder function to get ID token from your IdP. | ||
| In production, implement actual IdP authentication flow. | ||
| """ | ||
| raise NotImplementedError("Implement your IdP authentication flow here") | ||
| # Define token storage implementation | ||
| class SimpleTokenStorage(TokenStorage): | ||
| def __init__(self) -> None: | ||
| self._tokens: OAuthToken | None = None | ||
| self._client_info: OAuthClientInformationFull | None = None | ||
| async def get_tokens(self) -> OAuthToken | None: | ||
| return self._tokens | ||
| async def set_tokens(self, tokens: OAuthToken) -> None: | ||
| self._tokens = tokens | ||
| async def get_client_info(self) -> OAuthClientInformationFull | None: | ||
| return self._client_info | ||
| async def set_client_info(self, client_info: OAuthClientInformationFull) -> None: | ||
| self._client_info = client_info | ||
| def is_token_expired(access_token: OAuthToken) -> bool: | ||
| """Check if the access token has expired.""" | ||
| if access_token.expires_in: | ||
| # Calculate expiration time | ||
| issued_at = datetime.now(timezone.utc) | ||
| expiration_time = issued_at + timedelta(seconds=access_token.expires_in) | ||
| return datetime.now(timezone.utc) >= expiration_time | ||
| return False | ||
| async def refresh_access_token( | ||
| enterprise_auth: EnterpriseAuthOAuthClientProvider, | ||
| client: httpx.AsyncClient, | ||
| id_token: str, | ||
| ) -> OAuthToken: | ||
| """Refresh the access token when it expires.""" | ||
| try: | ||
| # Update token exchange parameters with fresh ID token | ||
| enterprise_auth.token_exchange_params.subject_token = id_token | ||
| # Re-exchange for new ID-JAG | ||
| id_jag = await enterprise_auth.exchange_token_for_id_jag(client) | ||
| # Get new access token | ||
| access_token = await enterprise_auth.exchange_id_jag_for_access_token(client, id_jag) | ||
| return access_token | ||
| except Exception as e: | ||
| print(f"Token refresh failed:{e}") | ||
| # Re-authenticate with IdP if ID token is also expired | ||
| id_token = await get_id_token_from_idp() | ||
| return await refresh_access_token(enterprise_auth, client, id_token) | ||
| async def call_tool_with_retry( | ||
| session: ClientSession, | ||
| tool_name: str, | ||
| arguments: dict[str, Any], | ||
| enterprise_auth: EnterpriseAuthOAuthClientProvider, | ||
| client: httpx.AsyncClient, | ||
| id_token: str, | ||
| ) -> CallToolResult | None: | ||
| """Call a tool with automatic retry on token expiration.""" | ||
| max_retries = 1 | ||
| for attempt in range(max_retries + 1): | ||
| try: | ||
| result = await session.call_tool(tool_name, arguments) | ||
| return result | ||
| except OAuthTokenError: | ||
| if attempt < max_retries: | ||
| print("Token expired, refreshing...") | ||
| # Refresh token and reconnect | ||
| _access_token = await refresh_access_token(enterprise_auth, client, id_token) | ||
| # Note: In production, you'd need to reconnect the session here | ||
| else: | ||
| raise | ||
| return None | ||
| async def main() -> None: | ||
| # Step 1: Get ID token from your IdP (example with Okta) | ||
| id_token = await get_id_token_from_idp() # Your IdP authentication | ||
| # Step 2: Configure token exchange parameters | ||
| token_exchange_params = TokenExchangeParameters.from_id_token( | ||
| id_token=id_token, | ||
| mcp_server_auth_issuer="https://your-idp.com", # IdP issuer URL | ||
| mcp_server_resource_id="https://mcp-server.example.com", # MCP server resource ID | ||
| scope="mcp:tools mcp:resources", # Optional scopes | ||
| ) | ||
| # Step 3: Create enterprise auth provider | ||
| enterprise_auth = EnterpriseAuthOAuthClientProvider( | ||
| server_url="https://mcp-server.example.com", | ||
| client_metadata=OAuthClientMetadata( | ||
| client_name="Enterprise MCP Client", | ||
| redirect_uris=[AnyUrl("http://localhost:3000/callback")], | ||
| grant_types=["urn:ietf:params:oauth:grant-type:jwt-bearer"], | ||
| response_types=["token"], | ||
| ), | ||
| storage=SimpleTokenStorage(), | ||
| idp_token_endpoint="https://your-idp.com/oauth2/v1/token", | ||
| token_exchange_params=token_exchange_params, | ||
| ) | ||
| async with httpx.AsyncClient() as client: | ||
| # Step 4: Exchange ID token for ID-JAG | ||
| id_jag = await enterprise_auth.exchange_token_for_id_jag(client) | ||
| print(f"Obtained ID-JAG:{id_jag[:50]}...") | ||
| # Step 5: Exchange ID-JAG for access token | ||
| access_token = await enterprise_auth.exchange_id_jag_for_access_token(client, id_jag) | ||
| print(f"Access token obtained, expires in:{access_token.expires_in}s") | ||
| # Step 6: Check if token is expired (for demonstration) | ||
| if is_token_expired(access_token): | ||
| print("Token is expired, refreshing...") | ||
| access_token = await refresh_access_token(enterprise_auth, client, id_token) | ||
| # Step 7: Use the access token to connect to MCP server | ||
| headers ={"Authorization": f"Bearer{access_token.access_token}"} | ||
| async with sse_client(url="https://mcp-server.example.com", headers=headers) as (read, write): | ||
| async with ClientSession(read, write) as session: | ||
| await session.initialize() | ||
| # Call tools with automatic retry on token expiration | ||
| result = await call_tool_with_retry( | ||
| session, "enterprise_tool",{"param": "value"}, enterprise_auth, client, id_token | ||
| ) | ||
| if result: | ||
| print(f"Tool result:{result.content}") | ||
| # List available resources | ||
| resources = await session.list_resources() | ||
| for resource in resources.resources: | ||
| print(f"Resource:{resource.uri}") | ||
| async def maintain_active_session( | ||
| enterprise_auth: EnterpriseAuthOAuthClientProvider, | ||
| mcp_server_url: str, | ||
| ) -> None: | ||
| """Maintain an active session with automatic token refresh.""" | ||
| id_token_var = await get_id_token_from_idp() | ||
| async with httpx.AsyncClient() as client: | ||
| while True: | ||
| try: | ||
| # Update token exchange params with current ID token | ||
| enterprise_auth.token_exchange_params.subject_token = id_token_var | ||
| # Get access token | ||
| id_jag = await enterprise_auth.exchange_token_for_id_jag(client) | ||
| access_token = await enterprise_auth.exchange_id_jag_for_access_token(client, id_jag) | ||
| # Calculate refresh time (refresh before expiration) | ||
| refresh_in = access_token.expires_in - 60 if access_token.expires_in else 300 | ||
| # Use the token for MCP operations | ||
| headers ={"Authorization": f"Bearer{access_token.access_token}"} | ||
| async with sse_client(mcp_server_url, headers=headers) as (read, write): | ||
| async with ClientSession(read, write) as session: | ||
| await session.initialize() | ||
| # Perform operations... | ||
| # Schedule refresh before token expires | ||
| await asyncio.sleep(refresh_in) | ||
| except Exception as e: | ||
| print(f"Session error:{e}") | ||
| # Re-authenticate with IdP | ||
| id_token_var = await get_id_token_from_idp() | ||
| await asyncio.sleep(5) # Wait before retry | ||
| if __name__ == "__main__": | ||
| asyncio.run(main()) | ||
| ``` | ||
| _Full example: [examples/snippets/clients/enterprise_managed_auth_client.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/snippets/clients/enterprise_managed_auth_client.py)_ | ||
| <!-- /snippet-source --> | ||
| **Working with SAML Assertions:** | ||
| If your enterprise uses SAML instead of OIDC, you can exchange SAML assertions: | ||
| ```python | ||
| token_exchange_params = TokenExchangeParameters.from_saml_assertion( | ||
| saml_assertion=saml_assertion_string, | ||
| mcp_server_auth_issuer="https://your-idp.com", | ||
| mcp_server_resource_id="https://mcp-server.example.com", | ||
| scope="mcp:tools", | ||
| ) | ||
| ``` | ||
| **Decoding and Inspecting ID-JAG Tokens:** | ||
| You can decode ID-JAG tokens to inspect their claims: | ||
| ```python | ||
| from mcp.client.auth.extensions import decode_id_jag | ||
| # Decode without signature verification (for inspection only) | ||
| claims = decode_id_jag(id_jag) | ||
| print(f"Subject:{claims.sub}") | ||
| print(f"Issuer:{claims.iss}") | ||
| print(f"Audience:{claims.aud}") | ||
| print(f"Client ID:{claims.client_id}") | ||
| print(f"Resource:{claims.resource}") | ||
| ``` | ||
| ### Parsing Tool Results | ||
| When calling tools through MCP, the `CallToolResult` object contains the tool's response in a structured format. Understanding how to parse this result is essential for properly handling tool outputs. | ||
Oops, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.