-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathauth.py
More file actions
103 lines (81 loc) · 2.9 KB
/
auth.py
File metadata and controls
103 lines (81 loc) · 2.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# Copyright (C) 2025 Collabora Limited
# Author: Denys Fedoryshchenko <denys.f@collabora.com>
# SPDX-License-Identifier: LGPL-2.1-or-later
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status, Cookie
from sqlalchemy.orm import Session
from database import SessionLocal
from models import User
from config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
# Configuration imported from config.py
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def get_current_user(
access_token: Optional[str] = Cookie(None), db: Session = Depends(get_db)
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if not access_token:
raise credentials_exception
# Remove "Bearer " prefix if present
token = (
access_token.replace("Bearer ", "")
if access_token.startswith("Bearer ")
else access_token
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.username == username).first()
if user is None:
raise credentials_exception
return user
def get_current_user_optional(
access_token: Optional[str] = Cookie(None), db: Session = Depends(get_db)
):
"""Get current user but return None if not authenticated (no exception)"""
if not access_token:
return None
# Remove "Bearer " prefix if present
token = (
access_token.replace("Bearer ", "")
if access_token.startswith("Bearer ")
else access_token
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
return None
except JWTError:
return None
user = db.query(User).filter(User.username == username).first()
return user