Skip to content

Commit eab27c9

Browse files
committed
First draft of integration test
1 parent 8c656d5 commit eab27c9

4 files changed

Lines changed: 128 additions & 122 deletions

File tree

src/cdm_data_loader_utils/parsers/annotation_parse.py

Lines changed: 94 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -55,20 +55,6 @@ def init_spark_and_db(app_name: str, database: str) -> SparkSession:
5555
return spark
5656

5757

58-
# ---------------------------------------------------------------------
59-
# CDM TABLE SCHEMAS
60-
# ---------------------------------------------------------------------
61-
# Using centralized schemas
62-
IDENTIFIER_SCHEMA = cdm_schemas["Identifier"]
63-
NAME_SCHEMA = cdm_schemas["Name"]
64-
FEATURE_SCHEMA = cdm_schemas["Feature"]
65-
CONTIG_COLLECTION_X_FEATURE_SCHEMA = cdm_schemas["ContigCollection_x_Feature"]
66-
CONTIG_COLLECTION_X_PROTEIN_SCHEMA = cdm_schemas["ContigCollection_x_Protein"]
67-
FEATURE_X_PROTEIN_SCHEMA = cdm_schemas["Feature_x_Protein"]
68-
CONTIG_SCHEMA = cdm_schemas["Contig"]
69-
CONTIG_X_CONTIG_COLLECTION_SCHEMA = cdm_schemas["Contig_x_ContigCollection"]
70-
71-
7258
# ---------------------------------------------------------------------
7359
# CDM PREFIX NORMALIZATION
7460
# ---------------------------------------------------------------------
@@ -160,19 +146,21 @@ def load_feature_records(data: dict) -> list[tuple]:
160146
"minus": "negative",
161147
"unstranded": "unstranded",
162148
}.get(r.get("orientation"), "unknown")
163-
features.append((
164-
feature_id,
165-
None,
166-
None,
167-
None,
168-
to_int(r.get("end")),
169-
None,
170-
to_int(r.get("begin")),
171-
strand,
172-
"RefSeq",
173-
None,
174-
"gene",
175-
))
149+
features.append(
150+
(
151+
feature_id,
152+
None,
153+
None,
154+
None,
155+
to_int(r.get("end")),
156+
None,
157+
to_int(r.get("begin")),
158+
strand,
159+
"RefSeq",
160+
None,
161+
"gene",
162+
)
163+
)
176164
return features
177165

178166

@@ -270,10 +258,12 @@ def load_contig_x_contig_collection(data: dict) -> list[tuple[str, str]]:
270258
assembly = annotations[0].get("assembly_accession")
271259

272260
if contig and assembly:
273-
links.append((
274-
f"refseq:{contig}",
275-
apply_prefix(assembly),
276-
))
261+
links.append(
262+
(
263+
f"refseq:{contig}",
264+
apply_prefix(assembly),
265+
)
266+
)
277267

278268
return links
279269

