-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDC Notes
More file actions
177 lines (145 loc) · 22.2 KB
/
Copy pathDC Notes
File metadata and controls
177 lines (145 loc) · 22.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
The meeting (led by Rakesh, explaining to what seems like Vanshika) covered a DQ check table with an SCD-style active flag pattern. The core problem: when a check passes on re-run, the previous "failed + active" record needs to be deactivated. Two approaches were proposed to handle this in Spark/Oracle.
The data model (from Image 2):
cob_date | dwh_business_date | row_type | instance | status | active_flag | N (inactive?)
The active_flag = Y marks records that still need tracking (open failures). When reprocessed and passed, the old Y record must flip to N (inactive) and get an effective_end_timestamp.
Approach 1 — Stage table + Oracle MERGE
The Spark job writes check results to a stage (STG) table first using df.write(append). Then a custom Scala Oracle utility (JDBC, autocommit=false) fires a native Oracle MERGE statement — not a Spark operation. The MERGE logic:
If the business key (recon_engine + data_type + instance + dwh_business_date) matches an existing active record in the Check (Chk) table → update active_flag = N and set effective_end_timestamp
If no match → insert the new record from STG into Chk as a fresh active entry
Stage table rerun safety: Before each run, a DELETE + INSERT pattern is used on the stage table (not a Spark overwrite) — again via JDBC — so only one version of each execution exists in STG.
Approach 2 — No stage table, scan open failures directly
Skip the stage table entirely. Each day's run scans the Check table itself for all still-open (active_flag = Y, status = Fail) records across all previous dates, combines them with the current day's fresh check results, and appends everything via df.write(append). No MERGE needed. But the old failed records are never deactivated — they just accumulate, and the process always re-picks them up as long as they stay Y.
::view-transition-group(*),
::view-transition-old(*),
::view-transition-new(*) {
animation-duration: 0.25s;
animation-timing-function: cubic-bezier(0.19, 1, 0.22, 1);
}
VvisualizeVvisualize show_widget
Which approach to pick?
Approach 1 (Stage + Oracle MERGE) is the right long-term choice. Here's why:
The problem at the heart of this design is that a DQ check result is not just an event — it's a state transition. When row_type=8, instance=ABC, dwh_business_date=11th June moves from Fail to Pass, the old "open failure" record should be retired, not left open forever. Approach 1 handles this correctly via the MERGE:
Previous active Y record → deactivated (active_flag=N, effective_end_ts stamped)
New result (pass or fail) → inserted fresh as the current truth
Approach 2 works but it's essentially a workaround for not having MERGE capability in Spark. You'd be carrying forward all historical open failures on every run, scanning more and more data as time goes on, with no clean audit trail of when a failure was resolved.
However, Rakesh explicitly said to try both and see what works in your environment first. Given you're still in the design/build phase of BreaksDQCJob, a pragmatic path would be:
Start with Approach 2 to unblock testing quickly (it's just a Spark scan + append, no new infra)
Implement Approach 1 properly once the Oracle JDBC utility pattern is confirmed working (Satadi's df.write utility is already available for the STG write — you just need the custom JDBC utility for the DELETE and MERGE calls)
The MERGE SQL itself runs entirely on Oracle — Spark just hands the SQL string to the JDBC utility and commits the transaction. Spark never touches the merge logic.You said: explain the approach 1explain the approach 115 JunClaude responded: Let me walk through Approach 1 step by step — there are four distinct phases, so I'll explain each and then show them together in an interactive stepper.Let me walk through Approach 1 step by step — there are four distinct phases, so I'll explain each and then show them together in an interactive stepper.
The core idea is that Spark alone cannot do an upsert. df.write only knows overwrite or append — it has no concept of "find the previous record for this key and update it." So Approach 1 solves this by splitting the work: Spark handles data reads and checks, Oracle handles all transactional write logic through native SQL executed via a custom JDBC utility.
There are four phases per daily run:
Phase 1 — Clean the stage table. Before writing anything, fire a JDBC DELETE against the STG table for the current run's key (e.g. today's dwh_business_date). This ensures if the job reruns, you don't accumulate duplicate rows — the stage always holds exactly one copy of the latest execution.
Phase 2 — Write check results to STG. Now run your DQ checks in Spark, build the result dataframe, and use Satadi's df.write(append) to land it in the STG table. This is the only moment Spark writes to Oracle.
Phase 3 — Oracle MERGE: STG → Chk. Call the JDBC utility again, this time passing a MERGE SQL. Oracle executes it entirely on its own engine — Spark is just the trigger. The MERGE logic is:
If a record in STG matches an existing active record in Chk on the business key (recon_engine + data_type + instance + dwh_business_date) → update that old record: set active_flag = N and stamp effective_end_timestamp = now
If no match exists → insert the STG record into Chk fresh, with active_flag = Y
Phase 4 — Commit or throw. If the MERGE succeeds, commit the transaction and exit cleanly. If anything fails, throw an error — the transaction is not committed, so the Chk table remains unchanged.
The JDBC utility itself is a simple reusable Scala class: takes a JDBC URL, username, and a SQL string; sets autoCommit = false; executes; commits on success; throws on failure.
::view-transition-group(*),
::view-transition-old(*),
::view-transition-new(*) {
animation-duration: 0.25s;
animation-timing-function: cubic-bezier(0.19, 1, 0.22, 1);
}
VvisualizeVvisualize show_widgetWalk through each tab — each one shows the actual SQL/code and the table state before and after.
A few things worth calling out that the transcript makes clear:
The MERGE's match condition includes active_flag = 'Y'. This is intentional — you only want to deactivate the currently open record, not every historical record for that key. Old already-closed records (flag = N) are left untouched.
The reason STG exists at all is because df.write cannot merge — it can only append or overwrite the whole table. STG is a landing buffer that gives Oracle something to MERGE from. Without it, you'd have no clean way to tell Oracle "here is today's result, now reconcile it against what you already have."
The JDBC utility is called twice per run — once for the DELETE (Phase 1) and once for the MERGE (Phase 3). The df.write in Phase 2 uses Satadi's existing Spark-to-Oracle writer, so you're not reinventing that. Only the DELETE and MERGE SQL need the custom utility.You said: explain approach1 with exampleexplain approach1 with example15 JunClaude responded: Let me walk through Approach 1 with the exact data from Rakesh's whiteboard — tracking what happens day by day.Let me walk through Approach 1 with the exact data from Rakesh's whiteboard — tracking what happens day by day.
::view-transition-group(*),
::view-transition-old(*),
::view-transition-new(*) {
animation-duration: 0.25s;
animation-timing-function: cubic-bezier(0.19, 1, 0.22, 1);
}
VvisualizeVvisualize show_widgetClick through each tab — Setup → COB 12 Jun → COB 13 Jun → COB 14 Jun — to see exactly how the STG and CHK tables evolve.
A few key insights the example surfaces:
The MERGE fires regardless of whether the recheck is a pass or a fail. On 14 Jun, 12 Jun / 10 / ABC still fails — but the MERGE still deactivates the previous version and inserts a fresh one. This gives you a clean versioned audit trail: every row in CHK is either a closed historical record (flag = N with an effective_end_ts) or the current active version (flag = Y). You never have stale ambiguous rows.
The "open failures" dashboard query is then trivially simple — WHERE active_flag = 'Y' AND status = 'Fail' — and it's always accurate regardless of how many reruns have happened.
The 11 Jun / 10 / XYZ Pass row from COB 12 Jun is never touched again after that because it passed on first run and was never an open failure. It just sits there quietly with flag = Y as a clean historical pass record.You said: how he explains to delete into stg tablehow he explains to delete into stg table15 JunClaude responded: From the transcript, Rakesh actually doesn't go deep on the DELETE into STG as a separate detailed explanation — he mentions it fairly briefly as part of the o…From the transcript, Rakesh actually doesn't go deep on the DELETE into STG as a separate detailed explanation — he mentions it fairly briefly as part of the overall flow. Here's exactly what he says about it:
Around the 6:30–7:00 mark he says:
"I need to prepare the delete script to delete the data from that given combination... if you decide not to keep the multiple versions, we can go with one single one... your stage table, whatever you're doing, it should be like a once a copy of that given execution."
And then at 7:09:
"So it's like in your stored procedure, if you rerun the process, it's a delete and insert, right?"
That's essentially it. His point is simple and practical — the stage table should hold only one version of the current execution. So before you write (insert) fresh results via df.write, you delete whatever was already sitting in STG for that run's key combination. This way if the job reruns (due to failure, manual trigger, etc.), you don't accumulate duplicate rows in STG that would then confuse the MERGE.
He doesn't specify the exact DELETE SQL or the exact key columns to use for the delete — that's something you'd need to decide based on your actual partition key (likely dwh_business_date, or dwh_business_date + batch_version if you're tracking that). The design principle he's conveying is just: DELETE then INSERT on STG = always one clean copy per execution.
So the detailed DELETE implementation — which columns form the key, whether you scope it by date alone or by date + instance, etc. — is left for you to design based on your schema.You said: where he explain to empty the stg tablewhere he explain to empty the stg table15 JunClaude responded: He doesn't explicitly say "empty the STG table" anywhere in the transcript.He doesn't explicitly say "empty the STG table" anywhere in the transcript. What he says is specifically about deleting for a given combination/key, not truncating the whole table.
The closest he gets is around 6:37:
"I need to prepare the delete script to delete the data from that given combination, let's say, for example, you are running multiple times in it."
And at 6:59:
"your stage table, whatever you're doing, it should be like a once a copy of that given execution."
So his intent is a scoped DELETE by key (the business date or run combination), not a full table wipe. The distinction matters because STG might hold rows for multiple dates simultaneously if you're rechecking historical open failures — you'd only want to delete and reinsert rows for the specific keys being processed in the current run, not blow away everything else.
The "empty STG" framing I used in the earlier explanation was my interpretation to simplify the concept — strictly speaking, what Rakesh describes is a targeted delete for the current execution's key, followed by a fresh insert. Worth keeping that distinction in mind when you write the actual DELETE SQL.You said: explain with many datesexplain with many dates15 JunClaude responded: Good point — let me build a full multi-date example showing STG and CHK side by side across many days, including reruns and persistent failures.Good point — let me build a full multi-date example showing STG and CHK side by side across many days, including reruns and persistent failures.
::view-transition-group(*),
::view-transition-old(*),
::view-transition-new(*) {
animation-duration: 0.25s;
animation-timing-function: cubic-bezier(0.19, 1, 0.22, 1);
}
VvisualizeVvisualize show_widgetClick through all 6 tabs. A few things this multi-date example makes very clear:
The scoped DELETE earns its value on COB 16 Jun. The job crashes after writing to STG but before the MERGE fires. On rerun, the DELETE clears those stale first-attempt rows before df.write runs again. Without it, STG would have 2 copies of each row, and the MERGE would insert duplicates into CHK. This is exactly what Rakesh means by "once a copy of that given execution."
11 Jun / 8 / ABC is the most instructive record. It fails on 12 Jun, 13 Jun, and 14 Jun rechecks — so CHK ends up with 3 closed (N) versions for that same business key, each with a different effective_end_ts. That's your full audit trail: you can see precisely how many days it took to resolve, and when each recheck happened.
The final CHK view tab shows the payoff — the dashboard query WHERE active_flag = 'Y' AND status = 'Fail' returns zero rows after COB 16 Jun. Everything resolved. The closed history rows sit quietly below, untouched, queryable for audit any time.You said: what will be the stg value at cob 14junwhat will be the stg value at cob 14jun15 JunClaude responded: Looking at the COB 14 Jun tab in the stepper above, the STG table after df.Looking at the COB 14 Jun tab in the stepper above, the STG table after df.write contains these 4 rows:
dwh_biz_daterow_typeinstancestatusactive_flag13 Jun8ABCPassY13 Jun10XYZPassY11 Jun8ABCFailY12 Jun8ABCPassY
Breaking down why each row is there:
13 Jun rows (2 rows) — these are the fresh DQ check results for yesterday's business date (COB 14 Jun checks 13 Jun data). Both came back Pass.
11 Jun / 8 / ABC — this is a recheck of an open fail. It was still active_flag = Y, status = Fail in CHK from the previous run, so the job picks it up again. It still fails on this recheck.
12 Jun / 8 / ABC — also a recheck of an open fail carried forward from COB 13 Jun. This time it comes back Pass, so it will be resolved by the MERGE.
The Phase 1 DELETE before this would have targeted all four of these date + key combinations — dwh_biz_date IN (13 Jun, 11 Jun, 12 Jun) scoped to the relevant row_type and instance combinations — clearing any prior run residue before the fresh df.write lands these rows.You said: what value would be present before delate and after delete for cob date 14what value would be present before delate and after delete for cob date 1415 JunClaude responded: Great question — this gets at the exact purpose of the scoped DELETE.Great question — this gets at the exact purpose of the scoped DELETE. Let me trace it precisely.
Before DELETE (STG contains residue from COB 13 Jun's run):
dwh_biz_daterow_typeinstancestatusactive_flag12 Jun8ABCFailY12 Jun10XYZPassY11 Jun8ABCFailY11 Jun10ABCPassY
These are the exact rows that COB 13 Jun's df.write left behind in STG. STG is never wiped between runs — it just accumulates unless you DELETE first.
Phase 1 DELETE fires — scoped to the keys COB 14 Jun is about to process:
sqlDELETE FROM dq_stage_table
WHERE dwh_biz_date IN ('11-Jun', '12-Jun', '13-Jun')
AND (row_type, instance) IN ((8,'ABC'), (10,'XYZ'), (10,'ABC'))
All 4 residue rows match → gone.
After DELETE (STG is now empty):
dwh_biz_daterow_typeinstancestatusactive_flag—————
Empty. Clean slate for this run.
After df.write (COB 14 Jun's fresh results land):
dwh_biz_daterow_typeinstancestatusactive_flag13 Jun8ABCPassY13 Jun10XYZPassY11 Jun8ABCFailY12 Jun8ABCPassY
This is what the MERGE then reads from STG to act on CHK.
The key point here is that the "before DELETE" state is not today's data at all — it's yesterday's run's leftovers. If the DELETE didn't happen and df.write(append) ran directly, STG would have 8 rows going into the MERGE — 4 stale + 4 fresh — and the MERGE would try to process duplicate keys, inserting double rows into CHK. The scoped DELETE is the only thing that keeps STG holding exactly one version per execution.You said: why the delete operstion is requiredwhy the delete operstion is required15 JunClaude responded: Great question.Great question. The answer comes down to one fundamental constraint — df.write in Spark only knows append or overwrite. It has no concept of "replace just these keys."
So think about what happens without the DELETE:
Scenario: No DELETE, job runs on COB 13 Jun
STG still has COB 12 Jun's leftovers:
dwh_biz_daterow_typeinstancestatus11 Jun8ABCFail11 Jun10XYZPass11 Jun10ABCFail
Now COB 13 Jun's df.write(append) lands on top:
dwh_biz_daterow_typeinstancestatus11 Jun8ABCFail11 Jun10XYZPass11 Jun10ABCFail12 Jun8ABCFail12 Jun10XYZPass11 Jun8ABCFail11 Jun10ABCPass
Now STG has two rows for the same business key — 11 Jun / 8 / ABC appears twice. The MERGE reads both and tries to insert two records into CHK for the same key. Your CHK table gets corrupted with duplicate active rows, and your WHERE active_flag = 'Y' dashboard query starts returning duplicates.
Scenario: No DELETE, job reruns on the same COB day
This is the even more dangerous case. Say COB 14 Jun's job crashes after df.write but before the MERGE. You rerun it. Without DELETE:
First attempt left in STG:
dwh_biz_daterow_typeinstancestatus13 Jun8ABCPass11 Jun8ABCFail
Rerun's df.write(append) adds on top:
dwh_biz_daterow_typeinstancestatus13 Jun8ABCPass11 Jun8ABCFail13 Jun8ABCPass11 Jun8ABCFail
Every key is now doubled in STG. The MERGE inserts two active rows per key into CHK. Completely broken.
So the DELETE solves three things:
1. Idempotency — you can rerun the job any number of times and always get the same clean result in CHK. DELETE ensures STG has exactly one copy before each write.
2. No cross-run contamination — yesterday's STG rows don't bleed into today's MERGE. Each run operates on a clean, fresh STG slice.
3. Correctness of the MERGE — the MERGE assumes one STG row per business key. The DELETE is what guarantees that assumption holds. Without it, the MERGE's WHEN NOT MATCHED THEN INSERT fires multiple times for the same key, producing duplicate CHK rows.
This is exactly what Rakesh means when he says "your stage table should be like a once a copy of that given execution." The DELETE is what enforces that "once a copy" guarantee.You said: why approach 2 not upto markwhy approach 2 not upto mark15 JunClaude responded: Good question.Good question. Let me break it down clearly.
Approach 2 works on a simple idea — instead of a MERGE, just scan CHK every day for all open failures (active_flag = Y, status = Fail) across all previous dates, combine them with today's fresh results, and df.write(append) everything into CHK directly.
On the surface it sounds simpler. But it breaks down in several important ways:
Problem 1 — CHK keeps growing with no deactivation
Every day you append new rows. The old failed rows from yesterday are never touched — they stay in CHK forever with active_flag = Y. So for a break that has been failing for 10 days, CHK has 10 open active rows for the same business key. There is no single source of truth for "what is the current state of this check."
Your dashboard query WHERE active_flag = 'Y' AND status = 'Fail' returns 10 rows for the same key instead of 1. You cannot tell which one is the latest. The table becomes meaningless for reporting.
Problem 2 — No audit trail of when something changed
In Approach 1, when a check moves from Fail to Pass, the MERGE stamps effective_end_ts on the old record and inserts a fresh Pass row. You know exactly when the break was resolved.
In Approach 2, the old Fail rows just keep sitting there with active_flag = Y forever. Even after the check eventually passes, the old Fail rows are never closed. There is no timestamp telling you when it was resolved. Your audit history is completely lost.
Problem 3 — No way to distinguish current state from history
In Approach 1 the distinction is clean:
active_flagmeaningYcurrent version — this is the truth right nowNhistorical — superseded, closed with end_ts
In Approach 2 everything is active_flag = Y — both the stale rows from 5 days ago and today's fresh result. There is no way to filter to just the current state without knowing which row was inserted last, which means you'd need to do a MAX(insert_ts) or ROW_NUMBER() trick on every query. And even that becomes unreliable if insert timestamps are not precisely tracked.
Problem 4 — The scan gets heavier every day
Each day the job scans the entire CHK table to find all open failures across all previous dates. As days accumulate, that scan covers more and more rows — including rows that were already picked up and reprocessed many times before. There is no natural cutoff. By week 3 you are scanning 3 weeks of accumulated appended rows just to find which ones are still open.
In Approach 1, the MERGE only touches the rows in STG — a small, precise, single-execution snapshot. CHK is never full-scanned for this purpose.
Problem 5 — Rerun safety is broken
In Approach 1, if the job reruns, the DELETE on STG ensures CHK only ever gets one clean version per execution.
In Approach 2, if the job reruns, df.write(append) fires again on CHK directly. Now CHK has duplicate rows for today's run — two identical active rows for the same business key on the same date. There is no DELETE step protecting CHK, and since Spark cannot do a targeted delete on Oracle via df.write, you have no clean way to fix this. The table is now corrupted.
Summary — the core problem
Approach 2 treats CHK as a pure append log, not as a state table. That works fine if you only ever need to answer "did this check ever fail?" But your requirement is to answer "is this check currently failing?" — and that requires being able to deactivate old versions when new results come in. df.write(append) cannot do that. Only a MERGE can. And that is precisely why Approach 1 exists.
Rakesh hints at this directly when he says:
"Now, now, here we wanted to update the previous entries of the active flag, right? So what will happen is that if I keep everything as open... you have to scan only the previous day's data where the records are failed."
He is describing the workaround Approach 2 forces you into — and immediately flagging that it is a compromise, not a clean design.