Skip to content

Commit 2be49ee

Browse files
authored
Merge pull request #23 from VACLab/ingest-real-data-into-duckdb
Added script for ingesting real data into duckdb and safe guard to close potentially lingering duckdb connections if any
2 parents b4fc5a7 + 1ad92f2 commit 2be49ee

3 files changed

Lines changed: 117 additions & 16 deletions

File tree

biasanalyzer/api.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,23 +39,26 @@ def set_root_omop(self):
3939
if not self.config:
4040
notify_users('no valid configuration to set root OMOP CDM data. '
4141
'Call set_config(config_file_path) to specify configurations first.')
42+
return
43+
44+
self.cleanup()
45+
46+
db_type = self.config['root_omop_cdm_database']['database_type']
47+
if db_type == 'postgresql':
48+
user = self.config['root_omop_cdm_database']['username']
49+
password = self.config['root_omop_cdm_database']['password']
50+
host = self.config['root_omop_cdm_database']['hostname']
51+
port = self.config['root_omop_cdm_database']['port']
52+
db = self.config['root_omop_cdm_database']['database']
53+
db_url = f"postgresql://{user}:{password}@{host}:{port}/{db}"
54+
self.omop_cdm_db = OMOPCDMDatabase(db_url)
55+
self.bias_db = BiasDatabase(':memory:', omop_db_url=db_url)
56+
elif db_type == 'duckdb':
57+
db_path = self.config['root_omop_cdm_database'].get('database', ":memory:")
58+
self.omop_cdm_db = OMOPCDMDatabase(db_path)
59+
self.bias_db = BiasDatabase(':memory:', omop_db_url=db_path)
4260
else:
43-
db_type = self.config['root_omop_cdm_database']['database_type']
44-
if db_type == 'postgresql':
45-
user = self.config['root_omop_cdm_database']['username']
46-
password = self.config['root_omop_cdm_database']['password']
47-
host = self.config['root_omop_cdm_database']['hostname']
48-
port = self.config['root_omop_cdm_database']['port']
49-
db = self.config['root_omop_cdm_database']['database']
50-
db_url = f"postgresql://{user}:{password}@{host}:{port}/{db}"
51-
self.omop_cdm_db = OMOPCDMDatabase(db_url)
52-
self.bias_db = BiasDatabase(':memory:', omop_db_url=db_url)
53-
elif db_type == 'duckdb':
54-
db_path = self.config['root_omop_cdm_database'].get('database', ":memory:")
55-
self.omop_cdm_db = OMOPCDMDatabase(db_path)
56-
self.bias_db = BiasDatabase(':memory:', omop_db_url=db_path)
57-
else:
58-
notify_users(f"Unsupported database type: {db_type}")
61+
notify_users(f"Unsupported database type: {db_type}")
5962

6063
def _set_cohort_action(self):
6164
if self.omop_cdm_db is None:

biasanalyzer/database.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import duckdb
22
import pandas as pd
3+
import gc
34
from typing import Optional
45
from datetime import datetime
56
from tqdm.auto import tqdm
@@ -288,6 +289,14 @@ def __new__(cls, *args, **kwargs):
288289

289290
def _initialize(self, db_url):
290291
if db_url.endswith('.duckdb'):
292+
# close any potential global connections if any
293+
for obj in gc.get_objects(): # pragma: no cover
294+
if isinstance(obj, duckdb.DuckDBPyConnection):
295+
try:
296+
obj.close()
297+
except Exception as e:
298+
notify_users(f'failed to close the lingering duckdb connection before opening a new one: {e}')
299+
291300
# Handle DuckDB connection
292301
try:
293302
self.engine = duckdb.connect(db_url)
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
"""
2+
This script ingests both clinical and vocabulary OMOP CSV exports into a single DckDB database for
3+
downstream use of the core BiasAnalyzer python library.
4+
Example for running this script:
5+
python scripts/ingest_csvs_to_omop_duckdb.py \
6+
--clinical data/clinical \
7+
--vocab data/omop_vocabs \
8+
--output data/omop.duckdb
9+
"""
10+
11+
import duckdb
12+
import time
13+
import argparse
14+
import sys
15+
from pathlib import Path
16+
17+
18+
def load_csv_to_duckdb(con, csv_path: Path, table_name: str):
19+
"""Load a single CSV file into DuckDB."""
20+
t0 = time.time()
21+
print(f'loading {table_name} from {csv_path}')
22+
con.execute(f"""
23+
CREATE OR REPLACE TABLE {table_name} AS
24+
SELECT * FROM read_csv_auto('{csv_path}', header=True, quote='', parallel=True)
25+
""")
26+
row_count = con.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
27+
elapsed = time.time() - t0
28+
print(f"Loaded {table_name} ({row_count} rows) in {elapsed:5.2f}s")
29+
return row_count, elapsed
30+
31+
32+
def ingest_directory(con, csv_dir: Path):
33+
"""Ingest all CSVs in a directory."""
34+
if not csv_dir.exists():
35+
print(f"directory not found: {csv_dir}")
36+
return []
37+
38+
results = []
39+
for csv_path in sorted(csv_dir.glob("*.csv")):
40+
table_name = csv_path.stem.lower()
41+
rc, t = load_csv_to_duckdb(con, csv_path, table_name)
42+
results.append((table_name, rc, t))
43+
return results
44+
45+
46+
def main():
47+
parser = argparse.ArgumentParser(description="Ingest OMOP CSVs into DuckDB")
48+
parser.add_argument("--clinical", type=Path, required=False,
49+
help="Directory containing OMOP clinical CSVs (person, condition_occurrence, etc.)")
50+
parser.add_argument("--vocab", type=Path, required=False,
51+
help="Directory containing OMOP vocabulary CSVs (concept, concept_relationship, etc.)")
52+
parser.add_argument("--output", type=Path, required=True,
53+
help="Output DuckDB file path")
54+
55+
args = parser.parse_args()
56+
57+
input_clinical = args.clinical
58+
input_vocab = args.vocab
59+
db_path = args.output
60+
61+
if input_clinical is None and input_vocab is None:
62+
print("Error: You must provide at least one of --clinical or --vocab for data ingestion.")
63+
sys.exit(1)
64+
65+
print(f"Creating DuckDB at: {db_path}")
66+
db_path.parent.mkdir(parents=True, exist_ok=True)
67+
68+
con = duckdb.connect(str(db_path))
69+
all_results = []
70+
71+
if input_clinical:
72+
if not input_clinical.exists():
73+
print(f"Clinical directory does not exist: {input_clinical}")
74+
sys.exit(1)
75+
all_results += ingest_directory(con, input_clinical)
76+
77+
if input_vocab:
78+
if not input_vocab.exists():
79+
print(f"Vocabulary directory does not exist: {input_vocab}")
80+
sys.exit(1)
81+
all_results += ingest_directory(con, input_vocab)
82+
83+
con.close()
84+
85+
print(f"Ingestion complete with {len(all_results)} tables loaded. Details shown below:")
86+
print(f"\n{all_results}")
87+
88+
if __name__ == "__main__":
89+
main()

0 commit comments

Comments
 (0)