-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Expand file tree
/
Copy pathremote_user.py
More file actions
116 lines (91 loc) · 5.09 KB
/
remote_user.py
File metadata and controls
116 lines (91 loc) · 5.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import logging
from django.conf import settings
from django.contrib.auth.backends import RemoteUserBackend as OriginalRemoteUserBackend
from django.contrib.auth.middleware import RemoteUserMiddleware as OriginalRemoteUserMiddleware
from drf_spectacular.extensions import OpenApiAuthenticationExtension
from netaddr import IPAddress
from rest_framework.authentication import RemoteUserAuthentication as OriginalRemoteUserAuthentication
from dojo.models import Dojo_Group
from dojo.pipeline import assign_user_to_groups, cleanup_old_groups_for_user
logger = logging.getLogger(__name__)
class RemoteUserAuthentication(OriginalRemoteUserAuthentication):
def authenticate(self, request):
# process only if request is comming from the trusted proxy node
if IPAddress(request.META["REMOTE_ADDR"]) in settings.AUTH_REMOTEUSER_TRUSTED_PROXY:
self.header = settings.AUTH_REMOTEUSER_USERNAME_HEADER
if self.header in request.META:
return super().authenticate(request)
return None
logger.debug("Requested came from untrusted proxy %s; This is list of trusted proxies: %s",
IPAddress(request.META["REMOTE_ADDR"]),
settings.AUTH_REMOTEUSER_TRUSTED_PROXY)
return None
class RemoteUserMiddleware(OriginalRemoteUserMiddleware):
def process_request(self, request):
if not settings.AUTH_REMOTEUSER_ENABLED:
return None
# process only if request is comming from the trusted proxy node
if IPAddress(request.META["REMOTE_ADDR"]) in settings.AUTH_REMOTEUSER_TRUSTED_PROXY:
self.header = settings.AUTH_REMOTEUSER_USERNAME_HEADER
if self.header in request.META:
return super().process_request(request)
return None
logger.debug("Requested came from untrusted proxy %s; This is list of trusted proxies: %s",
IPAddress(request.META["REMOTE_ADDR"]),
settings.AUTH_REMOTEUSER_TRUSTED_PROXY)
return None
def process_response(self, request, response):
# Set REMOTE_USER so uWSGI logs the username correctly for all auth methods
if hasattr(request, "user") and request.user and request.user.is_authenticated:
request.META["REMOTE_USER"] = request.user.username
return response
class PersistentRemoteUserMiddleware(RemoteUserMiddleware):
# same as https://github.com/django/django/blob/6654289f5b350dfca3dc4f6abab777459b906756/django/contrib/auth/middleware.py#L128
force_logout_if_no_header = False
class RemoteUserBackend(OriginalRemoteUserBackend):
def configure_user(self, request, user, *, created=True):
changed = False
if settings.AUTH_REMOTEUSER_EMAIL_HEADER and \
settings.AUTH_REMOTEUSER_EMAIL_HEADER in request.META and \
user.email != request.META[settings.AUTH_REMOTEUSER_EMAIL_HEADER]:
user.email = request.META[settings.AUTH_REMOTEUSER_EMAIL_HEADER]
logger.debug("Updating email for user %s to value %s", user.username, user.email)
changed = True
if settings.AUTH_REMOTEUSER_FIRSTNAME_HEADER and \
settings.AUTH_REMOTEUSER_FIRSTNAME_HEADER in request.META and \
user.first_name != request.META[settings.AUTH_REMOTEUSER_FIRSTNAME_HEADER]:
user.first_name = request.META[settings.AUTH_REMOTEUSER_FIRSTNAME_HEADER]
logger.debug("Updating first_name for user %s to value %s", user.username, user.first_name)
changed = True
if settings.AUTH_REMOTEUSER_LASTNAME_HEADER and \
settings.AUTH_REMOTEUSER_LASTNAME_HEADER in request.META and \
user.last_name != request.META[settings.AUTH_REMOTEUSER_LASTNAME_HEADER]:
user.last_name = request.META[settings.AUTH_REMOTEUSER_LASTNAME_HEADER]
logger.debug("Updating last_name for user %s to value %s", user.username, user.last_name)
changed = True
if settings.AUTH_REMOTEUSER_GROUPS_HEADER and \
settings.AUTH_REMOTEUSER_GROUPS_HEADER in request.META:
assign_user_to_groups(user, request.META[settings.AUTH_REMOTEUSER_GROUPS_HEADER].split(","), Dojo_Group.REMOTE)
if settings.AUTH_REMOTEUSER_GROUPS_CLEANUP and \
settings.AUTH_REMOTEUSER_GROUPS_HEADER and \
settings.AUTH_REMOTEUSER_GROUPS_HEADER in request.META:
cleanup_old_groups_for_user(user, request.META[settings.AUTH_REMOTEUSER_GROUPS_HEADER].split(","))
if changed:
user.save()
return user
class RemoteUserScheme(OpenApiAuthenticationExtension):
target_class = "dojo.remote_user.RemoteUserAuthentication"
name = "remoteUserAuth"
match_subclasses = True
priority = 1
def get_security_definition(self, auto_schema):
if not settings.AUTH_REMOTEUSER_VISIBLE_IN_SWAGGER:
return {}
header_name = settings.AUTH_REMOTEUSER_USERNAME_HEADER
header_name = header_name.removeprefix("HTTP_")
header_name = header_name.replace("_", "-").capitalize()
return {
"type": "apiKey",
"in": "header",
"name": header_name,
}