Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[run]
omit =
*/module_test.py
biasanalyzer/background/threading_utils.py
[report]
exclude_lines =
pragma: no cover
if __name__ == .__main__.:

[html]
directory = coverage_html_report

2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ jobs:
# Step 5: Run Tests
- name: Run tests
run: |
poetry run pytest -s --cov=biasanalyzer
poetry run pytest -s --cov=biasanalyzer --cov-config=.coveragerc
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ to install the python package from this github repo.
- Run `bias = BIAS()` to create an object of the imported BIAS class.
- Create a config.yaml file for specifying OMOP database connection configuration information.
The config.yaml file must include root_omop_cdm_database key.
- [A test OMOP database configuration yaml file](https://github.com/VACLab/BiasAnalyzer/blob/main/tests/assets/test_config.yaml)
- [A test OMOP database configuration yaml file](https://github.com/VACLab/BiasAnalyzer/blob/main/tests/assets/config/test_config.yaml)
can serve as an example. Another config.yaml example for connecting to a OMOP postgreSQL database
is also copied below for reference.
```angular2html
Expand Down
108 changes: 42 additions & 66 deletions biasanalyzer/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,41 @@
from biasanalyzer.cohort import CohortAction
from biasanalyzer.config import load_config
from ipywidgets import VBox, Label
from ipytree import Tree, Node
from ipytree import Tree
from IPython.display import display
from biasanalyzer.utils import get_direction_arrow
from biasanalyzer.utils import get_direction_arrow, notify_users, build_concept_tree


class BIAS:
_instance = None

def __init__(self):
self.config = {}
def __init__(self, config_file_path=None):
self.bias_db = None
self.omop_cdm_db = None
self.cohort_action = None

def __new__(cls, config_file_path=None):
if cls._instance is None:
cls._instance = super(BIAS, cls).__new__(cls)
cls._instance.set_config(config_file_path)
return cls._instance
if config_file_path is None:
self.config = {}
else:
self.set_config(config_file_path)

def set_config(self, config_file_path: str):
if config_file_path is None:
print('no configuration file specified. '
'Call set_config(config_file_path) next to specify configurations')
if not config_file_path:
notify_users('no configuration file specified. '
'Call set_config(config_file_path) next to specify configurations')
else:
try:
self.config = load_config(config_file_path)
print(f'configuration specified in {config_file_path} loaded successfully')
notify_users(f'configuration specified in {config_file_path} loaded successfully')
except FileNotFoundError:
print('specified configuration file does not exist. '
'Call set_config(config_file_path) next to specify a valid '
'configuration file')
notify_users('specified configuration file does not exist. '
'Call set_config(config_file_path) next to specify a valid configuration file',
level='error')
except ValidationError as ex:
print(f'configuration yaml file is not valid with validation error: {ex}')
notify_users(f'configuration yaml file is not valid with validation error: {ex}', level='error')

def set_root_omop(self):
if not self.config:
print('no valid configuration to set root OMOP CDM data. '
'Call set_config(config_file_path) to specify configurations first.')
elif 'root_omop_cdm_database' in self.config:
notify_users('no valid configuration to set root OMOP CDM data. '
'Call set_config(config_file_path) to specify configurations first.')
else:
db_type = self.config['root_omop_cdm_database']['database_type']
if db_type == 'postgresql':
user = self.config['root_omop_cdm_database']['username']
Expand All @@ -65,64 +60,42 @@ def set_root_omop(self):
self.bias_db = BiasDatabase(db_path)
self.bias_db.omop_cdm_db_url = db_path
else:
print(f"Unsupported database type: {db_type}")
else:
print('Configuration file must include configuration values for root_omop_cdm_database key.')
notify_users(f"Unsupported database type: {db_type}")

def _set_cohort_action(self):
if self.omop_cdm_db is None:
print('A valid OMOP CDM must be set before creating a cohort. '
'Call set_root_omop first to set a valid root OMOP CDM')
notify_users('A valid OMOP CDM must be set before creating a cohort. '
'Call set_root_omop first to set a valid root OMOP CDM')
return None
if self.cohort_action is None:
self.cohort_action = CohortAction(self.omop_cdm_db, self.bias_db)
return self.cohort_action

def get_domains_and_vocabularies(self):
print(f'self.omop_cdm_db: {self.omop_cdm_db}')
if self.omop_cdm_db is None:
print('A valid OMOP CDM must be set before getting domains. '
'Call set_root_omop first to set a valid root OMOP CDM')
notify_users('A valid OMOP CDM must be set before getting domains. '
'Call set_root_omop first to set a valid root OMOP CDM')
return None
return self.omop_cdm_db.get_domains_and_vocabularies()

def get_concepts(self, search_term, domain=None, vocabulary=None):
if self.omop_cdm_db is None:
print('A valid OMOP CDM must be set before getting concepts. '
'Call set_root_omop first to set a valid root OMOP CDM')
notify_users('A valid OMOP CDM must be set before getting concepts. '
'Call set_root_omop first to set a valid root OMOP CDM')
return None
if domain is None and vocabulary is None:
print('either domain or vocabulary must be set to constrain the number of returned concepts')
notify_users('either domain or vocabulary must be set to constrain the number of returned concepts')
return None
return self.omop_cdm_db.get_concepts(search_term, domain, vocabulary)

def get_concept_hierarchy(self, concept_id):
if self.omop_cdm_db is None:
print('A valid OMOP CDM must be set before getting concepts. '
'Call set_root_omop first to set a valid root OMOP CDM')
notify_users('A valid OMOP CDM must be set before getting concepts. '
'Call set_root_omop first to set a valid root OMOP CDM')
return None
return self.omop_cdm_db.get_concept_hierarchy(concept_id)

def _build_concept_tree(self, concept_tree: dict, tree_type: str) -> Node:
"""
Recursively builds an ipytree Node for a given concept tree.
"""
# Extract concept details
details = concept_tree.get("details", {})
concept_name = details.get("concept_name", "Unknown Concept")
concept_id = details.get("concept_id", "")
concept_code = details.get("concept_code", "")
direction_arrow = get_direction_arrow(tree_type)
# Create a label for the current concept
label_text = f"{direction_arrow} {concept_name} (ID: {concept_id}, Code: {concept_code})"
node = Node(label_text)

# Recursively add child nodes
for child in concept_tree.get(tree_type, []):
child_node = self._build_concept_tree(child, tree_type)
node.add_node(child_node)

return node

def display_concept_tree(self, concept_tree: dict, level: int = 0, show_in_text_format=True, tree_type=None):
"""
Recursively prints the concept hierarchy tree in an indented format for display.
Expand All @@ -134,7 +107,7 @@ def display_concept_tree(self, concept_tree: dict, level: int = 0, show_in_text_
elif 'children' in concept_tree:
tree_type = 'children'
else:
print('The input concept tree must contain parents or children key as the type of the tree.')
notify_users('The input concept tree must contain parents or children key as the type of the tree.')
return ''

if show_in_text_format:
Expand All @@ -152,12 +125,12 @@ def display_concept_tree(self, concept_tree: dict, level: int = 0, show_in_text_
else:
# Extract concept details
# Build the root tree node
root_node = self._build_concept_tree(concept_tree, tree_type)
root_node = build_concept_tree(concept_tree, tree_type)
tree = Tree()
tree.add_node(root_node)
tree.opened = True
display(VBox([Label("Concept Hierarchy"), tree]))
return None
return root_node


def create_cohort(self, cohort_name: str, cohort_desc: str, query_or_yaml_file: str, created_by: str,
Expand All @@ -178,12 +151,12 @@ def create_cohort(self, cohort_name: str, cohort_desc: str, query_or_yaml_file:
created_cohort = c_action.create_cohort(cohort_name, cohort_desc, query_or_yaml_file, created_by)
if created_cohort is not None:
if delay > 0:
print(f"[DEBUG] Simulating long-running task with {delay} seconds delay...")
notify_users(f"[DEBUG] Simulating long-running task with {delay} seconds delay...")
time.sleep(delay)
print('cohort created successfully')
notify_users('cohort created successfully')
return created_cohort
else:
print('failed to create a valid cohort action object')
notify_users('failed to create a valid cohort action object')
return None


Expand All @@ -192,11 +165,14 @@ def compare_cohorts(self, cohort_id1, cohort_id2):
if c_action:
return c_action.compare_cohorts(cohort_id1, cohort_id2)
else:
print('failed to create a valid cohort action object')
notify_users('failed to create a valid cohort action object')
return None


def cleanup(self):
self.bias_db.close()
self.omop_cdm_db.close()
del self.cohort_action
if self.bias_db:
self.bias_db.close()
if self.omop_cdm_db:
self.omop_cdm_db.close()
if self.cohort_action:
del self.cohort_action
18 changes: 9 additions & 9 deletions biasanalyzer/cohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from datetime import datetime
from tqdm.auto import tqdm
from pydantic import ValidationError
from biasanalyzer.models import CohortDefinition, Cohort
from biasanalyzer.models import CohortDefinition
from biasanalyzer.config import load_cohort_creation_config
from biasanalyzer.database import OMOPCDMDatabase, BiasDatabase
from biasanalyzer.utils import hellinger_distance, clean_string
from biasanalyzer.utils import hellinger_distance, clean_string, notify_users
from biasanalyzer.cohort_query_builder import CohortQueryBuilder


Expand Down Expand Up @@ -99,12 +99,11 @@ def create_cohort(self, cohort_name: str, description: str, query_or_yaml_file:
cohort_config = load_cohort_creation_config(query_or_yaml_file)
tqdm.write(f'configuration specified in {query_or_yaml_file} loaded successfully')
except FileNotFoundError:
print('specified cohort creation configuration file does not exist. Make sure '
'the configuration file name with path is specified correctly.')
notify_users('specified cohort creation configuration file does not exist. Make sure '
'the configuration file name with path is specified correctly.')
return None
except ValidationError as ex:
print(f'cohort creation configuration yaml file is not valid with '
f'validation error: {ex}')
notify_users(f'cohort creation configuration yaml file is not valid with validation error: {ex}')
return None

query = self._query_builder.build_query(cohort_config)
Expand Down Expand Up @@ -139,11 +138,12 @@ def create_cohort(self, cohort_name: str, description: str, query_or_yaml_file:
tqdm.write(f"Cohort {cohort_name} successfully created.")
return CohortData(cohort_id=cohort_def_id, bias_db=self.bias_db, omop_db=self.omop_db)
except duckdb.Error as e:
print(f"Error executing query: {e}")
notify_users(f"Error executing query: {e}")
return None
except SQLAlchemyError as e:
print(f"Error executing query: {e}")
omop_session.close()
notify_users(f"Error executing query: {e}")
if omop_session is not None:
omop_session.close()
return None

def compare_cohorts(self, cohort_id_1: int, cohort_id_2: int):
Expand Down
38 changes: 23 additions & 15 deletions biasanalyzer/cohort_query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ class CohortQueryBuilder:
def __init__(self):
"""Get the path to SQL templates, whether running from source or installed."""
try:
if sys.version_info >= (3, 9):
if sys.version_info >= (3, 9): # pragma: no cover
# Python 3.9+: Use importlib.resources.files()
template_path = importlib.resources.files("biasanalyzer").joinpath("sql_templates")
else:
# Python 3.8: Use importlib.resources.path() (context manager)
with importlib.resources.path("biasanalyzer", "sql_templates") as p:
template_path = str(p)
except ModuleNotFoundError:
except ModuleNotFoundError: # pragma: no cover
template_path = os.path.join(os.path.dirname(__file__), "sql_templates")

print(f'template_path: {template_path}')
Expand Down Expand Up @@ -117,7 +117,7 @@ def render_event_group(event_group, alias_prefix="evt"):
event_sql = CohortQueryBuilder.render_event_group(event, f"{alias_prefix}_{i}")
if event_sql:
queries.append(event_sql)
if not queries:
if not queries: # pragma: no cover
return ""

if event_group["operator"] == "AND":
Expand Down Expand Up @@ -150,9 +150,6 @@ def render_event_group(event_group, alias_prefix="evt"):
elif event_group["operator"] == "OR":
return f"SELECT person_id, event_start_date, event_end_date FROM ({' UNION '.join(queries)}) AS {alias_prefix}_or"
elif event_group["operator"] == "NOT":
if len(queries) != 1:
raise ValueError("NOT operator expects exactly one event subquery")
# Keep the full subquery with dates for consistency, but use it as a filter
not_query = queries[0]
# Return a query that selects all persons from a base table (e.g., person),
# excluding those in the NOT subquery, while allowing dates from other criteria
Expand Down Expand Up @@ -187,10 +184,6 @@ def render_event_group(event_group, alias_prefix="evt"):
FROM ({queries[0]}) AS {alias_prefix}_0
WHERE event_start_date < DATE '{timestamp}'
"""
else:
print(f"Error: event_group: {event_group} with BEFORE operator only "
f"has one query event {queries}")
return ''
elif len(queries) == 2:
event_group = TemporalEventGroup(**event_group)
e1_alias = f"e1_{alias_prefix}"
Expand All @@ -213,7 +206,7 @@ def render_event_group(event_group, alias_prefix="evt"):
AND {e1_alias}.event_start_date < {e2_alias}.event_start_date
{interval_sql}
"""
return ""
return "" # pragma: no cover

def temporal_event_filter(self, event_groups, alias='c'):
"""
Expand All @@ -236,15 +229,30 @@ def temporal_event_filter(self, event_groups, alias='c'):
filters.append(f"AND {alias}.person_id IN (SELECT person_id FROM ({group_sql}) AS ex_subquery_{i})")
else:
filters.append(f"({group_sql})")
if not filters:
if not filters: # pragma: no cover
return ""
if alias == 'ex':
# For exclusion, combine with AND as filters
return " ".join(filters)
else:
# For inclusion, combine as a single subquery (assuming one event group for simplicity)
# If multiple groups, may need UNION or further logic
# For inclusion, handle both single event group case with operator defined and multiple event group
# case with no operator defined
if len(filters) > 1:
# For multiple temporal event group case with no operator defined, use "OR" operator by default
# An example YAML block for multiple temporal event group is shown below for reference, in which
# case, patients who satisfy either group (condition 37311061 or drug 67890) will be included:
# inclusion_criteria:
# temporal_events:
# - operator: AND
# events:
# - event_type: condition_occurrence
# event_concept_id: 37311061
# - operator: AND
# events:
# - event_type: drug_exposure
# event_concept_id: 67890
return (f"SELECT person_id, event_start_date, event_end_date FROM "
f"({' UNION ALL '.join(filters)}) AS combined_events")
return filters[0] # Single event group case

# Single event group case with operator defined
return filters[0]
Loading