Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/dve/core_engine/backends/base/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,3 +681,13 @@ def read_parquet(self, path: URI, **kwargs) -> EntityType:
def write_parquet(self, entity: EntityType, target_location: URI, **kwargs) -> URI:
"""Method to write parquet files"""
raise NotImplementedError()

def filter_data_contract_record_rejections(
self,
working_directory: URI,
entity: EntityType,
entity_name: EntityName,
**kwargs,
):
"""Method to filter out record rejection errors from the data contract for a given entity"""
raise NotImplementedError()
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
from pydantic import BaseModel
from typing_extensions import Annotated, get_args, get_origin, get_type_hints

from dve.common.error_utils import get_feedback_errors_uri
from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.type_hints import URI
from dve.core_engine.type_hints import URI, EntityName
from dve.parser.file_handling.service import LocalFilesystemImplementation, _get_implementation


Expand Down Expand Up @@ -100,7 +101,7 @@ def table_exists(connection: DuckDBPyConnection, table_name: str) -> bool:

def relation_is_empty(relation: DuckDBPyRelation) -> bool:
"""Check if a duckdb relation is empty"""
if relation.limit(1).count("*"):
if relation.limit(1).shape[0] > 0:
return False
return True

Expand Down Expand Up @@ -256,6 +257,48 @@ def duckdb_write_parquet(cls):
return cls


def _ddb_filter_contract_errors(
self,
working_directory: URI,
entity: DuckDBPyRelation,
entity_name: EntityName,
) -> DuckDBPyRelation:
contract_error_location = get_feedback_errors_uri(working_directory, "data_contract")
if not Path(contract_error_location).exists():
return entity
relevant_record_rejection_codes_rel = (
self._connection.read_json(
contract_error_location,
columns={
"RecordIndex": "INTEGER",
"FailureType": "STRING",
"Status": "STRING",
"Entity": "STRING",
},
)
.filter(f"FailureType == 'record' AND Status != 'informational' AND Entity = '{entity_name}'") # pylint: disable=C0301
.select("RecordIndex")
.distinct()
.order("RecordIndex asc")
)

if relation_is_empty(relevant_record_rejection_codes_rel):
return entity

filtered_entity = entity.join(
relevant_record_rejection_codes_rel,
condition="__record_index__ == RecordIndex",
how="anti"
)
return filtered_entity


def ddb_filter_contract_errors(cls):
"""Class decorator to filter out records that failed casting and have record rejection scope"""
cls.filter_data_contract_record_rejections = _ddb_filter_contract_errors
return cls


@staticmethod # type: ignore
def _duckdb_get_entity_count(entity: DuckDBPyRelation) -> int:
"""Method to obtain entity count from a persisted parquet entity"""
Expand Down
2 changes: 2 additions & 0 deletions src/dve/core_engine/backends/implementations/duckdb/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dve.core_engine.backends.exceptions import ConstraintError
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
DDBStruct,
ddb_filter_contract_errors,
duckdb_read_parquet,
duckdb_record_index,
duckdb_rel_to_dictionaries,
Expand Down Expand Up @@ -61,6 +62,7 @@
@duckdb_record_index
@duckdb_write_parquet
@duckdb_read_parquet
@ddb_filter_contract_errors
class DuckDBStepImplementations(BaseStepImplementations[DuckDBPyRelation]):
"""An implementation of transformation steps in duckdb."""

