Skip to content

Commit f90fa25

Browse files
authored
Merge pull request #117 from NHSDigital/fix/gr-ndit-1535-ensure_record_level_rejections_are_working
Ensure record level rejections are working
2 parents 150d3fa + c1c5f3e commit f90fa25

18 files changed

Lines changed: 501 additions & 40 deletions

File tree

src/dve/core_engine/backends/base/rules.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -681,3 +681,13 @@ def read_parquet(self, path: URI, **kwargs) -> EntityType:
681681
def write_parquet(self, entity: EntityType, target_location: URI, **kwargs) -> URI:
682682
"""Method to write parquet files"""
683683
raise NotImplementedError()
684+
685+
def filter_data_contract_record_rejections(
686+
self,
687+
working_directory: URI,
688+
entity: EntityType,
689+
entity_name: EntityName,
690+
**kwargs,
691+
):
692+
"""Method to filter out record rejection errors from the data contract for a given entity"""
693+
raise NotImplementedError()

src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
from pydantic import BaseModel
1919
from typing_extensions import Annotated, get_args, get_origin, get_type_hints
2020

21+
from dve.common.error_utils import get_feedback_errors_uri
2122
from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
2223
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
23-
from dve.core_engine.type_hints import URI
24+
from dve.core_engine.type_hints import URI, EntityName
2425
from dve.parser.file_handling.service import LocalFilesystemImplementation, _get_implementation
2526

2627

@@ -100,7 +101,7 @@ def table_exists(connection: DuckDBPyConnection, table_name: str) -> bool:
100101

101102
def relation_is_empty(relation: DuckDBPyRelation) -> bool:
102103
"""Check if a duckdb relation is empty"""
103-
if relation.limit(1).count("*"):
104+
if relation.limit(1).shape[0] > 0:
104105
return False
105106
return True
106107

@@ -256,6 +257,48 @@ def duckdb_write_parquet(cls):
256257
return cls
257258

258259

260+
def _ddb_filter_contract_errors(
261+
self,
262+
working_directory: URI,
263+
entity: DuckDBPyRelation,
264+
entity_name: EntityName,
265+
) -> DuckDBPyRelation:
266+
contract_error_location = get_feedback_errors_uri(working_directory, "data_contract")
267+
if not Path(contract_error_location).exists():
268+
return entity
269+
relevant_record_rejection_codes_rel = (
270+
self._connection.read_json(
271+
contract_error_location,
272+
columns={
273+
"RecordIndex": "INTEGER",
274+
"FailureType": "STRING",
275+
"Status": "STRING",
276+
"Entity": "STRING",
277+
},
278+
)
279+
.filter(f"FailureType == 'record' AND Status != 'informational' AND Entity = '{entity_name}'") # pylint: disable=C0301
280+
.select("RecordIndex")
281+
.distinct()
282+
.order("RecordIndex asc")
283+
)
284+
285+
if relation_is_empty(relevant_record_rejection_codes_rel):
286+
return entity
287+
288+
filtered_entity = entity.join(
289+
relevant_record_rejection_codes_rel,
290+
condition="__record_index__ == RecordIndex",
291+
how="anti"
292+
)
293+
return filtered_entity
294+
295+
296+
def ddb_filter_contract_errors(cls):
297+
"""Class decorator to filter out records that failed casting and have record rejection scope"""
298+
cls.filter_data_contract_record_rejections = _ddb_filter_contract_errors
299+
return cls
300+
301+
259302
@staticmethod # type: ignore
260303
def _duckdb_get_entity_count(entity: DuckDBPyRelation) -> int:
261304
"""Method to obtain entity count from a persisted parquet entity"""

src/dve/core_engine/backends/implementations/duckdb/rules.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from dve.core_engine.backends.exceptions import ConstraintError
2323
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
2424
DDBStruct,
25+
ddb_filter_contract_errors,
2526
duckdb_read_parquet,
2627
duckdb_record_index,
2728
duckdb_rel_to_dictionaries,
@@ -61,6 +62,7 @@
6162
@duckdb_record_index
6263
@duckdb_write_parquet
6364
@duckdb_read_parquet
65+
@ddb_filter_contract_errors
6466
class DuckDBStepImplementations(BaseStepImplementations[DuckDBPyRelation]):
6567
"""An implementation of transformation steps in duckdb."""
6668

