Skip to content
Merged

update #1359

Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,19 @@ Run `python core.py validate --help` to see the list of validation options.
-rr, --raw-report Report in a raw format as it is generated by
the engine. This flag must be used only with
--output-format JSON.
-mr, --max-report-rows INTEGER Maximum rows per Excel sheet. When exceeded,
creates multiple report files (report_part1.xlsx,
report_part2.xlsx, etc). Default: 1,000,000 rows per sheet
Can be set via MAX_REPORT_ROWS env variable;
if both .env and -mr are specified, the larger value will be used.
If set to 0, no maximum will be enforced.
-me, --max-errors-per-rule INTEGER Imposes a soft maximum number of errors per rule to
enforce. After each dataset is validated for a single rule, the limit is checked
and if it is met or exceeded, the validation for that rule will cease.
Will default to 1000 if not specified.
Can be set via MAX_REPORT_ERRORS env variable;
if both .env and -me are specified, the larger value will be used.
If set to 0, no maximum will be enforced.
-dv, --define-version TEXT Define-XML version used for validation
-dxp, --define-xml-path Path to define-xml file.
-vx, --validate-xml This flag enables XML validation against a Define-XML schema.
Expand Down
2 changes: 2 additions & 0 deletions cdisc_rules_engine/models/validation_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@
"progress",
"define_xml_path",
"validate_xml",
"max_report_rows",
"max_errors_per_rule",
],
)
27 changes: 24 additions & 3 deletions cdisc_rules_engine/rules_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def __init__(
self.external_dictionaries = external_dictionaries
self.define_xml_path: str = kwargs.get("define_xml_path")
self.validate_xml: bool = kwargs.get("validate_xml")
self.max_errors_per_rule: int = kwargs.get("max_errors_per_rule")

def get_schema(self):
return export_rule_data(DatasetVariable, COREActions)
Expand All @@ -101,16 +102,37 @@ def validate_single_rule(self, rule: dict, datasets: Iterable[SDTMDatasetMetadat
rule["conditions"] = ConditionCompositeFactory.get_condition_composite(
rule["conditions"]
)
total_errors = 0
for dataset_metadata in datasets:
if self.max_errors_per_rule and total_errors >= self.max_errors_per_rule:
logger.info(
f"Rule {rule.get('core_id')}: Error limit ({self.max_errors_per_rule}) reached. "
f"Skipping remaining datasets."
)
break
if dataset_metadata.unsplit_name in results and "domains" in rule:
include_split = rule["domains"].get("include_split_datasets", False)
if not include_split:
continue # handling split datasets
results[dataset_metadata.unsplit_name] = self.validate_single_dataset(
dataset_results = self.validate_single_dataset(
rule,
datasets,
dataset_metadata,
)
results[dataset_metadata.unsplit_name] = dataset_results
for result in dataset_results:
if result.get("executionStatus") == "success":
total_errors += len(result.get("errors"))
if (
self.max_errors_per_rule
and total_errors >= self.max_errors_per_rule
):
logger.info(
f"Rule {rule.get('core_id')}: Error limit ({self.max_errors_per_rule}) "
f"reached after processing {dataset_metadata.name}. "
f"Execution halted at {total_errors} total errors."
)
break
return results

def validate_single_dataset(
Expand Down Expand Up @@ -176,8 +198,7 @@ def validate_single_dataset(
Error Message: {str(e)}
Dataset Name: {dataset_metadata.name}
Rule ID: {rule.get("core_id", "unknown")}
Full traceback:
{traceback.format_exc()}
Full traceback: {traceback.format_exc()}
"""
)
error_obj: ValidationErrorContainer = self.handle_validation_exceptions(
Expand Down
201 changes: 190 additions & 11 deletions cdisc_rules_engine/services/reporting/excel_report.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from datetime import datetime
from typing import BinaryIO, List, Optional, Iterable
import os

from openpyxl import Workbook

Expand Down Expand Up @@ -28,6 +29,8 @@ class ExcelReport(BaseReport):
Generates an excel report for a given set of validation results.
"""

DEFAULT_MAX_ROWS = 1000000

def __init__(
self,
datasets: Iterable[SDTMDatasetMetadata],
Expand All @@ -41,27 +44,184 @@ def __init__(
datasets, dataset_paths, validation_results, elapsed_time, args, template
)
self._item_type = "list"
env_max_rows = (
int(os.getenv("MAX_REPORT_ROWS")) if os.getenv("MAX_REPORT_ROWS") else None
)
if env_max_rows is not None and args.max_report_rows is not None:
result = max(env_max_rows, args.max_report_rows)
elif env_max_rows is not None:
result = env_max_rows
elif args.max_report_rows is not None:
result = args.max_report_rows
else:
result = self.DEFAULT_MAX_ROWS
if result == 0:
result = None
elif result < 0:
result = self.DEFAULT_MAX_ROWS
self.max_rows_per_sheet = result

@property
def _file_format(self):
return ReportTypes.XLSX.value.lower()

def _chunk_data(self, data: List[List], chunk_size: int) -> List[List[List]]:
chunks = []
for i in range(0, len(data), chunk_size):
chunks.append(data[i : i + chunk_size])
return chunks

def _needs_splitting(self, summary_data: List, detailed_data: List) -> bool:
if self.max_rows_per_sheet is None:
return False
return (
len(summary_data) > self.max_rows_per_sheet
or len(detailed_data) > self.max_rows_per_sheet
)

def get_export(
self, define_version, cdiscCt, standard, version, dictionary_versions, **kwargs
) -> Workbook:
wb = excel_open_workbook(self._template.read())
self,
define_version,
cdiscCt,
standard,
version,
dictionary_versions,
template_buffer,
**kwargs,
) -> List[Workbook]:
summary_data = self.get_summary_data()
detailed_data = self.get_detailed_data(excel=True)
rules_report_data = self.get_rules_report_data()
if not self._needs_splitting(summary_data, detailed_data):
# Single file - original behavior
wb = self._create_single_workbook(
template_buffer,
summary_data,
detailed_data,
rules_report_data,
define_version,
cdiscCt,
standard,
version,
dictionary_versions,
**kwargs,
)
return [wb]
else:
# Multiple files needed
return self._create_multiple_workbooks(
template_buffer,
summary_data,
detailed_data,
rules_report_data,
define_version,
cdiscCt,
standard,
version,
dictionary_versions,
**kwargs,
)

def _create_single_workbook(
self,
template_buffer,
summary_data,
detailed_data,
rules_report_data,
define_version,
cdiscCt,
standard,
version,
dictionary_versions,
**kwargs,
) -> Workbook:
wb = excel_open_workbook(template_buffer)
excel_update_worksheet(wb["Issue Summary"], summary_data, dict(wrap_text=True))
excel_update_worksheet(wb["Issue Details"], detailed_data, dict(wrap_text=True))
excel_update_worksheet(
wb["Rules Report"], rules_report_data, dict(wrap_text=True)
)
# write conformance data
wb["Conformance Details"]["B2"] = (
datetime.now().replace(microsecond=0).isoformat()
self._populate_metadata_sheets(
wb,
define_version,
cdiscCt,
standard,
version,
dictionary_versions,
**kwargs,
)
return wb

def _create_multiple_workbooks(
self,
template_buffer,
summary_data,
detailed_data,
rules_report_data,
define_version,
cdiscCt,
standard,
version,
dictionary_versions,
**kwargs,
) -> List[Workbook]:
"""
Create multiple workbooks when data exceeds Excel's row limit.
"""
workbooks = []
detailed_chunks = self._chunk_data(detailed_data, self.max_rows_per_sheet)
summary_chunks = self._chunk_data(summary_data, self.max_rows_per_sheet)
summary_needs_split = len(summary_chunks) > 1
num_files = max(len(detailed_chunks), len(summary_chunks))
for i in range(num_files):
wb = excel_open_workbook(template_buffer)
if summary_needs_split:
summary_chunk = summary_chunks[i] if i < len(summary_chunks) else []
else:
summary_chunk = summary_data
detailed_chunk = detailed_chunks[i] if i < len(detailed_chunks) else []
excel_update_worksheet(
wb["Issue Summary"], summary_chunk, dict(wrap_text=True)
)
excel_update_worksheet(
wb["Issue Details"], detailed_chunk, dict(wrap_text=True)
)
excel_update_worksheet(
wb["Rules Report"], rules_report_data, dict(wrap_text=True)
)
self._populate_metadata_sheets(
wb,
define_version,
cdiscCt,
standard,
version,
dictionary_versions,
file_part=i + 1,
total_parts=num_files,
**kwargs,
)
workbooks.append(wb)
return workbooks

def _populate_metadata_sheets(
self,
wb: Workbook,
define_version,
cdiscCt,
standard,
version,
dictionary_versions,
file_part: int = None,
total_parts: int = None,
**kwargs,
):
"""
Populate the conformance and dataset details sheets.
"""
timestamp = datetime.now().replace(microsecond=0).isoformat()
if file_part and total_parts:
timestamp += f" (Part {file_part} of {total_parts})"
wb["Conformance Details"]["B2"] = timestamp
wb["Conformance Details"]["B3"] = f"{round(self._elapsed_time, 2)} seconds"
wb["Conformance Details"]["B4"] = __version__

Expand Down Expand Up @@ -116,7 +276,6 @@ def get_export(
snomed_version = dictionary_versions.get(DictionaryTypes.SNOMED.value)
if snomed_version is not None:
wb["Conformance Details"]["B16"] = snomed_version
return wb

def write_report(self, **kwargs):
logger = logging.getLogger("validator")
Expand All @@ -137,22 +296,42 @@ def write_report(self, **kwargs):
controlled_terminology = get_define_ct(
self._args.dataset_paths, define_version
)
report_data = self.get_export(
template_buffer = self._template.read()
workbooks = self.get_export(
define_version,
controlled_terminology,
self._args.standard,
self._args.version.replace("-", "."),
dictionary_versions,
template_buffer,
substandard=(
self._args.substandard
if hasattr(self._args, "substandard")
else None
),
)
with open(self._output_name, "wb") as f:
f.write(excel_workbook_to_stream(report_data))
if len(workbooks) == 1:
# Single file - use original filename
with open(self._output_name, "wb") as f:
f.write(excel_workbook_to_stream(workbooks[0]))
logger.debug(f"Report written to: {self._output_name}")
else:
# Multiple files - add part numbers
base_name = Path(self._output_name).stem
extension = Path(self._output_name).suffix
parent_dir = Path(self._output_name).parent

for i, wb in enumerate(workbooks, 1):
filename = parent_dir / f"{base_name}_part{i}{extension}"
with open(filename, "wb") as f:
f.write(excel_workbook_to_stream(wb))
logger.debug(f"Report part {i} written to: {filename}")
logger.warning(
f"Data exceeded Excel row limit. Created {len(workbooks)} report files. "
f"Total rows split across files."
)
except Exception as e:
logger.error(e)
logger.error(f"Error writing report: {e}")
raise e
finally:
self._template.close()
Loading
Loading