diff --git a/.github/workflows/benchmarks.yaml b/.github/workflows/benchmarks.yaml index 67661dc..6b682af 100644 --- a/.github/workflows/benchmarks.yaml +++ b/.github/workflows/benchmarks.yaml @@ -33,10 +33,6 @@ jobs: --log-cli-level=INFO \ --random-subset=1 \ --html report/report.html --self-contained-html \ - --track-metrics-json=report/metrics.json \ - --track-metrics-parquet-s3-bucket="apex-benchmarks" \ - --track-metrics-parquet-s3-key="metrics/v1/metrics.parquet" \ - --track-metrics-parquet-partitioning="YYYYMM" \ --basetemp=tmp_path_root \ --upload-assets-s3-bucket="apex-benchmarks" | tee pytest_output.txt env: @@ -49,16 +45,12 @@ jobs: APEX_ALGORITHMS_S3_ENDPOINT_URL: "https://s3.waw3-1.cloudferro.com" APEX_ALGORITHMS_S3_DEFAULT_REGION: "waw3-1" - - name: Issue Generation - if: ${{ failure() }} + - name: Handle Benchmark Results + if: ${{ always() }} env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - run: | - if grep -q "FAILED" qa/benchmarks/pytest_output.txt; then - python qa/tools/apex_algorithm_qa_tools/test_failure_handler.py - else - echo "No test failures detected" - fi + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GITHUB_REPO: ${{ github.repository }} + run: python qa/tools/apex_algorithm_qa_tools/issue_handler.py - name: List local reports if: ${{ !cancelled() }} diff --git a/qa/benchmarks/tests/test_benchmarks.py b/qa/benchmarks/tests/test_benchmarks.py index ba8cb61..f7466bc 100644 --- a/qa/benchmarks/tests/test_benchmarks.py +++ b/qa/benchmarks/tests/test_benchmarks.py @@ -24,6 +24,7 @@ # Use scenario id as parameterization id to give nicer test names. pytest.param(uc, id=uc.id) for uc in get_benchmark_scenarios() + if uc.id == "max_ndvi_composite" #TODO remove it, after verifying fix ], ) def test_run_benchmark( diff --git a/qa/tools/apex_algorithm_qa_tools/issue_handler.py b/qa/tools/apex_algorithm_qa_tools/issue_handler.py new file mode 100644 index 0000000..f151c3d --- /dev/null +++ b/qa/tools/apex_algorithm_qa_tools/issue_handler.py @@ -0,0 +1,289 @@ + +import os +import re +import json +import logging +import argparse +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, Iterator, List, Optional + +import requests +from apex_algorithm_qa_tools.scenarios import get_benchmark_scenarios, get_project_root + +# ----------------------------------------------------------------------------- +# Configuration dataclass +# ----------------------------------------------------------------------------- +@dataclass +class GitHubConfig: + repo: str + token: str + label: str = "benchmark-failure" + page_size: int = 100 + +# ----------------------------------------------------------------------------- +# Issue and Scenario Data Models +# ----------------------------------------------------------------------------- +@dataclass +class Scenario: + id: str + description: str + backend: str + process_graph: str + contacts: List[Dict[str, Any]] + scenario_link: str + +@dataclass +class TestRecord: + scenario_id: str + status: str # 'PASSED' or 'FAILED' + logs: Optional[str] = None + +# ----------------------------------------------------------------------------- +# Issue Manager +# ----------------------------------------------------------------------------- +class IssueManager: + def __init__(self, config: GitHubConfig, workflow_run_url: str): + self.config = config + self.headers = { + "Authorization": f"Bearer {config.token}", + "Accept": "application/vnd.github.v3+json" + } + self.base_issues_url = f"https://api.github.com/repos/{config.repo}/issues" + self.workflow_run_url = workflow_run_url + self.logger = logging.getLogger(self.__class__.__name__) + + def get_existing_issues(self) -> Dict[str, Any]: + """ + Retrieve all open issues with the given label, handling pagination. + """ + page = 1 + existing = {} + self.logger.info("Fetching existing issues with label '%s'", self.config.label) + while True: + params = {"state": "open", "labels": self.config.label, "per_page": self.config.page_size, "page": page} + resp = requests.get(self.base_issues_url, headers=self.headers, params=params) + resp.raise_for_status() + issues = resp.json() + if not issues: + break + for issue in issues: + title = issue.get("title") + if title: + existing[title] = issue + page += 1 + self.logger.info("Fetched %d open issues", len(existing)) + return existing + + def build_issue_body(self, scenario: Scenario, logs: str) -> str: + + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") + contact_entries = [] + self.logger.info("Building issue body for scenario '%s'", scenario.id) + if scenario.contacts: + primary = scenario.contacts[0] + name = primary.get("name", "") + org = primary.get("organization", "") + instr = primary.get("contactInstructions", "") + links = primary.get("links", []) + if links: + link_strs = [f"[{l.get('title','link')}]({l.get('href','#')})" for l in links] + instr += " (" + ", ".join(link_strs) + ")" + contact_entries.append(f"| {name} | {org} | {instr} |") + + contact_table = ( + "\n**Point of Contact**:\n\n" + "| Name | Organization | Contact |\n" + "|------|--------------|---------|\n" + + "\n".join(contact_entries) + if contact_entries else "" + ) + + body = ( + f"##Scenario Failure: {scenario.id}\n\n" + f"**Scenario ID**: {scenario.id}\n" + f"**Backend**: {scenario.backend}\n" + f"**Timestamp**: {timestamp}\n\n" + f"**Links**:\n" + f"- Workflow Run: {self.workflow_run_url}\n" + f"- Scenario Definition: {scenario.scenario_link}\n" + f"- Artifacts: {self.workflow_run_url}#artifacts\n\n" + "---\n" + f"### Contact Information{contact_table}\n" + "---\n\n" + "### Process Graph\n" + "```json\n" + f"{scenario.process_graph}\n" + "```\n\n" + "---\n" + "### Error Logs\n" + "```plaintext\n" + f"{logs}\n" + "```\n" + ) + return body + + def build_comment_body(self, scenario: Scenario, success: bool) -> str: + status = "Success" if success else "Failure" + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") + self.logger.info("Building comment body for scenario '%s' with status '%s'", scenario.id, status) + return ( + f"## Status: {status}\n" + f"## Benchmark: {scenario.id}\n" + f"**Scenario ID**: {scenario.id}\n" + f"**Backend**: {scenario.backend}\n" + f"**Timestamp**: {timestamp}\n\n" + f"**Links**:\n" + f"- Workflow Run: {self.workflow_run_url}\n" + f"- Scenario Definition: {scenario.scenario_link}\n" + f"- Artifacts: {self.workflow_run_url}#artifacts\n" + ) + + def create_issue(self, scenario: Scenario, logs: str) -> None: + self.logger.info("Creating issue for scenario '%s'", scenario.id) + title = f"Scenario Failure: {scenario.id}" + body = self.build_issue_body(scenario, logs) + payload = {"title": title, "body": body, "labels": [self.config.label]} + resp = requests.post(self.base_issues_url, headers=self.headers, json=payload) + resp.raise_for_status() + url = resp.json().get("html_url") + self.logger.info("Created issue %s", url) + + def comment_issue(self, issue_number: int, body: str) -> None: + self.logger.info("Commenting on issue #%d", issue_number) + url = f"{self.base_issues_url}/{issue_number}/comments" + resp = requests.post(url, headers=self.headers, json={"body": body}) + resp.raise_for_status() + self.logger.info("Commented on issue #%d", issue_number) + +# ----------------------------------------------------------------------------- +# Scenario Processor +# ----------------------------------------------------------------------------- +class ScenarioProcessor: + def __init__(self): + self.logger = logging.getLogger(self.__class__.__name__) + + def get_scenario_details(self, scenario_id: str) -> Optional[Scenario]: + self.logger.info("Fetching details for benchmark scenario '%s'", scenario_id) + + for sc in get_benchmark_scenarios(): + if sc.id == scenario_id: + details = Scenario( + id=sc.id, + description=sc.description, + backend=sc.backend, + process_graph=json.dumps(sc.process_graph, indent=2), + contacts=self.load_contacts(scenario_id), + scenario_link=self.build_scenario_link(scenario_id), + ) + return details + self.logger.info("Scenario '%s' not found", scenario_id) + return None + + def load_contacts(self, scenario_id: str) -> List[Any]: + self.logger.info("Loading contacts for scenario '%s'", scenario_id) + root = get_project_root() / "algorithm_catalog" + for provider in root.iterdir(): + rec = provider / scenario_id / "records" / f"{scenario_id}.json" + if rec.exists(): + try: + data = json.loads(rec.read_text()) + return data.get("properties", {}).get("contacts", []) + except json.JSONDecodeError as e: + self.logger.info("Invalid JSON in %s: %s", rec, e) + self.logger.info("No contacts for '%s'", scenario_id) + return [] + + def build_scenario_link(self, scenario_id: str) -> str: + self.logger.info("Building scenario link for '%s'", scenario_id) + sha = os.getenv("GITHUB_SHA", "main") + base = f"https://github.com/{os.getenv('GITHUB_REPO')}/blob/{sha}" + root = get_project_root() / "algorithm_catalog" + for provider in root.iterdir(): + path = provider / scenario_id / "benchmark_scenarios" / f"{scenario_id}.json" + if path.exists(): + rel = path.relative_to(get_project_root()) + return f"{base}/{rel.as_posix()}" + self.logger.info("No scenario definition for '%s'", scenario_id) + return "" + + def parse_results(self) -> List[TestRecord]: + self.logger.info("Parsing test results from pytest output") + text_path = Path("qa/benchmarks/pytest_output.txt") + if not text_path.exists(): + self.logger.info("Log file not found: %s", text_path) + return [] + content = text_path.read_text() + records: List[TestRecord] = [] + # capture individual test outcomes + for match in re.finditer(r"(test_run_benchmark\[(.*?)\]).*?\s(PASSED|FAILED)", content): + test_name = match.group(1) + scenario_id = match.group(2) + status = match.group(3) + logs = None + self.logger.info("Found test '%s' with status '%s'", test_name, status) + if status == 'FAILED': + self.logger.info("Capturing logs for failed test '%s'", test_name) + # grab failure block + fail_block = re.search( + rf"=+ FAILURES =+.*?{re.escape(test_name)}.*?\n(.*?)(?=\n=+|\Z)", + content, re.DOTALL + ) + if fail_block: + logs = fail_block.group(1).strip() + records.append(TestRecord(scenario_id=scenario_id, status=status, logs=logs)) + self.logger.info("Parsed %d test records", len(records)) + return records + +# ----------------------------------------------------------------------------- +# Unified Handler +# ----------------------------------------------------------------------------- +def main(): + + repo = os.getenv("GITHUB_REPO", "ESA-APEx/apex_algorithms") + token = os.getenv("GITHUB_TOKEN") + if not token: + raise EnvironmentError("GITHUB_TOKEN not set.") + + logging.basicConfig(level=logging.INFO, + format="%(asctime)s %(name)s %(levelname)s %(message)s") + logger = logging.getLogger() + + run_id = os.getenv("GITHUB_RUN_ID", "0") + workflow_url = f"https://github.com/{repo}/actions/runs/{run_id}" + + config = GitHubConfig(repo=repo, token=token) + manager = IssueManager(config, workflow_url) + processor = ScenarioProcessor() + + # parse all test results + results = processor.parse_results() + open_issues = manager.get_existing_issues() + + # handle each result + for rec in results: + title = f"Scenario Failure: {rec.scenario_id}" #TODO loosely coupled with create_issue; tighten to avoid bugs + scen = processor.get_scenario_details(rec.scenario_id) + if not scen: + logger.info("No scenario details found for '%s'", rec.scenario_id) + continue + if rec.status == 'FAILED': + logger.info("Processing failure for scenario '%s'", rec.scenario_id) + if title in open_issues: + logger.info("Updating existing issue for '%s'", rec.scenario_id) + num = open_issues[title]['number'] + manager.comment_issue(num, manager.build_comment_body(scen, success=False)) + else: + logger.info("Creating new issue for '%s'", rec.scenario_id) + manager.create_issue(scen, rec.logs or "No logs captured.") + else: # PASSED + logger.info("Processing success for scenario '%s'", rec.scenario_id) + if title in open_issues: + logger.info("Updating existing issue for '%s'", rec.scenario_id) + num = open_issues[title]['number'] + manager.comment_issue(num, manager.build_comment_body(scen, success=True)) + +if __name__ == "__main__": + main() + diff --git a/qa/tools/apex_algorithm_qa_tools/test_failure_handler.py b/qa/tools/apex_algorithm_qa_tools/test_failure_handler.py deleted file mode 100644 index 444b0c5..0000000 --- a/qa/tools/apex_algorithm_qa_tools/test_failure_handler.py +++ /dev/null @@ -1,257 +0,0 @@ - -import os -import re -import json -import logging -from datetime import datetime -from pathlib import Path -from typing import Dict, List, Optional, Any - -import requests -from apex_algorithm_qa_tools.scenarios import get_benchmark_scenarios, get_project_root - -GITHUB_REPO = "ESA-APEx/apex_algorithms" -GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") -if not GITHUB_TOKEN: - raise EnvironmentError("GITHUB_TOKEN environment variable is not set.") - -ISSUE_LABEL = "benchmark-failure" -GITHUB_REPOSITORY = os.getenv("GITHUB_REPOSITORY", "unknown/repo") -GITHUB_RUN_ID = os.getenv("GITHUB_RUN_ID", "0") -WORKFLOW_BASE_URL = f"https://github.com/{GITHUB_REPOSITORY}/actions/runs/{GITHUB_RUN_ID}" -GITHUB_SHA = os.getenv("GITHUB_SHA", "main") - -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -logger = logging.getLogger(__name__) - -class GitHubIssueManager: - """ - Handles interactions with the GitHub API, including building the issue body - and creating new issues. - """ - def __init__(self): - self.repo = GITHUB_REPO - self.token = GITHUB_TOKEN - self.issue_label = ISSUE_LABEL - self.workflow_base_url = WORKFLOW_BASE_URL - - def get_existing_issues(self) -> Dict[str, Any]: - """ - Retrieve a mapping of existing open issue titles to issue details. - """ - url = f"https://api.github.com/repos/{self.repo}/issues?state=open&labels={self.issue_label}" - headers = {"Authorization": f"Bearer {self.token}"} - try: - response = requests.get(url, headers=headers) - response.raise_for_status() - issues = response.json() - return {issue["title"]: issue for issue in issues if issue.get("state") == "open"} - except requests.RequestException as e: - logger.error("Failed to fetch issues: %s. Response: %s", e, getattr(response, "text", "No response")) - return {} - - def build_issue_body(self, scenario: Dict[str, Any], logs: str, failure_count: int) -> str: - """ - Build the GitHub issue body based on scenario details, logs, and contacts. - """ - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - contacts = scenario.get("contacts", []) - scenario_link = scenario.get("scenario_link", "") - - contact_table = "" - if contacts: - try: - contact_table += "\n\n**Point of Contact**:\n\n" - contact_table += "| Name | Organization | Contact |\n" - contact_table += "|------|--------------|---------|\n" - primary_contact = contacts[0] - contact_info = primary_contact.get('contactInstructions', '') - if primary_contact.get('links'): - links = [f"[{link.get('title', 'link')}]({link.get('href', '#')})" - for link in primary_contact.get('links', [])] - contact_info += " (" + ", ".join(links) + ")" - contact_table += ( - f"| {primary_contact.get('name', '')} " - f"| {primary_contact.get('organization', '')} " - f"| {contact_info} |\n" - ) - except Exception as e: - logger.error("Error constructing contact table for scenario '%s': %s", scenario['id'], e) - - body = ( - f"## Benchmark Failure: {scenario['id']}\n\n" - f"**Scenario ID**: {scenario['id']}\n" - f"**Backend System**: {scenario['backend']}\n" - f"**Failure Count**: {failure_count}\n" - f"**Timestamp**: {timestamp}\n\n" - f"**Links**:\n" - f"- Workflow Run: {self.workflow_base_url}\n" - f"- Scenario Definition: {scenario_link}\n" - f"- Artifacts: {self.workflow_base_url}#artifacts\n\n" - "---\n" - f"### Contact Information\n{contact_table}\n" - "---\n\n" - "### Process Graph\n" - "```json\n" - f"{scenario['process_graph']}\n" - "```\n\n" - "---\n" - "### Error Logs\n" - "```plaintext\n" - f"{logs}\n" - "```\n" - ) - return body - - def create_issue(self, scenario: Dict[str, Any], logs: str) -> None: - """ - Create a new GitHub issue for the given scenario failure. - """ - issue_body = self.build_issue_body(scenario, logs, failure_count=1) - url = f"https://api.github.com/repos/{self.repo}/issues" - headers = { - "Authorization": f"Bearer {self.token}", - "Accept": "application/vnd.github.v3+json" - } - data = { - "title": f"Scenario Failure: {scenario['id']}", - "body": issue_body, - "labels": [self.issue_label] - } - try: - response = requests.post(url, json=data, headers=headers) - response.raise_for_status() - issue_url = response.json().get("html_url", "URL not available") - logger.info("Created new issue: %s", issue_url) - except requests.RequestException as e: - logger.error("Failed to create issue for scenario '%s': %s", scenario['id'], e) - -class ScenarioProcessor: - """ - Processes scenario details, including retrieving scenario data, - contacts, and parsing the log file for failed tests. - """ - def get_scenario_details(self, scenario_id: str) -> Optional[Dict[str, Any]]: - """ - Retrieve scenario details by ID. - """ - try: - for scenario in get_benchmark_scenarios(): - if scenario.id == scenario_id: - details = { - "id": scenario.id, - "description": scenario.description, - "backend": scenario.backend, - "process_graph": json.dumps(scenario.process_graph, indent=2) - } - # Add contacts and scenario link using helper methods. - details["contacts"] = self.get_scenario_contacts(scenario_id) - details["scenario_link"] = self.get_scenario_link(scenario_id) - return details - logger.warning("Scenario '%s' not found", scenario_id) - return None - except Exception as e: - logger.error("Error loading scenarios: %s", e) - return None - - def get_scenario_contacts(self, scenario_id: str) -> List[Any]: - """ - Retrieve contact information for a scenario from the algorithm catalog. - """ - algorithm_catalog = get_project_root() / "algorithm_catalog" - for provider_dir in algorithm_catalog.iterdir(): - if not provider_dir.is_dir(): - continue - algorithm_dir = provider_dir / scenario_id - if not algorithm_dir.exists(): - continue - records_path = algorithm_dir / "records" / f"{scenario_id}.json" - if records_path.exists(): - try: - with records_path.open() as f: - record = json.load(f) - return record.get("properties", {}).get("contacts", []) - except Exception as e: - logger.error("Error loading contacts from %s: %s", records_path, e) - return [] - logger.warning("No contacts found for scenario '%s'", scenario_id) - return [] - - def get_scenario_link(self, scenario_id: str) -> str: - """ - Generate a URL to the scenario definition file at the specific commit. - """ - base_url = f"https://github.com/{GITHUB_REPO}/blob/{os.getenv('GITHUB_SHA', 'main')}" - algorithm_catalog = get_project_root() / "algorithm_catalog" - for provider_dir in algorithm_catalog.iterdir(): - if not provider_dir.is_dir(): - continue - algorithm_dir = provider_dir / scenario_id - if not algorithm_dir.exists(): - continue - scenario_path = algorithm_dir / "benchmark_scenarios" / f"{scenario_id}.json" - if scenario_path.exists(): - relative_path = scenario_path.relative_to(get_project_root()) - return f"{base_url}/{relative_path.as_posix()}" - logger.warning("No benchmark found for scenario '%s'", scenario_id) - return "" - - def parse_failed_tests(self) -> List[Dict[str, str]]: - """ - Parse the pytest output file to extract failed tests and logs. - """ - log_file = Path("qa/benchmarks/pytest_output.txt") - if not log_file.exists(): - logger.error("Pytest output file not found at %s", log_file) - return [] - try: - content = log_file.read_text() - failures = [] - pattern = ( - r"=+ FAILURES =+\n.*?_* (test_run_benchmark\[(.*?)\])" - r"(?:.*?)\n(.*?)(?=\n=+|\Z)" - ) - matches = re.finditer(pattern, content, re.DOTALL) - for match in matches: - test_name = match.group(1) - scenario_id = match.group(2) - logs = match.group(3).strip() - failures.append({ - "test_name": test_name, - "scenario_id": scenario_id, - "logs": logs - }) - logger.info("Found %d failed scenario(s)", len(failures)) - return failures - except Exception as e: - logger.error("Error parsing log file: %s", e) - return [] - - -def main() -> None: - """ - Main flow: parse failed tests, check for existing issues, and create new issues as needed. - """ - github_manager = GitHubIssueManager() - scenario_processor = ScenarioProcessor() - - existing_issues = github_manager.get_existing_issues() - failed_tests = scenario_processor.parse_failed_tests() - - for failure in failed_tests: - scenario_id = failure["scenario_id"] - logs = failure["logs"] - - scenario = scenario_processor.get_scenario_details(scenario_id) - if not scenario: - logger.warning("Skipping scenario '%s' - details not found", scenario_id) - continue - - issue_title = f"Scenario Failure: {scenario_id}" - if issue_title not in existing_issues: - github_manager.create_issue(scenario, logs) - else: - logger.info("Issue already exists for scenario '%s'. Skipping.", scenario_id) - -if __name__ == "__main__": - main() \ No newline at end of file