-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathrequest_validator.py
More file actions
108 lines (81 loc) · 4.23 KB
/
request_validator.py
File metadata and controls
108 lines (81 loc) · 4.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import logging
import re
from collections.abc import Callable
from functools import wraps
from flask import request
from flask.typing import ResponseReturnValue
from eligibility_signposting_api.common.api_error_response import (
INVALID_CATEGORY_ERROR,
INVALID_CONDITION_FORMAT_ERROR,
INVALID_INCLUDE_ACTIONS_ERROR,
NHS_NUMBER_MISMATCH_ERROR,
)
from eligibility_signposting_api.config.constants import NHS_NUMBER_HEADER
logger = logging.getLogger(__name__)
condition_pattern = re.compile(r"^\s*[a-z0-9]+\s*$", re.IGNORECASE)
category_pattern = re.compile(r"^\s*(VACCINATIONS|SCREENING|ALL)\s*$", re.IGNORECASE)
include_actions_pattern = re.compile(r"^\s*([YN])\s*$", re.IGNORECASE)
def validate_query_params(query_params: dict[str, str]) -> tuple[bool, ResponseReturnValue | None]:
conditions = query_params.get("conditions", "ALL").split(",")
for condition in conditions:
search = re.search(condition_pattern, condition)
if not search:
return False, get_condition_error_response(condition)
category = query_params.get("category", "ALL")
if not re.search(category_pattern, category):
return False, get_category_error_response(category)
include_actions = query_params.get("includeActions", "Y")
if not re.search(include_actions_pattern, include_actions):
return False, get_include_actions_error_response(include_actions)
return True, None
def validate_nhs_number(path_nhs: str | None, header_nhs: str | None) -> bool:
logger.info("NHS numbers from the request", extra={"header_nhs": header_nhs, "path_nhs": path_nhs})
if not path_nhs:
logger.error("NHS number is not present in path", extra={"path_nhs": path_nhs})
return False
if not header_nhs: # Not a validation error
logger.info("NHS number is not present in header", extra={"header_nhs": header_nhs, "path_nhs": path_nhs})
return True
if header_nhs != path_nhs:
logger.error("NHS number mismatch", extra={"header_nhs": header_nhs, "path_nhs": path_nhs})
return False
return True
def validate_request_params() -> Callable:
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> ResponseReturnValue: # noqa:ANN002,ANN003
path_nhs_number = str(kwargs.get("nhs_number")) if kwargs.get("nhs_number") else None
header_nhs_no = str(request.headers.get(NHS_NUMBER_HEADER)) if request.headers.get(NHS_NUMBER_HEADER) else None
if not validate_nhs_number(path_nhs_number, header_nhs_no):
message = "You are not authorised to request information for the supplied NHS Number"
return NHS_NUMBER_MISMATCH_ERROR.log_and_generate_response(log_message=message, diagnostics=message)
query_params = request.args
if query_params:
is_valid, problem = validate_query_params(query_params)
if not is_valid and problem is not None:
return problem
return func(*args, **kwargs)
return wrapper
return decorator
def get_include_actions_error_response(include_actions: str) -> ResponseReturnValue:
diagnostics = f"{include_actions} is not a value that is supported by the API"
return INVALID_INCLUDE_ACTIONS_ERROR.log_and_generate_response(
log_message=f"Invalid include actions query param: '{include_actions}'",
diagnostics=diagnostics,
location_param="includeActions",
)
def get_category_error_response(category: str) -> ResponseReturnValue:
diagnostics = f"{category} is not a category that is supported by the API"
return INVALID_CATEGORY_ERROR.log_and_generate_response(
log_message=f"Invalid category query param: '{category}'", diagnostics=diagnostics, location_param="category"
)
def get_condition_error_response(condition: str) -> ResponseReturnValue:
diagnostics = (
f"{condition} should be a single or comma separated list of condition "
f"strings with no other punctuation or special characters"
)
return INVALID_CONDITION_FORMAT_ERROR.log_and_generate_response(
log_message=f"Invalid condition query param: '{condition}'",
diagnostics=diagnostics,
location_param="conditions",
)