-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdeploy.py
More file actions
152 lines (118 loc) · 4.21 KB
/
deploy.py
File metadata and controls
152 lines (118 loc) · 4.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import argparse
import importlib.util
import sys
from glob import glob
import sqlalchemy as sa
import yaml
from sqlalchemy import Engine
from sqlalchemy.exc import IntegrityError
from sqlalchemy.exc import ProgrammingError
from helpers.cnxns_helper import get_cnxns
def _deploy_tables(
cnxn: Engine,
*instances: str
) -> None:
"""
Execute CREATE TABLE SQL Scripts.
Given a config object and a SQL ALCHEMY engine object, collect
CREATE TABLE SQL Scripts from the definitions subdirectory and iterate
over the resultant dictionary, exectuing the scripts against the engine.
Args:
ddl_dict (Dictionary): Dictionary of CREATE SQL Scripts.
cnxn (Engine): SQL ALCHEMY engine object for database.
*instances (String): One or more instances to run deploy for.
Returns:
None
"""
modules = []
exceptions = []
for instance in instances:
ddl = glob(f"definitions/{instance}.py")
if len(ddl) > 0:
modules.extend(glob(f"definitions/{instance}.py"))
else:
exceptions.append(instance)
err = f"{', '.join(exceptions)} not valid instance(s) for deploy"
assert len(exceptions) == 0, err
data_definition_libraries = {}
for module in modules:
spec = importlib.util.spec_from_file_location(module, module)
if not spec or not spec.loader:
raise ImportError("Cannot load module 'my_module'")
definitions = importlib.util.module_from_spec(spec)
sys.modules[module] = definitions
spec.loader.exec_module(definitions)
data_definition_libraries.update(definitions.get_ddl())
with cnxn.connect() as c:
for ddl in data_definition_libraries.values():
try:
c.execute(sa.text(ddl))
except ProgrammingError as e:
error = repr(e)
if "There is already an object named" in error:
pass # the table already exists
elif "already exists" in error:
pass # the database already exists
else:
raise Exception(error)
c.close()
def _populate_entity_params(
cnxn: Engine,
*instances: str
) -> None:
"""
Execute INSERT TABLE SQL Scripts.
Given a config object and a SQL ALCHEMY engine object, collect INSERT SQL
Scripts from the entity_list subdirectory and iterate over the resultant
dictionary, exectuing the scripts against the engine.
Args:
cnxn (Engine): SQL ALCHEMY engine object for database.
*instances (String): instances to populate.
Returns:
None
"""
if instances:
modules = []
for instance in instances:
modules.extend(glob(f"entity_params/{instance}_params.py"))
print(modules)
else:
modules = glob("entity_params/*.py")
entity_list = {}
for module in modules:
spec = importlib.util.spec_from_file_location(module, module)
if not spec or not spec.loader:
raise ImportError("Cannot load module 'my_module'")
entities = importlib.util.module_from_spec(spec)
sys.modules[module] = entities
spec.loader.exec_module(entities)
entity_list.update(entities.populate_entity_list())
with cnxn.connect() as c:
for entity in entity_list.keys():
try:
c.execute(sa.text(entity_list[entity]))
except IntegrityError as e:
error = repr(e)
if "Cannot insert duplicate key" in error:
pass # the table has already been populated
else:
raise Exception(error)
c.close()
def run(
config: dict,
*instances: str,
) -> None:
"""
Main run function.
"""
cnxn = get_cnxns(config)["ods"]
_deploy_tables(cnxn, *instances)
_populate_entity_params(cnxn, *instances)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--instances", type=str, nargs="*", default=[])
args = parser.parse_args()
instances = args.instances
with open("config.yaml", "r") as f:
config = yaml.safe_load(f)
run(config, *instances)