diff --git a/scripts/bdc/get_bdc_studies_from_gen3.py b/scripts/bdc/get_bdc_studies_from_gen3.py index e0f3c6b..2748407 100644 --- a/scripts/bdc/get_bdc_studies_from_gen3.py +++ b/scripts/bdc/get_bdc_studies_from_gen3.py @@ -192,6 +192,12 @@ def make_csv_dict_from_study_info(study_info): if not 'gen3_discovery' in study_info: return {} gen3_discovery = study_info['gen3_discovery'] + + # Check if study_id exists before accessing it + if 'study_id' not in gen3_discovery: + logging.warning("Skipping study_info without study_id: %s", + study_info.get('_guid', 'unknown')) + return None study_id = gen3_discovery['study_id'] (study_name, name, short_name) = get_study_name(gen3_discovery) @@ -340,6 +346,8 @@ def make_kgx_lists(study_info_list): if not 'gen3_discovery' in study_info: continue gen3_discovery = study_info['gen3_discovery'] + if 'study_id' not in gen3_discovery: + continue (study_id, consent) = get_id_and_consent(gen3_discovery['study_id']) if not consent: # Non-dbgap IDs not supported by Dug diff --git a/scripts/bdc/program_table/.DS_Store b/scripts/bdc/program_table/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/scripts/bdc/program_table/.DS_Store differ diff --git a/scripts/bdc/program_table/README.md b/scripts/bdc/program_table/README.md new file mode 100644 index 0000000..52a697b --- /dev/null +++ b/scripts/bdc/program_table/README.md @@ -0,0 +1,330 @@ +# BDC Program Table Ingestion Pipeline + +A modular data ingestion pipeline for BioData Catalyst (BDC) program table used in the program search/filter feature on the BDC portal. + +## Overview + +This pipeline fetches study data from BDC and Gen3 portals, merges them, updates program names using NHLBI Jira data, and processes the final program table for upload. The output is used to populate the program search table on the BDC portal. + +## Pipeline Steps + +| Step | Script | Description | +|------|--------|-------------| +| 1 | `fetch_studies.py` | Fetch studies from BDC and Gen3 portals | +| 2 | `update_programs.py` | Update program names from Jira data | +| 3 | `process_program_table.py` | Post-process: set defaults and expand by community | + +## Directory Structure + +``` +program_table/ +├── bdc_utils.py # Shared utilities (logging, JSON I/O, validation) +├── bdc_data_manager.py # Core data management (BDC/Gen3 API interactions) +├── fetch_studies.py # Step 1: Fetch and merge studies +├── update_programs.py # Step 2: Update program names from Jira +├── process_program_table.py # Step 3: Post-process program table +├── run_pipeline.sh # Bash orchestration script +└── README.md # This file +``` + +## Quick Start + +### Run Full Pipeline (Steps 1-2) + +```bash +./run_pipeline.sh \ + --output-dir /path/to/output \ + --jira-file /path/to/jira.csv +``` + +### Run Post-Processing (Step 3) + +```bash +python3 process_program_table.py \ + --input-file /path/to/program_table_updated.json \ + --output-dir /path/to/output +``` + +## Data Flow + +``` +┌─────────────────┐ ┌──────────────┐ +│ BDC Portal │ │ Gen3 Portal │ +│ (API Fetch) │ │ (API Fetch) │ +└────────┬────────┘ └──────┬───────┘ + │ │ + └────────────┬─────────────┘ + │ + ▼ + ┌────────────────────────┐ + │ Step 1: Merge Studies │ + │ (fetch_studies.py) │ + └───────────┬────────────┘ + │ + ▼ + ┌────────────────────────┐ ┌──────────────┐ + │ Step 2: Update │◄────│ Jira Data │ + │ Programs │ │ (CSV/Excel) │ + │ (update_programs.py) │ └──────────────┘ + └───────────┬────────────┘ + │ + ▼ + ┌────────────────────────────────────────────┐ + │ Step 3: Post-Process Program Table │ + │ (process_program_table.py) │ + │ │ + │ 1. Set "Extramural Research" for studies │ + │ with missing descriptions │ + │ 2. Replace with Community values │ + │ 3. Create duplicates for multi-community │ + │ studies │ + └───────────┬────────────────────────────────┘ + │ + ▼ + ┌────────────────────────┐ + │ Final Program Table │ + │ (Ready for Upload) │ + └────────────────────────┘ +``` + +--- + +## Step 1: Fetch Studies + +Fetches studies from BDC portal and Gen3 portal, merges them, and prepares the program table. + +```bash +python3 fetch_studies.py \ + --output-dir /path/to/output \ + [--bdc-url URL] \ + [--gen3-url URL] \ + [--gen3-limit N] \ + [--show-details] +``` + +**Options:** +| Option | Description | Default | +|--------|-------------|---------| +| `--output-dir` | Output directory | Required | +| `--bdc-url` | BDC API URL | Production | +| `--gen3-url` | Gen3 API URL | Production | +| `--gen3-limit` | Pagination limit | 50 | +| `--show-details` | Show detailed logs | False | + +**Output:** `program_table/program_table.json` + +--- + +## Step 2: Update Program Names + +Updates program names based on NHLBI Jira data. + +```bash +python3 update_programs.py \ + --jira-file /path/to/jira.csv \ + --input-file /path/to/program_table.json \ + --output-dir /path/to/output +``` + +**Options:** +| Option | Description | +|--------|-------------| +| `--jira-file` | Path to Jira data file (CSV or Excel) | +| `--input-file` | Path to input program table JSON | +| `--output-dir` | Output directory | + +**Output:** +- `program_table_updated_TIMESTAMP.json` (readable) +- `program_table_updated_TIMESTAMP_minified.json` (for upload) + +--- + +## Step 3: Post-Process Program Table + +Processes the program table to set default programs for studies with missing descriptions and expands the table by community mapping. + +```bash +python3 process_program_table.py \ + --input-file /path/to/program_table_updated.json \ + --output-dir /path/to/output +``` + +**Options:** +| Option | Description | Default | +|--------|-------------|---------| +| `--input-file` | Input program table JSON | Required | +| `--output-dir` | Output directory | Script directory | + +### What It Does + +**Phase 1: Set Default Program** +- Studies with empty descriptions get: + - Program = "Extramural Research" + - Description = "Various HLBS" +- Excludes: `TOPMed_Common_Exchange_Area` studies, studies with no valid name + +**Phase 2: Expand by Community** +- Replaces "Extramural Research" with Community value (when available) +- Creates duplicate records for studies belonging to multiple communities +- Updates Description based on Program-to-Description mapping + +### Program to Description Mapping + +| Program | Description | +|---------|-------------| +| Bench to Bassinet | Pediatric Cardiovascular | +| BioLINCC | Various HLBS | +| C4R | COVID-19 & Various HLBS | +| CONNECTS | COVID-19 | +| Extramural Research | Various HLBS | +| Heartshare | Cardiovascular | +| Longitudinal Epidemiology Observational Study (LEOS) | Longitudinal Observational | +| LungMAP | Pulmonary | +| NHLBI Intramural Research | Various HLBS | +| National Sleep Research Resource (NSRR) | Sleep and Circadian Rhythms | +| PETAL Network | Pulmonary & COVID-19 | +| Pediatric Heart Network (PHN) | Pediatric Cardiovascular | +| RECOVER | Long COVID & PASC | +| Sickle Cell Disease | Sickle Cell Disease | +| TOPMed | Precision Medicine | +| Training | Open-Access; Training | +| *(other)* | Various HLBS *(default)* | + +### Output Files + +| File | Description | +|------|-------------| +| `program_table_final_TIMESTAMP.json` | Formatted JSON (readable) | +| `program_table_final_TIMESTAMP.min.json` | Minified JSON (for upload) | +| `program_table_processing_TIMESTAMP.log` | Detailed processing log | +| `program_table_report_TIMESTAMP.txt` | Comprehensive report | + +### Input Format + +JSON array of study records: + +```json +[ + { + "Accession": "phs000123.v1.p1", + "Study Name": "Example Study", + "Program": "", + "Description": "", + "Community": "TOPMed", + "Community1": "CONNECTS", + "Community2": "" + } +] +``` + +--- + +## Output Directory Structure + +After running the full pipeline: + +``` +output_directory/ +├── program_table/ +│ ├── program_table.json # Step 1 output +│ ├── program_table_updated_*.json # Step 2 output (readable) +│ ├── program_table_updated_*_minified.json # Step 2 output (minified) +│ ├── program_table_final_*.json # Step 3 output (readable) +│ └── program_table_final_*.min.json # Step 3 output (FINAL for upload) +│ +├── studies_on_bdc_portal/ +│ └── bdc_studies.json +│ +└── studies_on_gen3_portal/ + ├── raw_studies_on_gen3.json + ├── missing_studies_comparing_gen3_and_bdc.log + ├── studies_updated_*.json + ├── program_names_updated_report_*.json + ├── studies_missing_description_*.json + └── jira_studies_not_in_gen3_report_*.log +``` + +--- + +## Module Documentation + +### bdc_utils.py + +Shared utilities for logging, JSON I/O, and validation. + +| Function | Description | +|----------|-------------| +| `setup_logging()` | Configure logging with file and console handlers | +| `load_json()` / `save_json()` | JSON file I/O with minify option | +| `extract_base_accession()` | Extract base accession from full ID | +| `validate_study()` | Validate study has required fields | +| `generate_timestamp()` | Generate timestamp for file naming | + +### bdc_data_manager.py + +Core data management for API interactions. + +| Class | Description | +|-------|-------------| +| `BDCDataManager` | Manages BDC API interactions | +| `Gen3DataManager` | Manages Gen3 API interactions | +| `StudyMerger` | Merges and compares study data | + +--- + +## Examples + +### Full Pipeline Run + +```bash +# Step 1-2: Fetch and update +./run_pipeline.sh \ + --output-dir /data/bdc_$(date +%Y%m) \ + --jira-file /data/jira/latest.csv + +# Step 3: Post-process +python3 process_program_table.py \ + --input-file /data/bdc_$(date +%Y%m)/program_table/program_table_updated_*.json \ + --output-dir /data/bdc_$(date +%Y%m)/program_table +``` + +### Re-run Post-Processing Only + +```bash +python3 process_program_table.py \ + --input-file ./program_table_updated.json \ + --output-dir ./output +``` + +--- + +## Requirements + +- Python 3.7+ +- pandas +- requests + +```bash +pip install pandas requests +``` + +--- + +## Troubleshooting + +| Issue | Solution | +|-------|----------| +| Import errors | Run from the correct directory | +| Missing Jira file | Verify path and file format (CSV/Excel) | +| Permission denied | Run `chmod +x run_pipeline.sh` | +| API timeout | Re-run the pipeline; Gen3 API occasionally times out | + +--- + +## Version History + +| Version | Date | Changes | +|---------|------|---------| +| v3.0 | 2026-01 | Added `process_program_table.py` combining post-processing steps | +| v2.0 | 2025-10 | Clean directory structure with program_table directory | +| v1.0 | - | Initial modular refactoring with shared utilities | diff --git a/scripts/bdc/program_table/bdc_data_manager.py b/scripts/bdc/program_table/bdc_data_manager.py new file mode 100644 index 0000000..50cc978 --- /dev/null +++ b/scripts/bdc/program_table/bdc_data_manager.py @@ -0,0 +1,534 @@ +#!/usr/bin/env python3 +""" +Core data management for BDC and Gen3 data operations. + +This module handles: +- API interactions with BDC and Gen3 +- Data downloading and processing +- Study validation and transformation +""" + +import requests +import urllib.parse +import re +from typing import Dict, List, Any, Optional +import bdc_utils + + +class BDCDataManager: + """Manages interactions with BDC API.""" + + def __init__(self, base_url: str, logger: Optional[Any] = None): + """ + Initialize BDC data manager. + + Args: + base_url: BDC API base URL + logger: Optional logger instance + """ + self.base_url = base_url + self.logger = logger + + def fetch_program_list(self) -> List[Dict[str, Any]]: + """ + Fetch list of all programs from BDC. + + Returns: + List of program dictionaries + """ + url = f"{self.base_url}/program_list" + response = requests.get(url) + response.raise_for_status() + return response.json().get("result", []) + + def fetch_program_studies(self, program_name: str) -> List[Dict[str, Any]]: + """ + Fetch all studies for a specific program. + + Args: + program_name: Name of the program + + Returns: + List of study dictionaries + """ + url = f"{self.base_url}/search_program" + params = {"program_name": program_name} + response = requests.get(url, params=params) + response.raise_for_status() + return response.json().get("result", []) + + def fetch_all_studies(self, show_detailed_studies: bool = False) -> tuple[List[Dict[str, str]], Dict[str, str]]: + """ + Fetch all programs and their studies from BDC. + + Args: + show_detailed_studies: If True, log individual study details + + Returns: + Tuple of (studies_list, program_descriptions_dict) + """ + programs = self.fetch_program_list() + program_descriptions = {p["key"]: p.get("description", "") for p in programs} + + studies = [] + total_studies = 0 + + for program in programs: + program_name = program["key"] + program_studies = self.fetch_program_studies(program_name) + study_count = len(program_studies) + total_studies += study_count + + if self.logger: + self.logger.info(f"Program: {program_name}, Studies: {study_count}") + + for study in program_studies: + studies.append(bdc_utils.build_study_dict( + accession=study.get('collection_id', 'N/A'), + study_name=study.get('collection_name', 'N/A'), + program=program_name, + description=program_descriptions.get(program_name, "") + )) + + if self.logger: + self.logger.info(f"Total programs: {len(programs)}, Total studies: {total_studies}") + + return studies, program_descriptions + + +class Gen3DataManager: + """Manages interactions with Gen3 API.""" + + def __init__(self, base_url: str, download_limit: int = 50, logger: Optional[Any] = None): + """ + Initialize Gen3 data manager. + + Args: + base_url: Gen3 API base URL + download_limit: Number of records per API call + logger: Optional logger instance + """ + self.base_url = base_url + self.download_limit = download_limit + self.logger = logger + + def download_paginated_list(self, url: str) -> List[str]: + """ + Download a paginated list from Gen3 API. + + Args: + url: API endpoint URL + + Returns: + Complete list of all items + """ + complete_list = [] + offset = 0 + + while True: + paginated_url = f"{url}&limit={self.download_limit}&offset={offset}" + + try: + response = requests.get(paginated_url) + response.raise_for_status() + partial_list = response.json() + complete_list.extend(partial_list) + + if len(partial_list) < self.download_limit: + break + + offset += self.download_limit + + except requests.exceptions.RequestException as e: + if self.logger: + self.logger.error(f"Error downloading from Gen3: {e}") + raise + + return complete_list + + def fetch_study_metadata(self, study_id: str) -> Dict[str, Any]: + """ + Fetch metadata for a single study. + + Args: + study_id: Study identifier + + Returns: + Study metadata dictionary + """ + url = urllib.parse.urljoin(self.base_url, f'/mds/metadata/{study_id}') + response = requests.get(url) + response.raise_for_status() + return response.json() + + def extract_study_info(self, study_id: str) -> Optional[Dict[str, str]]: + """ + Extract and normalize study information from Gen3 metadata. + + Args: + study_id: Study identifier + + Returns: + Dictionary with normalized study information or None if error + """ + try: + study_metadata = self.fetch_study_metadata(study_id) + + if 'gen3_discovery' not in study_metadata: + return None + + gen3_discovery = study_metadata['gen3_discovery'] + + # Extract released status + released = gen3_discovery.get('released', '').strip() + + # Extract subjects count + subjects_count = str(gen3_discovery.get('_subjects_count', '')) + + # Extract DOI tombstone status + doi_tombstone = gen3_discovery.get('doi_tombstone', '') + + # Extract study name (try multiple fields) + study_name = ( + gen3_discovery.get('full_name') or + gen3_discovery.get('name') or + gen3_discovery.get('short_name') or + '(no name)' + ) + + # Extract program names from authz field + program_names = [] + authz = gen3_discovery.get('authz', '') + if authz: + try: + match = re.fullmatch(r'^/programs/(.*)/projects/(.*)$', authz) + if match: + program_names.append(match.group(1)) + except Exception: + pass + + return bdc_utils.build_study_dict( + accession=study_id, + study_name=study_name, + program='|'.join(sorted(set(filter(None, program_names)))), + description="", # Description will be filled from BDC data + released=released, + subjects_count=subjects_count, + doi_tombstone=doi_tombstone + ) + + except requests.exceptions.RequestException as e: + if self.logger: + self.logger.error(f"Error processing study {study_id}: {e}") + return None + + def fetch_all_studies(self) -> List[Dict[str, str]]: + """ + Fetch all studies from Gen3 using the metadata endpoint with data=True. + + Returns: + List of study dictionaries + """ + # Use the new URL format with data=True to get all metadata in one call + mds_url = urllib.parse.urljoin( + self.base_url, + '/mds/metadata?data=True&_guid_type=discovery_metadata' + ) + + if self.logger: + self.logger.info(f"Fetching Gen3 metadata from: {mds_url}") + + # Download all metadata with pagination + all_metadata = {} + offset = 0 + + while True: + paginated_url = f"{mds_url}&limit={self.download_limit}&offset={offset}" + + try: + response = requests.get(paginated_url) + response.raise_for_status() + partial_data = response.json() + + if not partial_data: + break + + # The response is a dict with study IDs as keys + all_metadata.update(partial_data) + + if len(partial_data) < self.download_limit: + break + + offset += self.download_limit + + except requests.exceptions.RequestException as e: + if self.logger: + self.logger.error(f"Error downloading from Gen3: {e}") + raise + + if self.logger: + self.logger.info(f"Found {len(all_metadata)} total studies in Gen3") + + # Process metadata + studies = [] + + for study_id, metadata in sorted(all_metadata.items()): + if not metadata or not isinstance(metadata, dict): + continue + + gen3_discovery = metadata.get('gen3_discovery', {}) + if not gen3_discovery: + continue + + # Extract released status + released = gen3_discovery.get('released', '').strip() + + # Extract subjects count + subjects_count = str(gen3_discovery.get('_subjects_count', '')) + + # Extract DOI tombstone status + doi_tombstone = gen3_discovery.get('doi_tombstone', '') + + # Extract study name (try multiple fields) + study_name = ( + gen3_discovery.get('full_name') or + gen3_discovery.get('name') or + gen3_discovery.get('short_name') or + '(no name)' + ) + + # Extract program names from authz field + program_names = [] + authz = gen3_discovery.get('authz', '') + if authz: + try: + match = re.fullmatch(r'^/programs/(.*)/projects/(.*)$', authz) + if match: + program_names.append(match.group(1)) + except Exception: + pass + + study_info = bdc_utils.build_study_dict( + accession=study_id, + study_name=study_name, + program='|'.join(sorted(set(filter(None, program_names)))), + description="", # Description will be filled from BDC data + released=released, + subjects_count=subjects_count, + doi_tombstone=doi_tombstone + ) + studies.append(study_info) + + if self.logger: + # Count release statuses and subjects counts + released_yes = sum(1 for s in studies if s.get('Released', '').lower() == 'yes') + released_other = len(studies) - released_yes + + # Count subjects + with_subjects = sum(1 for s in studies if s.get('Subjects Count', '').strip() and s.get('Subjects Count', '').strip() != '0') + without_subjects = len(studies) - with_subjects + + # Count DOI tombstone statuses (checking for string 'True') + doi_tombstone_true = sum(1 for s in studies if s.get('DOI Tombstone') == 'True') + doi_tombstone_other = len(studies) - doi_tombstone_true + + self.logger.info(f"Downloaded {len(studies)} studies") + self.logger.info(f" Released=Yes: {released_yes}, Other: {released_other}") + self.logger.info(f" With subjects (>0): {with_subjects}, Without subjects (0 or empty): {without_subjects}") + self.logger.info(f" DOI Tombstone='True': {doi_tombstone_true}, Other/Empty: {doi_tombstone_other}") + + return studies + + +class StudyMerger: + """Handles merging and comparison of study data from different sources.""" + + def __init__(self, logger: Optional[Any] = None): + """ + Initialize study merger. + + Args: + logger: Optional logger instance + """ + self.logger = logger + + def merge_studies( + self, + reference_studies: List[Dict[str, str]], + target_studies: List[Dict[str, str]] + ) -> tuple[List[Dict[str, str]], List[str], List[str]]: + """ + Merge two study lists, updating target with info from reference. + + Args: + reference_studies: Reference studies (e.g., from BDC) + target_studies: Target studies to update (e.g., from Gen3) + + Returns: + Tuple of: + - merged_studies: Combined and updated study list + - new_accessions: Accessions in target but not in reference + - missing_accessions: Accessions in reference but not in target + """ + # Create lookup dictionary for reference studies + reference_dict = {} + reference_accessions = set() + + for study in reference_studies: + base_acc = bdc_utils.extract_base_accession(study['Accession']) + if base_acc: + reference_dict[base_acc] = study + reference_accessions.add(base_acc) + + # Process target studies + existing_records = [] + new_records = [] + target_accessions = set() + + for study in target_studies: + base_acc = bdc_utils.extract_base_accession(study['Accession']) + if base_acc: + target_accessions.add(base_acc) + + if base_acc in reference_dict: + # Update with reference information (preserve Released, Subjects Count, DOI Tombstone from Gen3) + released = study.get('Released', '') + subjects_count = study.get('Subjects Count', '') + doi_tombstone = study.get('DOI Tombstone', '') + study['Program'] = reference_dict[base_acc]['Program'] + study['Description'] = reference_dict[base_acc]['Description'] + study['Released'] = released + study['Subjects Count'] = subjects_count + study['DOI Tombstone'] = doi_tombstone + existing_records.append(study) + else: + # New record not in reference (keep Released field from Gen3) + new_records.append(study) + + # Add Training program studies from reference (BDC) that are missing in target (Gen3) + # Iterate through original reference_studies list to handle duplicate accessions + training_from_bdc = [] + for study in reference_studies: + if study['Program'] == 'Training': + base_acc = bdc_utils.extract_base_accession(study['Accession']) + if base_acc and base_acc not in target_accessions: + training_from_bdc.append(study) + + # Combine: existing records first, new Gen3-only records, then Training from BDC + merged_studies = existing_records + new_records + training_from_bdc + + # Find missing accessions + missing_accessions = list(reference_accessions - target_accessions) + new_accessions = [rec['Accession'] for rec in new_records] + + if self.logger: + self.logger.info(f"Merged studies: {len(merged_studies)} total") + self.logger.info(f"New in target (Gen3 only): {len(new_accessions)}") + if training_from_bdc: + self.logger.info(f"Added Training from BDC: {len(training_from_bdc)}") + self.logger.info(f"Missing from target: {len(missing_accessions)}") + + return merged_studies, new_accessions, missing_accessions + + def filter_by_doi_tombstone( + self, + studies: List[Dict[str, str]], + return_excluded: bool = False + ) -> tuple: + """ + Filter out studies where DOI Tombstone is 'True' (string). + + Args: + studies: List of study dictionaries + return_excluded: If True, also return list of excluded studies + + Returns: + If return_excluded is False: Tuple of (filtered_studies, excluded_count) + If return_excluded is True: Tuple of (filtered_studies, excluded_count, excluded_studies) + """ + filtered_studies = [] + excluded_studies = [] + + for study in studies: + doi_tombstone = study.get('DOI Tombstone', '') + + # Exclude if DOI tombstone is the string 'True' + # The field is a string in Gen3 metadata, not a boolean + if doi_tombstone == 'True': + excluded_studies.append(study) + else: + filtered_studies.append(study) + + if self.logger: + self.logger.info(f"Filtered by DOI tombstone (exclude 'True'): {len(filtered_studies)} kept, {len(excluded_studies)} excluded") + + if return_excluded: + return filtered_studies, len(excluded_studies), excluded_studies + return filtered_studies, len(excluded_studies) + + def filter_by_subjects_count( + self, + studies: List[Dict[str, str]] + ) -> tuple[List[Dict[str, str]], int]: + """ + Filter studies to only include those with Subjects Count > 0 and not empty. + + Args: + studies: List of study dictionaries + + Returns: + Tuple of (filtered_studies, excluded_count) + """ + filtered_studies = [] + excluded_count = 0 + + for study in studies: + subjects_count_str = study.get('Subjects Count', '').strip() + + # Keep if subjects count is not empty and not "0" + if subjects_count_str and subjects_count_str != '0': + filtered_studies.append(study) + else: + excluded_count += 1 + + if self.logger: + self.logger.info(f"Filtered by subjects count (>0): {len(filtered_studies)} kept, {excluded_count} excluded") + + return filtered_studies, excluded_count + + def filter_valid_studies( + self, + studies: List[Dict[str, str]], + required_fields: List[str] = None + ) -> tuple[List[Dict[str, str]], int]: + """ + Filter out studies that don't meet validation criteria. + + Args: + studies: List of study dictionaries + required_fields: Fields that must be non-empty (default: Accession, Study Name, Program) + + Returns: + Tuple of (valid_studies, skipped_count) + """ + if required_fields is None: + required_fields = ['Accession', 'Study Name', 'Program'] + + valid_studies = [] + skipped_count = 0 + + for study in studies: + is_valid, _ = bdc_utils.validate_study(study, required_fields) + if is_valid: + valid_studies.append(study) + else: + skipped_count += 1 + + if self.logger: + self.logger.info(f"Valid studies: {len(valid_studies)}, Skipped: {skipped_count}") + + return valid_studies, skipped_count + + +if __name__ == "__main__": + print("BDC Data Manager Module - Ready for import") diff --git a/scripts/bdc/program_table/bdc_utils.py b/scripts/bdc/program_table/bdc_utils.py new file mode 100644 index 0000000..c2233e7 --- /dev/null +++ b/scripts/bdc/program_table/bdc_utils.py @@ -0,0 +1,308 @@ +#!/usr/bin/env python3 +""" +Shared utilities for BDC data ingestion scripts. + +This module provides common functionality used across multiple scripts: +- File I/O operations (JSON loading/saving) +- Logging setup +- Data validation and transformation +- Path management +""" + +import json +import logging +import sys +import os +from datetime import datetime +from typing import Any, Dict, List, Optional + + +def setup_logging(log_file: str, log_level: int = logging.INFO) -> logging.Logger: + """ + Set up logging configuration with both file and console handlers. + + Args: + log_file: Path to the log file + log_level: Logging level (default: INFO) + + Returns: + Configured logger instance + """ + # Remove existing handlers + for handler in logging.root.handlers[:]: + logging.root.removeHandler(handler) + + logging_format = '%(asctime)s - %(levelname)s - %(message)s' + logging.basicConfig( + level=log_level, + format=logging_format, + handlers=[ + logging.FileHandler(log_file), + logging.StreamHandler(sys.stdout) + ] + ) + return logging.getLogger(__name__) + + +def load_json(file_path: str) -> Any: + """ + Load data from a JSON file. + + Args: + file_path: Path to the JSON file + + Returns: + Parsed JSON data + + Raises: + FileNotFoundError: If file doesn't exist + json.JSONDecodeError: If file is not valid JSON + """ + with open(file_path, 'r', encoding='utf-8') as f: + return json.load(f) + + +def save_json(data: Any, file_path: str, indent: Optional[int] = 2, minify: bool = False) -> None: + """ + Save data to a JSON file. + + Args: + data: Data to save + file_path: Output file path + indent: JSON indentation (default: 2, ignored if minify=True) + minify: If True, save without whitespace + """ + os.makedirs(os.path.dirname(file_path), exist_ok=True) + + with open(file_path, 'w', encoding='utf-8') as f: + if minify: + json.dump(data, f, separators=(',', ':')) + else: + json.dump(data, f, indent=indent) + + +def clean_json_newlines(data: Any) -> Any: + """ + Recursively remove newlines from all string values in JSON data. + + Args: + data: JSON data (dict, list, str, or primitive) + + Returns: + Cleaned data with newlines removed from strings + """ + if isinstance(data, str): + return data.replace('\n', '') + elif isinstance(data, list): + return [clean_json_newlines(item) for item in data] + elif isinstance(data, dict): + return {key: clean_json_newlines(value) for key, value in data.items()} + else: + return data + + +def extract_base_accession(accession: str) -> Optional[str]: + """ + Extract base accession from full accession string. + + Examples: + phs000123.v1.p1.c1 -> phs000123 + phs000456.v2.p2 -> phs000456 + other_id -> other_id + + Args: + accession: Full accession string + + Returns: + Base accession or None if invalid + """ + if not accession or not isinstance(accession, str): + return None + return accession.split('.')[0] if accession.startswith('phs') else accession + + +def validate_study(study: Dict[str, Any], required_fields: List[str]) -> tuple[bool, Optional[str]]: + """ + Validate that a study has all required fields with non-empty values. + + Args: + study: Study dictionary + required_fields: List of required field names + + Returns: + Tuple of (is_valid, error_message) + """ + missing_fields = [] + + for field in required_fields: + if not study.get(field) or str(study[field]).strip() == '': + missing_fields.append(field) + + if missing_fields: + return False, f"Missing required fields: {', '.join(missing_fields)}" + return True, None + + +def generate_timestamp() -> str: + """ + Generate a timestamp string for file naming. + + Returns: + Timestamp in format YYYYMMDD_HHMMSS + """ + return datetime.now().strftime("%Y%m%d_%H%M%S") + + +def ensure_dir_exists(directory: str) -> None: + """ + Create directory if it doesn't exist. + + Args: + directory: Directory path to create + """ + os.makedirs(directory, exist_ok=True) + + +def build_study_dict(accession: str, study_name: str, program: str, description: str = "", released: str = "", subjects_count: str = "", doi_tombstone: Any = "") -> Dict[str, Any]: + """ + Build a standardized study dictionary. + + Args: + accession: Study accession ID + study_name: Study name + program: Program name + description: Program description (optional) + released: Release status (optional) + subjects_count: Number of subjects (optional) + doi_tombstone: DOI tombstone status (optional) + + Returns: + Standardized study dictionary + """ + return { + 'Accession': accession, + 'Study Name': study_name, + 'Program': program, + 'Description': description, + 'Released': released, + 'Subjects Count': subjects_count, + 'DOI Tombstone': doi_tombstone + } + + +def sort_studies_by_description(studies: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Sort studies with empty descriptions at the bottom. + + Args: + studies: List of study dictionaries + + Returns: + Sorted list of studies + """ + return sorted(studies, key=lambda x: (x.get('Description', '') == '', x.get('Description', ''))) + + +def separate_studies_by_description(studies: List[Dict[str, Any]]) -> tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + """ + Separate studies into those with and without descriptions. + + Args: + studies: List of study dictionaries + + Returns: + Tuple of (studies_with_desc, studies_without_desc) + """ + studies_with_desc = [s for s in studies if s.get('Description', '').strip()] + studies_without_desc = [s for s in studies if not s.get('Description', '').strip()] + return studies_with_desc, studies_without_desc + + +def write_report_header(file_handle, title: str, width: int = 100) -> None: + """ + Write a formatted header to a report file. + + Args: + file_handle: Open file handle + title: Header title + width: Width of the separator line + """ + file_handle.write("=" * width + "\n") + file_handle.write(f"{title}\n") + file_handle.write("=" * width + "\n\n") + + +def write_report_section(file_handle, title: str, count: int, width: int = 100) -> None: + """ + Write a formatted section header to a report file. + + Args: + file_handle: Open file handle + title: Section title + count: Count to display + width: Width of the separator line + """ + file_handle.write("\n" + "=" * width + "\n") + file_handle.write(f"{title}\n") + file_handle.write("=" * width + "\n\n") + file_handle.write(f"Total: {count} studies\n\n") + + +class FilePathManager: + """Manages file paths for BDC data processing.""" + + def __init__(self, base_dir: str, timestamp: Optional[str] = None): + """ + Initialize file path manager. + + Args: + base_dir: Base directory for all data + timestamp: Optional timestamp string (generated if not provided) + """ + self.base_dir = base_dir + self.timestamp = timestamp or generate_timestamp() + self.bdc_dir = os.path.join(base_dir, "studies_on_bdc_portal") + self.gen3_dir = os.path.join(base_dir, "studies_on_gen3_portal") + self.program_table_dir = os.path.join(base_dir, "program_table") + + # Ensure directories exist + ensure_dir_exists(self.bdc_dir) + ensure_dir_exists(self.gen3_dir) + ensure_dir_exists(self.program_table_dir) + + def get_bdc_path(self, filename: str) -> str: + """Get path in BDC directory.""" + return os.path.join(self.bdc_dir, filename) + + def get_gen3_path(self, filename: str) -> str: + """Get path in Gen3 directory.""" + return os.path.join(self.gen3_dir, filename) + + def get_program_table_path(self, filename: str) -> str: + """Get path in program table directory.""" + return os.path.join(self.program_table_dir, filename) + + def get_timestamped_path(self, directory: str, base_name: str, extension: str) -> str: + """ + Get timestamped file path. + + Args: + directory: Directory path + base_name: Base filename (without extension) + extension: File extension (with or without dot) + + Returns: + Full path with timestamp + """ + if not extension.startswith('.'): + extension = f'.{extension}' + filename = f"{base_name}_{self.timestamp}{extension}" + return os.path.join(directory, filename) + + +if __name__ == "__main__": + # Test utilities + print("BDC Utils Module - Test Functions") + print(f"Timestamp: {generate_timestamp()}") + print(f"Base accession: {extract_base_accession('phs000123.v1.p1.c1')}") + print(f"Validation: {validate_study({'Accession': 'phs001', 'Study Name': 'Test'}, ['Accession', 'Study Name'])}") diff --git a/scripts/bdc/program_table/fetch_studies.py b/scripts/bdc/program_table/fetch_studies.py new file mode 100644 index 0000000..891a1fa --- /dev/null +++ b/scripts/bdc/program_table/fetch_studies.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +""" +Fetch and merge study data from BDC and Gen3 portals for program table ingestion. + +PURPOSE: + Generates the program table file used by the BDC portal's program search/filter feature. + This table allows users to filter studies by program affiliation on the BDC portal. + +WORKFLOW: + 1. Fetches studies from BDC portal (with program associations) + 2. Fetches studies from Gen3 portal + 3. Merges and filters the datasets + 4. Generates program table file ready for chart ingestion + +OUTPUT: + - cleaned_studies_on_gen3_sortedfile_to_upload_chart.json (program table file) + +Usage: + python fetch_studies.py --output-dir /path/to/output [--bdc-url URL] [--gen3-url URL] +""" + +import argparse +import sys +from pathlib import Path + +import bdc_utils +import bdc_data_manager + + +def main(): + parser = argparse.ArgumentParser( + description='Fetch and merge study data from BDC and Gen3 portals', + formatter_class=argparse.RawDescriptionHelpFormatter + ) + + parser.add_argument( + '--output-dir', + required=True, + help='Output directory for generated files' + ) + parser.add_argument( + '--bdc-url', + default='https://search-dev.biodatacatalyst.renci.org/search-api', + help='BDC API base URL (default: dev endpoint)' + ) + parser.add_argument( + '--gen3-url', + default='https://gen3.biodatacatalyst.nhlbi.nih.gov/', + help='Gen3 API base URL' + ) + parser.add_argument( + '--gen3-limit', + type=int, + default=50, + help='Gen3 API pagination limit (default: 50)' + ) + parser.add_argument( + '--show-details', + action='store_true', + help='Show detailed study information in logs' + ) + + args = parser.parse_args() + + # Setup paths + output_dir = Path(args.output_dir) + path_manager = bdc_utils.FilePathManager(str(output_dir)) + + # Setup logging + log_file = Path(output_dir) / "data_pipeline.log" + logger = bdc_utils.setup_logging(str(log_file)) + + logger.info("="*80) + logger.info("BDC Data Pipeline - Fetch Studies") + logger.info("="*80) + logger.info(f"Output directory: {output_dir}") + logger.info(f"BDC URL: {args.bdc_url}") + logger.info(f"Gen3 URL: {args.gen3_url}") + logger.info("") + + try: + # Step 1: Fetch BDC studies + logger.info("Step 1: Fetching studies from BDC portal...") + bdc_manager = bdc_data_manager.BDCDataManager(args.bdc_url, logger) + bdc_studies, program_descriptions = bdc_manager.fetch_all_studies(args.show_details) + + bdc_file = path_manager.get_bdc_path("bdc_studies.json") + bdc_utils.save_json(bdc_studies, bdc_file) + logger.info(f"Saved {len(bdc_studies)} BDC studies to: {bdc_file}") + + # Step 2: Fetch Gen3 studies + logger.info("\nStep 2: Fetching studies from Gen3 portal...") + gen3_manager = bdc_data_manager.Gen3DataManager(args.gen3_url, args.gen3_limit, logger) + gen3_raw_studies = gen3_manager.fetch_all_studies() + + gen3_raw_file = path_manager.get_gen3_path("raw_studies_on_gen3.json") + bdc_utils.save_json(gen3_raw_studies, gen3_raw_file) + logger.info(f"Saved {len(gen3_raw_studies)} Gen3 studies to: {gen3_raw_file}") + + # Step 3: Filter by DOI tombstone + logger.info("\nStep 3: Filtering by DOI tombstone...") + merger = bdc_data_manager.StudyMerger(logger) + gen3_no_tombstone, tombstone_filtered, tombstone_studies = merger.filter_by_doi_tombstone(gen3_raw_studies, return_excluded=True) + + # Step 4: Filter by subjects count (COMMENTED OUT FOR TESTING) + # logger.info("\nStep 4: Filtering by subjects count...") + # gen3_with_subjects, subjects_filtered = merger.filter_by_subjects_count(gen3_no_tombstone) + + # Step 5: Filter valid studies (COMMENTED OUT FOR TESTING) + # logger.info("\nStep 5: Filtering valid studies...") + # gen3_filtered_studies, skipped_count = merger.filter_valid_studies(gen3_with_subjects) + # logger.info(f"Filtered {len(gen3_filtered_studies)} valid Gen3 studies (skipped {skipped_count})") + + # Use DOI tombstone filtered studies directly + gen3_filtered_studies = gen3_no_tombstone + logger.info(f"Using {len(gen3_filtered_studies)} Gen3 studies after DOI tombstone filter only") + + # Step 6: Merge BDC and Gen3 data + logger.info("\nStep 6: Merging BDC and Gen3 data...") + merged_studies, new_accessions, missing_accessions = merger.merge_studies( + bdc_studies, gen3_filtered_studies + ) + logger.info(f"Merged {len(merged_studies)} studies") + + # Log missing studies + if missing_accessions: + missing_file = path_manager.get_gen3_path("missing_studies_comparing_gen3_and_bdc.log") + with open(missing_file, 'w') as f: + f.write(f"Studies in BDC but not in Gen3 ({len(missing_accessions)} total):\n\n") + for acc in sorted(missing_accessions): + f.write(f"{acc}\n") + logger.info(f"Logged {len(missing_accessions)} missing studies to: {missing_file}") + + # Generate report for BDC studies excluded from program table due to DOI tombstone + # Find BDC studies whose accessions match tombstone studies + tombstone_accessions = set() + for ts in tombstone_studies: + acc = ts.get('Accession', '') or ts.get('accession', '') or ts.get('study_id', '') + if acc: + base_acc = bdc_utils.extract_base_accession(acc) + if base_acc: + tombstone_accessions.add(base_acc) + + bdc_excluded = [] + for bdc_study in bdc_studies: + acc = bdc_study.get('Accession', '') + base_acc = bdc_utils.extract_base_accession(acc) + if base_acc and base_acc in tombstone_accessions: + bdc_excluded.append(bdc_study) + + if bdc_excluded: + excluded_file = path_manager.get_gen3_path("bdc_studies_excluded_doi_tombstone.json") + bdc_utils.save_json(bdc_excluded, excluded_file) + + # Also create a readable log + excluded_log = path_manager.get_gen3_path("bdc_studies_excluded_doi_tombstone.log") + with open(excluded_log, 'w') as f: + f.write("=" * 100 + "\n") + f.write("BDC STUDIES EXCLUDED FROM PROGRAM TABLE DUE TO DOI TOMBSTONE\n") + f.write("=" * 100 + "\n\n") + f.write("These studies exist in the BDC portal but are excluded from the program table\n") + f.write("because they have DOI Tombstone = 'True' in Gen3 (deprecated/removed studies).\n\n") + f.write(f"Total: {len(bdc_excluded)} study records\n") + f.write(f"Unique base accessions: {len(set(bdc_utils.extract_base_accession(s.get('Accession', '')) for s in bdc_excluded))}\n") + f.write("-" * 100 + "\n\n") + + # Group by base accession + from collections import defaultdict + by_acc = defaultdict(list) + for s in bdc_excluded: + base = bdc_utils.extract_base_accession(s.get('Accession', '')) + by_acc[base].append(s) + + for base_acc in sorted(by_acc.keys()): + studies_list = by_acc[base_acc] + f.write(f"Accession: {base_acc}\n") + f.write(f" Study Name: {studies_list[0].get('Study Name', 'N/A')}\n") + programs = list(set(s.get('Program', '') for s in studies_list if s.get('Program'))) + f.write(f" Programs in BDC: {', '.join(programs)}\n") + f.write(f" Reason: DOI Tombstone = True (study deprecated in Gen3)\n") + f.write("-" * 100 + "\n") + + logger.info(f"Logged {len(bdc_excluded)} BDC studies excluded due to DOI tombstone to: {excluded_log}") + + # Step 7: Sort by description and clean for program table + logger.info("\nStep 7: Preparing program table file...") + sorted_studies = bdc_utils.sort_studies_by_description(merged_studies) + cleaned_studies = bdc_utils.clean_json_newlines(sorted_studies) + + # Save program table file to dedicated program_table directory + program_table_file = path_manager.get_program_table_path("program_table.json") + bdc_utils.save_json(cleaned_studies, program_table_file, minify=False) + logger.info(f"Saved program table file to: {program_table_file}") + + # Summary + logger.info("\n" + "="*80) + logger.info("STEP 1 SUMMARY") + logger.info("="*80) + logger.info("") + logger.info("--- Detailed Breakdown ---") + logger.info(f"Gen3 studies fetched (raw): {len(gen3_raw_studies)}") + logger.info(f"Gen3 studies excluded by DOI tombstone: {tombstone_filtered}") + logger.info(f"Gen3 studies after filtering: {len(gen3_filtered_studies)}") + logger.info(f"Merged studies total: {len(merged_studies)}") + logger.info(f"New studies (Gen3 only, not in BDC): {len(new_accessions)}") + logger.info(f"Missing studies (BDC only, not in Gen3): {len(missing_accessions)}") + logger.info("") + logger.info("--- Reports ---") + logger.info(f"Excluded studies report: studies_on_gen3_portal/bdc_studies_excluded_doi_tombstone.log") + logger.info("="*80) + logger.info(f"\nProgram table file ready: {program_table_file}") + logger.info("Processing completed successfully!") + + return 0 + + except Exception as e: + logger.error(f"\nError: {str(e)}", exc_info=True) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/bdc/program_table/process_program_table.py b/scripts/bdc/program_table/process_program_table.py new file mode 100644 index 0000000..1c9d9ae --- /dev/null +++ b/scripts/bdc/program_table/process_program_table.py @@ -0,0 +1,803 @@ +#!/usr/bin/env python3 +""" +=============================================================================== +BDC Program Table Processor +=============================================================================== + +This script combines two processing steps for the BDC program table: + 1. Set "Extramural Research" program for studies with missing descriptions + 2. Expand the table by mapping communities to programs and creating duplicates + +=============================================================================== +INPUT +=============================================================================== +A JSON file containing an array of study records with the following fields: + - Accession: Study accession ID (e.g., "phs000123.v1.p1") + - Study Name: Name of the study + - Program: Current program assignment (may be empty) + - Description: Program description (may be empty) + - Community: Primary community classification + - Community1: Secondary community classification (optional) + - Community2: Tertiary community classification (optional) + - Released: Release status (will be removed from output) + - Subjects Count: Number of subjects (will be removed from output) + - DOI Tombstone: DOI tombstone info (will be removed from output) + +Example input record: +{ + "Accession": "phs000123.v1.p1", + "Study Name": "Example Study", + "Program": "", + "Description": "", + "Community": "TOPMed", + "Community1": "CONNECTS", + "Community2": "" +} + +=============================================================================== +OUTPUT +=============================================================================== +1. program_table_final_YYYYMMDD_HHMMSS.json + - Formatted JSON with processed records + - Easy to read and inspect + +2. program_table_final_YYYYMMDD_HHMMSS.min.json + - Minified JSON for production upload + - Smaller file size + +3. program_table_processing_YYYYMMDD_HHMMSS.log + - Detailed log of all processing steps and changes + +4. program_table_report_YYYYMMDD_HHMMSS.txt + - Comprehensive report with statistics and change details + +=============================================================================== +PROCESSING STEPS +=============================================================================== + +STEP 1: SET EXTRAMURAL RESEARCH FOR MISSING DESCRIPTIONS +--------------------------------------------------------- +For studies with empty or missing descriptions: + - Set Program = "Extramural Research" + - Set Description = "Various HLBS" + +Excluded from processing: + - Studies with Program = "TOPMed_Common_Exchange_Area" + - Studies with no valid Study Name + +STEP 2: EXPAND TABLE BY COMMUNITY MAPPING +--------------------------------------------------------- +For studies with Program = "Extramural Research": + - Replace Program with the Community value (if available) + - Update Description based on the new Program + +For ALL studies with communities that don't match their Program: + - Create duplicate records for each non-matching community + - Set the duplicate's Program to the community value + - Update Description based on the community + +=============================================================================== +PROGRAM TO DESCRIPTION MAPPING +=============================================================================== +Program Name -> Description +--------------------------------------------------------------------------- +Bench to Bassinet -> Pediatric Cardiovascular +BioLINCC -> Various HLBS +C4R -> COVID-19 & Various HLBS +CONNECTS -> COVID-19 +Extramural Research -> Various HLBS +Heartshare -> Cardiovascular +Longitudinal Epidemiology Observational Study (LEOS)-> Longitudinal Observational +LungMAP -> Pulmonary +NHLBI Intramural Research -> Various HLBS +National Sleep Research Resource (NSRR) -> Sleep and Circadian Rhythms +PETAL Network -> Pulmonary & COVID-19 +Pediatric Heart Network (PHN) -> Pediatric Cardiovascular +RECOVER -> Long COVID & PASC +Sickle Cell Disease -> Sickle Cell Disease +TOPMed -> Precision Medicine +Training -> Open-Access; Training +(Any other program) -> Various HLBS (default) + +=============================================================================== +USAGE +=============================================================================== +python process_program_table.py --input-file path/to/program_table.json +python process_program_table.py --input-file path/to/program_table.json --output-dir path/to/output + +=============================================================================== +""" + +import json +import argparse +import sys +import os +from pathlib import Path +from datetime import datetime +from typing import List, Dict, Any, Tuple +from collections import defaultdict + + +# ============================================================================= +# CONSTANTS +# ============================================================================= + +# Program to Description mapping - used when assigning descriptions to programs +PROGRAM_DESCRIPTIONS = { + "Bench to Bassinet": "Pediatric Cardiovascular", + "BioLINCC": "Various HLBS", + "C4R": "COVID-19 & Various HLBS", + "CONNECTS": "COVID-19", + "Extramural Research": "Various HLBS", + "Heartshare": "Cardiovascular", + "Longitudinal Epidemiology Observational Study (LEOS)": "Longitudinal Observational", + "LungMAP": "Pulmonary", + "NHLBI Intramural Research": "Various HLBS", + "National Sleep Research Resource (NSRR)": "Sleep and Circadian Rhythms", + "PETAL Network": "Pulmonary & COVID-19", + "Pediatric Heart Network (PHN)": "Pediatric Cardiovascular", + "RECOVER": "Long COVID & PASC", + "Sickle Cell Disease": "Sickle Cell Disease", + "TOPMed": "Precision Medicine", + "Training": "Open-Access; Training", +} + +# Fields to remove from final output (not needed for upload) +FIELDS_TO_REMOVE = ['Released', 'Subjects Count', 'DOI Tombstone'] + +# Programs to exclude from processing +EXCLUDED_PROGRAMS = ['TOPMed_Common_Exchange_Area'] + + +# ============================================================================= +# HELPER FUNCTIONS +# ============================================================================= + +def get_description_for_program(program: str) -> str: + """ + Get the appropriate Description for a given Program. + + Args: + program: The program name + + Returns: + The description for the program, or "Various HLBS" as default + """ + return PROGRAM_DESCRIPTIONS.get(program, "Various HLBS") + + +def load_json(file_path: str) -> List[Dict[str, Any]]: + """ + Load JSON file containing study records. + + Args: + file_path: Path to the JSON file + + Returns: + List of study dictionaries + + Raises: + FileNotFoundError: If the file doesn't exist + json.JSONDecodeError: If the file contains invalid JSON + """ + with open(file_path, 'r') as f: + return json.load(f) + + +def save_json(data: List[Dict[str, Any]], file_path: Path, minify: bool = False) -> None: + """ + Save data to JSON file, optionally removing specified fields. + + Args: + data: List of study dictionaries to save + file_path: Output file path + minify: If True, save as minified JSON (no indentation) + """ + # Remove specified fields before saving + cleaned_data = [] + for study in data: + cleaned_study = {k: v for k, v in study.items() if k not in FIELDS_TO_REMOVE} + cleaned_data.append(cleaned_study) + + with open(file_path, 'w') as f: + if minify: + json.dump(cleaned_data, f, separators=(',', ':')) + else: + json.dump(cleaned_data, f, indent=2) + + +def count_studies_by_program(studies: List[Dict[str, Any]]) -> Dict[str, int]: + """ + Count studies by program for statistics. + + Args: + studies: List of study dictionaries + + Returns: + Dictionary mapping program names to study counts + """ + program_counts = defaultdict(int) + for study in studies: + program = study.get('Program', '(empty)') + program_counts[program] += 1 + return dict(program_counts) + + +# ============================================================================= +# STEP 1: SET EXTRAMURAL RESEARCH FOR MISSING DESCRIPTIONS +# ============================================================================= + +def step1_set_extramural_for_missing_descriptions( + studies: List[Dict[str, Any]], + log_func +) -> Tuple[List[Dict[str, Any]], List[Dict], List[Dict]]: + """ + STEP 1: Set Program to "Extramural Research" for studies with empty descriptions. + + This step processes all studies and: + - Identifies studies with empty or missing descriptions + - Sets their Program to "Extramural Research" + - Sets their Description to "Various HLBS" + - Excludes studies from TOPMed_Common_Exchange_Area + - Excludes studies with no valid Study Name + + Args: + studies: List of study dictionaries from input file + log_func: Function to write log messages + + Returns: + Tuple of: + - processed_studies: Studies after processing + - changed_studies: List of studies that were changed + - excluded_studies: List of studies that were excluded + """ + log_func("=" * 100) + log_func("STEP 1: SET EXTRAMURAL RESEARCH FOR MISSING DESCRIPTIONS") + log_func("=" * 100) + log_func("") + + processed_studies = [] + changed_studies = [] + excluded_studies = [] + skipped_no_name = [] + + for study in studies: + study_copy = study.copy() + + # Skip studies with no valid name + study_name = study_copy.get('Study Name', '').strip() + if not study_name or study_name == '(no name)': + skipped_no_name.append({ + 'Accession': study_copy.get('Accession', ''), + 'Study Name': study_name if study_name else '(empty)', + 'Reason': 'No valid study name' + }) + continue + + # Get program name + program = study_copy.get('Program', '').strip() + + # Exclude studies from specified programs + if program in EXCLUDED_PROGRAMS: + excluded_studies.append({ + 'Accession': study_copy.get('Accession', ''), + 'Study Name': study_copy.get('Study Name', ''), + 'Program': program, + 'Description': study_copy.get('Description', ''), + 'Reason': f'Excluded program: {program}' + }) + log_func(f"[EXCLUDED] {study_copy.get('Accession', '')} - Program: {program}") + continue + + # Check if description is empty or missing + description = study_copy.get('Description', '').strip() + + if not description: + # Record original values + original_program = program + + # Update to "Extramural Research" with description "Various HLBS" + study_copy['Program'] = 'Extramural Research' + study_copy['Description'] = 'Various HLBS' + + # Record the change + changed_studies.append({ + 'Accession': study_copy.get('Accession', ''), + 'Study Name': study_copy.get('Study Name', ''), + 'Original Program': original_program, + 'New Program': 'Extramural Research', + 'Original Description': '(empty)', + 'New Description': 'Various HLBS' + }) + log_func(f"[CHANGED] {study_copy.get('Accession', '')} - Program: '{original_program}' -> 'Extramural Research'") + + processed_studies.append(study_copy) + + # Log skipped studies + if skipped_no_name: + log_func("") + log_func(f"Skipped {len(skipped_no_name)} studies with no valid name") + for s in skipped_no_name[:5]: # Show first 5 + log_func(f" - {s['Accession']}: {s['Study Name']}") + if len(skipped_no_name) > 5: + log_func(f" ... and {len(skipped_no_name) - 5} more") + + log_func("") + log_func(f"Step 1 Summary:") + log_func(f" - Studies processed: {len(processed_studies)}") + log_func(f" - Studies changed to Extramural Research: {len(changed_studies)}") + log_func(f" - Studies excluded: {len(excluded_studies)}") + log_func(f" - Studies skipped (no name): {len(skipped_no_name)}") + log_func("") + + return processed_studies, changed_studies, excluded_studies + + +# ============================================================================= +# STEP 2: EXPAND TABLE BY COMMUNITY MAPPING +# ============================================================================= + +def step2_expand_by_community( + studies: List[Dict[str, Any]], + log_func +) -> Tuple[List[Dict[str, Any]], List[Dict], List[Dict]]: + """ + STEP 2: Expand table by mapping communities to programs. + + This step processes all studies and: + - Replaces "Extramural Research" with Community value when available + - Updates Description based on the new Program + - Creates duplicate records for communities that don't match the Program + + Args: + studies: List of study dictionaries from Step 1 + log_func: Function to write log messages + + Returns: + Tuple of: + - expanded_studies: Studies after expansion (may have more records) + - extramural_replacements: List of Extramural Research replacements + - duplicates_created: List of duplicate records created + """ + log_func("=" * 100) + log_func("STEP 2: EXPAND TABLE BY COMMUNITY MAPPING") + log_func("=" * 100) + log_func("") + + expanded_studies = [] + extramural_replacements = [] + duplicates_created = [] + + for study in studies: + program = study.get('Program', '').strip() + description = study.get('Description', '').strip() + + # Get all communities + community = study.get('Community', '').strip() + community1 = study.get('Community1', '').strip() + community2 = study.get('Community2', '').strip() + all_communities = [c for c in [community, community1, community2] if c] + + # Case 1: If Program is "Extramural Research", replace with Community value + if program == 'Extramural Research': + study_copy = study.copy() + + if community: + # Replace with primary community + new_program = community + new_description = get_description_for_program(new_program) + + extramural_replacements.append({ + 'accession': study.get('Accession', ''), + 'study_name': study.get('Study Name', ''), + 'old_program': program, + 'new_program': new_program, + 'old_description': description, + 'new_description': new_description, + 'communities': all_communities + }) + + log_func(f"[EXTRAMURAL -> COMMUNITY] {study.get('Accession', '')}") + log_func(f" Program: '{program}' -> '{new_program}'") + log_func(f" Description: '{description}' -> '{new_description}'") + + study_copy['Program'] = new_program + study_copy['Description'] = new_description + + expanded_studies.append(study_copy) + + # Create duplicates for other communities that don't match new program + current_program = study_copy['Program'] + non_matching = [c for c in all_communities if c and c != current_program] + + for nm_community in non_matching: + dup = study_copy.copy() + dup['Program'] = nm_community + dup['Description'] = get_description_for_program(nm_community) + expanded_studies.append(dup) + + duplicates_created.append({ + 'accession': study.get('Accession', ''), + 'study_name': study.get('Study Name', ''), + 'original_program': current_program, + 'new_program': nm_community, + 'new_description': dup['Description'], + 'reason': 'Additional community from Extramural Research study' + }) + log_func(f" [DUPLICATE] Program: '{nm_community}', Description: '{dup['Description']}'") + + # Case 2: Program exists and may differ from some communities + elif program: + expanded_studies.append(study.copy()) + + # Find communities that don't match the program + non_matching = [c for c in all_communities if c and c != program] + + if non_matching: + log_func(f"[COMMUNITY MISMATCH] {study.get('Accession', '')}") + log_func(f" Original Program: '{program}'") + log_func(f" Communities: {', '.join(all_communities)}") + + for nm_community in non_matching: + dup = study.copy() + dup['Program'] = nm_community + dup['Description'] = get_description_for_program(nm_community) + expanded_studies.append(dup) + + duplicates_created.append({ + 'accession': study.get('Accession', ''), + 'study_name': study.get('Study Name', ''), + 'original_program': program, + 'new_program': nm_community, + 'new_description': dup['Description'], + 'reason': 'Community does not match Program' + }) + log_func(f" [DUPLICATE] Program: '{nm_community}', Description: '{dup['Description']}'") + + # Case 3: No program, just add as is + else: + expanded_studies.append(study.copy()) + + log_func("") + log_func(f"Step 2 Summary:") + log_func(f" - Input studies: {len(studies)}") + log_func(f" - Extramural Research replacements: {len(extramural_replacements)}") + log_func(f" - Duplicates created: {len(duplicates_created)}") + log_func(f" - Output studies: {len(expanded_studies)}") + log_func(f" - Net increase: +{len(expanded_studies) - len(studies)}") + log_func("") + + return expanded_studies, extramural_replacements, duplicates_created + + +# ============================================================================= +# REPORTING +# ============================================================================= + +def generate_report( + output_dir: Path, + timestamp: str, + original_count: int, + step1_results: Tuple[List, List, List], + step2_results: Tuple[List, List, List], + final_data: List[Dict], + original_program_counts: Dict[str, int], + final_program_counts: Dict[str, int] +) -> Path: + """ + Generate a comprehensive report of all processing. + + Args: + output_dir: Directory for output files + timestamp: Timestamp string for filename + original_count: Original number of input records + step1_results: Tuple from step 1 (processed, changed, excluded) + step2_results: Tuple from step 2 (expanded, replacements, duplicates) + final_data: Final processed data + original_program_counts: Program distribution before processing + final_program_counts: Program distribution after processing + + Returns: + Path to the generated report file + """ + report_file = output_dir / f'program_table_report_{timestamp}.txt' + + _, step1_changed, step1_excluded = step1_results + _, extramural_replacements, duplicates_created = step2_results + + with open(report_file, 'w') as f: + # Header + f.write("=" * 100 + "\n") + f.write("BDC PROGRAM TABLE PROCESSING REPORT\n") + f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + f.write("=" * 100 + "\n\n") + + # Executive Summary + f.write("=" * 100 + "\n") + f.write("EXECUTIVE SUMMARY\n") + f.write("=" * 100 + "\n") + f.write(f" Original record count: {original_count}\n") + f.write(f" Records excluded (special programs): {len(step1_excluded)}\n") + f.write(f" Records set to Extramural Research: {len(step1_changed)}\n") + f.write(f" Extramural Research -> Community: {len(extramural_replacements)}\n") + f.write(f" Duplicate records created: {len(duplicates_created)}\n") + f.write(f" Final record count: {len(final_data)}\n") + f.write(f" Net change: +{len(final_data) - original_count}\n") + f.write("\n") + + # Program to Description mapping + f.write("=" * 100 + "\n") + f.write("PROGRAM TO DESCRIPTION MAPPING (Reference)\n") + f.write("=" * 100 + "\n") + f.write(f"{'Program':<55} {'Description':<35}\n") + f.write("-" * 90 + "\n") + for prog, desc in sorted(PROGRAM_DESCRIPTIONS.items()): + f.write(f"{prog:<55} {desc:<35}\n") + f.write("\n") + + # Program Distribution Comparison + f.write("=" * 100 + "\n") + f.write("PROGRAM DISTRIBUTION: BEFORE vs AFTER\n") + f.write("=" * 100 + "\n") + all_programs = sorted(set(original_program_counts.keys()) | set(final_program_counts.keys())) + f.write(f"{'Program':<55} {'Before':<10} {'After':<10} {'Change':<10}\n") + f.write("-" * 85 + "\n") + for prog in all_programs: + before = original_program_counts.get(prog, 0) + after = final_program_counts.get(prog, 0) + change = after - before + change_str = f"+{change}" if change > 0 else str(change) + f.write(f"{prog:<55} {before:<10} {after:<10} {change_str:<10}\n") + f.write("-" * 85 + "\n") + f.write(f"{'TOTAL':<55} {original_count:<10} {len(final_data):<10} +{len(final_data) - original_count}\n") + f.write("\n") + + # Step 1 Details: Extramural Research assignments + f.write("=" * 100 + "\n") + f.write(f"STEP 1 DETAILS: STUDIES SET TO EXTRAMURAL RESEARCH ({len(step1_changed)} studies)\n") + f.write("=" * 100 + "\n") + if step1_changed: + for i, rec in enumerate(step1_changed[:50], 1): # Show first 50 + f.write(f"{i:>3}. {rec['Accession']:<25} {rec['Original Program'] or '(empty)':<25} -> Extramural Research\n") + if len(step1_changed) > 50: + f.write(f"... and {len(step1_changed) - 50} more\n") + else: + f.write(" No studies were set to Extramural Research.\n") + f.write("\n") + + # Step 1 Details: Excluded studies + f.write("=" * 100 + "\n") + f.write(f"STEP 1 DETAILS: EXCLUDED STUDIES ({len(step1_excluded)} studies)\n") + f.write("=" * 100 + "\n") + if step1_excluded: + for i, rec in enumerate(step1_excluded, 1): + f.write(f"{i:>3}. {rec['Accession']:<25} Program: {rec['Program']:<30} Reason: {rec['Reason']}\n") + else: + f.write(" No studies were excluded.\n") + f.write("\n") + + # Step 2 Details: Extramural Research replacements + f.write("=" * 100 + "\n") + f.write(f"STEP 2 DETAILS: EXTRAMURAL RESEARCH -> COMMUNITY ({len(extramural_replacements)} studies)\n") + f.write("=" * 100 + "\n") + if extramural_replacements: + by_new_program = defaultdict(list) + for rec in extramural_replacements: + by_new_program[rec['new_program']].append(rec) + + f.write("Summary by New Program:\n") + for prog, recs in sorted(by_new_program.items(), key=lambda x: -len(x[1])): + f.write(f" {prog}: {len(recs)} studies\n") + f.write("\n") + + f.write("Detailed List:\n") + f.write("-" * 100 + "\n") + for i, rec in enumerate(extramural_replacements[:50], 1): + f.write(f"{i:>3}. {rec['accession']:<25} -> {rec['new_program']:<40} ({rec['new_description']})\n") + if len(extramural_replacements) > 50: + f.write(f"... and {len(extramural_replacements) - 50} more\n") + else: + f.write(" No Extramural Research replacements were made.\n") + f.write("\n") + + # Step 2 Details: Duplicates created + f.write("=" * 100 + "\n") + f.write(f"STEP 2 DETAILS: DUPLICATE STUDIES CREATED ({len(duplicates_created)} duplicates)\n") + f.write("=" * 100 + "\n") + if duplicates_created: + by_new_program = defaultdict(list) + for rec in duplicates_created: + by_new_program[rec['new_program']].append(rec) + + f.write("Summary by New Program:\n") + for prog, recs in sorted(by_new_program.items(), key=lambda x: -len(x[1])): + f.write(f" {prog}: {len(recs)} duplicates\n") + f.write("\n") + + f.write("Detailed List:\n") + f.write("-" * 120 + "\n") + for i, rec in enumerate(duplicates_created[:50], 1): + f.write(f"{i:>3}. {rec['accession']:<25} Original: {rec['original_program']:<25} -> Duplicate: {rec['new_program']:<25}\n") + if len(duplicates_created) > 50: + f.write(f"... and {len(duplicates_created) - 50} more\n") + else: + f.write(" No duplicate studies were created.\n") + f.write("\n") + + # Footer + f.write("=" * 100 + "\n") + f.write("END OF REPORT\n") + f.write("=" * 100 + "\n") + + return report_file + + +# ============================================================================= +# MAIN FUNCTION +# ============================================================================= + +def main(): + """ + Main execution function. + + Parses command line arguments and orchestrates the two-step processing: + 1. Set Extramural Research for studies with missing descriptions + 2. Expand table by community mapping + """ + parser = argparse.ArgumentParser( + description='Process BDC program table: set Extramural Research and expand by community', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Example usage: + python process_program_table.py --input-file data/program_table.json + python process_program_table.py --input-file data/program_table.json --output-dir output/ + """ + ) + parser.add_argument( + '--input-file', + required=True, + help='Path to input program_table.json file' + ) + parser.add_argument( + '--output-dir', + default=None, + help='Output directory (default: same directory as script)' + ) + + args = parser.parse_args() + + # Setup output directory + if args.output_dir: + output_dir = Path(args.output_dir) + else: + output_dir = Path(__file__).parent + + output_dir.mkdir(parents=True, exist_ok=True) + + # Setup timestamp for output files + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + # Setup log file + log_file = output_dir / f'program_table_processing_{timestamp}.log' + + def log(message, also_print=True): + """Write message to log file and optionally print to console.""" + with open(log_file, 'a') as f: + f.write(message + "\n") + if also_print: + print(message) + + try: + # Header + log("=" * 100) + log("BDC PROGRAM TABLE PROCESSOR") + log(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + log("=" * 100) + log("") + + # Load input file + log(f"Loading input file: {args.input_file}") + studies = load_json(args.input_file) + original_count = len(studies) + log(f"Loaded {original_count} studies") + log("") + + # Get original program distribution + original_program_counts = count_studies_by_program(studies) + + # ===================================================================== + # STEP 1: Set Extramural Research for missing descriptions + # ===================================================================== + step1_processed, step1_changed, step1_excluded = step1_set_extramural_for_missing_descriptions( + studies, log + ) + + # ===================================================================== + # STEP 2: Expand table by community mapping + # ===================================================================== + final_data, extramural_replacements, duplicates_created = step2_expand_by_community( + step1_processed, log + ) + + # Get final program distribution + final_program_counts = count_studies_by_program(final_data) + + # ===================================================================== + # Save output files + # ===================================================================== + log("=" * 100) + log("SAVING OUTPUT FILES") + log("=" * 100) + log("") + + # Save formatted JSON + output_file = output_dir / f'program_table_final_{timestamp}.json' + save_json(final_data, output_file, minify=False) + log(f"Saved formatted JSON: {output_file.name}") + + # Save minified JSON + output_file_min = output_dir / f'program_table_final_{timestamp}.min.json' + save_json(final_data, output_file_min, minify=True) + + # Get file sizes + formatted_size = os.path.getsize(output_file) + minified_size = os.path.getsize(output_file_min) + reduction = ((formatted_size - minified_size) / formatted_size) * 100 + + log(f"Saved minified JSON: {output_file_min.name}") + log(f" Formatted size: {formatted_size:,} bytes") + log(f" Minified size: {minified_size:,} bytes") + log(f" Size reduction: {reduction:.1f}%") + log("") + + # Generate report + report_file = generate_report( + output_dir=output_dir, + timestamp=timestamp, + original_count=original_count, + step1_results=(step1_processed, step1_changed, step1_excluded), + step2_results=(final_data, extramural_replacements, duplicates_created), + final_data=final_data, + original_program_counts=original_program_counts, + final_program_counts=final_program_counts + ) + log(f"Saved report: {report_file.name}") + log(f"Saved log: {log_file.name}") + log("") + + # ===================================================================== + # Final Summary + # ===================================================================== + log("=" * 100) + log("FINAL SUMMARY") + log("=" * 100) + log(f" Original record count: {original_count}") + log(f" Records excluded (special programs): {len(step1_excluded)}") + log(f" Records set to Extramural Research: {len(step1_changed)}") + log(f" Extramural Research -> Community: {len(extramural_replacements)}") + log(f" Duplicate records created: {len(duplicates_created)}") + log(f" Final record count: {len(final_data)}") + log(f" Net change: +{len(final_data) - original_count}") + log("") + log("=" * 100) + log(f"Completed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + log("=" * 100) + + print(f"\nProcessing completed successfully!") + print(f"Output files in: {output_dir}") + + return 0 + + except FileNotFoundError: + print(f"Error: Input file not found: {args.input_file}", file=sys.stderr) + return 1 + except json.JSONDecodeError as e: + print(f"Error: Invalid JSON in input file: {e}", file=sys.stderr) + return 1 + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/bdc/program_table/run_pipeline.sh b/scripts/bdc/program_table/run_pipeline.sh new file mode 100755 index 0000000..5225ed1 --- /dev/null +++ b/scripts/bdc/program_table/run_pipeline.sh @@ -0,0 +1,210 @@ +#!/bin/bash +# +# BDC Data Ingestion Pipeline +# +# This script orchestrates the complete BDC data processing pipeline: +# 1. Fetch studies from BDC and Gen3 portals +# 2. Merge and clean the data +# 3. Update program names from Jira data +# 4. Generate all reports and output files +# +# Usage: +# ./run_pipeline.sh [OPTIONS] +# +# Options: +# -o, --output-dir DIR Output directory (required) +# -j, --jira-file FILE Jira data file path (required for step 2) +# -s, --step STEP Run specific step only (1 or 2) +# --bdc-url URL BDC API URL (optional) +# --gen3-url URL Gen3 API URL (optional) +# -h, --help Show this help message +# +# Examples: +# # Run full pipeline +# ./run_pipeline.sh -o /path/to/output -j /path/to/jira.csv +# +# # Run step 1 only (fetch studies) +# ./run_pipeline.sh -o /path/to/output -s 1 +# +# # Run step 2 only (update programs) +# ./run_pipeline.sh -o /path/to/output -j /path/to/jira.csv -s 2 +# + +set -e # Exit on error +set -u # Exit on undefined variable + +# Default values +OUTPUT_DIR="" +JIRA_FILE="" +STEP="all" +#BDC_URL="https://search-dev.biodatacatalyst.renci.org/search-api" +BDC_URL="https://search.biodatacatalyst.renci.org/search-api" +GEN3_URL="https://gen3.biodatacatalyst.nhlbi.nih.gov/" +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Helper functions +print_header() { + echo -e "${BLUE}======================================================================================================${NC}" + echo -e "${BLUE}$1${NC}" + echo -e "${BLUE}======================================================================================================${NC}" +} + +print_success() { + echo -e "${GREEN}✓ $1${NC}" +} + +print_error() { + echo -e "${RED}✗ $1${NC}" +} + +print_warning() { + echo -e "${YELLOW}⚠ $1${NC}" +} + +print_info() { + echo -e "$1" +} + +show_help() { + sed -n '2,27p' "$0" | sed 's/^# //' | sed 's/^#//' + exit 0 +} + +# Parse arguments +while [[ $# -gt 0 ]]; do + case $1 in + -o|--output-dir) + OUTPUT_DIR="$2" + shift 2 + ;; + -j|--jira-file) + JIRA_FILE="$2" + shift 2 + ;; + -s|--step) + STEP="$2" + shift 2 + ;; + --bdc-url) + BDC_URL="$2" + shift 2 + ;; + --gen3-url) + GEN3_URL="$2" + shift 2 + ;; + -h|--help) + show_help + ;; + *) + print_error "Unknown option: $1" + echo "Use -h or --help for usage information" + exit 1 + ;; + esac +done + +# Validate required arguments +if [[ -z "$OUTPUT_DIR" ]]; then + print_error "Output directory is required (-o or --output-dir)" + exit 1 +fi + +if [[ "$STEP" == "all" || "$STEP" == "2" ]]; then + if [[ -z "$JIRA_FILE" ]]; then + print_error "Jira file is required for step 2 (-j or --jira-file)" + exit 1 + fi + if [[ ! -f "$JIRA_FILE" ]]; then + print_error "Jira file not found: $JIRA_FILE" + exit 1 + fi +fi + +# Create output directory +mkdir -p "$OUTPUT_DIR" + +# Print configuration +print_header "BDC Data Ingestion Pipeline" +print_info "Configuration:" +print_info " Output directory: $OUTPUT_DIR" +print_info " BDC URL: $BDC_URL" +print_info " Gen3 URL: $GEN3_URL" +if [[ -n "$JIRA_FILE" ]]; then + print_info " Jira file: $JIRA_FILE" +fi +if [[ "$STEP" != "all" ]]; then + print_info " Step: $STEP" +fi +echo "" + +# Step 1: Fetch studies from BDC and Gen3 +if [[ "$STEP" == "all" || "$STEP" == "1" ]]; then + print_header "STEP 1: Fetching Studies from BDC and Gen3" + + if python3 "$SCRIPT_DIR/fetch_studies.py" \ + --output-dir "$OUTPUT_DIR" \ + --bdc-url "$BDC_URL" \ + --gen3-url "$GEN3_URL"; then + print_success "Step 1 completed successfully" + echo "" + else + print_error "Step 1 failed" + exit 1 + fi +fi + +# Step 2: Update program names from Jira +if [[ "$STEP" == "all" || "$STEP" == "2" ]]; then + print_header "STEP 2: Updating Program Names from Jira" + + # Find the input file from step 1 (now in program_table directory) + INPUT_FILE="$OUTPUT_DIR/program_table/program_table.json" + + if [[ ! -f "$INPUT_FILE" ]]; then + print_error "Input file not found: $INPUT_FILE" + print_error "Please run step 1 first or provide the correct input file" + exit 1 + fi + + if python3 "$SCRIPT_DIR/update_programs.py" \ + --jira-file "$JIRA_FILE" \ + --input-file "$INPUT_FILE" \ + --output-dir "$OUTPUT_DIR"; then + print_success "Step 2 completed successfully" + echo "" + else + print_error "Step 2 failed" + exit 1 + fi +fi + +# Final summary +print_header "Pipeline Completed Successfully!" +print_info "" +print_info "Generated files can be found in:" +print_info " $OUTPUT_DIR" +print_info "" +print_info "Key output files:" +if [[ "$STEP" == "all" || "$STEP" == "1" ]]; then + print_info " Step 1 (Fetch):" + print_info " - Program table: $OUTPUT_DIR/program_table/program_table.json" + print_info " - Reference files: $OUTPUT_DIR/studies_on_bdc_portal/bdc_studies.json" + print_info " $OUTPUT_DIR/studies_on_gen3_portal/raw_studies_on_gen3.json" +fi +if [[ "$STEP" == "all" || "$STEP" == "2" ]]; then + print_info " Step 2 (Update):" + print_info " - Program table (final): $OUTPUT_DIR/program_table/program_table_updated_*_minified.json" + print_info " - Program table (readable): $OUTPUT_DIR/program_table/program_table_updated_*.json" + print_info " - Reports: $OUTPUT_DIR/studies_on_gen3_portal/*_report_*.{json,log}" + print_info " - Missing NHLBI program names: $OUTPUT_DIR/studies_on_gen3_portal/studies_in_gen3_and_jira_missing_NHLBI_program_name_*.json" +fi +print_info "" +print_success "All done!" diff --git a/scripts/bdc/program_table/update_programs.py b/scripts/bdc/program_table/update_programs.py new file mode 100644 index 0000000..0a77d83 --- /dev/null +++ b/scripts/bdc/program_table/update_programs.py @@ -0,0 +1,814 @@ +#!/usr/bin/env python3 +""" +Update program names for studies using NHLBI Jira data for program table ingestion. + +PURPOSE: + Updates the program table with correct program names from NHLBI Jira, + ensuring the BDC portal's program search/filter feature has accurate data. + +WORKFLOW: + 1. Reads Gen3 studies data (from fetch_studies.py output) + 2. Loads Jira data (CSV/Excel) with approved program associations + 3. Updates program names based on Jira + 4. Fills missing descriptions from same program studies + 5. Generates final program table and comprehensive reports + +OUTPUT: + - cleaned_studies_on_gen3_updated_*_upload_chart_minified.json (final program table) + +Usage: + python update_programs.py --jira-file path/to/jira.csv --input-file path/to/gen3_studies.json --output-dir path/to/output +""" + +import argparse +import sys +import pandas as pd +from pathlib import Path +from typing import Dict, List, Any, Tuple + +import bdc_utils + + +class JiraProgramUpdater: + """Handles updating program names from Jira data.""" + + def __init__(self, jira_file: str, input_file: str, output_dir: str, logger: Any): + """ + Initialize Jira program updater. + + Args: + jira_file: Path to Jira data file (CSV or Excel) + input_file: Path to input Gen3 studies JSON + output_dir: Output directory for results + logger: Logger instance + """ + self.jira_file = jira_file + self.input_file = input_file + self.logger = logger + + # Setup output paths + self.path_manager = bdc_utils.FilePathManager(output_dir) + timestamp = self.path_manager.timestamp + + self.output_files = { + # Final program table files in program_table directory + 'upload_chart': self.path_manager.get_program_table_path(f"program_table_updated_{timestamp}.json"), + 'upload_mini': self.path_manager.get_program_table_path(f"program_table_updated_{timestamp}_minified.json"), + # Reports and intermediate files in gen3 directory + 'updated': self.path_manager.get_gen3_path(f"studies_updated_{timestamp}.json"), + 'update_report': self.path_manager.get_gen3_path(f"program_names_updated_report_{timestamp}.json"), + 'missing_desc': self.path_manager.get_gen3_path(f"studies_missing_NHLBI_program_name_{timestamp}.json"), + 'both_missing': self.path_manager.get_gen3_path(f"studies_in_gen3_and_jira_Which_are_missing_NHLBI_program_name_{timestamp}.json"), + 'comprehensive': self.path_manager.get_gen3_path(f"jira_studies_not_in_gen3_report_{timestamp}.log") + } + + def _find_column(self, df: pd.DataFrame, *names: str) -> str: + """ + Find a column by trying multiple possible names. + Handles both old format (e.g., 'Accession') and new Jira export format + (e.g., 'Custom field (Accession)'). + + Args: + df: DataFrame to search + *names: Possible column name variations to try + + Returns: + The actual column name found, or empty string if not found + """ + for name in names: + # Try exact match + if name in df.columns: + return name + # Try with 'Custom field (...)' prefix (new Jira export format) + custom_field_name = f'Custom field ({name})' + if custom_field_name in df.columns: + return custom_field_name + return '' + + def _find_community_columns(self, df: pd.DataFrame) -> List[str]: + """ + Find all Community columns in the DataFrame. + Handles both old format and new Jira export format with duplicate columns. + + Args: + df: DataFrame to search + + Returns: + List of community column names found + """ + community_cols = [] + for col in df.columns: + col_lower = col.lower() + # Match 'Community', 'Community.1', 'Custom field (Community)', etc. + if col_lower == 'community' or col_lower.startswith('community.'): + community_cols.append(col) + elif 'custom field (community)' in col_lower: + community_cols.append(col) + return community_cols + + def load_jira_data(self) -> Tuple[Dict[str, Dict[str, str]], set]: + """ + Load and parse Jira data file. + Supports both old CSV format and new Jira export format with 'Custom field (...)' columns. + + Returns: + Tuple of (jira_dict, jira_accessions) + """ + # Validate file + if not Path(self.jira_file).exists(): + raise FileNotFoundError(f"Jira file not found: {self.jira_file}") + + file_ext = Path(self.jira_file).suffix.lower() + if file_ext not in ['.csv', '.xlsx', '.xls']: + raise ValueError(f"Unsupported file format: {file_ext}") + + self.logger.info(f"Loading Jira data from: {self.jira_file}") + + # Load data + df = pd.read_csv(self.jira_file) if file_ext == '.csv' else pd.read_excel(self.jira_file) + + # Find column names (handles both old and new Jira formats) + accession_col = self._find_column(df, 'Accession') + program_col = self._find_column(df, 'Program(s)') + gen3_program_col = self._find_column(df, 'Gen3 Program Name') + status_col = self._find_column(df, 'Status') + summary_col = self._find_column(df, 'Summary') + + if not accession_col: + raise ValueError("Could not find Accession column in Jira file") + + self.logger.info(f"Column mapping: Accession='{accession_col}', Program(s)='{program_col}', " + f"Gen3 Program Name='{gen3_program_col}', Status='{status_col}', Summary='{summary_col}'") + + df_valid = df.dropna(subset=[accession_col]) + + jira_dict = {} + jira_accessions = set() + + # Find all Community columns + community_cols = self._find_community_columns(df) + self.logger.info(f"Found {len(community_cols)} Community columns in Jira data: {community_cols}") + + for _, row in df_valid.iterrows(): + accession = str(row[accession_col]).strip() + base_acc = bdc_utils.extract_base_accession(accession) + + if base_acc: + jira_accessions.add(base_acc) + + # Extract community values from all Community columns + community_values = [] + for col in community_cols: + val = row.get(col, '') + community_values.append(str(val).strip() if pd.notna(val) else '') + + # Get program name from Program(s) field, or fall back to first Community value + # (In some Jira exports, Community column contains the program names) + program_name = '' + if program_col: + program_name = str(row.get(program_col, '')).strip() if pd.notna(row.get(program_col, '')) else '' + + # If no Program(s) value, use first Community value as program name + if not program_name and community_values: + first_community = community_values[0] + if first_community: + program_name = first_community + + jira_dict[base_acc] = { + 'full_accession': accession, + 'program_name': program_name, # Only from Program(s) field, not Gen3 Program Name + 'gen3_program_name': str(row.get(gen3_program_col, '')).strip() if gen3_program_col and pd.notna(row.get(gen3_program_col, '')) else '', + 'status': str(row.get(status_col, '')).strip() if status_col and pd.notna(row.get(status_col, '')) else '', + 'summary': str(row.get(summary_col, '')) if summary_col else '', + 'community': community_values[0] if len(community_values) > 0 else '', + 'community1': community_values[1] if len(community_values) > 1 else '', + 'community2': community_values[2] if len(community_values) > 2 else '' + } + + self.logger.info(f"Loaded {len(jira_dict)} studies from Jira") + return jira_dict, jira_accessions + + def get_program_info(self, studies: List[Dict[str, str]], program_name: str) -> Tuple[str, str]: + """ + Find correct casing and description for a program (case-insensitive). + + Args: + studies: List of study dictionaries + program_name: Program name to search for + + Returns: + Tuple of (program_name_with_correct_case, description) + """ + if not program_name: + return program_name, '' + + program_lower = program_name.lower() + for study in studies: + study_prog = study.get('Program', '') + if study_prog and study_prog.lower() == program_lower: + desc = study.get('Description', '') + return study_prog, desc if desc and desc.strip() else '' + return program_name, '' + + def update_study_programs( + self, + studies: List[Dict[str, str]], + jira_dict: Dict[str, Dict[str, str]], + valid_programs_map: Dict[str, str] = None + ) -> Tuple[List[Dict[str, str]], List[Dict[str, str]], List[Dict[str, str]]]: + """ + Update program names for studies based on Jira data. + Creates duplicate entries for studies with multiple communities. + Only uses program names that exist in BDC portal. + + Args: + studies: List of Gen3 studies + jira_dict: Jira data dictionary + valid_programs_map: Dict mapping lowercase program names to original case + + Returns: + Tuple of (additional_studies, updated_records, not_in_jira_records) + """ + if valid_programs_map is None: + valid_programs_map = {} + + # Programs to exclude entirely (not map to Extramural Research) + excluded_program_names = {'tutorial', 'no program', ''} + + def get_valid_program(prog_name: str) -> tuple: + """ + Get valid BDC program name. + Returns (program_name, should_exclude) tuple. + - If valid BDC program: returns (program_name, False) + - If 'tutorial' or empty: returns (None, True) - exclude + - If other invalid program: returns ('Extramural Research', False) - map to Extramural + """ + if not prog_name or prog_name.lower() in excluded_program_names: + return None, True # Exclude tutorial and No Program + # Check if program exists in BDC (case-insensitive) + prog_lower = prog_name.lower() + if prog_lower in valid_programs_map: + return valid_programs_map[prog_lower], False + # Not a valid BDC program - map to Extramural Research + self.logger.info(f"Program '{prog_name}' not in BDC, using 'Extramural Research'") + return 'Extramural Research', False + + additional_studies = [] + updated_records = [] + not_in_jira = [] + excluded_studies = [] # Studies with excluded programs (tutorial, No Program) + + for study in list(studies): + accession = study.get('Accession', '') + base_acc = bdc_utils.extract_base_accession(accession) + current_prog = study.get('Program', '') + current_desc = study.get('Description', '') + + # Check if accession is in phs_id format + is_phs_id = accession.startswith('phs') + + # Validate current program + if valid_programs_map: + validated_prog, should_exclude = get_valid_program(current_prog) + # For non-phs_id studies, exclude if they would be mapped to Extramural Research + if not is_phs_id and validated_prog == 'Extramural Research': + should_exclude = True + if should_exclude: + # Exclude tutorial, No Program, and non-phs_id studies defaulting to Extramural Research + reason = 'Non-phs_id study excluded from Extramural Research' if not is_phs_id else 'Program excluded (tutorial or No Program)' + excluded_studies.append({ + 'Accession': accession, + 'Study Name': study.get('Study Name', ''), + 'Invalid Program': current_prog or 'No Program', + 'Reason': reason + }) + self.logger.info(f"Excluding {accession}: {reason}") + studies.remove(study) + continue + elif validated_prog != current_prog: + study['Program'] = validated_prog + current_prog = validated_prog + + if base_acc and base_acc in jira_dict: + jira_info = jira_dict[base_acc] + + # Get all non-empty community values (community names are program names) + communities = [c for c in [ + jira_info.get('community', ''), + jira_info.get('community1', ''), + jira_info.get('community2', '') + ] if c] + + # Get program name: first from Program(s), then fall back to first Community + program_name = jira_info.get('program_name', '').strip() + if not program_name and communities: + program_name = communities[0] + + # Validate program name against BDC programs + if program_name: + validated_prog, should_exclude = get_valid_program(program_name) + # For non-phs_id studies, exclude if they would be mapped to Extramural Research + if not is_phs_id and validated_prog == 'Extramural Research': + should_exclude = True + if should_exclude: + program_name = None # Don't update program if it's tutorial/empty + else: + program_name = validated_prog + + # Set all community fields from Jira as-is + study['Community'] = communities[0] if len(communities) > 0 else '' + study['Community1'] = communities[1] if len(communities) > 1 else '' + study['Community2'] = communities[2] if len(communities) > 2 else '' + + if program_name: + # Update program name from Jira (validated against BDC programs) + first_prog, first_desc = self.get_program_info(studies, program_name) + if not first_desc and current_desc: + first_desc = current_desc + + if first_prog.lower() != current_prog.lower(): + study['Program'] = first_prog + if first_desc: + study['Description'] = first_desc + + updated_records.append({ + 'Accession': accession, + 'Study Name': study.get('Study Name', ''), + 'Old Program': current_prog, + 'New Program': first_prog, + 'Old Description': current_desc, + 'New Description': first_desc, + 'Jira Status': jira_info['status'], + 'Jira Summary': jira_info['summary'], + 'Had Description': 'Yes' if current_desc.strip() else 'No', + 'Multi Program': 'Yes' if len(communities) > 1 else 'No' + }) + self.logger.info(f"Updated {accession}: Program '{current_prog}' -> '{first_prog}' (primary community: {communities[0] if communities else 'N/A'})") + + # Handle additional communities (create duplicates) + # This duplicates the study for each additional community + # Community values are validated as program names - skip if excluded (tutorial/empty) + if len(communities) > 1: + for idx, community in enumerate(communities[1:], start=2): + # Validate community as program name + validated_community_prog, should_exclude = get_valid_program(community) + if should_exclude: + self.logger.info(f"Skipping community duplicate {accession}: '{community}' (tutorial/empty)") + continue + new_study = study.copy() + new_study['Program'] = validated_community_prog + # Get description for this program + _, prog_desc = self.get_program_info(studies, validated_community_prog) + if prog_desc: + new_study['Description'] = prog_desc + additional_studies.append(new_study) + self.logger.info(f"Added {accession}: Program '{current_prog}' -> '{validated_community_prog}' (community {idx}/{len(communities)}: {community})") + + else: + # Study not in Jira - Community stays empty (Community comes from Jira only) + study['Community'] = '' + study['Community1'] = '' + study['Community2'] = '' + + if not current_desc or not current_desc.strip(): + # Track new study not in Jira + not_in_jira.append({ + 'Accession': accession, + 'Study Name': study.get('Study Name', ''), + 'Current Program': current_prog + }) + + return additional_studies, updated_records, not_in_jira, excluded_studies + + def fill_descriptions(self, studies: List[Dict[str, str]]) -> List[Dict[str, str]]: + """ + Fill missing descriptions from studies with same program. + + Args: + studies: List of study dictionaries + + Returns: + List of descriptions that were filled + """ + self.logger.info("\nFilling missing descriptions from same program studies...") + filled = [] + + for study in studies: + prog = study.get('Program', '') + desc = study.get('Description', '') + + if prog and (not desc or not desc.strip()): + _, found_desc = self.get_program_info(studies, prog) + if found_desc: + study['Description'] = found_desc + filled.append({ + 'Accession': study.get('Accession', ''), + 'Study Name': study.get('Study Name', ''), + 'Program': prog, + 'Description Added': found_desc + }) + self.logger.info(f"Filled description for {study.get('Accession', '')}: Program '{prog}'") + + if filled: + self.logger.info(f"Filled descriptions for {len(filled)} studies") + + return filled + + def identify_both_missing_desc( + self, + studies: List[Dict[str, str]], + jira_dict: Dict[str, Dict[str, str]] + ) -> List[Dict[str, str]]: + """ + Identify studies in both Gen3 and Jira but missing descriptions. + + Args: + studies: List of study dictionaries + jira_dict: Jira data dictionary + + Returns: + List of studies missing descriptions + """ + self.logger.info("\nIdentifying studies in both Gen3 and Jira but missing descriptions...") + both_missing = [] + + for study in studies: + accession = study.get('Accession', '') + base_acc = bdc_utils.extract_base_accession(accession) + desc = study.get('Description', '') + + if base_acc and base_acc in jira_dict: + if not desc or not desc.strip(): + jira_info = jira_dict[base_acc] + both_missing.append({ + 'Accession': accession, + 'Study Name': study.get('Study Name', ''), + 'Program': study.get('Program', ''), + 'Jira Program': jira_info['program_name'], + 'Jira Status': jira_info['status'], + 'Jira Summary': jira_info['summary'] + }) + self.logger.info(f"Found {accession}: In both Gen3 and Jira but missing description") + + if both_missing: + self.logger.info(f"Found {len(both_missing)} studies in both Gen3 and Jira but missing description") + + return both_missing + + def save_comprehensive_report( + self, + updated_studies: List[Dict[str, str]], + not_in_jira: List[Dict[str, str]], + missing_from_gen3: List[Dict[str, str]] + ) -> None: + """Save comprehensive categorized report.""" + with open(self.output_files['comprehensive'], 'w') as f: + bdc_utils.write_report_header(f, "COMPREHENSIVE STUDY STATUS REPORT") + + # Category 1: NEW studies + bdc_utils.write_report_section(f, "CATEGORY 1: NEW STUDIES (In both Gen3 and Jira - Program names updated)", len(updated_studies)) + for study in updated_studies: + f.write(f"Accession: {study['Accession']}\n") + f.write(f"Study Name: {study['Study Name']}\n") + f.write(f"Old Program: {study['Old Program']}\n") + f.write(f"New Program: {study['New Program']}\n") + f.write(f"Old Description: {study['Old Description']}\n") + f.write(f"New Description: {study['New Description']}\n") + f.write(f"Had Description Before: {study['Had Description']}\n") + f.write(f"Jira Status: {study['Jira Status']}\n") + f.write(f"Jira Summary: {study['Jira Summary']}\n") + f.write("-" * 100 + "\n") + + # Category 2: OLD studies + bdc_utils.write_report_section(f, "CATEGORY 2: OLD STUDIES (In Gen3 only - Not in Jira)", len(not_in_jira)) + for study in not_in_jira: + f.write(f"Accession: {study['Accession']}\n") + f.write(f"Study Name: {study['Study Name']}\n") + f.write(f"Current Program: {study['Current Program']}\n") + f.write("-" * 100 + "\n") + + # Category 3: NOT INGESTED + bdc_utils.write_report_section(f, "CATEGORY 3: NOT INGESTED (In Jira only - Yet to be ingested by Gen3)", len(missing_from_gen3)) + for study in missing_from_gen3: + f.write(f"Accession: {study['Accession']}\n") + f.write(f"Program Name: {study['Gen3 Program Name']}\n") + f.write(f"Status: {study['Status']}\n") + f.write(f"Summary: {study['Summary']}\n") + f.write("-" * 100 + "\n") + + self.logger.info(f"Comprehensive report saved to: {self.output_files['comprehensive']}") + + def run(self) -> int: + """Execute the full update process.""" + try: + # Load BDC studies first to get valid program names + bdc_studies_file = self.path_manager.get_bdc_path("bdc_studies.json") + bdc_studies = bdc_utils.load_json(bdc_studies_file) + + # Get valid program names from BDC (case-insensitive matching) + valid_programs = set() + valid_programs_map = {} # lowercase -> original case + for s in bdc_studies: + prog = s.get('Program', '') + if prog: + valid_programs.add(prog) + valid_programs_map[prog.lower()] = prog + self.logger.info(f"Valid BDC programs: {sorted(valid_programs)}") + + # Load data + self.logger.info("Loading Gen3 studies data...") + gen3_studies = bdc_utils.load_json(self.input_file) + self.logger.info(f"Loaded {len(gen3_studies)} studies from Gen3 data") + + jira_dict, jira_accessions = self.load_jira_data() + + # Track accessions + gen3_accessions = {bdc_utils.extract_base_accession(s['Accession']) for s in gen3_studies if bdc_utils.extract_base_accession(s['Accession'])} + + # Update programs + self.logger.info("\nUpdating program names from Jira...") + additional, updated, not_in_jira, excluded = self.update_study_programs(gen3_studies, jira_dict, valid_programs_map) + gen3_studies.extend(additional) + if excluded: + self.logger.info(f"Excluded {len(excluded)} studies with invalid programs") + + # Save program_table.json with Community fields (before filtering) + program_table_with_community = self.path_manager.get_program_table_path("program_table.json") + bdc_utils.save_json(gen3_studies, program_table_with_community) + self.logger.info(f"Saved program_table.json with Community fields: {len(gen3_studies)} studies") + + # Fill descriptions + filled = self.fill_descriptions(gen3_studies) + + # Identify both missing descriptions + both_missing = self.identify_both_missing_desc(gen3_studies, jira_dict) + + # Separate studies with/without descriptions + studies_with_desc, studies_missing_desc = bdc_utils.separate_studies_by_description(gen3_studies) + self.logger.info(f"\nStudies with description: {len(studies_with_desc)}") + self.logger.info(f"Studies missing description: {len(studies_missing_desc)}") + + # Find missing from Gen3 + missing_from_gen3 = [ + { + 'Accession': jira_dict[base_acc]['full_accession'], + 'Gen3 Program Name': jira_dict[base_acc].get('gen3_program_name', '') or jira_dict[base_acc]['program_name'], + 'Program(s)': jira_dict[base_acc]['program_name'], + 'Status': jira_dict[base_acc]['status'], + 'Summary': jira_dict[base_acc]['summary'] + } + for base_acc in jira_accessions if base_acc not in gen3_accessions + ] + + # Save results + self.logger.info("\nSaving results...") + bdc_utils.save_json(studies_with_desc, self.output_files['updated']) + bdc_utils.save_json(studies_with_desc, self.output_files['upload_chart']) + bdc_utils.save_json(studies_with_desc, self.output_files['upload_mini'], minify=True) + + # Save YAML format + import yaml + yaml_file = self.path_manager.get_program_table_path(f"program_table_updated_{self.path_manager.timestamp}.yaml") + yaml_data = {'program_study_mappings': studies_with_desc} + with open(yaml_file, 'w') as f: + yaml.dump(yaml_data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) + self.logger.info(f"Saved YAML format to: {yaml_file}") + + # Save minified YAML format + yaml_mini_file = self.path_manager.get_program_table_path(f"program_table_updated_{self.path_manager.timestamp}_minified.yaml") + with open(yaml_mini_file, 'w') as f: + yaml.dump(yaml_data, f, default_flow_style=True, allow_unicode=True, sort_keys=False) + self.logger.info(f"Saved minified YAML format to: {yaml_mini_file}") + + if studies_missing_desc: + bdc_utils.save_json(studies_missing_desc, self.output_files['missing_desc']) + + if both_missing: + bdc_utils.save_json(both_missing, self.output_files['both_missing']) + + if updated: + bdc_utils.save_json(updated, self.output_files['update_report']) + + self.save_comprehensive_report(updated, not_in_jira, missing_from_gen3) + + # Load BDC studies for final comparison + bdc_studies_file = self.path_manager.get_bdc_path("bdc_studies.json") + bdc_studies = bdc_utils.load_json(bdc_studies_file) + + # Calculate unique accessions + from collections import Counter + bdc_unique_accessions = set() + for s in bdc_studies: + base = bdc_utils.extract_base_accession(s.get('Accession', '')) + if base: + bdc_unique_accessions.add(base) + + program_table_unique_accessions = set() + for s in gen3_studies: + base = bdc_utils.extract_base_accession(s.get('Accession', '')) + if base: + program_table_unique_accessions.add(base) + + # Count studies by program + bdc_program_counts = Counter() + for s in bdc_studies: + program = s.get('Program', '') or 'No Program' + bdc_program_counts[program] += 1 + + program_table_counts = Counter() + for s in gen3_studies: + program = s.get('Program', '') or 'No Program' + program_table_counts[program] += 1 + + # Final Summary with comparison + self.logger.info("\n" + "="*80) + self.logger.info("FINAL SUMMARY") + self.logger.info("="*80) + self.logger.info("") + self.logger.info("--- Processing Stats ---") + self.logger.info(f"Total studies in final program table: {len(gen3_studies)}") + self.logger.info(f"Studies excluded (invalid program): {len(excluded)}") + self.logger.info(f"Studies with program names updated: {len(updated)}") + self.logger.info(f"Descriptions filled from same program: {len(filled)}") + self.logger.info(f"Studies still missing description: {len(studies_missing_desc)}") + self.logger.info(f"Studies in both Gen3 and Jira but missing description: {len(both_missing)}") + self.logger.info(f"New studies (empty description) not in Jira: {len(not_in_jira)}") + self.logger.info(f"Studies in Jira but not in Gen3: {len(missing_from_gen3)}") + self.logger.info("") + self.logger.info("--- BDC Portal vs Program Table Comparison ---") + self.logger.info(f"BDC Portal: {len(bdc_studies)} study records ({len(bdc_unique_accessions)} unique accessions)") + self.logger.info(f"Program Table: {len(gen3_studies)} study records ({len(program_table_unique_accessions)} unique accessions)") + self.logger.info("") + self.logger.info("--- Studies by Program Comparison ---") + self.logger.info(f"{'Program':<60} {'BDC':>8} {'PrgTbl':>8} {'Change':>10}") + self.logger.info("-" * 88) + + all_programs = sorted(set(bdc_program_counts.keys()) | set(program_table_counts.keys())) + for program in all_programs: + bdc_count = bdc_program_counts.get(program, 0) + pt_count = program_table_counts.get(program, 0) + diff = pt_count - bdc_count + if diff > 0: + change = f"+{diff}" + elif diff < 0: + change = str(diff) + else: + change = "0" + self.logger.info(f"{program:<60} {bdc_count:>8} {pt_count:>8} {change:>10}") + + self.logger.info("-" * 88) + total_diff = len(gen3_studies) - len(bdc_studies) + total_change = f"+{total_diff}" if total_diff > 0 else str(total_diff) + self.logger.info(f"{'TOTAL':<60} {len(bdc_studies):>8} {len(gen3_studies):>8} {total_change:>10}") + + # --- Record Count Reconciliation (explains the TOTAL change) --- + # Load DOI tombstone data + doi_tombstone_accessions = set() + doi_tombstone_records = [] + doi_tombstone_file = self.path_manager.get_gen3_path("bdc_studies_excluded_doi_tombstone.json") + if Path(doi_tombstone_file).exists(): + doi_tombstone_records = bdc_utils.load_json(doi_tombstone_file) + for s in doi_tombstone_records: + base = bdc_utils.extract_base_accession(s.get('Accession', '')) + if base: + doi_tombstone_accessions.add(base) + + # Count BDC records removed due to DOI tombstone + bdc_doi_removed = [s for s in bdc_studies + if bdc_utils.extract_base_accession(s.get('Accession', '')) in doi_tombstone_accessions] + + # Count BDC records whose base accession is NOT in Gen3 at all (not in program table and not tombstoned) + bdc_not_in_gen3 = [s for s in bdc_studies + if bdc_utils.extract_base_accession(s.get('Accession', '')) not in program_table_unique_accessions + and bdc_utils.extract_base_accession(s.get('Accession', '')) not in doi_tombstone_accessions] + + # Count records added: new Gen3 studies not in BDC + pt_new_records = [s for s in gen3_studies + if bdc_utils.extract_base_accession(s.get('Accession', '')) not in bdc_unique_accessions] + + self.logger.info("") + self.logger.info("--- Record Count Reconciliation ---") + self.logger.info(f" BDC Portal records (starting point): {len(bdc_studies):>6}") + self.logger.info(f" (-) BDC records removed: DOI Tombstone (deprecated): {len(bdc_doi_removed):>6}") + self.logger.info(f" (-) BDC records removed: Not in Gen3: {len(bdc_not_in_gen3):>6}") + self.logger.info(f" (-) Records excluded: Non-phs_id / invalid program: {len(excluded):>6}") + self.logger.info(f" (+) New records added: Gen3 studies not in BDC: {len(pt_new_records):>6}") + self.logger.info(f" (+) Records added: Community duplicates from Jira: {len(additional):>6}") + net = len(bdc_studies) - len(bdc_doi_removed) - len(bdc_not_in_gen3) - len(excluded) + len(pt_new_records) + len(additional) + self.logger.info(f" (=) Expected program table records: {net:>6}") + self.logger.info(f" (=) Actual program table records: {len(gen3_studies):>6}") + if net != len(gen3_studies): + self.logger.info(f" (*) Unaccounted difference: {len(gen3_studies) - net:>6}") + + # --- Removed: BDC studies removed due to DOI Tombstone --- + if bdc_doi_removed: + # Group by base accession for cleaner display + from collections import defaultdict + doi_by_acc = defaultdict(list) + for s in bdc_doi_removed: + base = bdc_utils.extract_base_accession(s.get('Accession', '')) + doi_by_acc[base].append(s) + + self.logger.info("") + self.logger.info(f"--- Removed: DOI Tombstone Studies ({len(bdc_doi_removed)} records, {len(doi_by_acc)} unique accessions) ---") + self.logger.info(f" Note: These studies are deprecated in Gen3 (source of truth) and removed from program table.") + self.logger.info(f" {'Accession':<45} {'Records':>8} {'Program in BDC'}") + self.logger.info(f" {'-'*90}") + for acc in sorted(doi_by_acc.keys()): + records = doi_by_acc[acc] + programs = sorted(set(s.get('Program', '') for s in records if s.get('Program'))) + self.logger.info(f" {acc:<45} {len(records):>8} {', '.join(programs)}") + + # --- Removed: BDC studies not found in Gen3 --- + if bdc_not_in_gen3: + not_in_gen3_by_acc = defaultdict(list) + for s in bdc_not_in_gen3: + base = bdc_utils.extract_base_accession(s.get('Accession', '')) + not_in_gen3_by_acc[base].append(s) + + self.logger.info("") + self.logger.info(f"--- Removed: BDC Studies Not in Gen3 ({len(bdc_not_in_gen3)} records, {len(not_in_gen3_by_acc)} unique accessions) ---") + self.logger.info(f" Note: These are on BDC portal but not in Gen3 (source of truth).") + self.logger.info(f" {'Accession':<45} {'Records':>8} {'Program in BDC'}") + self.logger.info(f" {'-'*90}") + for acc in sorted(not_in_gen3_by_acc.keys()): + records = not_in_gen3_by_acc[acc] + programs = sorted(set(s.get('Program', '') for s in records if s.get('Program'))) + self.logger.info(f" {acc:<45} {len(records):>8} {', '.join(programs)}") + + # --- Removed: Excluded non-phs_id / invalid program studies --- + if excluded: + self.logger.info("") + self.logger.info(f"--- Removed: Excluded Studies ({len(excluded)} records) ---") + self.logger.info(f" {'Accession':<45} {'Program':<25} {'Reason'}") + self.logger.info(f" {'-'*100}") + for ex in excluded: + self.logger.info(f" {ex['Accession']:<45} {ex['Invalid Program']:<25} {ex['Reason']}") + + # --- Added: New Gen3 studies not in BDC --- + pt_not_in_bdc = program_table_unique_accessions - bdc_unique_accessions + if pt_not_in_bdc: + self.logger.info("") + self.logger.info(f"--- Added: New Studies from Gen3 ({len(pt_new_records)} records, {len(pt_not_in_bdc)} unique accessions) ---") + self.logger.info(f" Note: These are in Gen3 (source of truth) but not yet on the BDC portal.") + self.logger.info(f" {'Accession':<45} {'Program'}") + self.logger.info(f" {'-'*70}") + for acc in sorted(pt_not_in_bdc): + prog = '' + for s in gen3_studies: + if bdc_utils.extract_base_accession(s.get('Accession', '')) == acc: + prog = s.get('Program', '') + break + self.logger.info(f" {acc:<45} {prog}") + + # --- Updated: Program names changed from Jira --- + if updated: + self.logger.info("") + self.logger.info(f"--- Updated: Program Names Changed from Jira ({len(updated)} records) ---") + self.logger.info(f" {'Accession':<45} {'Old Program':<30} {'New Program'}") + self.logger.info(f" {'-'*100}") + for u in updated: + self.logger.info(f" {u['Accession']:<45} {u['Old Program']:<30} {u['New Program']}") + + self.logger.info("") + self.logger.info("="*80) + self.logger.info("Processing completed successfully!") + + return 0 + + except Exception as e: + self.logger.error(f"Error: {str(e)}", exc_info=True) + return 1 + + +def main(): + parser = argparse.ArgumentParser( + description='Update program names for Gen3 studies from NHLBI Jira data', + formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument('--jira-file', required=True, help='Path to Jira data file (CSV or Excel)') + parser.add_argument('--input-file', required=True, help='Path to input Gen3 studies JSON file') + parser.add_argument('--output-dir', required=True, help='Output directory for results') + + args = parser.parse_args() + + # Setup logging + output_dir = Path(args.output_dir) + bdc_utils.ensure_dir_exists(str(output_dir / "studies_on_gen3_portal")) + log_file = output_dir / "update_programs.log" + logger = bdc_utils.setup_logging(str(log_file)) + + logger.info("="*80) + logger.info("BDC Data Pipeline - Update Program Names") + logger.info("="*80) + logger.info(f"Jira file: {args.jira_file}") + logger.info(f"Input file: {args.input_file}") + logger.info(f"Output directory: {args.output_dir}") + logger.info("") + + updater = JiraProgramUpdater(args.jira_file, args.input_file, args.output_dir, logger) + return updater.run() + + +if __name__ == "__main__": + sys.exit(main())