forked from modelcontextprotocol/python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbearer_auth.py
More file actions
107 lines (85 loc) · 3.28 KB
/
bearer_auth.py
File metadata and controls
107 lines (85 loc) · 3.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import time
from typing import Any
from starlette.authentication import (
AuthCredentials,
AuthenticationBackend,
SimpleUser,
)
from starlette.exceptions import HTTPException
from starlette.requests import HTTPConnection
from starlette.types import Receive, Scope, Send
from mcp.server.auth.provider import AccessToken, OAuthAuthorizationServerProvider
class AuthenticatedUser(SimpleUser):
"""User with authentication info."""
def __init__(self, auth_info: AccessToken):
super().__init__(auth_info.client_id)
self.access_token = auth_info
self.scopes = auth_info.scopes
class BearerAuthBackend(AuthenticationBackend):
"""
Authentication backend that validates Bearer tokens.
"""
def __init__(
self,
provider: OAuthAuthorizationServerProvider[Any, Any, Any],
):
self.provider = provider
async def authenticate(self, conn: HTTPConnection):
auth_header = next(
(
conn.headers.get(key)
for key in conn.headers
if key.lower() == "authorization"
),
None,
)
if not auth_header or not auth_header.lower().startswith("bearer "):
return None
token = auth_header[7:] # Remove "Bearer " prefix
# Validate the token with the provider
auth_info = await self.provider.load_access_token(token)
if not auth_info:
return None
if auth_info.expires_at and auth_info.expires_at < int(time.time()):
return None
return AuthCredentials(auth_info.scopes), AuthenticatedUser(auth_info)
class RequireAuthMiddleware:
"""
Middleware that requires a valid Bearer token in the Authorization header.
This will validate the token with the auth provider and store the resulting
auth info in the request state.
"""
def __init__(
self,
app: Any,
required_scopes: list[str] | None = None,
resource_metadata_url: str | None = None,
):
"""
Initialize the middleware.
Args:
app: ASGI application
required_scopes: Optional list of scopes that the token must have
resource_metadata_url: Optional resource metadata URL
"""
self.app = app
self.required_scopes = required_scopes
self.resource_metadata_url = resource_metadata_url
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
auth_user = scope.get("user")
if not isinstance(auth_user, AuthenticatedUser):
headers = (
{"WWW-Authenticate": f'Bearer resource="{self.resource_metadata_url}"'}
if self.resource_metadata_url
else None
)
raise HTTPException(status_code=401, detail="Unauthorized", headers=headers)
auth_credentials = scope.get("auth")
for required_scope in self.required_scopes or []:
# auth_credentials should always be provided; this is just paranoia
if (
auth_credentials is None
or required_scope not in auth_credentials.scopes
):
raise HTTPException(status_code=403, detail="Insufficient scope")
await self.app(scope, receive, send)