1- """Tests for parser error handling, schema compliance, and so on."""
1+ """
2+
3+ Tests for DataFrameValidator behavior:
4+ - empty dataframe handling
5+ - mocked validation flow
6+ - integration validation on real RefSeq CDM outputs
7+
8+ """
29
310from typing import Any
411from unittest .mock import MagicMock
512
613import pytest
714from pyspark .sql import DataFrame , SparkSession
815from pyspark .sql .types import StructField , StructType
16+ from pyspark .sql .functions import col , when , lit
917
1018from cdm_data_loader_utils .audit .schema import METRICS , REJECTS , ROW_ERRORS
1119from cdm_data_loader_utils .core .constants import INVALID_DATA_FIELD_NAME
1422from tests .audit .conftest import create_table
1523
1624
25+ # ------------------------------------------------------------------------------
26+ # Unit tests
27+ # ------------------------------------------------------------------------------
1728@pytest .mark .requires_spark
1829def test_validate_dataframe_empty_df (pipeline_run : PipelineRun , empty_df : DataFrame ) -> None :
1930 """Assert that an empty dataframe does not perform any validation."""
@@ -82,34 +93,36 @@ def test_validate_dataframe_no_validation( # noqa: PLR0913
8293 assert rejects .count () == output .records_invalid
8394
8495
96+ # ------------------------------------------------------------------------------
97+ # Integration-style test (real RefSeq CDM output)
98+ # ------------------------------------------------------------------------------
99+
100+
85101@pytest .mark .requires_spark
86- def test_validate_refseq_cdm_identifiers (
102+ def test_validate_refseq_cdm (
87103 spark : SparkSession ,
88104 pipeline_run : PipelineRun ,
89105) -> None :
90- """
91- Validate CDM identifiers produced by annotation_parse.py
92- """
93-
94- # Prepare the metrics / rejects form scratch
95- for t in [METRICS , REJECTS ]:
106+ # Prepare audit tables from scratch
107+ for t in (METRICS , REJECTS ):
96108 create_table (spark , t , add_default_data = False )
97109
98- # read annotation_parse.py output
110+ # Load real pipeline output
99111 df = spark .table (f"{ pipeline_run .namespace } .cdm_identifiers" )
100112
101- # sanity check
113+ # Sanity check: pipeline actually produced data
102114 assert df .count () > 0
115+ assert "identifier" in df .columns
103116
104- # identifier cannot be null
117+ # Simple validation rule: identifier cannot be null
105118 def validation_fn (df : DataFrame ) -> DataFrame :
106- from pyspark .sql .functions import when , col
107-
108- return df .withColumn (INVALID_DATA_FIELD_NAME , when (col ("identifier" ).isNull (), "identifier is null" ))
119+ return df .withColumn (
120+ INVALID_DATA_FIELD_NAME ,
121+ when (col ("identifier" ).isNull (), lit ("identifier is null" )),
122+ )
109123
110124 validator = Validator (validation_fn , {})
111125
112- # DataFrameValidator
113126 dfv = DataFrameValidator (spark )
114127 output = dfv .validate_dataframe (
115128 data_to_validate = df ,
@@ -119,11 +132,14 @@ def validation_fn(df: DataFrame) -> DataFrame:
119132 invalid_col = INVALID_DATA_FIELD_NAME ,
120133 )
121134
122- # assertions
135+ # Records accounting
123136 assert output .records_read == df .count ()
124137 assert output .records_valid + output .records_invalid == output .records_read
125138
126- # validate rejects / metrics form scratch
139+ # valid_df must not contain invalid rows
140+ assert output .valid_df .filter (col (INVALID_DATA_FIELD_NAME ).isNotNull ()).count () == 0
141+
142+ # Audit tables written
127143 metrics = spark .table (f"{ pipeline_run .namespace } .{ METRICS } " )
128144 rejects = spark .table (f"{ pipeline_run .namespace } .{ REJECTS } " )
129145
0 commit comments