-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathuser.py
More file actions
134 lines (116 loc) · 5.82 KB
/
user.py
File metadata and controls
134 lines (116 loc) · 5.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""
user.py
Authors: Rasmus Welander, Diogo Castro, Giuseppe Lo Presti.
Emails: rasmus.oscar.welander@cern.ch, diogo.castro@cern.ch, giuseppe.lopresti@cern.ch
Last updated: 30/08/2024
"""
import logging
from typing import Optional
import cs3.identity.user.v1beta1.resources_pb2 as cs3iur
import cs3.identity.user.v1beta1.user_api_pb2 as cs3iu
from cs3.gateway.v1beta1.gateway_api_pb2_grpc import GatewayAPIStub
from .config import Config
from .statuscodehandler import StatusCodeHandler
class User:
"""
User class to handle user related API calls with CS3 Gateway API.
"""
def __init__(
self,
config: Config,
log: logging.Logger,
gateway: GatewayAPIStub,
status_code_handler: StatusCodeHandler,
) -> None:
"""
Initializes the User class with logger, auth, and gateway stub,
:param log: Logger instance for logging.
:param gateway: GatewayAPIStub instance for interacting with CS3 Gateway.
:param auth: An instance of the auth class.
"""
self._log: logging.Logger = log
self._gateway: GatewayAPIStub = gateway
self._config: Config = config
self._status_code_handler: StatusCodeHandler = status_code_handler
def get_user(self, idp, opaque_id) -> cs3iur.User:
"""
Get the user information provided the idp and opaque_id.
:param idp: Identity provider.
:param opaque_id: Opaque user id.
:return: User information.
:raises: return NotFoundException (User not found)
:raises: AuthenticationException (Operation not permitted)
:raises: UnknownException (Unknown error)
"""
req = cs3iu.GetUserRequest(user_id=cs3iur.UserId(idp=idp, opaque_id=opaque_id), skip_fetching_user_groups=True)
res = self._gateway.GetUser(request=req)
self._status_code_handler.handle_errors(res.status, "get user", f'opaque_id="{opaque_id}"')
self._log.debug(f'msg="Invoked GetUser" opaque_id="{res.user.id.opaque_id}" trace="{res.status.trace}"')
return res.user
def get_user_by_claim(self, claim, value) -> cs3iur.User:
"""
Get the user information provided the claim and value.
:param claim: Claim to search for.
:param value: Value to search for.
:return: User information.
:raises: NotFoundException (User not found)
:raises: AuthenticationException (Operation not permitted)
:raises: UnknownException (Unknown error)
"""
req = cs3iu.GetUserByClaimRequest(claim=claim, value=value, skip_fetching_user_groups=True)
res = self._gateway.GetUserByClaim(request=req)
self._status_code_handler.handle_errors(res.status, "get user by claim", f'claim="{claim}" value="{value}"')
self._log.debug(f'msg="Invoked GetUserByClaim" opaque_id="{res.user.id.opaque_id}" trace="{res.status.trace}"')
return res.user
def get_user_groups(self, idp, opaque_id) -> list[str]:
"""
Get the groups the user is a part of.
:param idp: Identity provider.
:param opaque_id: Opaque user id.
:return: A list of the groups the user is part of.
:raises: NotFoundException (User not found)
:raises: AuthenticationException (Operation not permitted)
:raises: UnknownException (Unknown error)
"""
req = cs3iu.GetUserGroupsRequest(user_id=cs3iur.UserId(idp=idp, opaque_id=opaque_id))
res = self._gateway.GetUserGroups(request=req)
self._status_code_handler.handle_errors(res.status, "get user groups", f'opaque_id="{opaque_id}"')
self._log.debug(f'msg="Invoked GetUserGroups" opaque_id="{opaque_id}" trace="{res.status.trace}"')
return res.groups
def find_users(self, auth_token: tuple, filters) -> list[cs3iur.User]:
"""
Find a user based on a filter.
:param auth_token: tuple in the form ('x-access-token', <token>) (see auth.get_token/auth.check_token)
:param filters: Filters to search for.
:return: a list of user(s).
:raises: NotFoundException (User not found)
:raises: AuthenticationException (Operation not permitted)
:raises: UnknownException (Unknown error)
"""
req = cs3iu.FindUsersRequest(filters=filters, skip_fetching_user_groups=True)
res = self._gateway.FindUsers(request=req, metadata=[auth_token])
self._status_code_handler.handle_errors(res.status, "find users")
self._log.debug(f'msg="Invoked FindUsers" filter="{filter}" trace="{res.status.trace}"')
return res.users
@classmethod
def create_find_user_filter(cls, filter_type: str, query: Optional[str] = None, user_type: Optional[str] = None) -> cs3iu.Filter:
"""
Create a filter for finding users.
:param filter_type: The type of filter to create. Supported types: TYPE_QUERY, TYPE_USER_TYPE.
:param query: The query string for TYPE_QUERY filter.
:param user_type: The user type for TYPE_USER_TYPE filter. Supported types: USER_TYPE_PRIMARY,
USER_TYPE_SECONDARY, USER_TYPE_SERVICE, USER_TYPE_GUEST, USER_TYPE_FEDERATED, USER_TYPE_LIGHTWEIGHT,
USER_TYPE_SPACE_OWNER.
:return: A filter object.
:raises: ValueError (Unsupported filter type)
"""
filter_type = cs3iu.Filter.Type.Value(filter_type.upper())
if filter_type == cs3iu.Filter.Type.TYPE_QUERY:
return cs3iu.Filter(type=filter_type, query=query)
elif filter_type == cs3iu.Filter.Type.TYPE_USERTYPE:
if user_type is None:
raise ValueError("user_type must be provided for TYPE_USERTYPE filter")
user_type = cs3iur.UserType.Value(user_type.upper())
return cs3iu.Filter(type=filter_type, usertype=user_type)
else:
raise ValueError(f"Unsupported filter type: {filter_type}")