-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtablevalidator.py
More file actions
86 lines (65 loc) · 2.79 KB
/
tablevalidator.py
File metadata and controls
86 lines (65 loc) · 2.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# Copyright (c) 2020, Caltech IPAC.
# This code is released with a BSD 3-clause license. License information is at
# https://github.com/Caltech-IPAC/nexsciTAP/blob/master/LICENSE
import logging
class TableValidator:
"""
Validates that table names in an ADQL query are registered in
TAP_SCHEMA.tables, preventing access to unauthorized database objects.
"""
def __init__(self, conn, debug=0):
self.conn = conn
self.debug = debug
self.allowed_tables = set()
self.allowed_bare = set()
self.allowed_schemas = set()
self._load_allowed_tables()
def _load_allowed_tables(self):
cursor = self.conn.cursor()
cursor.execute('SELECT table_name FROM TAP_SCHEMA.tables')
rows = cursor.fetchall()
cursor.close()
for row in rows:
full_name = row[0].strip().lower()
self.allowed_tables.add(full_name)
if '.' in full_name:
schema, bare = full_name.split('.', 1)
self.allowed_bare.add(bare)
self.allowed_schemas.add(schema)
else:
self.allowed_bare.add(full_name)
if self.debug:
logging.debug('')
logging.debug(
f'TableValidator: loaded {len(self.allowed_tables)} '
f'allowed tables: {self.allowed_tables}')
def validate(self, table_names):
if not table_names:
raise Exception('No table names to validate.')
for tname in table_names:
tname_lower = tname.strip().lower()
# Exact match against full table names (e.g. "tap_schema.columns")
if tname_lower in self.allowed_tables:
continue
if '.' in tname_lower:
schema, bare = tname_lower.split('.', 1)
# Schema-qualified query table: only match if the schema
# is one we know about AND the bare name is allowed.
# This prevents "information_schema.tables" from matching
# just because "tables" is a bare name in TAP_SCHEMA.
if schema in self.allowed_schemas and \
bare in self.allowed_bare:
continue
else:
# Unqualified query table: bare-name match is fine
# (e.g. query says "columns", whitelist has
# "tap_schema.columns")
if tname_lower in self.allowed_bare:
continue
raise Exception(
f'Table \'{tname}\' is not available for querying. '
f'Use TAP_SCHEMA.tables to see available tables.')
if self.debug:
logging.debug('')
logging.debug(
f'TableValidator: all tables validated: {table_names}')