src/dve/core_engine/backends/implementations/spark/rules.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
spark_read_parquet,
1818
spark_record_index,
1919
spark_write_parquet,
20+
spark_filter_contract_errors,
2021
)
2122
from dve.core_engine.backends.implementations.spark.types import (
2223
Joined,
@@ -53,6 +54,7 @@
5354
@spark_record_index
5455
@spark_write_parquet
5556
@spark_read_parquet
57+
@spark_filter_contract_errors
5658
class SparkStepImplementations(BaseStepImplementations[DataFrame]):
5759
"""An implementation of transformation steps in Apache Spark."""
5860

src/dve/core_engine/backends/implementations/spark/spark_helpers.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from dataclasses import dataclass, is_dataclass
1313
from decimal import Decimal
1414
from functools import wraps
15+
from pathlib import Path
1516
from typing import Any, ClassVar, Optional, TypeVar, Union, overload
1617

1718
from delta.exceptions import ConcurrentAppendException, DeltaConcurrentModificationException
@@ -26,8 +27,9 @@
2627
from typing_extensions import Annotated, Protocol, TypedDict, get_args, get_origin, get_type_hints
2728

2829
from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
30+
from dve.common.error_utils import get_feedback_errors_uri
2931
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
30-
from dve.core_engine.type_hints import URI
32+
from dve.core_engine.type_hints import URI, EntityName
3133

3234
# It would be really nice if there was a more parameterisable
3335
# way of doing this.
@@ -365,6 +367,51 @@ def spark_write_parquet(cls):
365367
return cls
366368

367369

370+
def _spark_filter_contract_errors(
371+
self,
372+
working_directory: URI,
373+
entity: DataFrame,
374+
entity_name: EntityName,
375+
) -> DataFrame:
376+
contract_error_location = get_feedback_errors_uri(working_directory, "data_contract")
377+
if not Path(contract_error_location).exists():
378+
return entity
379+
380+
relevant_record_rejections_codes_df = (
381+
self.spark_session.read.json(
382+
path=contract_error_location,
383+
schema=st.StructType([
384+
st.StructField("RecordIndex", st.IntegerType()),
385+
st.StructField("FailureType", st.StringType()),
386+
st.StructField("Status", st.StringType()),
387+
st.StructField("Entity", st.StringType()),
388+
]),
389+
)
390+
.filter(
391+
(sf.col("FailureType") == sf.lit("record"))
392+
& (sf.col("Status") != sf.lit("informational"))
393+
& (sf.col("Entity") == sf.lit(entity_name))
394+
)
395+
.distinct()
396+
.orderBy(sf.asc(sf.col("RecordIndex")))
397+
# todo - ^^ possibly relook at join strat. Does this help? Over prescriptive?
398+
)
399+
if df_is_empty(relevant_record_rejections_codes_df):
400+
return entity
401+
filtered_entity = entity.join(
402+
relevant_record_rejections_codes_df,
403+
on=entity.__record_index__ == relevant_record_rejections_codes_df.RecordIndex,
404+
how="anti",
405+
)
406+
return filtered_entity
407+
408+
409+
def spark_filter_contract_errors(cls):
410+
"""Class decorator to filter out records that failed casting and have record rejection scope"""
411+
cls.filter_data_contract_record_rejections = _spark_filter_contract_errors
412+
return cls
413+
414+
368415
@staticmethod # type: ignore
369416
def _spark_get_entity_count(entity: DataFrame) -> int:
370417
"""Method to obtain entity count from a persisted parquet entity"""

src/dve/core_engine/models.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ class SubmissionStatisticsRecord(AuditRecord):
105105

106106
record_count: Optional[int]
107107
"""Count of records in the submitted file"""
108+
number_submission_rejections: Optional[int]
109+
"""Number of submission rejections raised following validation"""
108110
number_record_rejections: Optional[int]
109111
"""Number of record rejections raised following validation"""
110112
number_warnings: Optional[int]

src/dve/pipeline/pipeline.py

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -379,9 +379,6 @@ def file_transformation_step(
379379
failed.append((submission_info, submission_status))
380380
else:
381381
success.append((submission_info, submission_status))
382-
except AttributeError as exc:
383-
self._logger.error(f"File transformation raised exception: {exc}")
384-
raise exc
385382
except PERMISSIBLE_EXCEPTIONS as exc:
386383
self._logger.warning(
387384
f"File transformation raised exception: {exc}. Will be retried later."
@@ -509,9 +506,6 @@ def data_contract_step(
509506
submission_info: SubmissionInfo
510507
submission_status: SubmissionStatus
511508
submission_info, submission_status = future.result()
512-
except AttributeError as exc:
513-
self._logger.error(f"Data Contract raised exception: {exc}")
514-
raise exc
515509
except PERMISSIBLE_EXCEPTIONS as exc:
516510
self._logger.warning(
517511
f"Data Contract raised exception: {exc}. Will be retried later."
@@ -616,8 +610,19 @@ def apply_business_rules( # pylint: disable=R0914
616610
submission_status.processing_failed = True
617611

618612
for entity_name, entity in entity_manager.entities.items():
613+
# Note BI filtering done within the apply_rules
614+
self._logger.info(f"applying data contract filter to {entity_name}.")
615+
if not entity_name.startswith("Original"):
616+
filtered_entity = self._step_implementations.filter_data_contract_record_rejections(
617+
working_directory,
618+
entity,
619+
entity_name,
620+
)
621+
else:
622+
self._logger.info(f"Skipping {entity_name}. Marked original.")
623+
filtered_entity = entity
619624
projected = self._step_implementations.write_parquet( # type: ignore
620-
entity,
625+
filtered_entity,
621626
fh.joinuri(
622627
self.processed_files_path,
623628
submission_info.submission_id,
@@ -629,6 +634,7 @@ def apply_business_rules( # pylint: disable=R0914
629634
projected
630635
)
631636

637+
# todo - add to submission_status around records that have passed record validations/rejected
632638
submission_status.number_of_records = self.get_entity_count(
633639
entity=entity_manager.entities[
634640
f"""Original{rules.global_variables.get(
@@ -682,9 +688,6 @@ def business_rule_step(
682688
unsucessful_files.append((submission_info, submission_status)) # type: ignore
683689
else:
684690
successful_files.append((submission_info, submission_status)) # type: ignore
685-
except AttributeError as exc:
686-
self._logger.error(f"Business Rules raised exception: {exc}")
687-
raise exc
688691
except PERMISSIBLE_EXCEPTIONS as exc:
689692
self._logger.warning(
690693
f"Business Rules raised exception: {exc}. Will be retried later."
@@ -758,10 +761,12 @@ def _get_error_dataframes(self, submission_id: str):
758761

759762
df = pl.DataFrame(errors, schema={key: pl.Utf8() for key in errors[0]}) # type: ignore
760763
df = df.with_columns(
761-
pl.when(pl.col("Status") == pl.lit("error")) # type: ignore
764+
pl.when(pl.col("Status") == pl.lit("informational"))
765+
.then(pl.lit("Warning"))
766+
.when(pl.col("FailureType") == pl.lit("submission")) # type: ignore
762767
.then(pl.lit("Submission Failure")) # type: ignore
763-
.otherwise(pl.lit("Warning")) # type: ignore
764-
.alias("error_type")
768+
.otherwise(pl.lit("Record Rejection")) # type: ignore
769+
.alias("error_type") # type: ignore
765770
)
766771
df = df.select(
767772
pl.col("Entity").alias("Table"), # type: ignore
@@ -823,7 +828,8 @@ def error_report(
823828
sub_stats = SubmissionStatisticsRecord(
824829
submission_id=submission_info.submission_id,
825830
record_count=submission_status.number_of_records,
826-
number_record_rejections=err_types.get("Submission Failure", 0),
831+
number_submission_rejections=err_types.get("Submission Failure", 0),
832+
number_record_rejections=err_types.get("Record Rejection", 0),
827833
number_warnings=err_types.get("Warning", 0),
828834
)
829835

@@ -835,7 +841,7 @@ def error_report(
835841
summary_items = er.SummaryItems(
836842
submission_status=submission_status,
837843
summary_dict=summary_dict,
838-
row_headings=["Submission Failure", "Warning"],
844+
row_headings=["Submission Failure", "Record Rejection", "Warning"],
839845
)
840846

841847
workbook = er.ExcelFormat(
@@ -894,9 +900,6 @@ def error_report_step(
894900
try:
895901
submission_info, submission_status, submission_stats, feedback_uri = future.result()
896902
reports.append((submission_info, submission_status, submission_stats, feedback_uri))
897-
except AttributeError as exc:
898-
self._logger.error(f"Error reports raised exception: {exc}")
899-
raise exc
900903
except PERMISSIBLE_EXCEPTIONS as exc:
901904
self._logger.warning(
902905
f"Error reports raised exception: {exc}. Will be retried later."

src/dve/reporting/excel_report.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,11 @@ def _add_submission_info(self, status: str, summary: Worksheet):
141141
for key, value in self.summary_dict.items():
142142
summary.append(["", _key_renames.get(key, key), str(value)])
143143

144+
summary.append([
145+
"",
146+
"Total Number of Records Processed",
147+
self.submission_status.number_of_records if self.submission_status.number_of_records else 0 # pylint: disable=C0301
148+
])
144149
summary.append(["", ""])
145150

146151

tests/features/animals.feature

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
Feature: Pipeline tests using the animal dataset
2+
Test record rejection and ensuring that records are correctly removed from the entity and that
3+
the correct validation feedback is raised in the error report.
4+
5+
Scenario: Validate XML data with just record level rejections (duckdb)
6+
Given I submit the animals file animals.xml for processing
7+
And A duckdb pipeline is configured with schema file 'animals.dischema.json'
8+
And I add initial audit entries for the submission
9+
Then the latest audit record for the submission is marked with processing status file_transformation
10+
When I run the file transformation phase
11+
Then the animals entity is stored as a parquet after the file_transformation phase
12+
And the latest audit record for the submission is marked with processing status data_contract
13+
When I run the data contract phase
14+
Then there are no record rejections from the data_contract phase
15+
And the animals entity is stored as a parquet after the data_contract phase
16+
And the latest audit record for the submission is marked with processing status business_rules
17+
When I run the business rules phase
18+
Then there are errors with the following details and associated error_count from the business_rules phase
19+
| ErrorType | ErrorCode | error_count |
20+
| record | ANE01 | 2 |
21+
And The rules restrict "animals" to 3 qualifying records
22+
When I run the error report phase
23+
Then An error report is produced
24+
And The statistics entry for the submission shows the following information
25+
| parameter | value |
26+
| record_count | 5 |
27+
| number_record_rejections | 2 |
28+
| number_warnings | 0 |
29+
30+
Scenario: Validate XML data with a mixture of error types in (duckdb)
31+
Given I submit the animals file animals_mixture.xml for processing
32+
And A duckdb pipeline is configured with schema file 'animals.dischema.json'
33+
And I add initial audit entries for the submission
34+
Then the latest audit record for the submission is marked with processing status file_transformation
35+
When I run the file transformation phase
36+
Then the animals entity is stored as a parquet after the file_transformation phase
37+
And the latest audit record for the submission is marked with processing status data_contract
38+
When I run the data contract phase
39+
Then there are no record rejections from the data_contract phase
40+
# Then there are errors with the following details and associated error_count from the data_contract phase
41+
# | FailureType | Status | ErrorCode | error_count |
42+
# | record | error | FieldBlank | 1 |
43+
And the animals entity is stored as a parquet after the data_contract phase
44+
And the latest audit record for the submission is marked with processing status business_rules
45+
When I run the business rules phase
46+
Then there are errors with the following details and associated error_count from the business_rules phase
47+
| FailureType | Status | ErrorCode | error_count |
48+
| record | error | ANE01 | 2 |
49+
| submission | error | ANE02 | 1 |
50+
| record | informational | ANE03 | 1 |
51+
And The rules restrict "animals" to 5 qualifying records
52+
When I run the error report phase
53+
Then An error report is produced
54+
And The statistics entry for the submission shows the following information
55+
| parameter | value |
56+
| record_count | 7 |
57+
| number_submission_rejections | 1 |
58+
| number_record_rejections | 2 |
59+
| number_warnings | 1 |

tests/features/demographics.feature

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ Feature: Pipeline tests using the ambsys dataset
1717
And the demographics entity is stored as a parquet after the data_contract phase
1818
And the latest audit record for the submission is marked with processing status business_rules
1919
When I run the business rules phase
20-
Then The rules restrict "demographics" to 6 qualifying records
20+
Then The rules restrict "demographics" to 2 qualifying records
2121
And At least one row from "demographics" has generated error code "BAD_NHS"
2222
And the demographics entity is stored as a parquet after the business_rules phase
2323
And The entity "demographics" does not contain an entry for "FALSE" in column "NHS_Number_Valid"
@@ -43,7 +43,7 @@ Feature: Pipeline tests using the ambsys dataset
4343
And the demographics entity is stored as a parquet after the data_contract phase
4444
And the latest audit record for the submission is marked with processing status business_rules
4545
When I run the business rules phase
46-
Then The rules restrict "demographics" to 6 qualifying records
46+
Then The rules restrict "demographics" to 2 qualifying records
4747
And At least one row from "demographics" has generated error code "BAD_NHS"
4848
And the demographics entity is stored as a parquet after the business_rules phase
4949
And The entity "demographics" does not contain an entry for "FALSE" in column "NHS_Number_Valid"

0 commit comments

Comments
 (0)