@@ -285,27 +275,27 @@ def write_to_table(
285275
spark: SparkSession,
286276
records: list[tuple],
287277
table_name: str,
288-
schema: StructType,
289278
database: str = "default",
290279
) -> None:
291280
if records:
292-
spark.createDataFrame(records, schema).write.format("delta").mode("overwrite").option(
281+
spark.createDataFrame(records, cdm_schemas[table_name]).write.format("delta").mode("overwrite").option(
293282
"overwriteSchema", "true"
294283
).saveAsTable(f"{database}.{table_name}")
295284

296285

297286
# ---------------------------------------------------------------------
298287
# SQL PREVIEW
299288
# ---------------------------------------------------------------------
289+
300290
CDM_TABLES = [
301-
"cdm_identifiers",
302-
"cdm_names",
303-
"cdm_features",
304-
"cdm_contig_collection_x_feature",
305-
"cdm_contig_collection_x_protein",
306-
"cdm_feature_x_protein",
307-
"cdm_contigs",
308-
"cdm_contig_x_contig_collection",
291+
"Identifier",
292+
"Name",
293+
"Feature",
294+
"ContigCollection_x_Feature",
295+
"ContigCollection_x_Protein",
296+
"Feature_x_Protein",
297+
"Contig",
298+
"Contig_x_ContigCollection",
309299
]
310300

311301

@@ -316,6 +306,68 @@ def run_sql_query(spark: SparkSession, database: str = "default") -> None:
316306
spark.sql(f"SELECT * FROM {table} LIMIT 20").show(truncate=False)
317307

318308

309+
def parse_annotation_data(spark: SparkSession, datasets: list[dict], namespace: str) -> None:
310+
# -----------------------------------------
311+
# Parse and write CDM tables
312+
# -----------------------------------------
313+
for data in datasets:
314+
write_to_table(
315+
spark,
316+
load_identifiers(data),
317+
"Identifier",
318+
namespace,
319+
)
320+
321+
write_to_table(
322+
spark,
323+
load_names(data),
324+
"Name",
325+
namespace,
326+
)
327+
328+
write_to_table(
329+
spark,
330+
load_feature_records(data),
331+
"Feature",
332+
namespace,
333+
)
334+
335+
write_to_table(
336+
spark,
337+
load_contig_collection_x_feature(data),
338+
"ContigCollection_x_Feature",
339+
namespace,
340+
)
341+
342+
write_to_table(
343+
spark,
344+
load_contig_collection_x_protein(data),
345+
"ContigCollection_x_Protein",
346+
namespace,
347+
)
348+
349+
write_to_table(
350+
spark,
351+
load_feature_x_protein(data),
352+
"Feature_x_Protein",
353+
namespace,
354+
)
355+
356+
write_to_table(
357+
spark,
358+
load_contigs(data),
359+
"Contig",
360+
namespace,
361+
)
362+
363+
write_to_table(
364+
spark,
365+
load_contig_x_contig_collection(data),
366+
"Contig_x_ContigCollection",
367+
namespace,
368+
)
369+
370+
319371
# ---------------------------------------------------------------------
320372
# CLI ENTRY
321373
# ---------------------------------------------------------------------
@@ -383,73 +435,7 @@ def main():
383435
with open(path) as f:
384436
datasets.append(json.load(f))
385437

386-
# -----------------------------------------
387-
# Parse and write CDM tables
388-
# -----------------------------------------
389-
for data in datasets:
390-
write_to_table(
391-
spark,
392-
load_identifiers(data),
393-
"cdm_identifiers",
394-
IDENTIFIER_SCHEMA,
395-
args.namespace,
396-
)
397-
398-
write_to_table(
399-
spark,
400-
load_names(data),
401-
"cdm_names",
402-
NAME_SCHEMA,
403-
args.namespace,
404-
)
405-
406-
write_to_table(
407-
spark,
408-
load_feature_records(data),
409-
"cdm_features",
410-
FEATURE_SCHEMA,
411-
args.namespace,
412-
)
413-
414-
write_to_table(
415-
spark,
416-
load_contig_collection_x_feature(data),
417-
"cdm_contig_collection_x_feature",
418-
CONTIG_COLLECTION_X_FEATURE_SCHEMA,
419-
args.namespace,
420-
)
421-
422-
write_to_table(
423-
spark,
424-
load_contig_collection_x_protein(data),
425-
"cdm_contig_collection_x_protein",
426-
CONTIG_COLLECTION_X_PROTEIN_SCHEMA,
427-
args.namespace,
428-
)
429-
430-
write_to_table(
431-
spark,
432-
load_feature_x_protein(data),
433-
"cdm_feature_x_protein",
434-
FEATURE_X_PROTEIN_SCHEMA,
435-
args.namespace,
436-
)
437-
438-
write_to_table(
439-
spark,
440-
load_contigs(data),
441-
"cdm_contigs",
442-
CONTIG_SCHEMA,
443-
args.namespace,
444-
)
445-
446-
write_to_table(
447-
spark,
448-
load_contig_x_contig_collection(data),
449-
"cdm_contig_x_contig_collection",
450-
CONTIG_X_CONTIG_COLLECTION_SCHEMA,
451-
args.namespace,
452-
)
438+
parse_annotation_data(spark, datasets, args.namespace)
453439

454440
# -----------------------------------------
455441
# SQL preview

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from typing import Any
88

99
import pytest
10-
from pyspark.sql import SparkSession, DataFrame
10+
from pyspark.sql import DataFrame, SparkSession
1111
from pyspark.sql.types import (
1212
ArrayType,
1313
BooleanType,

tests/parsers/test_annotation_parse.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import json
22
from pathlib import Path
3-
import pytest
43

4+
import pytest
5+
from pyspark.sql import SparkSession
56

67
from src.cdm_data_loader_utils.parsers.annotation_parse import (
8+
apply_prefix,
79
load_contig_collection_x_feature,
810
load_contig_collection_x_protein,
911
load_contig_x_contig_collection,
@@ -12,9 +14,10 @@
1214
load_feature_x_protein,
1315
load_identifiers,
1416
load_names,
15-
apply_prefix,
17+
parse_annotation_data,
1618
to_int,
1719
)
20+
from tests.conftest import TEST_NS
1821

1922

2023
@pytest.mark.parametrize(
@@ -709,3 +712,31 @@ def test_apply_prefix(input_id, expected):
709712
@pytest.mark.parametrize("val, expected", [("123", 123), ("abc", None), ("", None)])
710713
def test_to_int(val, expected):
711714
assert to_int(val) == expected
715+
716+
717+
@pytest.mark.requires_spark
718+
def test_parse_annotation_data(spark: SparkSession, test_data_dir: Path) -> None:
719+
"""Test the parsing of the annotation data."""
720+
test_ns = TEST_NS.lower()
721+
spark.sql(f"CREATE DATABASE IF NOT EXISTS {TEST_NS}")
722+
sample_api_response = test_data_dir / "refseq" / "annotation_report.json"
723+
# a single dataset
724+
dataset = json.load(sample_api_response.open())
725+
expected_resp_path = test_data_dir / "refseq" / "annotation_report.parsed.json"
726+
expected = json.load(expected_resp_path.open())
727+
728+
parse_annotation_data(spark, [dataset], TEST_NS)
729+
730+
for table_name, rows in expected.items():
731+
# correct number of rows
732+
result_df = spark.table(f"{test_ns}.{table_name}")
733+
assert result_df.count() == len(rows)
734+
# EITHER run r.asDict() to compare the data in dictionary form -- but watch out for the row order
735+
output = [r.asDict() for r in result_df.collect()]
736+
assert [r.asDict() for r in result_df.collect()] == rows
737+
# OR create a dataframe from the expected rows and the schema and compare the dataframes directly
738+
739+
740+
# expected_df = spark.createDataFrame(rows, schema=cdm_schema[table_name])
741+
# assertDataFrameSchemaEqual(expected_df, result_df)
742+
# assertDataFrameEqual(expected_df, result_df)

tests/utils/test_spark_delta.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,6 @@
2929
TENANT_NAME = "The_Breakers"
3030

3131

32-
<<<<<<< HEAD
33-
=======
34-
@pytest.fixture
35-
def spark(tmp_path: Path) -> Generator[SparkSession, Any]:
36-
"""Generate a spark session with spark.sql.warehouse.dir set to the pytest temporary directory."""
37-
spark = get_spark("test_delta_app", local=True, delta_lake=True, override={SAVE_DIR: tmp_path})
38-
yield spark
39-
spark.stop()
40-
41-
42-
>>>>>>> 01a355b (Restore accidentally deleted files)
4332
def gen_ns_save_dir(current_save_dir: str, namespace: str, tenant_name: str | None) -> tuple[str, str]:
4433
"""Generate the projected namespace and save directory, given a file path, a namespace, and a tenant name."""
4534
db_location = f"tenant/{tenant_name}/{namespace}.db" if tenant_name else f"user/some_user/{namespace}.db"

0 commit comments

Comments
 (0)