-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathperson.py
More file actions
102 lines (76 loc) · 2.95 KB
/
person.py
File metadata and controls
102 lines (76 loc) · 2.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from argon2 import exceptions
from flask import Blueprint, jsonify, request
from utility import (
database_cursor,
decrypt_email,
encrypt_email,
hash_email,
hash_password,
mask_email,
validate_password,
verify_password,
)
person_blueprint = Blueprint("person", __name__)
def login_person_by_id(person_id: int) -> dict:
with database_cursor() as cursor:
cursor.callproc("login_person_by_id", (person_id,))
return cursor.fetchone()
def mask_person_email(person: dict) -> None:
"""Mask the email address of a person safely."""
encrypted_email = person.get("encrypted_email")
if not encrypted_email:
person["email"] = "Unknown"
return
try:
person["email"] = mask_email(decrypt_email(encrypted_email))
except Exception:
person["email"] = "Decryption Error"
# Remove unreadable fields
person.pop("encrypted_email", None)
person.pop("hashed_password", None)
def update_person_in_db(person_id, name, email, hashed_password, locale_code):
email = hash_email(email), encrypt_email(email)
with database_cursor() as cursor:
cursor.callproc(
"update_person",
(person_id, name, *email, hashed_password, locale_code),
)
@person_blueprint.route("/all", methods=["GET"])
def get_all_persons():
with database_cursor() as cursor:
cursor.callproc("get_all_persons")
persons = cursor.fetchall()
for person in persons:
mask_person_email(person)
return jsonify(persons)
@person_blueprint.route("/<int:person_id>", methods=["GET"])
def get_person_by_id(person_id):
with database_cursor() as cursor:
cursor.callproc("get_person_by_id", (person_id,))
person = cursor.fetchone()
mask_person_email(person)
return jsonify(person)
@person_blueprint.route("/<int:person_id>", methods=["PUT", "PATCH"])
def update_person(person_id):
data = request.get_json()
name = data.get("username", None)
email = data.get("email", None)
current_password = data.get("current_password")
new_password = data.get("new_password", None)
locale_code = data.get("locale_code", None)
if new_password and not validate_password(new_password):
return jsonify(message="Password does not meet security requirements"), 400
hashed_password = None
if new_password:
person = login_person_by_id(person_id)
if not person:
return jsonify(message="Invalid credentials"), 401
try:
verify_password(current_password, person["hashed_password"])
except exceptions.VerifyMismatchError:
return jsonify(message="Invalid credentials"), 401
except Exception:
return jsonify(message="An unknown error occurred"), 500
hashed_password = hash_password(new_password)
update_person_in_db(person_id, name, email, hashed_password, locale_code)
return jsonify(message="Person updated successfully"), 200