Skip to content

Commit cde35ee

Browse files
committed
update real data ingestion script
1 parent 441eb04 commit cde35ee

1 file changed

Lines changed: 216 additions & 6 deletions

File tree

scripts/ingest_csvs_to_omop_duckdb.py

Lines changed: 216 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,224 @@
1212

1313
import argparse
1414
import sys
15+
import csv
1516
import time
1617
from pathlib import Path
1718

1819
import duckdb
1920

2021

22+
FILENAME_STEM_TO_TABLE_NAME_MAPPING = {
23+
# 'demographics': 'person'
24+
# 'conditions': 'condition_occurrence'
25+
# 'drugs': 'drug_exposure'
26+
# 'procedures': 'procedure_occurrence'
27+
# 'visits': 'visit_occurrence'
28+
'observations': 'observation'
29+
}
30+
31+
COLUMN_MAPPINGS = {
32+
"person": {
33+
"deid_pat_id": "person_id"
34+
},
35+
"condition_occurrence": {
36+
"deid_pat_id": "person_id"
37+
},
38+
"drug_exposure": {
39+
"deid_pat_id": "person_id"
40+
},
41+
"procedure_occurrence": {
42+
"deid_pat_id": "person_id"
43+
},
44+
"visit_occurrence": {
45+
"deid_pat_id": "person_id"
46+
},
47+
"observation": {
48+
"deid_pat_id": "person_id"
49+
},
50+
"measurement": {
51+
"deid_pat_id": "person_id"
52+
},
53+
}
54+
55+
OMOP_TABLE_SCHEMAS = {
56+
"person": [
57+
"person_id",
58+
"gender_concept_id",
59+
"year_of_birth",
60+
"month_of_birth",
61+
"day_of_birth",
62+
"birth_datetime",
63+
"race_concept_id",
64+
"ethnicity_concept_id",
65+
"location_id",
66+
"provider_id",
67+
"care_site_id",
68+
"person_source_value",
69+
"gender_source_value",
70+
"gender_source_concept_id",
71+
"race_source_value",
72+
"race_source_concept_id",
73+
"ethnicity_source_value",
74+
"ethnicity_source_concept_id"
75+
],
76+
"condition_occurrence": [
77+
"condition_occurrence_id",
78+
"person_id",
79+
"condition_concept_id",
80+
"condition_start_date",
81+
"condition_start_datetime",
82+
"condition_end_date",
83+
"condition_end_datetime",
84+
"condition_type_concept_id",
85+
"condition_status_concept_id",
86+
"stop_reason",
87+
"provider_id",
88+
"visit_occurrence_id",
89+
"visit_detail_id",
90+
"condition_source_value",
91+
"condition_source_concept_id",
92+
"condition_status_source_value"
93+
],
94+
'drug_exposure': [
95+
"drug_exposure_id",
96+
"person_id",
97+
"drug_concept_id",
98+
"drug_exposure_start_date",
99+
"drug_exposure_start_datetime",
100+
"drug_exposure_end_date",
101+
"drug_exposure_end_datetime",
102+
"verbatim_end_date",
103+
"drug_type_concept_id",
104+
"stop_reason",
105+
"refills",
106+
"quantity",
107+
"days_supply",
108+
"sig",
109+
"route_concept_id",
110+
"lot_number",
111+
"provider_id",
112+
"visit_occurrence_id",
113+
"visit_detail_id",
114+
"drug_source_value",
115+
"drug_source_concept_id",
116+
"route_source_value",
117+
"dose_unit_source_value"
118+
],
119+
'procedure_occurrence': [
120+
"procedure_occurrence_id",
121+
"person_id",
122+
"procedure_concept_id",
123+
"procedure_date",
124+
"procedure_datetime",
125+
"procedure_end_date",
126+
"procedure_end_datetime",
127+
"procedure_type_concept_id",
128+
"modifier_concept_id",
129+
"quantity",
130+
"provider_id",
131+
"visit_occurrence_id",
132+
"visit_detail_id",
133+
"procedure_source_value",
134+
"procedure_source_concept_id",
135+
"modifier_source_value"
136+
],
137+
'visit_occurrence': [
138+
"visit_occurrence_id",
139+
"person_id",
140+
"visit_concept_id",
141+
"visit_start_date",
142+
"visit_start_datetime",
143+
"visit_end_date",
144+
"visit_end_datetime",
145+
"visit_type_concept_id",
146+
"provider_id",
147+
"care_site_id",
148+
"visit_source_value",
149+
"visit_source_concept_id",
150+
"admitted_from_concept_id",
151+
"admitted_from_source_value",
152+
"discharged_to_concept_id",
153+
"discharged_to_source_value",
154+
"preceding_visit_occurrence_id"
155+
],
156+
'observation': [
157+
"observation_id",
158+
"person_id",
159+
"observation_concept_id",
160+
"observation_date",
161+
"observation_datetime",
162+
"observation_type_concept_id",
163+
"value_as_number",
164+
"value_as_string",
165+
"value_as_concept_id",
166+
"qualifier_concept_id",
167+
"unit_concept_id",
168+
"provider_id",
169+
"visit_occurrence_id",
170+
"visit_detail_id",
171+
"observation_source_value",
172+
"observation_source_concept_id",
173+
"unit_source_value",
174+
"qualifier_source_value",
175+
"value_source_value",
176+
"observation_event_id",
177+
"obs_event_field_concept_id"
178+
]
179+
}
180+
21181
def load_csv_to_duckdb(con, csv_path: Path, table_name: str):
22182
"""Load a single CSV file into DuckDB."""
23183
t0 = time.time()
24184
print(f"loading {table_name} from {csv_path}")
185+
186+
# read and normalize header
187+
with open(csv_path, "r", newline="") as f:
188+
reader = csv.reader(f)
189+
raw_header = next(reader)
190+
191+
# normalize: lower case + strip quotes/spaces
192+
raw_header = [h.strip().replace('"', '') for h in raw_header]
193+
header = [h.lower() for h in raw_header]
194+
print(f'normalized header: {header}')
195+
196+
mapping = COLUMN_MAPPINGS.get(table_name, {})
197+
final_cols = [mapping.get(col, col) for col in header]
198+
print(f'mapped header: {final_cols}')
199+
200+
expected = OMOP_TABLE_SCHEMAS.get(table_name, [])
201+
final_set = set(final_cols)
202+
203+
missing = set(expected) - final_set
204+
if missing:
205+
print(f"Missing expected OMOP columns in {table_name}: {sorted(missing)} - cannot ingest")
206+
elapsed = time.time() - t0
207+
return None, elapsed
208+
209+
extra = final_set - set(expected)
210+
if extra:
211+
print(f"WARNING: Extra columns in CSV for {table_name}: {sorted(extra)}")
212+
print(f"Extra columns will NOT be ingested.")
213+
214+
select_clauses = []
215+
for orig, new in zip(raw_header, final_cols):
216+
if new not in expected:
217+
# skip extra columns entirely
218+
continue
219+
if orig != new:
220+
select_clauses.append(f'{orig} AS {new}')
221+
else:
222+
select_clauses.append(orig)
223+
224+
select_clause = ", ".join(select_clauses)
225+
print(f"Final SELECT clause: {select_clause}")
226+
25227
con.execute(f"""
26-
CREATE OR REPLACE TABLE {table_name} AS
27-
SELECT * FROM read_csv_auto('{csv_path}', header=True, quote='', parallel=True)
28-
""")
228+
CREATE OR REPLACE TABLE {table_name} AS
229+
SELECT {select_clause}
230+
FROM read_csv_auto('{csv_path}', header=True, parallel=True)
231+
""")
232+
29233
row_count = con.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
30234
elapsed = time.time() - t0
31235
print(f"Loaded {table_name} ({row_count} rows) in {elapsed:5.2f}s")
@@ -41,7 +245,10 @@ def ingest_directory(con, csv_dir: Path):
41245
results = []
42246
for csv_path in sorted(csv_dir.glob("*.csv")):
43247
table_name = csv_path.stem.lower()
44-
rc, t = load_csv_to_duckdb(con, csv_path, table_name)
248+
if table_name not in FILENAME_STEM_TO_TABLE_NAME_MAPPING:
249+
continue
250+
mapped_table_name = FILENAME_STEM_TO_TABLE_NAME_MAPPING[table_name]
251+
rc, t = load_csv_to_duckdb(con, csv_path, mapped_table_name)
45252
results.append((table_name, rc, t))
46253
return results
47254

@@ -51,6 +258,7 @@ def main():
51258
parser.add_argument(
52259
"--clinical",
53260
type=Path,
261+
default=Path("Y:/"),
54262
required=False,
55263
help="Directory containing OMOP clinical CSVs (person, condition_occurrence, etc.)",
56264
)
@@ -60,7 +268,9 @@ def main():
60268
required=False,
61269
help="Directory containing OMOP vocabulary CSVs (concept, concept_relationship, etc.)",
62270
)
63-
parser.add_argument("--output", type=Path, required=True, help="Output DuckDB file path")
271+
parser.add_argument("--output", type=Path,
272+
default=Path("Y:/OMOP_duckdb/omop.duckdb"),
273+
required=False, help="Output DuckDB file path")
64274

65275
args = parser.parse_args()
66276

@@ -73,7 +283,7 @@ def main():
73283
sys.exit(1)
74284

75285
print(f"Creating DuckDB at: {db_path}")
76-
db_path.parent.mkdir(parents=True, exist_ok=True)
286+
# db_path.parent.mkdir(parents=True, exist_ok=True)
77287

78288
con = duckdb.connect(str(db_path))
79289
all_results = []

0 commit comments

Comments
 (0)