forked from kernelci/kernelci-api
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodels.py
More file actions
288 lines (226 loc) · 8.59 KB
/
models.py
File metadata and controls
288 lines (226 loc) · 8.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# SPDX-License-Identifier: LGPL-2.1-or-later
#
# Copyright (C) 2023 Collabora Limited
# Author: Jeny Sadadia <jeny.sadadia@collabora.com>
# Disable flag as user models don't require any public methods
# at the moment
# pylint: disable=too-few-public-methods
# pylint: disable=no-name-in-module
"""Server-side model definitions"""
from datetime import datetime
from typing import Optional, TypeVar, List
from pydantic import (
BaseModel,
EmailStr,
Field,
field_validator,
)
from typing_extensions import Annotated
from fastapi import Query
from fastapi_pagination import LimitOffsetPage, LimitOffsetParams
from fastapi_users.db import BeanieBaseUser
from fastapi_users import schemas
from beanie import (
Indexed,
Document,
PydanticObjectId,
)
from kernelci.api.models_base import DatabaseModel, ModelId
# PubSub model definitions
class Subscription(BaseModel):
"""Pub/Sub subscription object model"""
id: int = Field(
description='Subscription ID'
)
channel: str = Field(
description='Subscription channel name'
)
user: str = Field(
description=("Username of the user that created the "
"subscription (owner)")
)
promiscuous: bool = Field(
description='Listen to all users messages',
default=False)
class SubscriptionStats(Subscription):
"""Pub/Sub subscription statistics object model"""
created: datetime = Field(
description='Timestamp of connection creation'
)
last_poll: Optional[datetime] = Field(
default=None,
description='Timestamp when connection last polled for data'
)
# MongoDB-based durable Pub/Sub models
# Note: Event storage uses EventHistory model from kernelci-core
# (stored in 'eventhistory' collection with sequence_id, channel, owner fields)
class SubscriberState(BaseModel):
"""Tracks subscriber position for durable event delivery
Only created when subscriber_id is provided during subscription.
Enables catch-up on missed events after reconnection.
"""
subscriber_id: str = Field(
description='Unique subscriber identifier (client-provided)'
)
channel: str = Field(
description='Subscribed channel name'
)
user: str = Field(
description='Username of subscriber (for ownership validation)'
)
promiscuous: bool = Field(
default=False,
description='If true, receive all messages regardless of owner'
)
last_event_id: int = Field(
default=0,
description='Last acknowledged event ID (implicit ACK on next poll)'
)
created_at: datetime = Field(
default_factory=datetime.utcnow,
description='Subscription creation timestamp'
)
last_poll: Optional[datetime] = Field(
default=None,
description='Last poll timestamp (used for stale cleanup)'
)
# User model definitions
class UserGroup(DatabaseModel):
"""API model to group associated user accounts"""
name: str = Field(
description="User group name"
)
@classmethod
def get_indexes(cls):
"""Get an index to bind unique constraint to group name"""
return [
cls.Index('name', {'unique': True}),
]
class UserGroupCreateRequest(BaseModel):
"""Create user group request schema for API router"""
name: str = Field(description="User group name")
class User(BeanieBaseUser, Document, # pylint: disable=too-many-ancestors
DatabaseModel):
"""API User model"""
username: Annotated[str, Indexed(unique=True)]
groups: List[UserGroup] = Field(
default=[],
description="A list of groups that the user belongs to"
)
@field_validator('groups')
def validate_groups(cls, groups): # pylint: disable=no-self-argument
"""Unique group constraint"""
unique_names = {group.name for group in groups}
if len(unique_names) != len(groups):
raise ValueError("Groups must have unique names.")
return groups
class Settings(BeanieBaseUser.Settings):
"""Configurations"""
# MongoDB collection name for model
name = "user"
@classmethod
def get_indexes(cls):
"""Get indices"""
return [
cls.Index('email', {'unique': True}),
]
class UserRead(schemas.BaseUser[PydanticObjectId], ModelId):
"""Schema for reading a user"""
username: Annotated[str, Indexed(unique=True)]
groups: List[UserGroup] = Field(default=[])
@field_validator('groups')
def validate_groups(cls, groups): # pylint: disable=no-self-argument
"""Unique group constraint"""
unique_names = {group.name for group in groups}
if len(unique_names) != len(groups):
raise ValueError("Groups must have unique names.")
return groups
class UserCreateRequest(schemas.BaseUserCreate):
"""Create user request schema for API router"""
username: Annotated[str, Indexed(unique=True)]
groups: List[str] = Field(default=[])
@field_validator('groups')
def validate_groups(cls, groups): # pylint: disable=no-self-argument
"""Unique group constraint"""
unique_names = set(groups)
if len(unique_names) != len(groups):
raise ValueError("Groups must have unique names.")
return groups
class UserCreate(schemas.BaseUserCreate):
"""Schema used for sending create user request to 'fastapi-users' router"""
username: Annotated[str, Indexed(unique=True)]
groups: List[UserGroup] = Field(default=[])
@field_validator('groups')
def validate_groups(cls, groups): # pylint: disable=no-self-argument
"""Unique group constraint"""
unique_names = {group.name for group in groups}
if len(unique_names) != len(groups):
raise ValueError("Groups must have unique names.")
return groups
class UserUpdateRequest(schemas.BaseUserUpdate):
"""Update user request schema for API router"""
username: Annotated[Optional[str], Indexed(unique=True),
Field(default=None)]
groups: List[str] = Field(default=[])
@field_validator('groups')
def validate_groups(cls, groups): # pylint: disable=no-self-argument
"""Unique group constraint"""
unique_names = set(groups)
if len(unique_names) != len(groups):
raise ValueError("Groups must have unique names.")
return groups
class UserUpdate(schemas.BaseUserUpdate):
"""Schema used for sending update user request to 'fastapi-users' router"""
username: Annotated[Optional[str], Indexed(unique=True),
Field(default=None)]
groups: List[UserGroup] = Field(default=[])
@field_validator('groups')
def validate_groups(cls, groups): # pylint: disable=no-self-argument
"""Unique group constraint"""
unique_names = {group.name for group in groups}
if len(unique_names) != len(groups):
raise ValueError("Groups must have unique names.")
return groups
# Invite-only user onboarding models
class UserInviteRequest(BaseModel):
"""Admin invite request schema for API router"""
username: Annotated[str, Indexed(unique=True)]
email: EmailStr
groups: List[str] = Field(default=[])
is_superuser: bool = False
send_email: bool = True
return_token: bool = False
resend_if_exists: bool = False
@field_validator('groups')
def validate_groups(cls, groups): # pylint: disable=no-self-argument
"""Unique group constraint"""
unique_names = set(groups)
if len(unique_names) != len(groups):
raise ValueError("Groups must have unique names.")
return groups
class InviteAcceptRequest(BaseModel):
"""Accept invite request schema for API router"""
token: str
password: str
class UserInviteResponse(BaseModel):
"""Invite response schema"""
user: UserRead
email_sent: bool
public_base_url: str
accept_invite_url: str
invite_url: Optional[str] = None
token: Optional[str] = None
class InviteUrlResponse(BaseModel):
"""Resolved public URL info for invite/accept endpoints"""
public_base_url: str
accept_invite_url: str
# Pagination models
class CustomLimitOffsetParams(LimitOffsetParams):
"""Model to set custom constraint on limit
The model is required to redefine limit parameter to remove the number
validation on maximum value"""
limit: int = Query(50, ge=1, description="Page size limit")
class PageModel(LimitOffsetPage[TypeVar("T")]):
"""Model for pagination
This model is required to serialize paginated model data response"""
__params_type__ = CustomLimitOffsetParams