-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathzip_dcm_ds_test.py
More file actions
138 lines (109 loc) · 3.54 KB
/
zip_dcm_ds_test.py
File metadata and controls
138 lines (109 loc) · 3.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import logging
import sys
import pytest
from pyspark.sql import SparkSession
from zip_dcm_ds import RangePartition, ZipDCMDataSource, ZipDCMDataSourceReader
logger = logging.getLogger(__file__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(asctime)s - %(filename)s:%(lineno)s - %(levelname)s - %(message)s"
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
@pytest.fixture(scope="session")
def spark():
spark_session = (
SparkSession.builder.appName("PySpark DCM Zips Datasource Tester")
.master("local[*]")
.config("spark.memory.offHeap.enabled", "true")
.config("spark.memory.offHeap.size", "1g")
.getOrCreate()
)
spark_session.dataSource.register(ZipDCMDataSource)
yield spark_session
spark_session.stop()
def test_ZipDCMDataSourceReader():
zip_file_path = "./resources/dcms"
r = ZipDCMDataSourceReader(
schema="rowid INT, path STRING, meta STRING",
options={"path": zip_file_path, "numPartitions": 32},
)
partitions = r.partitions()
logger.debug([_ for _ in partitions])
for part in partitions:
results = r.read(part)
logger.debug([_ for _ in results])
def test_wrongfile(spark):
from pyspark.errors import AnalysisException
with pytest.raises(AnalysisException):
df = (
spark.read.option("numPartitions", "1")
.format("zipdcm")
.load("./resources/wrongpath.zip")
)
result = df.collect()
def test_wrongpath(spark):
from pyspark.errors import AnalysisException
with pytest.raises(AnalysisException):
df = (
spark.read.option("numPartitions", "1")
.format("zipdcm")
.load("./resources/wrongpath")
)
df.collect()
def test_dcm(spark):
df = (
spark.read.option("numPartitions", "1")
.format("zipdcm")
.load("./resources/dcms/y/1-1.dcm")
)
result = df.collect()
assert len(result) == 1
assert result[0]["path"] == "resources/dcms/y/1-1.dcm"
logger.debug(f"test_single result: {result}")
def test_dcm_glob(spark):
df = (
spark.read.option("numPartitions", "2")
.format("zipdcm")
.option("pathGlobFilter", "*.dcm")
.load("./resources/dcms")
)
result = df.orderBy(df.path, ascending=False).collect()
assert len(result) == 2
assert result[0]["path"] == "resources/dcms/y/1-1.dcm"
logger.debug(f"test_dcm_glob result: {result}")
def test_single(spark):
df = (
spark.read.option("numPartitions", "1")
.format("zipdcm")
.load("./resources/dcms/3.5.574.1.3.9030958.6.376.2860280475000825621.zip")
)
result = df.collect()
assert len(result) == 1
logger.debug(f"test_single result: {result}")
def test_folder(spark, tmp_path):
df = (
spark.read.option("numPartitions", "2")
.format("zipdcm")
.load("./resources/dcms")
)
df.limit(20).show()
assert df.isEmpty() == False
save_path = tmp_path / "saves"
df.write.format("csv").mode("overwrite").save(str(save_path))
assert save_path.exists()
def test_rowid(spark):
df = (
spark.read.option("numPartitions", "2")
.format("zipdcm")
.load("./resources/dcms")
)
df.limit(20).show()
df.registerTempTable("dicoms")
assert (
spark.sql("""select count(distinct rowid) from dicoms""").collect()[0][0] == 5
)
if __name__ == "__main__":
import pytest
sys.exit(pytest.main([__file__]))