Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cdisc_rules_engine/enums/execution_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,10 @@ class ExecutionStatus(BaseEnum):
SUCCESS = "success"
SKIPPED = "skipped"
EXECUTION_ERROR = "execution_error"
ISSUE_REPORTED = "issue_reported"
UNKNOWN_STATUS = "unknown_status"


class ExecutionError(BaseEnum):
AN_UNKNOWN_EXCEPTION_HAS_OCCURRED = "An unknown exception has occurred"
COLUMN_NOT_FOUND_IN_DATA = "Column not found in data"
142 changes: 91 additions & 51 deletions cdisc_rules_engine/services/reporting/base_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from openpyxl import Workbook

from cdisc_rules_engine.enums.execution_status import ExecutionStatus
from cdisc_rules_engine.enums.execution_status import ExecutionStatus, ExecutionError
from cdisc_rules_engine.models.rule_validation_result import RuleValidationResult
from cdisc_rules_engine.models.validation_args import Validation_args

Expand Down Expand Up @@ -47,24 +47,24 @@ def get_summary_data(self) -> List[List]:
"""
summary_data = []
for validation_result in self._results:
if validation_result.execution_status == "success":
for result in validation_result.results or []:
dataset = result.get("domain")
if (
result.get("errors")
and result.get("executionStatus") == "success"
):
summary_item = {
"dataset": dataset,
"core_id": validation_result.id,
"message": result.get("message"),
"issues": len(result.get("errors")),
}

if self._item_type == "list":
summary_data.extend([[*summary_item.values()]])
elif self._item_type == "dict":
summary_data.extend([summary_item])
for result in validation_result.results or []:
dataset = result.get("domain")
if (
result.get("errors")
and result.get("executionStatus")
== ExecutionStatus.ISSUE_REPORTED.value
):
summary_item = {
"dataset": dataset,
"core_id": validation_result.id,
"message": result.get("message"),
"issues": len(result.get("errors")),
}

if self._item_type == "list":
summary_data.extend([[*summary_item.values()]])
elif self._item_type == "dict":
summary_data.extend([summary_item])

return sorted(
summary_data,
Expand All @@ -86,6 +86,70 @@ def get_detailed_data(self) -> List[List]:
else (x["core_id"], x["dataset"]),
)

def _issue_details(self, validation_result: RuleValidationResult, result: dict):
errors = []
variables = result.get("variables", [])
for error in [
error
for error in result.get("errors")
if error.get("error")
not in [
ExecutionError.AN_UNKNOWN_EXCEPTION_HAS_OCCURRED.value,
ExecutionError.COLUMN_NOT_FOUND_IN_DATA.value,
]
]:
error_item = {
"core_id": validation_result.id,
"message": result.get("message"),
"executability": validation_result.executability,
"dataset": result.get("domain") or "",
"USUBJID": error.get("USUBJID", ""),
"row": error.get("row", ""),
"SEQ": error.get("SEQ", ""),
}

if self._item_type == "list":
error_item["variables"] = ", ".join(variables)
error_item["values"] = ", ".join(
[
str(error.get("value", {}).get(variable))
for variable in variables
]
)
errors = errors + [[*error_item.values()]]
elif self._item_type == "dict":
error_item["variables"] = variables
error_item["values"] = [
str(error.get("value", {}).get(variable)) for variable in variables
]
errors = errors + [error_item]
return errors

def _error_details(self, validation_result: RuleValidationResult, result: dict):
errors = []
for error in [
error
for error in result.get("errors")
if error.get("error")
== ExecutionError.AN_UNKNOWN_EXCEPTION_HAS_OCCURRED.value
]:
error_item = {
"core_id": validation_result.id,
"message": (f"{result.get('message')} - {error.get('error')}"),
"executability": validation_result.executability,
"dataset": result.get("domain") or "",
"USUBJID": "",
"row": "",
"SEQ": "",
"variables": "",
"values": error.get("message"),
}
if self._item_type == "list":
errors = errors + [[*error_item.values()]]
elif self._item_type == "dict":
errors = errors + [error_item]
return errors

def _generate_error_details(
self, validation_result: RuleValidationResult
) -> List[List]:
Expand All @@ -107,35 +171,11 @@ def _generate_error_details(
"""
errors = []
for result in validation_result.results or []:
if result.get("errors", []) and result.get("executionStatus") == "success":
variables = result.get("variables", [])
for error in result.get("errors"):
error_item = {
"core_id": validation_result.id,
"message": result.get("message"),
"executability": validation_result.executability,
"dataset": result.get("domain"),
"USUBJID": error.get("USUBJID", ""),
"row": error.get("row", ""),
"SEQ": error.get("SEQ", ""),
}

