-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmiddleware.py
More file actions
131 lines (101 loc) · 3.86 KB
/
middleware.py
File metadata and controls
131 lines (101 loc) · 3.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from urllib.parse import parse_qs
import jwt
from channels.db import database_sync_to_async
from django.conf import settings
from django.contrib.auth import get_user_model
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.authtoken.models import Token
from users.models import CustomUser
from django.contrib.auth.models import AnonymousUser
User = get_user_model()
class TokenAuthentication:
"""
Simple token based authentication.
Clients should authenticate by passing the token key in the query parameters.
For example:
?token=401f7ac837da42b97f613d789819ff93537bee6a
"""
model = None
def get_model(self) -> Token:
if self.model is not None:
return self.model
return Token
"""
A custom token model may be used, but must have the following properties.
* key -- The string identifying the token
* user -- The user to which the token belongs
"""
def authenticate_credentials(self, key: str) -> CustomUser:
model = self.get_model()
try:
token = model.objects.select_related("user").get(key=key)
except model.DoesNotExist:
raise AuthenticationFailed(_("Invalid token."))
if not token.user.is_active:
raise AuthenticationFailed(_("User inactive or deleted."))
return token.user
def authenticate(self, token: Token) -> CustomUser:
"""
Returns a `User` if a correct username and password have been supplied
Args:
token: token key
Returns:
User: A user instance.
"""
try:
user_id = jwt.decode(
jwt=token, key=settings.SECRET_KEY, algorithms=["HS256"]
)["user_id"]
except jwt.exceptions.DecodeError:
raise AuthenticationFailed(_("Invalid token."))
except jwt.exceptions.ExpiredSignatureError:
raise AuthenticationFailed(_("Token expired."))
user = User.objects.get(pk=user_id)
return user
@database_sync_to_async
def get_user(scope: dict) -> CustomUser | AnonymousUser:
"""
Return the user model instance associated with the given scope.
If no user is retrieved, return an instance of `AnonymousUser`.
"""
# postpone model import to avoid ImproperlyConfigured error before Django
# setup is complete.
if "token" not in scope:
raise ValueError(
"Cannot find token in scope. You should wrap your consumer in "
"TokenAuthMiddleware."
)
token = scope["token"]
user = None
try:
auth = TokenAuthentication()
user = auth.authenticate(token)
except AuthenticationFailed:
pass
return user or AnonymousUser()
class TokenAuthMiddleware:
"""
Custom middleware that takes a token from the query string and authenticates via Django Rest Framework authtoken.
"""
def __init__(self, app):
# Store the ASGI application we were passed
self.app = app
async def __call__(self, scope, receive, send):
# Look up user from query string
# TODO: (you should also do things like
# checking if it is a valid user ID, or if scope["user" ] is already
# populated).
query_string = scope["query_string"].decode()
query_dict = parse_qs(query_string)
try:
token = query_dict["token"][0]
if token is None:
raise ValueError("Token is missing from headers")
scope["token"] = token
scope["user"] = await get_user(scope)
except (ValueError, KeyError, IndexError):
# Token is missing from query string
from django.contrib.auth.models import AnonymousUser
scope["user"] = AnonymousUser()
return await self.app(scope, receive, send)