-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsample_data_validator.py
More file actions
1185 lines (1007 loc) · 53.7 KB
/
sample_data_validator.py
File metadata and controls
1185 lines (1007 loc) · 53.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
Sample Data Validator
Leader Agent + 10 Sub-Agents
Goal: Ensure sample_data test fixtures are complete, clean, and cover all
scenarios needed to test the Informatica PowerCenter code conversion pipeline.
Usage:
python3 sample_data_validator.py [--fix] [--verbose]
"""
import os
import re
import sys
import json
import shutil
import argparse
import textwrap
import xml.etree.ElementTree as ET
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Dict, Tuple, Optional, Set
from collections import defaultdict
import datetime
# ─────────────────────────────────────────────────────────────────────────────
# Config
# ─────────────────────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).parent
SAMPLE_DATA_DIR = BASE_DIR / "sample_data"
PROJECTS = ["apex_insurance", "firstbank", "meridian_am", "nexus_scm"]
TIERS = ["simple", "medium", "complex"]
SUPPORTED_TRANSFORM_TYPES = {
"Source Qualifier", "Expression", "Lookup Procedure",
"Joiner", "Router", "Aggregator", "Normalizer",
"Rank", "Sorter", "Filter", "Update Strategy",
"Sequence Generator", "Stored Procedure"
}
# Transformation types required per tier
TIER_REQUIRED_TYPES = {
"simple": {"Source Qualifier", "Expression"},
"medium": {"Source Qualifier", "Lookup Procedure", "Aggregator"},
"complex": {"Source Qualifier", "Joiner", "Router"},
}
# Scenario coverage: what the pipeline stages need to see in test data
REQUIRED_SCENARIOS = {
"scd2_pattern": {
"description": "SCD2 change detection (Lookup Procedure + Router or Update Strategy)",
"detect": lambda root: (
any(t.get("TYPE") == "Lookup Procedure"
for t in root.iter("TRANSFORMATION")) and
any(t.get("TYPE") in ("Router", "Update Strategy")
for t in root.iter("TRANSFORMATION"))
)
},
"multi_target": {
"description": "Multi-target routing (Router → multiple targets)",
"detect": lambda root: (
len(list(root.iter("TARGET"))) >= 2 and
any(t.get("TYPE") == "Router" for t in root.iter("TRANSFORMATION"))
)
},
"multi_source_join": {
"description": "Multi-source joiner pattern",
"detect": lambda root: (
len(list(root.iter("SOURCE"))) >= 2 and
any(t.get("TYPE") == "Joiner" for t in root.iter("TRANSFORMATION"))
)
},
"aggregation_rollup": {
"description": "Aggregation rollup (Aggregator transform)",
"detect": lambda root: any(
t.get("TYPE") == "Aggregator" for t in root.iter("TRANSFORMATION")
)
},
"sql_override": {
"description": "SQL override in Source Qualifier (verification_agent trigger)",
"detect": lambda root: any(
a.get("NAME") == "Sql Query" and a.get("VALUE", "").strip()
for t in root.iter("TRANSFORMATION")
if t.get("TYPE") == "Source Qualifier"
for a in t.iter("TABLEATTRIBUTE")
)
},
"filter_condition": {
"description": "Source filter condition on Source Qualifier",
"detect": lambda root: any(
a.get("NAME") in ("Source Filter", "Filter Condition") and a.get("VALUE", "").strip()
for t in root.iter("TRANSFORMATION")
for a in t.iter("TABLEATTRIBUTE")
)
},
"parameter_usage": {
"description": "$$PARAM variable usage in mapping",
"detect": lambda root: bool(
re.search(r'\$\$\w+', ET.tostring(root, encoding="unicode"))
)
},
"lookup_connected": {
"description": "Connected lookup transformation",
"detect": lambda root: any(
t.get("TYPE") == "Lookup Procedure" for t in root.iter("TRANSFORMATION")
)
},
"sequence_generator": {
"description": "Sequence Generator transformation (surrogate key generation)",
"detect": lambda root: any(
t.get("TYPE") == "Sequence Generator" for t in root.iter("TRANSFORMATION")
)
},
"update_strategy": {
"description": "Update Strategy transformation (CDC insert/update/delete pattern)",
"detect": lambda root: any(
t.get("TYPE") == "Update Strategy" for t in root.iter("TRANSFORMATION")
)
},
"reusable_transform": {
"description": "Reusable transformation (REUSABLE=YES — shared across mappings)",
"detect": lambda root: any(
t.get("REUSABLE") == "YES" for t in root.iter("TRANSFORMATION")
)
},
"unconnected_lookup": {
"description": "Unconnected lookup (:LKP. invocation syntax in expression)",
"detect": lambda root: bool(
re.search(r':LKP\.', ET.tostring(root, encoding="unicode"))
)
},
"sorter": {
"description": "Sorter transformation (explicit sort before Rank or Aggregator)",
"detect": lambda root: any(
t.get("TYPE") == "Sorter" for t in root.iter("TRANSFORMATION")
)
},
"rank": {
"description": "Rank transformation (top-N or bottom-N filter)",
"detect": lambda root: any(
t.get("TYPE") == "Rank" for t in root.iter("TRANSFORMATION")
)
},
}
# ─────────────────────────────────────────────────────────────────────────────
# Data Structures
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class Finding:
agent: str
project: str
file: str
severity: str # CRITICAL | HIGH | MEDIUM | LOW | INFO
category: str
message: str
fixed: bool = False
fix_description: str = ""
@dataclass
class AgentReport:
agent_name: str
findings: List[Finding] = field(default_factory=list)
files_checked: int = 0
passed: int = 0
def add_finding(self, f: Finding):
self.findings.append(f)
def add_pass(self):
self.passed += 1
@property
def failed(self):
return len(self.findings)
@property
def fixed(self):
return sum(1 for f in self.findings if f.fixed)
def summary(self):
criticals = sum(1 for f in self.findings if f.severity == "CRITICAL" and not f.fixed)
highs = sum(1 for f in self.findings if f.severity == "HIGH" and not f.fixed)
return f"checked={self.files_checked} pass={self.passed} findings={self.failed} fixed={self.fixed} CRIT={criticals} HIGH={highs}"
# ─────────────────────────────────────────────────────────────────────────────
# Base Agent
# ─────────────────────────────────────────────────────────────────────────────
class BaseAgent:
name = "BaseAgent"
def run(self, fix: bool = False, verbose: bool = False) -> AgentReport:
raise NotImplementedError
def parse_xml(self, path: Path) -> Optional[ET.Element]:
try:
tree = ET.parse(path)
return tree.getroot()
except ET.ParseError:
return None
def get_folder(self, root: ET.Element) -> Optional[ET.Element]:
repo = root.find("REPOSITORY")
if repo is None:
return None
return repo.find("FOLDER")
def all_mapping_files(self) -> List[Tuple[str, str, Path]]:
"""Yields (project, tier, path) for all mapping XML files."""
results = []
for project in PROJECTS:
for tier in TIERS:
d = SAMPLE_DATA_DIR / project / "mappings" / tier
if d.exists():
for f in sorted(d.glob("*.xml")):
results.append((project, tier, f))
return results
def all_workflow_files(self) -> List[Tuple[str, str, Path]]:
results = []
for project in PROJECTS:
for tier in TIERS:
d = SAMPLE_DATA_DIR / project / "workflows" / tier
if d.exists():
for f in sorted(d.glob("*.xml")):
results.append((project, tier, f))
return results
# ─────────────────────────────────────────────────────────────────────────────
# Agent 1: XML Schema Validator
# ─────────────────────────────────────────────────────────────────────────────
class Agent01_XMLSchema(BaseAgent):
name = "01_XMLSchemaValidator"
def run(self, fix=False, verbose=False) -> AgentReport:
report = AgentReport(self.name)
for project, tier, path in self.all_mapping_files():
report.files_checked += 1
root = self.parse_xml(path)
if root is None:
report.add_finding(Finding(self.name, project, path.name,
"CRITICAL", "XML Parse Error", "File cannot be parsed as valid XML"))
continue
ok = True
if root.tag != "POWERMART":
report.add_finding(Finding(self.name, project, path.name,
"CRITICAL", "Wrong Root", f"Expected <POWERMART>, got <{root.tag}>"))
ok = False
repo = root.find("REPOSITORY")
if repo is None:
report.add_finding(Finding(self.name, project, path.name,
"CRITICAL", "Missing REPOSITORY", "No <REPOSITORY> element"))
ok = False
else:
folder = repo.find("FOLDER")
if folder is None:
report.add_finding(Finding(self.name, project, path.name,
"CRITICAL", "Missing FOLDER", "No <FOLDER> inside REPOSITORY"))
ok = False
else:
for req in ("SOURCE", "TARGET", "TRANSFORMATION", "MAPPING"):
if folder.find(req) is None:
report.add_finding(Finding(self.name, project, path.name,
"HIGH", f"Missing {req}", f"No <{req}> element in FOLDER"))
ok = False
if ok:
report.add_pass()
for project, tier, path in self.all_workflow_files():
report.files_checked += 1
root = self.parse_xml(path)
if root is None:
report.add_finding(Finding(self.name, project, path.name,
"CRITICAL", "XML Parse Error", "Workflow file cannot be parsed"))
continue
ok = True
if root.find(".//TASK") is None:
report.add_finding(Finding(self.name, project, path.name,
"HIGH", "Missing TASK", "No <TASK> element in workflow"))
ok = False
if root.find(".//WORKFLOW") is None:
report.add_finding(Finding(self.name, project, path.name,
"HIGH", "Missing WORKFLOW", "No <WORKFLOW> element"))
ok = False
if ok:
report.add_pass()
return report
# ─────────────────────────────────────────────────────────────────────────────
# Agent 2: INSTANCE / CONNECTOR Completeness
# ─────────────────────────────────────────────────────────────────────────────
class Agent02_InstanceConnector(BaseAgent):
name = "02_InstanceConnector"
def run(self, fix=False, verbose=False) -> AgentReport:
report = AgentReport(self.name)
for project, tier, path in self.all_mapping_files():
report.files_checked += 1
root = self.parse_xml(path)
if root is None:
continue
folder = self.get_folder(root)
if folder is None:
continue
mapping = folder.find("MAPPING")
if mapping is None:
report.add_finding(Finding(self.name, project, path.name,
"HIGH", "Missing MAPPING", "No <MAPPING> element"))
continue
# Collect declared instance names
instances = {inst.get("NAME") for inst in mapping.findall("INSTANCE") if inst.get("NAME")}
# Collect source/target/transformation names (should all be instances)
sources = {s.get("NAME") for s in folder.findall("SOURCE") if s.get("NAME")}
targets = {t.get("NAME") for t in folder.findall("TARGET") if t.get("NAME")}
transforms = {t.get("NAME") for t in folder.findall("TRANSFORMATION") if t.get("NAME")}
all_expected = sources | targets | transforms
ok = True
# Check every connector references valid instances
connector_refs = set()
for conn in mapping.findall("CONNECTOR"):
from_inst = conn.get("FROMINSTANCE")
to_inst = conn.get("TOINSTANCE")
if from_inst:
connector_refs.add(from_inst)
if to_inst:
connector_refs.add(to_inst)
# Missing instances referenced in connectors
missing_in_instances = connector_refs - instances
if missing_in_instances:
for m in sorted(missing_in_instances):
report.add_finding(Finding(self.name, project, path.name,
"CRITICAL", "Missing INSTANCE",
f"CONNECTOR references '{m}' but no INSTANCE with that name exists",
fixed=fix and m in all_expected,
fix_description=f"Would add <INSTANCE NAME='{m}'/>" if fix and m in all_expected else ""))
ok = False
# Expected objects not declared as instances
missing_from_expected = all_expected - instances
if missing_from_expected:
for m in sorted(missing_from_expected):
was_fixed = False
if fix:
# Add INSTANCE element for each missing one
itype = "SOURCE" if m in sources else ("TARGET" if m in targets else "TRANSFORMATION")
elem = ET.SubElement(mapping, "INSTANCE")
elem.set("DESCRIPTION", "")
elem.set("NAME", m)
if itype == "SOURCE":
src = folder.find(f"SOURCE[@NAME='{m}']")
if src:
elem.set("SOURCENAME", m)
elem.set("TRANSFORMATION_NAME", m)
elem.set("TRANSFORMATION_TYPE", "Source Definition")
elem.set("TYPE", "SOURCE")
elif itype == "TARGET":
elem.set("TARGETNAME", m)
elem.set("TRANSFORMATION_NAME", m)
elem.set("TRANSFORMATION_TYPE", "Target Definition")
elem.set("TYPE", "TARGET")
else:
tf = folder.find(f"TRANSFORMATION[@NAME='{m}']")
tf_type = tf.get("TYPE", "") if tf is not None else ""
elem.set("TRANSFORMATION_NAME", m)
elem.set("TRANSFORMATION_TYPE", tf_type)
elem.set("TYPE", "TRANSFORMATION")
was_fixed = True
report.add_finding(Finding(self.name, project, path.name,
"HIGH", "Missing INSTANCE",
f"'{m}' defined as SOURCE/TARGET/TRANSFORMATION but not declared as INSTANCE",
fixed=was_fixed,
fix_description=f"Added INSTANCE element for '{m}'" if was_fixed else ""))
ok = False
# Orphan instances (declared but not referenced in any connector)
orphan_instances = instances - connector_refs - {"StartTask", "s_m_", "sq_"}
for o in sorted(orphan_instances):
if not any(o.startswith(p) for p in ("StartTask",)):
report.add_finding(Finding(self.name, project, path.name,
"LOW", "Orphan INSTANCE",
f"INSTANCE '{o}' declared but not referenced in any CONNECTOR"))
ok = False
if fix and (missing_from_expected or missing_in_instances):
# Re-indent and write back
_indent_xml(root)
ET.register_namespace("", "")
tree = ET.ElementTree(root)
with open(path, "wb") as fh:
fh.write(b'<?xml version="1.0" encoding="UTF-8"?>\n')
fh.write(b'<!DOCTYPE POWERMART SYSTEM "powrmart.dtd">\n')
tree.write(fh, encoding="utf-8", xml_declaration=False)
if ok:
report.add_pass()
return report
# ─────────────────────────────────────────────────────────────────────────────
# Agent 3: Mapping-Workflow Pairing
# ─────────────────────────────────────────────────────────────────────────────
class Agent03_MappingWorkflowPairing(BaseAgent):
name = "03_MappingWorkflowPairing"
def run(self, fix=False, verbose=False) -> AgentReport:
report = AgentReport(self.name)
for project in PROJECTS:
for tier in TIERS:
m_dir = SAMPLE_DATA_DIR / project / "mappings" / tier
w_dir = SAMPLE_DATA_DIR / project / "workflows" / tier
if not m_dir.exists():
continue
mapping_names = {f.stem for f in m_dir.glob("*.xml")}
workflow_names = {f.stem.removeprefix("wf_") for f in w_dir.glob("*.xml")} if w_dir.exists() else set()
report.files_checked += len(mapping_names)
# Mappings without workflows
for missing in sorted(mapping_names - workflow_names):
was_fixed = False
if fix:
_generate_workflow(project, tier, missing, w_dir)
was_fixed = True
report.add_finding(Finding(self.name, project, f"{missing}.xml",
"HIGH", "Missing Workflow",
f"No workflow found for mapping '{missing}'",
fixed=was_fixed,
fix_description=f"Generated wf_{missing}.xml" if was_fixed else ""))
# Workflows without mappings (orphan workflows)
for orphan in sorted(workflow_names - mapping_names):
report.add_finding(Finding(self.name, project, f"wf_{orphan}.xml",
"MEDIUM", "Orphan Workflow",
f"Workflow 'wf_{orphan}.xml' has no corresponding mapping"))
report.passed += len(mapping_names & workflow_names)
return report
# ─────────────────────────────────────────────────────────────────────────────
# Agent 4: Parameter Reference Coverage
# ─────────────────────────────────────────────────────────────────────────────
class Agent04_ParameterCoverage(BaseAgent):
name = "04_ParameterCoverage"
def run(self, fix=False, verbose=False) -> AgentReport:
report = AgentReport(self.name)
for project in PROJECTS:
# Collect all $$PARAM references across all mapping XMLs
all_params: Set[str] = set()
for tier in TIERS:
m_dir = SAMPLE_DATA_DIR / project / "mappings" / tier
if not m_dir.exists():
continue
for xml_file in m_dir.glob("*.xml"):
try:
text = xml_file.read_text(encoding="utf-8")
found = re.findall(r'\$\$([A-Z_][A-Z0-9_]*)', text)
all_params.update(found)
except Exception:
pass
report.files_checked += 3 # dev/uat/prod
param_dir = SAMPLE_DATA_DIR / project / "parameter_files"
for env in ("dev", "uat", "prod"):
param_file = param_dir / f"params_{project}_{env}.xml"
if not param_file.exists():
report.add_finding(Finding(self.name, project, f"params_{project}_{env}.xml",
"HIGH", "Missing Param File",
f"Parameter file for {env} environment not found"))
continue
root = self.parse_xml(param_file)
if root is None:
report.add_finding(Finding(self.name, project, param_file.name,
"HIGH", "Parse Error", "Cannot parse parameter file"))
continue
declared = {p.get("NAME", "").lstrip("$") for p in root.findall("PARAM")}
missing = all_params - declared
if missing:
for m in sorted(missing):
was_fixed = False
if fix:
elem = ET.SubElement(root, "PARAM")
elem.set("NAME", f"$${m}")
elem.set("VALUE", _default_param_value(m, env))
was_fixed = True
# Write back
_indent_xml(root)
tree = ET.ElementTree(root)
with open(param_file, "wb") as fh:
fh.write(b'<?xml version="1.0" encoding="UTF-8"?>\n')
tree.write(fh, encoding="utf-8", xml_declaration=False)
report.add_finding(Finding(self.name, project, param_file.name,
"MEDIUM", "Missing Param Declaration",
f"'$${m}' used in mappings but not declared in {env} param file",
fixed=was_fixed,
fix_description=f"Added $${m} with default value" if was_fixed else ""))
else:
report.add_pass()
return report
# ─────────────────────────────────────────────────────────────────────────────
# Agent 5: Tier Coverage Validator
# ─────────────────────────────────────────────────────────────────────────────
class Agent05_TierCoverage(BaseAgent):
name = "05_TierCoverageValidator"
def run(self, fix=False, verbose=False) -> AgentReport:
report = AgentReport(self.name)
for project in PROJECTS:
for tier in TIERS:
m_dir = SAMPLE_DATA_DIR / project / "mappings" / tier
if not m_dir.exists():
continue
# Collect all transformation types present in this tier
types_found: Set[str] = set()
file_count = 0
for xml_file in m_dir.glob("*.xml"):
report.files_checked += 1
file_count += 1
root = self.parse_xml(xml_file)
if root is None:
continue
for t in root.iter("TRANSFORMATION"):
tf_type = t.get("TYPE")
if tf_type:
types_found.add(tf_type)
required = TIER_REQUIRED_TYPES.get(tier, set())
missing_types = required - types_found
if missing_types:
for mt in sorted(missing_types):
report.add_finding(Finding(self.name, project,
f"{tier}/",
"HIGH", "Missing Transform Type",
f"Tier '{tier}' has no mapping with '{mt}' transformation (required for pipeline coverage)"))
else:
report.add_pass()
# Check minimum file counts
expected_min = {"simple": 15, "medium": 20, "complex": 15}
if file_count < expected_min.get(tier, 0):
report.add_finding(Finding(self.name, project, f"{tier}/",
"MEDIUM", "Low File Count",
f"Tier '{tier}' has {file_count} files, expected >= {expected_min[tier]}"))
return report
# ─────────────────────────────────────────────────────────────────────────────
# Agent 6: Session / Workflow Structure
# ─────────────────────────────────────────────────────────────────────────────
class Agent06_SessionWorkflow(BaseAgent):
name = "06_SessionWorkflowStructure"
def run(self, fix=False, verbose=False) -> AgentReport:
report = AgentReport(self.name)
for project, tier, path in self.all_workflow_files():
report.files_checked += 1
root = self.parse_xml(path)
if root is None:
continue
ok = True
task = root.find(".//TASK")
if task is None:
report.add_finding(Finding(self.name, project, path.name,
"HIGH", "Missing TASK", "No SESSION task found in workflow"))
ok = False
continue
task_type = task.get("TYPE", "")
if task_type != "Session":
report.add_finding(Finding(self.name, project, path.name,
"MEDIUM", "Wrong TASK TYPE",
f"TASK TYPE is '{task_type}', expected 'Session'"))
ok = False
# Check session has Mapping Name attribute
task_attrs = {a.get("NAME"): a.get("VALUE") for a in task.findall("ATTRIBUTE")}
if "Mapping Name" not in task_attrs:
report.add_finding(Finding(self.name, project, path.name,
"HIGH", "Missing Mapping Name",
"SESSION task has no 'Mapping Name' attribute linking to a mapping"))
ok = False
# Check TASKINSTANCE exists
wf = root.find(".//WORKFLOW")
if wf is not None:
if wf.find("TASKINSTANCE") is None:
report.add_finding(Finding(self.name, project, path.name,
"MEDIUM", "Missing TASKINSTANCE",
"WORKFLOW element has no TASKINSTANCE child"))
ok = False
if ok:
report.add_pass()
return report
# ─────────────────────────────────────────────────────────────────────────────
# Agent 7: Cross-Reference Integrity
# ─────────────────────────────────────────────────────────────────────────────
class Agent07_CrossReference(BaseAgent):
name = "07_CrossReferenceIntegrity"
def run(self, fix=False, verbose=False) -> AgentReport:
report = AgentReport(self.name)
for project, tier, path in self.all_mapping_files():
report.files_checked += 1
root = self.parse_xml(path)
if root is None:
continue
folder = self.get_folder(root)
if folder is None:
continue
mapping = folder.find("MAPPING")
if mapping is None:
continue
# Build name registries
source_names = {s.get("NAME") for s in folder.findall("SOURCE") if s.get("NAME")}
target_names = {t.get("NAME") for t in folder.findall("TARGET") if t.get("NAME")}
tf_names = {t.get("NAME") for t in folder.findall("TRANSFORMATION") if t.get("NAME")}
instance_names = {i.get("NAME") for i in mapping.findall("INSTANCE") if i.get("NAME")}
all_valid = source_names | target_names | tf_names | instance_names
ok = True
for conn in mapping.findall("CONNECTOR"):
from_inst = conn.get("FROMINSTANCE", "")
to_inst = conn.get("TOINSTANCE", "")
if from_inst and from_inst not in all_valid:
report.add_finding(Finding(self.name, project, path.name,
"CRITICAL", "Dangling FROMINSTANCE",
f"CONNECTOR FROMINSTANCE='{from_inst}' not defined anywhere in the mapping"))
ok = False
if to_inst and to_inst not in all_valid:
report.add_finding(Finding(self.name, project, path.name,
"CRITICAL", "Dangling TOINSTANCE",
f"CONNECTOR TOINSTANCE='{to_inst}' not defined anywhere in the mapping"))
ok = False
# Check INSTANCE TRANSFORMATION_NAME exists
for inst in mapping.findall("INSTANCE"):
tf_name = inst.get("TRANSFORMATION_NAME", "")
inst_type = inst.get("TYPE", "")
if inst_type == "TRANSFORMATION" and tf_name and tf_name not in tf_names:
report.add_finding(Finding(self.name, project, path.name,
"HIGH", "Broken INSTANCE ref",
f"INSTANCE references TRANSFORMATION_NAME='{tf_name}' which doesn't exist"))
ok = False
elif inst_type == "SOURCE" and tf_name and tf_name not in source_names:
report.add_finding(Finding(self.name, project, path.name,
"HIGH", "Broken INSTANCE ref",
f"SOURCE INSTANCE references '{tf_name}' which isn't a declared SOURCE"))
ok = False
elif inst_type == "TARGET" and tf_name and tf_name not in target_names:
report.add_finding(Finding(self.name, project, path.name,
"HIGH", "Broken INSTANCE ref",
f"TARGET INSTANCE references '{tf_name}' which isn't a declared TARGET"))
ok = False
if ok:
report.add_pass()
return report
# ─────────────────────────────────────────────────────────────────────────────
# Agent 8: Scenario Coverage Auditor
# ─────────────────────────────────────────────────────────────────────────────
class Agent08_ScenarioCoverage(BaseAgent):
name = "08_ScenarioCoverageAuditor"
def run(self, fix=False, verbose=False) -> AgentReport:
report = AgentReport(self.name)
# Track which scenarios are covered across all projects
scenario_hits: Dict[str, List[str]] = defaultdict(list)
for project, tier, path in self.all_mapping_files():
report.files_checked += 1
root = self.parse_xml(path)
if root is None:
continue
for scenario_key, scenario_def in REQUIRED_SCENARIOS.items():
try:
if scenario_def["detect"](root):
scenario_hits[scenario_key].append(f"{project}/{tier}/{path.name}")
report.add_pass()
except Exception:
pass
# Report missing scenarios
for scenario_key, scenario_def in REQUIRED_SCENARIOS.items():
hits = scenario_hits.get(scenario_key, [])
if not hits:
report.add_finding(Finding(self.name, "ALL", "—",
"HIGH", "Missing Pipeline Scenario",
f"No mapping covers scenario '{scenario_key}': {scenario_def['description']}. "
f"This means pipeline agents may not be tested for this case."))
elif len(hits) == 1:
report.add_finding(Finding(self.name, "ALL", hits[0],
"LOW", "Single Coverage",
f"Scenario '{scenario_key}' covered by only 1 file — consider adding a second test case"))
else:
if verbose:
print(f" ✓ {scenario_key}: {len(hits)} files cover this scenario")
return report
# ─────────────────────────────────────────────────────────────────────────────
# Agent 9: Symmetry Enforcer
# ─────────────────────────────────────────────────────────────────────────────
class Agent09_SymmetryEnforcer(BaseAgent):
name = "09_SymmetryEnforcer"
def run(self, fix=False, verbose=False) -> AgentReport:
report = AgentReport(self.name)
counts: Dict[str, Dict[str, int]] = {}
for project in PROJECTS:
report.files_checked += 1
tier_counts = {}
for tier in TIERS:
m_dir = SAMPLE_DATA_DIR / project / "mappings" / tier
w_dir = SAMPLE_DATA_DIR / project / "workflows" / tier
tier_counts[f"m_{tier}"] = len(list(m_dir.glob("*.xml"))) if m_dir.exists() else 0
tier_counts[f"w_{tier}"] = len(list(w_dir.glob("*.xml"))) if w_dir.exists() else 0
# Check all_mappings sync
all_m = SAMPLE_DATA_DIR / project / "all_mappings"
tier_counts["all_mappings"] = len(list(all_m.glob("*.xml"))) if all_m.exists() else 0
tier_counts["total_m"] = sum(tier_counts[f"m_{t}"] for t in TIERS)
counts[project] = tier_counts
# Compare across projects
ref_project = PROJECTS[0]
ref_counts = counts[ref_project]
for project in PROJECTS[1:]:
for key, ref_val in ref_counts.items():
actual = counts[project].get(key, 0)
if actual != ref_val:
severity = "MEDIUM" if abs(actual - ref_val) <= 2 else "HIGH"
report.add_finding(Finding(self.name, project, f"{key}/",
severity, "Asymmetric Count",
f"'{key}' count={actual} vs {ref_project} count={ref_val} (diff={actual-ref_val:+d})"))
else:
report.add_pass()
# Check mapping count == workflow count per project
for project in PROJECTS:
for tier in TIERS:
mc = counts[project].get(f"m_{tier}", 0)
wc = counts[project].get(f"w_{tier}", 0)
if mc != wc:
report.add_finding(Finding(self.name, project, f"{tier}/",
"HIGH", "Mapping/Workflow Mismatch",
f"Tier '{tier}': {mc} mappings but {wc} workflows"))
# Check all_mappings count == total mappings
for project in PROJECTS:
total = counts[project]["total_m"]
all_count = counts[project]["all_mappings"]
if all_count != total:
was_fixed = False
if fix:
_sync_all_mappings(project)
was_fixed = True
report.add_finding(Finding(self.name, project, "all_mappings/",
"MEDIUM", "all_mappings Out of Sync",
f"all_mappings/ has {all_count} files but total from tiers is {total}",
fixed=was_fixed,
fix_description="Synced all_mappings/ from tier directories" if was_fixed else ""))
else:
report.add_pass()
return report
# ─────────────────────────────────────────────────────────────────────────────
# Agent 10: Manifest / Index Sync
# ─────────────────────────────────────────────────────────────────────────────
class Agent10_ManifestSync(BaseAgent):
name = "10_ManifestIndexSync"
def run(self, fix=False, verbose=False) -> AgentReport:
report = AgentReport(self.name)
for project in PROJECTS:
report.files_checked += 2 # manifest + index
# Collect actual mapping filenames (from all_mappings or tiers)
actual_files: Set[str] = set()
for tier in TIERS:
m_dir = SAMPLE_DATA_DIR / project / "mappings" / tier
if m_dir.exists():
actual_files.update(f.name for f in m_dir.glob("*.xml"))
# Check manifest JSON
manifest_path = SAMPLE_DATA_DIR / project / f"{project}_full.manifest.json"
if not manifest_path.exists():
report.add_finding(Finding(self.name, project, manifest_path.name,
"HIGH", "Missing Manifest", "Manifest JSON file not found"))
else:
try:
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
manifest_files = set(manifest.get("mappings", []))
declared_total = manifest.get("total_mappings", -1)
missing_from_manifest = actual_files - manifest_files
extra_in_manifest = manifest_files - actual_files
if missing_from_manifest or extra_in_manifest or declared_total != len(actual_files):
was_fixed = False
if fix:
manifest["mappings"] = sorted(actual_files)
manifest["total_mappings"] = len(actual_files)
dist = {}
for tier in TIERS:
m_dir = SAMPLE_DATA_DIR / project / "mappings" / tier
dist[tier] = len(list(m_dir.glob("*.xml"))) if m_dir.exists() else 0
manifest["mapping_distribution"] = dist
manifest_path.write_text(
json.dumps(manifest, indent=2), encoding="utf-8")
was_fixed = True
details = []
if missing_from_manifest:
details.append(f"{len(missing_from_manifest)} files on disk not in manifest")
if extra_in_manifest:
details.append(f"{len(extra_in_manifest)} entries in manifest not on disk")
if declared_total != len(actual_files):
details.append(f"total_mappings={declared_total} but actual={len(actual_files)}")
report.add_finding(Finding(self.name, project, manifest_path.name,
"MEDIUM", "Manifest Out of Sync",
"; ".join(details),
fixed=was_fixed,
fix_description="Regenerated manifest from disk" if was_fixed else ""))
else:
report.add_pass()
except (json.JSONDecodeError, KeyError) as e:
report.add_finding(Finding(self.name, project, manifest_path.name,
"HIGH", "Manifest Parse Error", str(e)))
# Check INDEX.txt
index_path = SAMPLE_DATA_DIR / project / "INDEX.txt"
if not index_path.exists():
report.add_finding(Finding(self.name, project, "INDEX.txt",
"LOW", "Missing INDEX", "INDEX.txt not found"))
else:
index_text = index_path.read_text(encoding="utf-8")
missing_from_index = [f for f in sorted(actual_files)
if f.replace(".xml", "") not in index_text and f not in index_text]
if missing_from_index:
report.add_finding(Finding(self.name, project, "INDEX.txt",
"LOW", "INDEX Out of Sync",
f"{len(missing_from_index)} mapping files not referenced in INDEX.txt"))
else:
report.add_pass()
return report
# ─────────────────────────────────────────────────────────────────────────────
# Helper Functions
# ─────────────────────────────────────────────────────────────────────────────
def _indent_xml(elem: ET.Element, level: int = 0):
"""Add pretty-print indentation to an XML tree in-place."""
indent = "\n" + " " * level
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = indent + " "
if not elem.tail or not elem.tail.strip():
elem.tail = indent
for child in elem:
_indent_xml(child, level + 1)
if not child.tail or not child.tail.strip():
child.tail = indent
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = indent
def _default_param_value(param_name: str, env: str) -> str:
"""Return a sensible default value for a missing parameter."""
name_lower = param_name.lower()
schema_map = {"dev": "DEV", "uat": "UAT", "prod": "PROD"}
suffix = schema_map.get(env, "DEV")
if "src_schema" in name_lower or "source_schema" in name_lower:
return f"OLTP_{suffix}"
if "tgt_schema" in name_lower or "target_schema" in name_lower:
return f"DWH_{suffix}"
if "load_date" in name_lower or "date" in name_lower:
return "TRUNC(SYSDATE)"
if "environment" in name_lower or "env" in name_lower:
return env.upper()
if "batch" in name_lower:
return "1"
if "conn" in name_lower or "connection" in name_lower:
return f"$$SRC_CONN_{suffix}"
return f"DEFAULT_{suffix}"
def _generate_workflow(project: str, tier: str, mapping_stem: str, w_dir: Path):
"""Generate a minimal workflow XML for a mapping that has no workflow."""
w_dir.mkdir(parents=True, exist_ok=True)
session_name = f"s_{mapping_stem}"
wf_name = f"wf_{mapping_stem}"
xml_content = f"""<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE POWERMART SYSTEM "powrmart.dtd">
<POWERMART CREATION_DATE="{datetime.date.today().strftime('%m/%d/%Y')}" REPOSITORY_VERSION="112">
<REPOSITORY CODEPAGE="UTF-8" DATABASETYPE="Oracle" NAME="{project.upper()}_DWH" VERSION="112">
<FOLDER DESCRIPTION="{project} folder" GROUP="" NAME="{project}" OWNER="Administrator" PERMISSIONS="rwx---r--" SHARED="NOTSHARED">
<CONFIG DESCRIPTION="" ISDEFAULT="YES" NAME="default" VERSIONNUMBER="1">
<ATTRIBUTE NAME="Enable Test Load" VALUE="NO"/>
<ATTRIBUTE NAME="Commit Interval" VALUE="10000"/>
<ATTRIBUTE NAME="Commit Type" VALUE="Source"/>
<ATTRIBUTE NAME="Rollback Segment" VALUE=""/>
<ATTRIBUTE NAME="Recovery Strategy" VALUE="Restart from beginning"/>
</CONFIG>
<TASK DESCRIPTION="Session for {mapping_stem}" ISVALID="YES" NAME="{session_name}" REUSABLE="NO" TYPE="Session" VERSIONNUMBER="1">
<ATTRIBUTE NAME="Mapping Name" VALUE="{mapping_stem}"/>
<ATTRIBUTE NAME="Session Log File" VALUE="{session_name}.log"/>
<ATTRIBUTE NAME="Enable Test Load" VALUE="NO"/>
<ATTRIBUTE NAME="Error Threshold" VALUE="0"/>
<ATTRIBUTE NAME="Treat Source Rows As" VALUE="Data Driven"/>
</TASK>
<WORKFLOW DESCRIPTION="Workflow for {mapping_stem}" ISENABLED="YES" ISRUNNABLEINDEPENDENTLY="YES" ISSERVICE="NO" ISSESSIONBASEDREPO="YES" ISVALID="YES" NAME="{wf_name}" REUSABLE="NO" SCHEDULERTYPE="0" VERSIONNUMBER="1">
<TASKINSTANCE DESCRIPTION="" ISOVERRIDABLE="NO" NAME="{session_name}" TASKTYPENAME="Session" TASKNAME="{session_name}"/>
</WORKFLOW>
</FOLDER>
</REPOSITORY>
</POWERMART>
"""
out_path = w_dir / f"wf_{mapping_stem}.xml"
out_path.write_text(xml_content, encoding="utf-8")
def _sync_all_mappings(project: str):
"""Sync all_mappings/ directory from the tier directories."""
all_dir = SAMPLE_DATA_DIR / project / "all_mappings"
all_dir.mkdir(exist_ok=True)
# Remove stale files
for f in all_dir.glob("*.xml"):
f.unlink()