-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauthentication.py
More file actions
117 lines (91 loc) · 3.75 KB
/
authentication.py
File metadata and controls
117 lines (91 loc) · 3.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from argon2 import exceptions
from flask import Blueprint, current_app, jsonify, request
from pymysql import MySQLError
from config.ratelimit import limiter
from jwt_helper import (
TokenError,
extract_token_from_header,
generate_access_token,
generate_refresh_token,
verify_token,
)
from utility.database import database_cursor
from utility.encryption import encrypt_email, hash_email, hash_password, verify_password
from utility.validation import validate_email, validate_password
authentication_blueprint = Blueprint("authentication", __name__)
def login_person_by_email(email):
email_hash = hash_email(email)
with database_cursor() as cursor:
cursor.callproc("login_person_by_email", (email_hash,))
return cursor.fetchone()
def update_last_login(person_id):
with database_cursor() as cursor:
cursor.callproc("update_last_login", (person_id,))
@authentication_blueprint.route("/register", methods=["POST"])
@limiter.limit("5 per minute")
def register():
data = request.get_json()
name = data.get("username")
email = data.get("email")
password = data.get("password")
language_code = data.get("language_code", "en")
if not name or not email or not password:
return jsonify(message="Username, email, and password are required"), 400
if not validate_email(email):
return jsonify(message="Invalid email address"), 400
if not validate_password(password):
return jsonify(message="Password does not meet security requirements"), 400
hashed_password = hash_password(password)
email = hash_email(email), encrypt_email(email)
try:
with database_cursor() as cursor:
cursor.callproc(
"register_person", (name, *email, hashed_password, language_code)
)
except MySQLError as e:
if "User name already exists" in str(e):
return jsonify(message="User name already exists"), 400
elif "Email already exists" in str(e):
return jsonify(message="Email already exists"), 400
else:
current_app.logger.error(f"Database error: {e}")
return jsonify(message="An error occurred during registration"), 500
return jsonify(message="User created successfully"), 201
@authentication_blueprint.route("/login", methods=["POST"])
@limiter.limit("10 per minute")
def login():
data = request.get_json()
email = data.get("email")
password = data.get("password")
if not email or not password:
return jsonify(message="Email and password are required"), 400
person = login_person_by_email(email)
if not person:
return jsonify(message="Invalid credentials"), 401
try:
verify_password(password, person["hashed_password"])
except exceptions.VerifyMismatchError:
return jsonify(message="Invalid credentials"), 401
except Exception as e:
current_app.logger.error(f"Database error: {e}")
return jsonify(message="An internal error occurred"), 500
person_id = person["person_id"]
access_token = generate_access_token(person_id)
refresh_token = generate_refresh_token(person_id)
update_last_login(person_id)
return jsonify(
message="Login successful",
access_token=access_token,
refresh_token=refresh_token,
)
@authentication_blueprint.route("/refresh", methods=["POST"])
@limiter.limit("5 per hour")
def refresh_token():
try:
token = extract_token_from_header()
decoded = verify_token(token, required_type="refresh")
person_id = decoded["person_id"]
new_access_token = generate_access_token(person_id)
return jsonify(access_token=new_access_token), 200
except TokenError as e:
return jsonify(message=e.message), e.status_code