|
| 1 | +import re |
| 2 | +from pathlib import Path |
| 3 | +from typing import Any, Dict, List |
| 4 | + |
| 5 | +import yaml |
| 6 | +from bs4 import BeautifulSoup |
| 7 | + |
| 8 | +from .context_url import CONTEXTFILE_URL |
| 9 | +from .jsonldutils import get_context_version |
| 10 | +from .models import Activity, Item, Protocol, write_obj_jsonld |
| 11 | + |
| 12 | +PROTOCOL_KEYS_REQUIRED = [ |
| 13 | + "protocol_name", |
| 14 | + "protocol_display_name", |
| 15 | + "redcap_version", |
| 16 | +] |
| 17 | + |
| 18 | + |
| 19 | +def read_check_yaml_config(yaml_path: str) -> Dict[str, Any]: |
| 20 | + """Read and check the YAML configuration file.""" |
| 21 | + try: |
| 22 | + with open(yaml_path, "r", encoding="utf-8") as f: |
| 23 | + protocol = yaml.safe_load(f) |
| 24 | + except yaml.YAMLError as e: |
| 25 | + raise ValueError(f"Invalid YAML file: {str(e)}") |
| 26 | + if set(PROTOCOL_KEYS_REQUIRED) - set(protocol.keys()): |
| 27 | + raise ValueError( |
| 28 | + f"Missing required keys in YAML file: {set(PROTOCOL_KEYS_REQUIRED) - set(protocol.keys())}" |
| 29 | + ) |
| 30 | + return protocol |
| 31 | + |
| 32 | + |
| 33 | +def normalize_condition(condition_str, field_type=None): |
| 34 | + """Normalize condition strings with specific handling for calc fields.""" |
| 35 | + |
| 36 | + # Handle boolean values |
| 37 | + if isinstance(condition_str, bool): |
| 38 | + return condition_str |
| 39 | + if isinstance(condition_str, str): |
| 40 | + if condition_str.lower() == "true": |
| 41 | + return True |
| 42 | + if condition_str.lower() == "false": |
| 43 | + return False |
| 44 | + |
| 45 | + # Convert to string if needed |
| 46 | + if not isinstance(condition_str, str): |
| 47 | + try: |
| 48 | + condition_str = str(condition_str) |
| 49 | + except: |
| 50 | + raise ValueError("Condition must be a string or boolean") |
| 51 | + |
| 52 | + # Clean HTML |
| 53 | + condition_str = BeautifulSoup(condition_str, "html.parser").get_text() |
| 54 | + condition_str = condition_str.strip() |
| 55 | + |
| 56 | + if condition_str is None: |
| 57 | + return None |
| 58 | + |
| 59 | + # Common operator normalizations for all types |
| 60 | + operator_replacements = [ |
| 61 | + (r"\s*\+\s*", " + "), # Normalize spacing around + |
| 62 | + (r"\s*-\s*", " - "), # Normalize spacing around - |
| 63 | + (r"\s*\*\s*", " * "), # Normalize spacing around * |
| 64 | + (r"\s*\/\s*", " / "), # Normalize spacing around / |
| 65 | + (r"\s*\(\s*", "("), # Remove spaces after opening parenthesis |
| 66 | + (r"\s*\)\s*", ")"), # Remove spaces before closing parenthesis |
| 67 | + (r"\s*,\s*", ","), # Normalize spaces around commas |
| 68 | + (r"\s+", " "), # Normalize multiple spaces |
| 69 | + ] |
| 70 | + |
| 71 | + # Apply operator normalizations first |
| 72 | + for pattern, repl in operator_replacements: |
| 73 | + condition_str = re.sub(pattern, repl, condition_str) |
| 74 | + |
| 75 | + # Then apply type-specific replacements |
| 76 | + if field_type in ["sql", "calc"]: |
| 77 | + # For calc fields, just remove brackets from field references |
| 78 | + condition_str = re.sub(r"\[([^\]]+)\]", r"\1", condition_str) |
| 79 | + else: |
| 80 | + # For branching logic |
| 81 | + replacements = [ |
| 82 | + (r"\(([0-9]*)\)", r"___\1"), |
| 83 | + (r"([^>|<])=", r"\1=="), |
| 84 | + (r"\[([^\]]*)\]", r"\1"), # Remove brackets and extra spaces |
| 85 | + (r"\bor\b", "||"), |
| 86 | + (r"\band\b", "&&"), |
| 87 | + (r'"', "'"), |
| 88 | + ] |
| 89 | + for pattern, repl in replacements: |
| 90 | + condition_str = re.sub(pattern, repl, condition_str) |
| 91 | + |
| 92 | + result = condition_str.strip() |
| 93 | + return result |
| 94 | + |
| 95 | + |
| 96 | +def parse_html(input_string, default_language="en"): |
| 97 | + """ |
| 98 | + Parse HTML content and extract language-specific text. |
| 99 | +
|
| 100 | + Args: |
| 101 | + input_string: The HTML string to parse |
| 102 | + default_language: Default language code (default: "en") |
| 103 | +
|
| 104 | + Returns: |
| 105 | + dict: Dictionary of language codes to text content, or None if invalid |
| 106 | + """ |
| 107 | + try: |
| 108 | + result = {} |
| 109 | + |
| 110 | + # Handle non-string input |
| 111 | + if not isinstance(input_string, str): |
| 112 | + try: |
| 113 | + input_string = str(input_string) |
| 114 | + except: |
| 115 | + return None |
| 116 | + |
| 117 | + # Clean input string |
| 118 | + input_string = input_string.strip() |
| 119 | + if not input_string: |
| 120 | + return None |
| 121 | + |
| 122 | + # Parse HTML |
| 123 | + soup = BeautifulSoup(input_string, "html.parser") |
| 124 | + |
| 125 | + # Find elements with lang attribute |
| 126 | + lang_elements = soup.find_all(True, {"lang": True}) |
| 127 | + |
| 128 | + if lang_elements: |
| 129 | + # Process elements with language tags |
| 130 | + for element in lang_elements: |
| 131 | + lang = element.get("lang", default_language).lower() |
| 132 | + text = element.get_text(strip=True) |
| 133 | + if text: |
| 134 | + result[lang] = text |
| 135 | + |
| 136 | + # If no text was extracted but elements exist, try getting default text |
| 137 | + if not result: |
| 138 | + text = soup.get_text(strip=True) |
| 139 | + if text: |
| 140 | + result[default_language] = text |
| 141 | + else: |
| 142 | + # No language tags found, use default language |
| 143 | + text = soup.get_text(strip=True) |
| 144 | + if text: |
| 145 | + result[default_language] = text |
| 146 | + |
| 147 | + return result if result else None |
| 148 | + |
| 149 | + except Exception as e: |
| 150 | + print(f"Error parsing HTML: {str(e)}, trying plain text") |
| 151 | + # Try to return plain text if HTML parsing fails |
| 152 | + try: |
| 153 | + if isinstance(input_string, str) and input_string.strip(): |
| 154 | + return {default_language: input_string.strip()} |
| 155 | + except: |
| 156 | + raise ValueError(f"Invalid input for HTML parsing: {input_string}") |
| 157 | + |
| 158 | + |
| 159 | +def create_activity_schema( |
| 160 | + activity_name: str, |
| 161 | + activity_data: Dict[str, Any], |
| 162 | + output_path: Path, |
| 163 | + redcap_version: str, |
| 164 | + contextfile_url: str = CONTEXTFILE_URL, |
| 165 | +): |
| 166 | + json_ld = { |
| 167 | + "category": "reproschema:Activity", |
| 168 | + "id": f"{activity_name}_schema", |
| 169 | + "prefLabel": {"en": activity_name}, |
| 170 | + "schemaVersion": get_context_version(contextfile_url), |
| 171 | + "version": redcap_version, |
| 172 | + "ui": { |
| 173 | + "order": activity_data[ |
| 174 | + "order" |
| 175 | + ], # TODO spr czy to jest "clean order" i "clean bl list"? |
| 176 | + "addProperties": activity_data["addProperties"], |
| 177 | + "shuffle": False, |
| 178 | + }, |
| 179 | + } |
| 180 | + |
| 181 | + if activity_data["compute"]: |
| 182 | + json_ld["compute"] = activity_data["compute"] |
| 183 | + if activity_data.get("preamble"): |
| 184 | + json_ld["preamble"] = activity_data["preamble"] |
| 185 | + act = Activity(**json_ld) |
| 186 | + path = output_path / "activities" / activity_name |
| 187 | + path.mkdir(parents=True, exist_ok=True) |
| 188 | + write_obj_jsonld( |
| 189 | + act, |
| 190 | + path / f"{activity_name}_schema", |
| 191 | + contextfile_url=contextfile_url, |
| 192 | + ) |
| 193 | + |
| 194 | + items_path = path / "items" |
| 195 | + items_path.mkdir(parents=True, exist_ok=True) |
| 196 | + |
| 197 | + for item in activity_data["items"]: |
| 198 | + item_path = items_path / item["id"] |
| 199 | + item_path.parent.mkdir(parents=True, exist_ok=True) |
| 200 | + write_obj_jsonld( |
| 201 | + Item(**item), item_path, contextfile_url=CONTEXTFILE_URL |
| 202 | + ) |
| 203 | + print(f"{activity_name} Instrument schema created") |
| 204 | + |
| 205 | + |
| 206 | +def create_protocol_schema( |
| 207 | + protocol_data: Dict[str, Any], |
| 208 | + activities: List[str], |
| 209 | + output_path: Path, |
| 210 | + contextfile_url: str = CONTEXTFILE_URL, |
| 211 | +): |
| 212 | + protocol_name = protocol_data["protocol_name"].strip().replace(" ", "_") |
| 213 | + protocol_schema = { |
| 214 | + "category": "reproschema:Protocol", |
| 215 | + "id": f"{protocol_name}_schema", |
| 216 | + "prefLabel": {"en": protocol_data["protocol_display_name"]}, |
| 217 | + "description": {"en": protocol_data.get("protocol_description", "")}, |
| 218 | + "schemaVersion": get_context_version(contextfile_url), |
| 219 | + "version": protocol_data["redcap_version"], |
| 220 | + "ui": { |
| 221 | + "addProperties": [ |
| 222 | + { |
| 223 | + "isAbout": f"../activities/{activity}/{activity}_schema", |
| 224 | + "variableName": f"{activity}_schema", |
| 225 | + "prefLabel": {"en": activity.replace("_", " ").title()}, |
| 226 | + "isVis": True, |
| 227 | + } |
| 228 | + for activity in activities |
| 229 | + ], |
| 230 | + "order": [ |
| 231 | + f"../activities/{activity}/{activity}_schema" |
| 232 | + for activity in activities |
| 233 | + ], |
| 234 | + "shuffle": False, |
| 235 | + }, |
| 236 | + } |
| 237 | + |
| 238 | + protocol_dir = output_path / protocol_name |
| 239 | + protocol_dir.mkdir(parents=True, exist_ok=True) |
| 240 | + write_obj_jsonld( |
| 241 | + Protocol(**protocol_schema), |
| 242 | + protocol_dir / f"{protocol_name}_schema", |
| 243 | + contextfile_url=contextfile_url, |
| 244 | + ) |
| 245 | + print(f"Protocol schema created in {protocol_dir}") |
0 commit comments