Expand Down
2 changes: 2 additions & 0 deletions src/dve/core_engine/backends/implementations/spark/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
spark_read_parquet,
spark_record_index,
spark_write_parquet,
spark_filter_contract_errors,
)
from dve.core_engine.backends.implementations.spark.types import (
Joined,
Expand Down Expand Up @@ -53,6 +54,7 @@
@spark_record_index
@spark_write_parquet
@spark_read_parquet
@spark_filter_contract_errors
class SparkStepImplementations(BaseStepImplementations[DataFrame]):
"""An implementation of transformation steps in Apache Spark."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from dataclasses import dataclass, is_dataclass
from decimal import Decimal
from functools import wraps
from pathlib import Path
from typing import Any, ClassVar, Optional, TypeVar, Union, overload

from delta.exceptions import ConcurrentAppendException, DeltaConcurrentModificationException
Expand All @@ -26,8 +27,9 @@
from typing_extensions import Annotated, Protocol, TypedDict, get_args, get_origin, get_type_hints

from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
from dve.common.error_utils import get_feedback_errors_uri
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
from dve.core_engine.type_hints import URI
from dve.core_engine.type_hints import URI, EntityName

# It would be really nice if there was a more parameterisable
# way of doing this.
Expand Down Expand Up @@ -365,6 +367,51 @@
return cls


def _spark_filter_contract_errors(
self,
working_directory: URI,
entity: DataFrame,
entity_name: EntityName,
) -> DataFrame:
contract_error_location = get_feedback_errors_uri(working_directory, "data_contract")
if not Path(contract_error_location).exists():
return entity

relevant_record_rejections_codes_df = (
self.spark_session.read.json(
path=contract_error_location,
schema=st.StructType([
st.StructField("RecordIndex", st.IntegerType()),
st.StructField("FailureType", st.StringType()),
st.StructField("Status", st.StringType()),
st.StructField("Entity", st.StringType()),
]),
)
.filter(
(sf.col("FailureType") == sf.lit("record"))
& (sf.col("Status") != sf.lit("informational"))
& (sf.col("Entity") == sf.lit(entity_name))
)
.distinct()
.orderBy(sf.asc(sf.col("RecordIndex")))
# todo - ^^ possibly relook at join strat. Does this help? Over prescriptive?

Check warning on line 397 in src/dve/core_engine/backends/implementations/spark/spark_helpers.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Complete the task associated to this "TODO" comment.

See more on https://sonarcloud.io/project/issues?id=NHSDigital_data-validation-engine&issues=AZ8TQf3RG0HmfgRSgg1P&open=AZ8TQf3RG0HmfgRSgg1P&pullRequest=117
)
if df_is_empty(relevant_record_rejections_codes_df):
return entity
filtered_entity = entity.join(
relevant_record_rejections_codes_df,
on=entity.__record_index__ == relevant_record_rejections_codes_df.RecordIndex,
how="anti",
)
return filtered_entity


def spark_filter_contract_errors(cls):
"""Class decorator to filter out records that failed casting and have record rejection scope"""
cls.filter_data_contract_record_rejections = _spark_filter_contract_errors
return cls


@staticmethod # type: ignore
def _spark_get_entity_count(entity: DataFrame) -> int:
"""Method to obtain entity count from a persisted parquet entity"""
Expand Down
2 changes: 2 additions & 0 deletions src/dve/core_engine/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@

record_count: Optional[int]
"""Count of records in the submitted file"""
number_submission_rejections: Optional[int]

Check failure on line 108 in src/dve/core_engine/models.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add an explicit default value to this optional field.

See more on https://sonarcloud.io/project/issues?id=NHSDigital_data-validation-engine&issues=AZ8TQf6-G0HmfgRSgg1Q&open=AZ8TQf6-G0HmfgRSgg1Q&pullRequest=117
"""Number of submission rejections raised following validation"""
number_record_rejections: Optional[int]
"""Number of record rejections raised following validation"""
number_warnings: Optional[int]
Expand Down
39 changes: 21 additions & 18 deletions src/dve/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,6 @@
failed.append((submission_info, submission_status))
else:
success.append((submission_info, submission_status))
except AttributeError as exc:
self._logger.error(f"File transformation raised exception: {exc}")
raise exc
except PERMISSIBLE_EXCEPTIONS as exc:
self._logger.warning(
f"File transformation raised exception: {exc}. Will be retried later."
Expand Down Expand Up @@ -509,9 +506,6 @@
submission_info: SubmissionInfo
submission_status: SubmissionStatus
submission_info, submission_status = future.result()
except AttributeError as exc:
self._logger.error(f"Data Contract raised exception: {exc}")
raise exc
except PERMISSIBLE_EXCEPTIONS as exc:
self._logger.warning(
f"Data Contract raised exception: {exc}. Will be retried later."
Expand Down Expand Up @@ -616,8 +610,19 @@
submission_status.processing_failed = True

for entity_name, entity in entity_manager.entities.items():
# Note BI filtering done within the apply_rules
self._logger.info(f"applying data contract filter to {entity_name}.")
if not entity_name.startswith("Original"):
filtered_entity = self._step_implementations.filter_data_contract_record_rejections(
working_directory,
entity,
entity_name,
)
else:
self._logger.info(f"Skipping {entity_name}. Marked original.")
filtered_entity = entity
projected = self._step_implementations.write_parquet( # type: ignore
entity,
filtered_entity,
fh.joinuri(
self.processed_files_path,
submission_info.submission_id,
Expand All @@ -629,6 +634,7 @@
projected
)

# todo - add to submission_status around records that have passed record validations/rejected

Check warning on line 637 in src/dve/pipeline/pipeline.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Complete the task associated to this "TODO" comment.

See more on https://sonarcloud.io/project/issues?id=NHSDigital_data-validation-engine&issues=AZ8TQf7iG0HmfgRSgg1S&open=AZ8TQf7iG0HmfgRSgg1S&pullRequest=117
submission_status.number_of_records = self.get_entity_count(
entity=entity_manager.entities[
f"""Original{rules.global_variables.get(
Expand Down Expand Up @@ -682,9 +688,6 @@
unsucessful_files.append((submission_info, submission_status)) # type: ignore
else:
successful_files.append((submission_info, submission_status)) # type: ignore
except AttributeError as exc:
self._logger.error(f"Business Rules raised exception: {exc}")
raise exc
except PERMISSIBLE_EXCEPTIONS as exc:
self._logger.warning(
f"Business Rules raised exception: {exc}. Will be retried later."
Expand Down Expand Up @@ -758,10 +761,12 @@

df = pl.DataFrame(errors, schema={key: pl.Utf8() for key in errors[0]}) # type: ignore
df = df.with_columns(
pl.when(pl.col("Status") == pl.lit("error")) # type: ignore
pl.when(pl.col("Status") == pl.lit("informational"))
.then(pl.lit("Warning"))
.when(pl.col("FailureType") == pl.lit("submission")) # type: ignore
.then(pl.lit("Submission Failure")) # type: ignore
.otherwise(pl.lit("Warning")) # type: ignore
.alias("error_type")
.otherwise(pl.lit("Record Rejection")) # type: ignore

Check failure on line 768 in src/dve/pipeline/pipeline.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "Record Rejection" 3 times.

See more on https://sonarcloud.io/project/issues?id=NHSDigital_data-validation-engine&issues=AZ8TQf7hG0HmfgRSgg1R&open=AZ8TQf7hG0HmfgRSgg1R&pullRequest=117
.alias("error_type") # type: ignore
)
df = df.select(
pl.col("Entity").alias("Table"), # type: ignore
Expand Down Expand Up @@ -823,7 +828,8 @@
sub_stats = SubmissionStatisticsRecord(
submission_id=submission_info.submission_id,
record_count=submission_status.number_of_records,
number_record_rejections=err_types.get("Submission Failure", 0),
number_submission_rejections=err_types.get("Submission Failure", 0),
number_record_rejections=err_types.get("Record Rejection", 0),
number_warnings=err_types.get("Warning", 0),
)

Expand All @@ -835,7 +841,7 @@
summary_items = er.SummaryItems(
submission_status=submission_status,
summary_dict=summary_dict,
row_headings=["Submission Failure", "Warning"],
row_headings=["Submission Failure", "Record Rejection", "Warning"],
)

workbook = er.ExcelFormat(
Expand Down Expand Up @@ -894,9 +900,6 @@
try:
submission_info, submission_status, submission_stats, feedback_uri = future.result()
reports.append((submission_info, submission_status, submission_stats, feedback_uri))
except AttributeError as exc:
self._logger.error(f"Error reports raised exception: {exc}")
raise exc
except PERMISSIBLE_EXCEPTIONS as exc:
self._logger.warning(
f"Error reports raised exception: {exc}. Will be retried later."
Expand Down
5 changes: 5 additions & 0 deletions src/dve/reporting/excel_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ def _add_submission_info(self, status: str, summary: Worksheet):
for key, value in self.summary_dict.items():
summary.append(["", _key_renames.get(key, key), str(value)])

summary.append([
"",
"Total Number of Records Processed",
self.submission_status.number_of_records if self.submission_status.number_of_records else 0 # pylint: disable=C0301
])
summary.append(["", ""])


Expand Down
59 changes: 59 additions & 0 deletions tests/features/animals.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
Feature: Pipeline tests using the animal dataset
Test record rejection and ensuring that records are correctly removed from the entity and that
the correct validation feedback is raised in the error report.

Scenario: Validate XML data with just record level rejections (duckdb)
Given I submit the animals file animals.xml for processing
And A duckdb pipeline is configured with schema file 'animals.dischema.json'
And I add initial audit entries for the submission
Then the latest audit record for the submission is marked with processing status file_transformation
When I run the file transformation phase
Then the animals entity is stored as a parquet after the file_transformation phase
And the latest audit record for the submission is marked with processing status data_contract
When I run the data contract phase
Then there are no record rejections from the data_contract phase
And the animals entity is stored as a parquet after the data_contract phase
And the latest audit record for the submission is marked with processing status business_rules
When I run the business rules phase
Then there are errors with the following details and associated error_count from the business_rules phase
| ErrorType | ErrorCode | error_count |
| record | ANE01 | 2 |
And The rules restrict "animals" to 3 qualifying records
When I run the error report phase
Then An error report is produced
And The statistics entry for the submission shows the following information
| parameter | value |
| record_count | 5 |
| number_record_rejections | 2 |
| number_warnings | 0 |

Scenario: Validate XML data with a mixture of error types in (duckdb)
Given I submit the animals file animals_mixture.xml for processing
And A duckdb pipeline is configured with schema file 'animals.dischema.json'
And I add initial audit entries for the submission
Then the latest audit record for the submission is marked with processing status file_transformation
When I run the file transformation phase
Then the animals entity is stored as a parquet after the file_transformation phase
And the latest audit record for the submission is marked with processing status data_contract
When I run the data contract phase
Then there are no record rejections from the data_contract phase
# Then there are errors with the following details and associated error_count from the data_contract phase
# | FailureType | Status | ErrorCode | error_count |
# | record | error | FieldBlank | 1 |
And the animals entity is stored as a parquet after the data_contract phase
And the latest audit record for the submission is marked with processing status business_rules
When I run the business rules phase
Then there are errors with the following details and associated error_count from the business_rules phase
| FailureType | Status | ErrorCode | error_count |
| record | error | ANE01 | 2 |
| submission | error | ANE02 | 1 |
| record | informational | ANE03 | 1 |
And The rules restrict "animals" to 5 qualifying records
When I run the error report phase
Then An error report is produced
And The statistics entry for the submission shows the following information
| parameter | value |
| record_count | 7 |
| number_submission_rejections | 1 |
| number_record_rejections | 2 |
| number_warnings | 1 |
4 changes: 2 additions & 2 deletions tests/features/demographics.feature
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Feature: Pipeline tests using the ambsys dataset
And the demographics entity is stored as a parquet after the data_contract phase
And the latest audit record for the submission is marked with processing status business_rules
When I run the business rules phase
Then The rules restrict "demographics" to 6 qualifying records
Then The rules restrict "demographics" to 2 qualifying records
And At least one row from "demographics" has generated error code "BAD_NHS"
And the demographics entity is stored as a parquet after the business_rules phase
And The entity "demographics" does not contain an entry for "FALSE" in column "NHS_Number_Valid"
Expand All @@ -43,7 +43,7 @@ Feature: Pipeline tests using the ambsys dataset
And the demographics entity is stored as a parquet after the data_contract phase
And the latest audit record for the submission is marked with processing status business_rules
When I run the business rules phase
Then The rules restrict "demographics" to 6 qualifying records
Then The rules restrict "demographics" to 2 qualifying records
And At least one row from "demographics" has generated error code "BAD_NHS"
And the demographics entity is stored as a parquet after the business_rules phase
And The entity "demographics" does not contain an entry for "FALSE" in column "NHS_Number_Valid"
Expand Down
Loading
Loading