Skip to content

Commit 497a1f6

Browse files
Copilottmikula-dev
andcommitted
Add country field to run and EDLA schemas
- Added country field to conf/topic_runs.json (jobs.items.properties) - Added country field to conf/topic_dlchange.json (root properties) - Updated postgres_edla_write to include country column (after timestamp_event) - Updated postgres_run_write to include country column in jobs table (after catalog_id) - Updated tests to validate new country field with empty string default - All tests passing, black/pylint/mypy checks passing Co-authored-by: tmikula-dev <72911271+tmikula-dev@users.noreply.github.com>
1 parent a7257ed commit 497a1f6

4 files changed

Lines changed: 28 additions & 11 deletions

File tree

conf/topic_dlchange.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
"type": "number",
2626
"description": "Timestamp of the event in epoch milliseconds"
2727
},
28+
"country": {
29+
"type": "string",
30+
"description": "The country the data is related to."
31+
},
2832
"catalog_id": {
2933
"type": "string",
3034
"description": "Identifier for the data definition (Glue/Hive) database and table name for example "

conf/topic_runs.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@
4343
"type": "string",
4444
"description": "Identifier for the data definition (Glue/Hive) database and table name for example"
4545
},
46+
"country": {
47+
"type": "string",
48+
"description": "The country the data is related to."
49+
},
4650
"status": {
4751
"type": "string",
4852
"enum": ["succeeded", "failed", "killed", "skipped"],

src/writers/writer_postgres.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None:
9696
source_app_version,
9797
environment,
9898
timestamp_event,
99+
country,
99100
catalog_id,
100101
operation,
101102
"location",
@@ -116,6 +117,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None:
116117
%s,
117118
%s,
118119
%s,
120+
%s,
119121
%s
120122
)""",
121123
(
@@ -125,6 +127,7 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None:
125127
message["source_app_version"],
126128
message["environment"],
127129
message["timestamp_event"],
130+
message.get("country", ""),
128131
message["catalog_id"],
129132
message["operation"],
130133
message.get("location"),
@@ -188,6 +191,7 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s
188191
(
189192
event_id,
190193
catalog_id,
194+
country,
191195
status,
192196
timestamp_start,
193197
timestamp_end,
@@ -202,11 +206,13 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s
202206
%s,
203207
%s,
204208
%s,
209+
%s,
205210
%s
206211
)""",
207212
(
208213
message["event_id"],
209214
job["catalog_id"],
215+
job.get("country", ""),
210216
job["status"],
211217
job["timestamp_start"],
212218
job["timestamp_end"],

tests/writers/test_writer_postgres.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,13 @@ def test_postgres_edla_write_with_optional_fields():
5757
writer_postgres.postgres_edla_write(cur, "table_a", message)
5858
assert len(cur.executions) == 1
5959
_sql, params = cur.executions[0]
60-
assert len(params) == 12
60+
assert len(params) == 13
6161
assert params[0] == "e1"
62-
assert params[8] == "s3://bucket/path"
63-
assert params[9] == "parquet"
64-
assert json.loads(params[10]) == {"compression": "snappy"}
65-
assert json.loads(params[11]) == {"foo": "bar"}
62+
assert params[6] == "" # country (default empty string)
63+
assert params[9] == "s3://bucket/path"
64+
assert params[10] == "parquet"
65+
assert json.loads(params[11]) == {"compression": "snappy"}
66+
assert json.loads(params[12]) == {"foo": "bar"}
6667

6768

6869
def test_postgres_edla_write_missing_optional():
@@ -80,10 +81,11 @@ def test_postgres_edla_write_missing_optional():
8081
}
8182
writer_postgres.postgres_edla_write(cur, "table_a", message)
8283
_sql, params = cur.executions[0]
83-
assert params[8] is None
84-
assert params[9] == "delta"
85-
assert params[10] is None
86-
assert params[11] is None
84+
assert params[6] == "" # country (default empty string)
85+
assert params[9] is None # location
86+
assert params[10] == "delta"
87+
assert params[11] is None # format_options
88+
assert params[12] is None # additional_info
8789

8890

8991
def test_postgres_run_write():
@@ -115,8 +117,9 @@ def test_postgres_run_write():
115117
assert "source_app_version" in run_sql
116118
assert run_params[3] == "runapp"
117119
_job2_sql, job2_params = cur.executions[2]
118-
assert job2_params[5] == "err"
119-
assert json.loads(job2_params[6]) == {"k": "v"}
120+
assert job2_params[2] == "" # country (default empty string)
121+
assert job2_params[6] == "err"
122+
assert json.loads(job2_params[7]) == {"k": "v"}
120123

121124

122125
def test_postgres_test_write():

0 commit comments

Comments
 (0)