Skip to content

Commit 50600fb

Browse files
committed
Normalize TOTP validation and unify invalid TOTP error handling
1 parent fb1f0e8 commit 50600fb

1 file changed

Lines changed: 20 additions & 23 deletions

File tree

vertica_python/vertica/connection.py

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -99,34 +99,31 @@ class TotpValidationResult(NamedTuple):
9999
INVALID_TOTP_MSG = 'Invalid TOTP: Please enter a valid 6-digit numeric code.'
100100

101101

102-
def validate_totp_code(raw_code: str, totp_is_valid=None) -> TotpValidationResult:
102+
def validate_totp_code(raw_code: str) -> TotpValidationResult:
103103
"""Validate and normalize a user-supplied TOTP value.
104104
105105
Precedence:
106-
1) Trim & normalize input (strip spaces and separators; normalize full-width digits)
107-
2) Check emptiness, length == 6, and numeric-only
106+
1) Trim & normalize input (normalize full-width digits; strip leading/trailing whitespace only)
107+
2) Empty check
108+
3) Length check (must be exactly 6)
109+
4) Numeric-only check (digits 0–9 only; do not remove internal separators)
108110
109111
Returns TotpValidationResult(ok, code, message).
110112
- Success: `ok=True`, `code` is a 6-digit ASCII string, `message=''`.
111113
- Failure: `ok=False`, `code=''`, `message` is always the generic INVALID_TOTP_MSG.
112-
`totp_is_valid` is reserved for optional server-side checks and ignored here.
113114
"""
114115
try:
115116
s = raw_code if raw_code is not None else ''
116117
# Normalize Unicode (convert full-width digits etc. to ASCII)
117118
s = unicodedata.normalize('NFKC', s)
118119
# Strip leading/trailing whitespace
119120
s = s.strip()
120-
# Remove common separators inside the code
121-
# Spaces, hyphens, underscores, dots, and common dash-like characters
122-
separators = {' ', '\t', '\n', '\r', '\f', '\v', '-', '_', '.',
123-
'\u2012', '\u2013', '\u2014', '\u2212', '\u00B7', '\u2027', '\u30FB'}
124-
# Replace all occurrences of separators
125-
for sep in list(separators):
126-
s = s.replace(sep, '')
127-
128-
# Empty / length / numeric checks
129-
if s == '' or len(s) != 6 or not s.isdigit():
121+
# Empty / length / numeric checks (do not remove internal separators)
122+
if s == '':
123+
return TotpValidationResult(False, '', INVALID_TOTP_MSG)
124+
if len(s) != 6:
125+
return TotpValidationResult(False, '', INVALID_TOTP_MSG)
126+
if not s.isdigit():
130127
return TotpValidationResult(False, '', INVALID_TOTP_MSG)
131128

132129
# All good
@@ -362,11 +359,11 @@ def __init__(self, options: Optional[Dict[str, Any]] = None) -> None:
362359
if not isinstance(self.totp, str):
363360
raise TypeError('The value of connection option "totp" should be a string')
364361
# Validate using local validator
365-
result = validate_totp_code(self.totp, totp_is_valid=None)
362+
result = validate_totp_code(self.totp)
366363
if not result.ok:
367-
msg = result.message or INVALID_TOTP_MSG
368-
self._logger.error(f'Authentication failed: {msg}')
369-
raise errors.ConnectionError(f'Authentication failed: {msg}')
364+
msg = INVALID_TOTP_MSG
365+
self._logger.error(msg)
366+
raise errors.ConnectionError(msg)
370367
# normalized digits-only code
371368
self.totp = result.code
372369
self._logger.info('TOTP received in connection options')
@@ -1030,9 +1027,9 @@ def send_startup(totp_value=None):
10301027
short_msg = match.group(1).strip() if match else error_msg.strip()
10311028

10321029
if "Invalid TOTP" in short_msg:
1033-
self._logger.error(f"Authentication failed: {INVALID_TOTP_MSG}")
1030+
self._logger.error(INVALID_TOTP_MSG)
10341031
self.close_socket()
1035-
raise errors.ConnectionError(f"Authentication failed: {INVALID_TOTP_MSG}")
1032+
raise errors.ConnectionError(INVALID_TOTP_MSG)
10361033

10371034
# Generic error fallback
10381035
self._logger.error(short_msg)
@@ -1055,11 +1052,11 @@ def send_startup(totp_value=None):
10551052
totp_input = sys.stdin.readline().strip()
10561053

10571054
# Validate using local precedence-based validator
1058-
result = validate_totp_code(totp_input, totp_is_valid=None)
1055+
result = validate_totp_code(totp_input)
10591056
if not result.ok:
10601057
msg = INVALID_TOTP_MSG
1061-
self._logger.error(f"Authentication failed: {msg}")
1062-
raise errors.ConnectionError(f"Authentication failed: {msg}")
1058+
self._logger.error(msg)
1059+
raise errors.ConnectionError(msg)
10631060
totp_input = result.code
10641061
# ✅ Valid TOTP — retry connection
10651062
totp = totp_input

0 commit comments

Comments
 (0)