-
Notifications
You must be signed in to change notification settings - Fork 77
Expand file tree
/
Copy pathstdlib_tools.py
More file actions
172 lines (140 loc) · 6.21 KB
/
stdlib_tools.py
File metadata and controls
172 lines (140 loc) · 6.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import re
import csv
import logging
from mfr.extensions.tabular import utilities
from mfr.extensions.tabular.settings import MAX_FILE_SIZE, INIT_SNIFF_SIZE
from mfr.extensions.tabular.exceptions import EmptyTableError, TabularRendererError
logger = logging.getLogger(__name__)
def csv_stdlib(fp):
"""Read and convert a csv file to JSON format using the python standard library
Quirk: ``csv.Sniffer().sniff()`` needs the FULL first row and ONLY one full row to be able to
effectively detect the correct dialect of the file.
:param fp: the file pointer object
:return: a tuple of table headers and data
"""
# Prepare the first row for sniffing
data = fp.read(INIT_SNIFF_SIZE)
data = _trim_or_append_data(fp, data, INIT_SNIFF_SIZE, 0)
# Reset the file pointer
fp.seek(0)
# Sniff the first row to find a matching format
try:
dialect = csv.Sniffer().sniff(data)
except csv.Error:
dialect = csv.excel
else:
_set_dialect_quote_attrs(dialect, data)
# Explicitly delete data when it is on longer used.
del data
# Create the CSV reader with the detected dialect
reader = csv.DictReader(fp, dialect=dialect)
# Update the reader field names to avoid duplicate column names when performing row extraction
columns = []
for idx, fieldname in enumerate(reader.fieldnames or []):
column_count = sum(1 for column in columns if fieldname == column['name'])
if column_count:
unique_fieldname = '{}-{}'.format(fieldname, column_count + 1)
reader.fieldnames[idx] = unique_fieldname
else:
unique_fieldname = fieldname
columns.append({
'id': unique_fieldname,
'field': unique_fieldname,
'name': fieldname,
'sortable': True,
})
try:
rows = [row for row in reader]
except csv.Error as e:
if any("field larger than field limit" in errorMsg for errorMsg in e.args):
raise TabularRendererError(
'This file contains a field too large to render. '
'Please download and view it locally.',
code=400,
extension='csv',
) from e
else:
raise TabularRendererError('csv.Error: {}'.format(e), extension='csv') from e
if not columns and not rows:
raise EmptyTableError('Table empty or corrupt.', extension='csv')
del reader
return {'Sheet 1': (columns, rows)}
def sav_stdlib(fp):
"""Read and convert a .sav file to .csv with pspp, then convert that to JSON format using
the python standard library
:param fp: File pointer object to a .sav file
:return: tuple of table headers and data
"""
csv_file = utilities.sav_to_csv(fp)
with open(csv_file.name, 'r') as file:
csv_file.close()
return csv_stdlib(file)
def _set_dialect_quote_attrs(dialect, data):
"""Set quote-related dialect attributes based on up to 2kb of csv data.
The regular expressions search for things that look like the beginning of
a list, wrapped in a quotation mark that is not dialect.quotechar, with
list items wrapped in dialect.quotechar and seperated by commas.
Example matches include:
"['1', '2', '3' for quotechar == '
'{"a", "b", "c" for quotechar == "
"""
if dialect.quotechar == '"':
if re.search('\'[[({]".+",', data):
dialect.quotechar = "'"
if re.search("'''[[({]\".+\",", data):
dialect.doublequote = True
elif dialect.quotechar == "'":
if re.search("\"[[({]'.+',", data):
dialect.quotechar = '"'
if re.search('"""[[({]\'.+\',', data):
dialect.doublequote = True
def _trim_or_append_data(fp, text, read_size, size_to_sniff, max_render_size=MAX_FILE_SIZE):
"""Recursively read data from a file and return its full first row. The file starts with
``text`` and the file pointer points to the next character immediately after `text`.
:param fp: the file pointer from which data is read
:param text: the current text chunk to check the new line character
:param read_size: the last read size when `fp.read()` is called
:param size_to_sniff: the accumulated size fo the text to sniff
:param max_render_size: the max file size for render
:return: the first row of the file in string
"""
# Return on empty text. This handles the corner case where the CSV is empty or only contains
# one line without any new line characters.
if len(text) == 0:
return ''
# Try to find the first new line character in the text chunk
index = _find_new_line(text)
# If found, return the trimmed substring
if index != -1:
return text[:index]
# Otherwise, update `sniff_size` and then sniff more (2 times of the last `read_size`) text
size_to_sniff += read_size
read_size *= 2
more_text = fp.read(read_size)
# If text to sniff now goes over the max file size limit, raise the renderer error since there
# is no need to sniff when the file is already too large to be rendered.
if size_to_sniff + len(more_text) >= max_render_size:
raise TabularRendererError(
'The first row of this file is too large for the sniffer to detect the dialect. '
'Please download and view it locally.',
code=400,
extension='csv'
)
# If the size is still within the limit, recursively check `more_text`
return text + _trim_or_append_data(fp, more_text, read_size, size_to_sniff,
max_render_size=max_render_size)
def _find_new_line(text):
"""In the given text string, find the index of the first occurrence of any of the three types
of new line character. Note: '\n\r' is not a new line character but two, one LF and one CR.
1. \r\n Carriage Return (CR) and Line Feed (LF), must be checked first
2. \n LF
3. \r CR
:param text: the text string to check
:return: the index of the first new line character if found. Otherwise, return -1.
"""
index = text.find('\r\n')
if index == -1:
index = text.find('\n')
if index == -1:
index = text.find('\r')
return index