-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathvalidate_ol_events.py
More file actions
306 lines (250 loc) · 13.8 KB
/
validate_ol_events.py
File metadata and controls
306 lines (250 loc) · 13.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
import argparse
import traceback
import jsonc
import logging
import os
import re
from jsonschema.exceptions import best_match, by_relevance
from os import listdir
from os.path import isfile, join, isdir
from jsonschema import Draft202012Validator
from report import Test, Scenario, Component, Report
from compare_releases import release_between
from compare_events import diff
from jsonschema import RefResolver
class OLSyntaxValidator:
def __init__(self, schema_validators, unchecked_facets=None, openlineage_version=None):
self.schema_validators = schema_validators
self.unchecked_facets = unchecked_facets or {}
self.openlineage_version = openlineage_version
@staticmethod
def is_custom_facet(facet, schema_type):
if facet.get('_schemaURL') is not None:
is_custom = any(facet.get('_schemaURL').__contains__(f'defs/{facet_type}Facet') for facet_type in
['Run', 'Job', 'Dataset', 'InputDataset', 'OutputDataset'])
if is_custom:
print(f"facet {schema_type} seems to be custom facet, validation skipped")
return True
return False
@staticmethod
def is_facet(path):
# List of facets from the common package that can be used for syntax validation
common_facets = ['dbt-node-job-facet.json','dbt-version-run-facet.json','dbt-run-run-facet.json']
return 'Facet.json' in path or path in common_facets
@classmethod
def get_validators(cls, spec_path, tags):
return {tag: cls.get_validator(spec_path, tag) for tag in tags}
@classmethod
def get_validator(cls, spec_path, tag, unchecked_facets=None):
file_paths = listdir(join(spec_path, tag))
facet_schemas = { path: load_json(join(spec_path, tag, path)) for path in file_paths if cls.is_facet(path) }
spec_schema = next(load_json(join(spec_path, tag, path)) for path in file_paths if path.__contains__('OpenLineage.json'))
schema_validators = {}
for path, schema in facet_schemas.items():
try:
name = next(iter(schema['properties']))
store = {
spec_schema['$id']: spec_schema,
schema['$id']: schema,
}
resolver = RefResolver(base_uri="", referrer=spec_schema, store=store)
schema_validators[name] = Draft202012Validator(schema, resolver=resolver)
except KeyError as e:
print(f"WARNING: Cannot create schema validator for '{path}' due to missing key:", e)
schema_validators['core'] = Draft202012Validator(spec_schema)
return cls(schema_validators, unchecked_facets, tag)
def validate_entity(self, instance, schema_type, name):
# Check if this facet should be skipped for the current version
if self.should_skip_facet(schema_type):
print(f"Skipping validation for facet '{schema_type}' in version {self.openlineage_version} (unchecked_facets)")
return []
try:
schema_validator = self.schema_validators.get(schema_type)
if schema_validator is not None:
errors = [error for error in schema_validator.iter_errors(instance)]
if len(errors) == 0:
return []
else:
return [f"{(e := best_match([error], by_relevance())).json_path}: {e.message}" for error in errors]
elif self.is_custom_facet(instance.get(schema_type), schema_type):
# facet type may be custom facet without available schema json file (defined only as class)
return []
else:
return [f"$.{schema_type} facet type {schema_type} not recognized"]
except Exception:
print(f"when validating {schema_type}, for instance of {name} following exception occurred \n {traceback.format_exc()}")
def should_skip_facet(self, facet_name):
if not self.unchecked_facets or not self.openlineage_version:
return False
if facet_name in self.unchecked_facets:
excluded_versions = self.unchecked_facets[facet_name]
if self.openlineage_version in excluded_versions:
return True
return False
def validate(self, event, name):
validation_result = []
run_validation = self.validate_entity(event, 'core', name)
run = self.validate_entity_map(event, 'run', name)
job = self.validate_entity_map(event, 'job', name)
inputs = self.validate_entity_array(event, 'inputs', 'facets', name)
input_ifs = self.validate_entity_array(event, 'inputs', 'inputFacets', name)
outputs = self.validate_entity_array(event, 'outputs', 'facets', name)
output_ofs = self.validate_entity_array(event, 'outputs', 'outputFacets', name)
validation_result.extend(run_validation)
validation_result.extend(run)
validation_result.extend(job)
validation_result.extend(inputs)
validation_result.extend(input_ifs)
validation_result.extend(outputs)
validation_result.extend(output_ofs)
return validation_result
def validate_entity_array(self, data, entity, generic_facet_type, name):
return [e.replace('$', f'$.{entity}[{ind}].facets')
for ind, i in enumerate(data[entity])
for k, v in (i.get(generic_facet_type).items() if generic_facet_type in i else {}.items())
for e in self.validate_entity({k: v}, k, name)]
def validate_entity_map(self, data, entity, name):
return [e.replace('$', f'$.{entity}.facets') for k, v in data[entity]['facets'].items() for e in
self.validate_entity({k: v}, k, name)]
class OLSemanticValidator:
def __init__(self, expected_events):
self.expected_events = expected_events
def validate(self, events):
tests = {}
for name, event, tags in self.expected_events:
details = self.validate_event(event, events)
if details is None:
details = ['one or more of .eventType, .job.name, .job.namespace not defined in expected event']
named_details = [f"'{name}' {detail}" for detail in details]
tests[name] = Test.simplified(name, 'semantics', 'openlineage', named_details, tags)
return tests
def validate_event(self, ee, events):
if 'job' in ee and 'eventType' in ee and 'name' in ee['job'] and 'namespace' in ee['job']:
found = [
f"event with .eventType: {ee['eventType']}, .job.name: {ee['job']['name']} and .job.namespace: {ee['job']['namespace']} not found in result events"]
for e in events.values():
event_types_match = self.fields_match(e['eventType'], ee['eventType'])
names_match = self.fields_match(e['job']['name'], ee['job']['name'])
namespaces_match = self.fields_match(e['job']['namespace'], ee['job']['namespace'])
if event_types_match and names_match and namespaces_match and len(found) > 0:
found = diff(ee, e)
return found
return None
@staticmethod
def fields_match(r, e) -> bool:
if e == r:
return True
# if the expected field is jinja
regex = re.compile(r"^\{\{\s*match\(result,\s*(['\"])(.*?)\1\s*\)\s*\}\}$")
pattern_match = regex.match(e)
if pattern_match:
# Extract the actual regex pattern from e
pattern = pattern_match.group(2)
# Check if r matches the regex pattern
return re.fullmatch(pattern, r) is not None
return False
def load_json(path):
with open(path) as f:
return jsonc.load(f)
def extract_pattern(identifier, patterns):
if patterns is not None:
for pattern in patterns:
match = re.search(pattern, identifier)
if match:
return match.group(0)
return identifier
def get_event_identifier(event, default, patterns):
if 'job' in event and 'eventType' in event and 'name' in event['job'] and 'namespace' in event['job']:
return f"{event['job']['namespace']}:{extract_pattern(event['job']['name'], patterns)}:{event['eventType']}"
else:
return default
def all_tests_succeeded(syntax_tests):
return not any(t.status == "FAILURE" for t in syntax_tests.values())
def get_expected_events(producer_dir, component, scenario_name, config, component_version, openlineage_version):
test_events = []
for test in config['tests']:
if check_versions(component_version, openlineage_version, test):
filepath = join(producer_dir, component, 'scenarios', scenario_name, test['path'])
body = load_json(filepath)
test_events.append((test['name'], body, test['tags']))
return test_events
def validate_scenario_syntax(result_events, validator, config):
syntax_tests = {}
for name, event in result_events.items():
identification = get_event_identifier(event, name, config.get('patterns'))
print(f"syntax validation for {identification}")
details = validator.validate(event, name)
syntax_tests[identification] = Test(identification, "FAILURE" if len(details) > 0 else "SUCCESS",
'syntax', 'openlineage', details, {})
return syntax_tests
def get_config(producer_dir, component, scenario_name):
if component == 'scenarios':
path = join(producer_dir, 'scenarios', scenario_name, 'config.json')
else:
path = join(producer_dir, component, 'scenarios', scenario_name, 'config.json')
with open(path) as f:
return jsonc.load(f)
def get_arguments():
parser = argparse.ArgumentParser(description="")
parser.add_argument('--event_base_dir', type=str, help="directory containing the reports")
parser.add_argument('--spec_base_dir', type=str, help="directory containing specs and facets")
parser.add_argument('--producer_dir', type=str, help="directory storing producers")
parser.add_argument('--component', type=str, help="component producing the validated events")
parser.add_argument('--component_version', type=str, help="component release used in generating events")
parser.add_argument('--openlineage_version', type=str, help="Comma separated list of Openlineage versions")
parser.add_argument('--target', type=str, help="target file")
args = parser.parse_args()
event_base_dir = args.event_base_dir
producer_dir = args.producer_dir
target = args.target
component = args.component
component_version = args.component_version
openlineage_version = args.openlineage_version
spec_base_dir = args.spec_base_dir
return event_base_dir, producer_dir, target, spec_base_dir, component, component_version, openlineage_version
def check_versions(component_version, openlineage_version, config):
component_versions = config.get("component_versions", {})
openlineage_versions = config.get("openlineage_versions", {})
return (release_between(component_version, component_versions.get("min"), component_versions.get("max")) and
release_between(openlineage_version, openlineage_versions.get("min"), openlineage_versions.get("max")))
def main():
base_dir, producer_dir, target, spec_dirs, component, component_version, openlineage_version = get_arguments()
scenarios = {}
if component == 'scenarios':
validators = OLSyntaxValidator.get_validators(spec_path=spec_dirs, tags=openlineage_version.split(','))
for scenario_name in listdir(base_dir):
scenario_path = get_path(base_dir, component, scenario_name)
if isdir(scenario_path):
config = get_config(producer_dir, component, scenario_name)
validator = validators.get(config.get('openlineage_version'))
if 'unchecked_facets' in config:
validator.unchecked_facets = config['unchecked_facets']
print(f"for scenario {scenario_name} validation version is {config.get('openlineage_version')}")
result_events = {file: load_json(path) for file in listdir(scenario_path) if
isfile(path := join(scenario_path, file))}
tests = validate_scenario_syntax(result_events, validator, config)
scenarios[scenario_name] = Scenario.simplified(scenario_name, tests)
report = Report({component: Component(component, 'producer', scenarios, "", "")})
else:
for scenario_name in listdir(base_dir):
config = get_config(producer_dir, component, scenario_name)
unchecked_facets = config.get('unchecked_facets')
validator = OLSyntaxValidator.get_validator(spec_path=spec_dirs, tag=openlineage_version, unchecked_facets=unchecked_facets)
scenario_path = get_path(base_dir, component, scenario_name)
expected = get_expected_events(producer_dir, component, scenario_name, config, component_version, openlineage_version)
result_events = {file: load_json(path) for file in listdir(scenario_path) if
isfile(path := join(scenario_path, file))}
tests = validate_scenario_syntax(result_events, validator, config)
if all_tests_succeeded(tests) and expected is not None:
for name, res in OLSemanticValidator(expected).validate(result_events).items():
tests[name] = res
scenarios[scenario_name] = Scenario.simplified(scenario_name, tests)
report = Report({component: Component(component, 'producer', scenarios, component_version, openlineage_version)})
with open(target, 'w') as f:
jsonc.dump(report.to_dict(), f, indent=2)
def get_path(base_dir, component, scenario_name):
if component == 'scenarios':
return join(base_dir, scenario_name, 'events')
return join(base_dir, scenario_name)
if __name__ == "__main__":
main()