-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpoly_result.py
More file actions
104 lines (81 loc) · 3.28 KB
/
poly_result.py
File metadata and controls
104 lines (81 loc) · 3.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import json
from typing import Union, List
from prettytable import PrettyTable, PLAIN_COLUMNS
def build_result(raw_result: Union[str, dict, List[dict]]):
result_set = get_result_dict(raw_result)
if 'error' in result_set:
return ErrorPolyResult(result_set)
if 'header' not in result_set:
return InfoPolyResult(result_set)
return QueryPolyResult(result_set)
def get_result_dict(raw_result: Union[str, dict, List[dict]]) -> Union[None, dict]:
if isinstance(raw_result, str):
try:
raw_result = json.loads(raw_result)
except json.JSONDecodeError:
return None
if isinstance(raw_result, list):
raw_result = raw_result[-1] # return the last result_set if several results are present
return raw_result
def get_type_from_string(type_string):
cleaned = type_string.strip().upper()
if cleaned in ['INTEGER', 'BIGINT', 'SMALLINT', 'TINYINT']:
return int
if cleaned == 'BOOLEAN':
return bool
if cleaned in ['DECIMAL', 'DOUBLE', 'REAL']:
return float
if cleaned in ['ARRAY', 'JSON', 'DOCUMENT', 'DOCUMENT NOT NULL']:
return json.loads
if cleaned.startswith('PATH') or cleaned.startswith('NODE'):
return json.loads
return None
def cast_data(raw_data, header):
data = [row[:] for row in raw_data] # Create shallow copy
data_types = [get_type_from_string(col['dataType']) for col in header]
for col_idx, col_type in enumerate(data_types):
if col_type is None:
continue
for row_idx in range(len(data)):
try:
data[row_idx][col_idx] = col_type(data[row_idx][col_idx])
except (TypeError, json.JSONDecodeError):
pass
return data
class QueryPolyResult(list): # Data is stored in a nested list. For simplicity, we do not use Numpy, but Python lists
def __init__(self, result_set):
self.result_set = result_set
self.type = result_set['namespaceType']
self._header = result_set['header']
self.keys = [col['name'] for col in self._header]
self._data = cast_data(result_set['data'], self._header)
self._pretty = PrettyTable(self.keys)
self._pretty.add_rows(self._data)
self._pretty.set_style(PLAIN_COLUMNS)
super().__init__(self._data)
def __repr__(self):
return self._pretty.get_string()
def _repr_html_(self):
return self._pretty.get_html_string()
def dicts(self):
return [dict(zip(self.keys, row)) for row in self._data]
def as_df(self):
import pandas as pd # only import pandas if required
return pd.DataFrame.from_records(self._data, columns=self.keys)
class InfoPolyResult:
def __init__(self, result_set):
self.result_set = result_set
def __repr__(self):
rows = ['Successfully executed:',
'Query:'.ljust(30) + str(self.result_set['query'])
]
return "\n".join(rows)
class ErrorPolyResult:
def __init__(self, result_set):
self.result_set = result_set
def __repr__(self):
rows = ['ERROR:',
'Query:'.ljust(30) + str(self.result_set['query']),
'Message:'.ljust(30) + str(self.result_set['error'])
]
return "\n".join(rows)