-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathdescriptor.py
More file actions
164 lines (126 loc) · 5.02 KB
/
descriptor.py
File metadata and controls
164 lines (126 loc) · 5.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import json
import os
from hashlib import md5
from data_resource_api.logging import LogFactory
logger = LogFactory.get_console_logger("descriptor-utils")
class DescriptorsLoader:
"""Yields Descriptor objects when given a list and/or a directory of
descriptors.
Use iter_descriptors() to yield.
"""
def __init__(self, directories: list = [], dict_descriptors: list = []):
self.directories = directories
self.dict_descriptors = dict_descriptors
def iter_descriptors(self):
files = DescriptorsFromDirectory(self.directories).iter_files()
yield from files
for descriptor in self.dict_descriptors:
yield Descriptor(descriptor)
class DescriptorsFromDirectory:
"""Helper class that handles yielding descriptors from a directory.
Use iter_files() to yield Descriptor objects.
"""
def __init__(self, directories: list):
self.directories = directories
def iter_files(self):
yield from self._get_from_dir()
def _get_from_dir(self):
for directory in self.directories:
self._check_if_path_exists(directory)
for file_name in self._get_files_from_dir(directory):
try:
yield DescriptorFromFile(directory, file_name).get_descriptor_obj()
except (Exception, ValueError, RuntimeError) as e:
logger.error(e)
continue
def _check_if_path_exists(self, dir_path):
if not os.path.exists(dir_path) or not os.path.isdir(dir_path):
raise RuntimeError(f"Unable to locate schema directory '{dir_path}'")
def _get_files_from_dir(self, directory):
yield from sorted([f for f in os.listdir(directory) if f.endswith(".json")])
class DescriptorFromFile:
"""Helper class that handles creating a descriptor when given a file
path."""
def __init__(self, schema_dir: str, file_name: str):
self._check_if_the_file_is_a_directory(schema_dir, file_name)
self.descriptor_obj = self._create_descriptor(schema_dir, file_name)
def get_descriptor_obj(self):
return self.descriptor_obj
def _check_if_the_file_is_a_directory(self, schema_dir: str, file_name: str):
if os.path.isdir(os.path.join(schema_dir, file_name)):
raise RuntimeError(
f"Cannot open a directory '{file_name}' as a descriptor."
)
def _create_descriptor(self, schema_dir: str, file_name: str):
try:
with open(os.path.join(schema_dir, file_name)) as fh:
try:
descriptor_dict = json.load(fh)
except ValueError:
raise RuntimeError(
f"Failed to load JSON file. JSON is probably invalid in file '{os.path.join(schema_dir, file_name)}'"
)
except RuntimeError:
raise
except Exception:
raise RuntimeError(f"Error opening schema {file_name}")
return Descriptor(descriptor_dict, file_name)
class Descriptor:
"""Stores all of the procedures for extracting data from descriptors."""
def __init__(self, descriptor: dict, file_name: str = ""):
self._descriptor = descriptor
self._set_file_name(file_name, self.table_name)
@property
def table_name(self):
try:
return self._descriptor["datastore"]["tablename"]
except KeyError:
raise RuntimeError(
"Error finding data in descriptor. Descriptor file may not be valid."
)
@property
def table_schema(self):
try:
return self._descriptor["datastore"]["schema"]
except KeyError:
raise RuntimeError(
"Error finding data in descriptor. Descriptor file may not be valid."
)
@property
def api_schema(self):
try:
return self._descriptor["api"]["methods"][0]
except KeyError:
raise RuntimeError(
"Error finding data in descriptor. Descriptor file may not be valid."
)
@property
def data_resource_name(self):
try:
return self._descriptor["api"]["resource"]
except KeyError:
raise RuntimeError(
"Error finding data in descriptor. Descriptor file may not be valid."
)
@property
def restricted_fields(self):
try:
return self._descriptor["datastore"]["restricted_fields"]
except KeyError:
return []
@property
def descriptor(self):
return self._descriptor
@property
def file_name(self):
return self._file_name
def get_checksum(self) -> str:
model_checksum = md5( # nosec
json.dumps(self.table_schema, sort_keys=True).encode("utf-8")
).hexdigest()
return model_checksum
def _set_file_name(self, file_name: str, table_name: str):
if file_name == "":
self._file_name = f"{table_name}.json"
else:
self._file_name = file_name