-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth.py
More file actions
124 lines (100 loc) · 4.03 KB
/
auth.py
File metadata and controls
124 lines (100 loc) · 4.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""用户认证和权限管理"""
from datetime import datetime, timedelta
from typing import Optional, List
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from database import get_db, User
from config import DEBUG
# 角色权限定义
ROLE_HIERARCHY = ["viewer", "developer", "curator", "reviewer", "admin"]
ELEVATED_ROLES = {"admin", "reviewer", "curator"}
KNOWLEDGE_MANAGER_ROLES = {"admin", "curator"}
APPROVER_ROLES = {"admin", "reviewer", "curator"}
# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT配置
SECRET_KEY = "your-secret-key-change-in-production" # 生产环境应该使用环境变量
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24小时
# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""生成密码哈希"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""创建访问令牌"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def authenticate_user(db: Session, username: str, password: str) -> Optional[User]:
"""验证用户"""
user = db.query(User).filter(User.username == username).first()
if not user:
return None
if not verify_password(password, user.password_hash):
return None
if user.is_active != 1:
return None
return user
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
"""获取当前用户"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.username == username).first()
if user is None:
raise credentials_exception
if user.is_active != 1:
raise HTTPException(status_code=400, detail="用户已被禁用")
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""获取当前活跃用户"""
if current_user.is_active != 1:
raise HTTPException(status_code=400, detail="用户已被禁用")
return current_user
def require_admin(current_user: User = Depends(get_current_active_user)) -> User:
"""要求管理员权限"""
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="需要管理员权限"
)
return current_user
def require_auth(current_user: User = Depends(get_current_active_user)) -> User:
"""要求登录(任何用户)"""
return current_user
def require_roles(roles: List[str]):
"""要求特定角色"""
async def role_checker(current_user: User = Depends(get_current_active_user)) -> User:
if current_user.role not in roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="无权限执行此操作"
)
return current_user
return role_checker