-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathcore.py
More file actions
280 lines (253 loc) · 13.1 KB
/
core.py
File metadata and controls
280 lines (253 loc) · 13.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
import re
import os
import uuid
from pydbml import PyDBML
from pydbml.classes import Enum
from pathlib import Path
from itertools import chain
def toSQLite(dbml=".", emulation="full", tableExists=True, indexExists=True, join=True):
"""
Given a DBML file, convert contents to valid SQLite.
Parameters:
dbml (str): a valid string for converting to a Path object. Should point to a `.dbml` file containing valid DBML *or* a directory containing such files. Default is a period, in which case current working directory will be searched and all such files will be parsed.
emulation (str): specifies emulation mode for enum functionality since it is not directly supported by SQLite. Default is "full", and the other option is "half".
tableExists (bool): Default is True. If True, all generated `CREATE TABLE` SQLite statements will have `IF NOT EXISTS` language included.
indexExists (bool): Default is True. If True, all generated `CREATE INDEX` SQLite statements will have `IF NOT EXISTS` language included.
join (bool): Default is True. If True, function will `join` the result list of string segments with an empty string and return the resulting string to you. Otherwise, the one-dimensional list of string segments will be returned to you directly.
Returns:
str or list of str: a valid sequence of SQLite syntax.
"""
results = []
p = Path(dbml)
p.resolve()
if not p.exists():
raise ValueError(f'Argument "{dbml}" does not refer to an existing file or directory.')
if p.is_file():
if validDBMLFile(dbml):
results.append(processFile(p, emulation, tableExists=tableExists, indexExists=indexExists, join=False))
results = list(chain.from_iterable(results))
if join:
results = "".join(results)
return results
else:
raise ValueError(f'Argument "{dbml}" is a path to a file, but it does not have a `.dbml` extension.')
elif p.is_dir():
targets = [f for f in p.glob('*.dbml')]
for target in targets:
results.append(processFile(target, emulation, tableExists=tableExists, indexExists=indexExists, join=False))
results = list(chain.from_iterable(results))
if join:
results = "".join(results)
return results
def validDBMLFile(s):
"""
Return a boolean indicating whether passed string has valid `.dbml` file extension. Case-sensitive (i.e. `.DBML` not accepted).
Parameters:
s (str): name of file.
Returns:
bool: True if s ends with '.dbml', else False.
"""
if isinstance(s, Path):
return s.suffix == '.dbml'
else:
return s.endswith('.dbml')
def processFile(target, emulationMode, tableExists=True, indexExists=True, idxNameFunc=uuid.uuid4, join=True):
"""
Given a target `.dbml` file, parse and generate a valid SQLite string.
Parameters:
target (Path): File with contents to convert to SQLite.
emulationMode (str): Specifies "half" or "full" emulation for enum functionality in SQLite.
tableExists (bool): Default is True. If True, all generated `CREATE TABLE` SQLite statements will have `IF NOT EXISTS` language included.
indexExists (bool): Default is True. If True, all generated `CREATE INDEX` SQLite statements will have `IF NOT EXISTS` language included.
join (bool): Default is True. If True, function will `join` the result list of string segments with an empty string and return the resulting string to you. Otherwise, the one-dimensional list of string segments will be returned to you directly.
Returns:
str: A valid SQLite string.
"""
parsed = PyDBML.parse_file(str(target))
statements = []
if emulationMode == 'full':
for enum in parsed.enums:
statements.append(processEnum(enum, tableExists, False))
for table in parsed.tables:
statements.append(processTable(table, emulationMode, tableExists, False))
for table in parsed.tables:
for index in table.indexes:
statements.append(processIndex(table, index, idxNameFunc, indexExists=indexExists, join=False))
statements = list(chain.from_iterable(statements))
if join:
statements = "".join(statements)
return statements
def processIndex(table, index, idxNameFunc=uuid.uuid4, indexExists=True, join=True):
"""
Given objects produced by the PyDBML library (or appropriately mocked), generate valid SQLite DDL for creating indexes.
Parameters:
table (Table): a Table object generated by the PyDBML library. This object should represent the SQLite table relevant to the index you want to create.
index (Index): an Index object generated by the PyDBML library. This object should represent the SQLite index you want to create.
idxNameFunc (function): defaults to `uuid.uuid4`. Can mock that function by passing a different function that returns a more predictable result. The result of calling this argument in either case is used as the name of an index if one is not provided for any `CREATE INDEX` statements.
indexExists (bool): Default is True. If True, the generated `CREATE INDEX` SQLite statement will have `IF NOT EXISTS` language included.
join (bool): Default is True. If True, function will `join` the result list of string segments with an empty string and return the resulting string to you. otherwise, the one-dimensional list of string segments will be returned to you directly.
Returns:
str or list of str: SQLite DDL for creating an index.
"""
parts = []
parts.append(f'CREATE{" UNIQUE" if index.unique else ""} INDEX ')
if indexExists:
parts.append('IF NOT EXISTS ')
if index.name != "" and index.name != None:
parts.append(index.name)
else:
parts.append('_' + ''.join(str(idxNameFunc()).split('-')))
parts.append(f' ON {table.name} (')
for i, col in enumerate(index.subjects):
parts.append(col.name)
if i < len(index.subjects) - 1:
parts.append(', ')
parts.append(');\n')
if join:
parts = "".join(parts)
return parts
def processEnum(enum, tableExists=True, join=True):
"""
Take an Enum object generated by the PyDBML library and use it to generate SQLite DDL for creating an enum table for "full" enum emulation mode only.
Parameters:
enum (Enum): Enum object generated by PyDBML library representing an SQL enum.
tableExists (bool): Default is True. If True, all generated `CREATE TABLE` SQLite statements will have `IF NOT EXISTS` language included.
join (bool): Default is True. If True, function will `join` the result list of string segments with an empty string and return the resulting string to you. Otherwise, the one-dimensional list of string segments will be returned to you directly.
Returns:
str or list of str: SQLite DDL for creating a table to emulate SQL enum functionality.
"""
segments = []
segments.append(f'CREATE TABLE {"IF NOT EXISTS" if tableExists else ""} {enum.name} (\n id INTEGER PRIMARY KEY,\n type TEXT NOT NULL UNIQUE,\n seq INTEGER NOT NULL UNIQUE\n);\n')
for i, v in enumerate(enum.items):
segments.append(f'INSERT INTO {enum.name}(type, seq) VALUES (\'{v.name}\', {i + 1});\n')
if join:
segments = "".join(segments)
return segments
def processTable(table, emulationMode, tableExists=True, join=True):
"""
Generate SQLite DDL for creating a table.
Parameters:
table (Table): Table object generated by PyDBML, representing SQLite table you want to make.
emulationMode (str): if SQL enums are defined by dbml parsed by PyDBML, there are two ways to emulate them. Passing "full" for this parameter emulates enum by making a separate enum table. Passing "half" simply uses SQLite CHECK statements within column definitions utilizing enum types.
tableExists (bool): Default is True. If True, all generated `CREATE TABLE` SQLite statements will have `IF NOT EXISTS` language included.
join (bool): Default is True. If True, function will `join` the result list of string segments with an empty string and return the resulting string to you. Otherwise, the one-dimensional list of string segments will be returned to you directly.
Return:
str or list of str: SQLite DDL for generating a table.
"""
segments = []
segments.append('CREATE TABLE ')
if tableExists:
segments.append('IF NOT EXISTS ')
segments.append(f'{table.name} (\n')
for i, col in enumerate(table.columns):
segments.append(processColumn(col, emulationMode, False))
if i < len(table.columns) - 1:
segments.append(',\n')
for j, ref in enumerate(table.refs):
if j == 0:
segments.append(',\n')
segments.append(processRef(ref, False))
if j < len(table.refs) - 1:
segments.append(',\n')
segments.append('\n);\n')
segments = list(chain.from_iterable(segments))
if join:
segments = "".join(segments)
return segments
def processRef(ref, join=True):
"""
Convert a Ref object parsed by PyDBML from dbml into SQLite DDL.
Parameters:
ref (Ref): Ref object generated by PyDBML.
join (bool): Default is True. If True, function will `join` the result list of string segments with an empty string and return the resulting string to you. Otherwise, the one-dimensional list of string segments will be returned to you directly.
Returns:
str or list of str: SQLite DDL for defining a foreign key within a `CREATE TABLE` statement.
"""
segments = []
segments.append(' FOREIGN KEY(')
segments.append(f'{ref.col.name}) REFERENCES {ref.ref_table.name}({ref.ref_col.name})')
if ref.on_update:
segments.append(f' ON UPDATE {ref.on_update.upper()}')
if ref.on_delete:
segments.append(f' ON DELETE {ref.on_delete.upper()}')
if join:
segments = "".join(segments)
return segments
def processColumn(column, emulationMode, join=True):
"""
Generate SQLite DDL for creating a column.
Parameters:
column (Column): the Column object generated by PyDBML library.
emulationMode (str): "half" or "full" emulation of SQL enums for SQLite. The former uses `CHECK` statements within column definitions, and the latter uses separate tables.
join (bool): Default is True. If True, function will `join` the result list of string segments with an empty string and return the resulting string to you. Otherwise, the one-dimensional list of string segments will be returned to you directly.
Returns:
str or list of str: SQLite DDL for creating a column.
"""
segments = []
segments.append(f' {column.name}')
if isinstance(column.type, str):
segments.append(f' {coerceColType(column.type)}')
if column.pk:
segments.append(' PRIMARY KEY')
if column.autoinc:
segments.append(' AUTOINCREMENT')
if column.not_null:
segments.append(' NOT NULL')
if column.unique:
segments.append(' UNIQUE')
if column.default != None:
segments.append(' DEFAULT ')
if isinstance(column.default, str):
segments.append("'")
segments.append(column.default)
else:
segments.append(str(column.default))
if isinstance(column.default, str):
segments.append("'")
elif isinstance(column.type, Enum):
if emulationMode == 'full':
segments.append(f' TEXT NOT NULL REFERENCES {column.type.name}(type)')
else:
segments.append(f' TEXT CHECK( {column.name} IN ( ')
enums = []
for e in column.type.items:
enums.append(f"'{e.name}'")
segments.append(", ".join(enums))
segments.append(' ) ) NOT NULL')
else:
raise TypeError('Data type of column specification unknown.')
if join:
segments = "".join(segments)
return segments
def coerceColType(colType):
"""
Given a colType, coerce to closest native SQLite type and return that, otherwise raise a ValueError.
Parameters:
colType (str): column type from DBML specification.
Returns:
str: valid SQLite column type.
"""
colType = colType.upper()
nativeTypes = ('NULL', 'INTEGER', 'REAL', 'TEXT', 'BLOB')
if colType in nativeTypes:
return colType
nils = ('NONE', 'NIL')
if colType in nils:
return 'NULL'
integers = ('BOOL', 'BOOLEAN', 'INT', 'TINYINT', 'SMALLINT', 'MEDIUMINT', 'LONGINT', 'BIGINT', 'YEAR')
if colType in integers:
return 'INTEGER'
reals = ('FLOAT', 'DOUBLE', 'DECIMAL', 'NUMERIC')
if colType in reals:
return 'REAL'
texts = ('STR', 'DATE', 'DATETIME', 'TIMESTAMP', 'TIME', 'VARCHAR', 'TINYTEXT', 'SMALLTEXT', 'MEDIUMTEXT', 'LONGTEXT')
if colType in texts:
return 'TEXT'
blobs = ('TINYBLOB', 'SMALLBLOB', 'MEDIUMBLOB', 'LONGBLOB', 'BYTE', 'BYTES', 'UUID')
if colType in blobs:
return 'BLOB'
res = re.search(r'VARCHAR\([0-9]+\)', colType)
if res:
return 'TEXT'
else:
raise ValueError(f'Could not figure out how to coerce "{colType}" to valid SQLite type.')