if self._item_type == "list":
error_item["variables"] = ", ".join(variables)
error_item["values"] = ", ".join(
[
str(error.get("value", {}).get(variable))
for variable in variables
]
)
errors = errors + [[*error_item.values()]]
elif self._item_type == "dict":
error_item["variables"] = variables
error_item["values"] = [
str(error.get("value", {}).get(variable))
for variable in variables
]
errors = errors + [error_item]
errors = (
errors
+ self._issue_details(validation_result, result)
+ self._error_details(validation_result, result)
)
return errors

def get_rules_report_data(self) -> List[List]:
Expand All @@ -162,9 +202,9 @@ def get_rules_report_data(self) -> List[List]:
"fda_rule_id": validation_result.fda_rule_id,
"pmda_rule_id": validation_result.pmda_rule_id,
"message": validation_result.message,
"status": ExecutionStatus.SUCCESS.value.upper()
if validation_result.execution_status == ExecutionStatus.SUCCESS.value
else ExecutionStatus.SKIPPED.value.upper(),
"status": ExecutionStatus(
validation_result.execution_status
).value.upper(),
}
if self._item_type == "list":
rules_report.append([*rules_item.values()])
Expand Down
80 changes: 65 additions & 15 deletions cdisc_rules_engine/utilities/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
SUPPLEMENTARY_DOMAINS,
)
from cdisc_rules_engine.constants.classes import SPECIAL_PURPOSE
from cdisc_rules_engine.enums.execution_status import ExecutionStatus
from cdisc_rules_engine.enums.execution_status import ExecutionError, ExecutionStatus
from cdisc_rules_engine.interfaces import ConditionInterface
from cdisc_rules_engine.models.base_validation_entity import BaseValidationEntity

Expand Down Expand Up @@ -44,25 +44,75 @@ def mark_domain_as_validated(domain: str, validated_domains: Set[str]):

def get_execution_status(results):
"""
If all results have skipped status, return skipped.
Else return success
If any result has an execution error, return execution error
Else if any result has an issue reported, return issue reported
Else if any result is successful, return issue successful
Else, result should have all skips, return issue skipped
"""
if len(results) == 0:
return ExecutionStatus.SUCCESS.value
if isinstance(results[0], BaseValidationEntity):
successful_results = [
entity for entity in results if entity.status == ExecutionStatus.SUCCESS
]
else:
successful_results = [
result
for result in results
if result.get("executionStatus") == ExecutionStatus.SUCCESS.value
]
if successful_results:
status = (
{
ExecutionStatus.SUCCESS: [],
ExecutionStatus.EXECUTION_ERROR: [],
ExecutionStatus.ISSUE_REPORTED: results,
ExecutionStatus.SKIPPED: [],
}
if isinstance(results[0], BaseValidationEntity)
else {
ExecutionStatus.SUCCESS: [
result
for result in results
if result.get("executionStatus") == ExecutionStatus.SUCCESS.value
],
ExecutionStatus.EXECUTION_ERROR: [
result
for result in results
if result.get("executionStatus")
== ExecutionStatus.EXECUTION_ERROR.value
and [
error
for error in result.get("errors", [])
if error.get("error")
== ExecutionError.AN_UNKNOWN_EXCEPTION_HAS_OCCURRED.value
]
],
ExecutionStatus.ISSUE_REPORTED: [
result
for result in results
if result.get("executionStatus") == ExecutionStatus.ISSUE_REPORTED.value
],
ExecutionStatus.SKIPPED: [
result
for result in results
if result.get("executionStatus") == ExecutionStatus.SKIPPED.value
or [
error
for error in result.get("errors", [])
if error.get("error")
== ExecutionError.COLUMN_NOT_FOUND_IN_DATA.value
]
],
}
)
print("break")
if len(results) != (
len(status[ExecutionStatus.SUCCESS])
+ len(status[ExecutionStatus.EXECUTION_ERROR])
+ len(status[ExecutionStatus.ISSUE_REPORTED])
+ len(status[ExecutionStatus.SKIPPED])
):
return ExecutionStatus.UNKNOWN_STATUS.value
elif status[ExecutionStatus.EXECUTION_ERROR]:
return ExecutionStatus.EXECUTION_ERROR.value
elif status[ExecutionStatus.ISSUE_REPORTED]:
return ExecutionStatus.ISSUE_REPORTED.value
elif status[ExecutionStatus.SUCCESS]:
return ExecutionStatus.SUCCESS.value
else:
elif status[ExecutionStatus.SKIPPED]:
return ExecutionStatus.SKIPPED.value
else:
return ExecutionStatus.UNKNOWN_STATUS.value


def get_standard_codelist_cache_key(standard: str, version: str) -> str:
Expand Down
Binary file modified resources/templates/report-template.xlsx
Binary file not shown.
Loading