-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth.py
More file actions
142 lines (123 loc) · 4.21 KB
/
auth.py
File metadata and controls
142 lines (123 loc) · 4.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.db.models.user import User
from app.schemas.user import UserLogin, UserOut
from app.schemas.token import TokenResponse, TokenRefresh
from app.services.auth import AuthService
from app.core.deps import get_current_active_user, get_current_admin, get_auth_service
router = APIRouter()
@router.post("/login", response_model=TokenResponse)
async def login(
login_data: UserLogin,
db: Session = Depends(get_db)
):
"""
Authenticate user and return access/refresh tokens.
- Validates username and password
- Checks if user account is active
- Creates access and refresh tokens
- Stores refresh token in database for revocation
- Updates user's last login timestamp
"""
auth_service = AuthService(db)
# authenticate user
user = auth_service.authenticate_user(login_data)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password"
)
# create tokens
token_response = auth_service.create_tokens(user)
return token_response
@router.post("/refresh", response_model=TokenResponse)
async def refresh_token(
refresh_data: TokenRefresh,
db: Session = Depends(get_db)
):
"""
Refresh access token using a valid refresh token.
- Validates the refresh token
- Checks if token is revoked in database
- Creates new access token
- Keeps the same refresh token
"""
auth_service = AuthService(db)
try:
token_response = auth_service.refresh_access_token(refresh_data.refresh_token)
return token_response
except HTTPException:
raise
except Exception:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not refresh token"
)
@router.post("/logout")
async def logout(
refresh_data: TokenRefresh,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Session = Depends(get_db)
):
"""
Logout user by revoking their refresh token.
- Validates the refresh token belongs to current user
- Marks the refresh token as revoked in database
- Records revocation timestamp
"""
auth_service = AuthService(db)
success = auth_service.logout(refresh_data.refresh_token, current_user.id)
if not success:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid refresh token"
)
return {"message": "Successfully logged out"}
@router.post("/logout-all")
async def logout_all_sessions(
current_user: Annotated[User, Depends(get_current_active_user)],
db: Session = Depends(get_db)
):
"""
Logout user from all sessions by revoking all their refresh tokens.
- Finds all active refresh tokens for the user
- Revokes all tokens
- Useful for security (password change, suspicious activity)
"""
auth_service = AuthService(db)
revoked_count = auth_service.logout_all_sessions(current_user.id)
return {
"message": f"Successfully logged out from {revoked_count} sessions",
"sessions_revoked": revoked_count
}
@router.get("/me", response_model=UserOut)
async def get_current_user_info(
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""
Get current user information.
- Returns current user's profile information
- Validates that user is active and authenticated
"""
return current_user
'''
TODO: use cron job -- refer to issues for assistance
@router.post("/cleanup-expired")
async def cleanup_expired_tokens(
current_admin: Annotated[User, Depends(get_current_admin)],
auth_service: Annotated[AuthService, Depends(get_auth_service)]
):
"""
Clean up expired refresh tokens from database (Admin only).
- Removes expired refresh tokens
- Helps maintain database performance
- Admin access required
"""
cleaned_count = auth_service.cleanup_expired_tokens()
return {
"message": f"Cleaned up {cleaned_count} expired tokens",
"tokens_removed": cleaned_count
}
'''