diff --git a/.github/actions/test-and-report/action.yml b/.github/actions/test-and-report/action.yml index 46243143183..0f0195022f5 100644 --- a/.github/actions/test-and-report/action.yml +++ b/.github/actions/test-and-report/action.yml @@ -101,19 +101,69 @@ runs: run: | echo "=== Current disk usage ===" df -h - NAMESPACE=${{ env.NAMESPACE }} - if [ "${{ github.event_name }}" == "workflow_dispatch" ]; then - NAMESPACE=${{ inputs.namespace }} - fi + NAMESPACE="${{ inputs.default_namespace }}" ./.github/resources/scripts/collect-logs.sh --ns $NAMESPACE --output /tmp/tmp_pod_log.txt - - name: Install Junit2Html plugin and generate report - if: (!cancelled()) && steps.collect-logs.outcome != 'failure' + - name: Install Junit2Html plugin + id: install-junit2html + if: (!cancelled()) shell: bash run: | + JUNIT_XML="${{ inputs.test_directory }}/reports/junit.xml" + if [[ ! -f "$JUNIT_XML" ]]; then + echo "ERROR: junit.xml not found at $JUNIT_XML" + exit 1 + fi pip install junit2html + + - name: Install MinIO Client for log collection + id: install-mc + if: ${{ steps.run-tests.outcome != 'success' }} + shell: bash + run: | + MC_PATH="$HOME/.local/bin/minio-mc" + if [ -f "$MC_PATH" ]; then + echo "MinIO client already installed" + else + echo "Installing MinIO client..." + curl -sLO https://dl.min.io/client/mc/release/linux-amd64/mc + chmod +x mc + mkdir -p "$HOME/.local/bin" + mv mc "$MC_PATH" + fi + echo "MC_PATH=$MC_PATH" >> "$GITHUB_ENV" + + - name: Port-forward MinIO service for workflow log access + id: port-forward-minio + if: ${{ steps.run-tests.outcome != 'success' }} + shell: bash + run: | + NAMESPACE="${{ inputs.default_namespace }}" + kubectl port-forward -n "$NAMESPACE" svc/minio-service 9000:9000 >/tmp/minio-port-forward.log 2>&1 & + PF_PID=$! + sleep 2 + if ! kill -0 "$PF_PID" 2>/dev/null; then + echo "ERROR: minio port-forward exited early" + cat /tmp/minio-port-forward.log || true + exit 1 + fi + + - name: Augment junit.xml with workflow logs + id: augment-junit-xml + if: ${{ steps.run-tests.outcome != 'success' }} + shell: bash + run: | + python3 "$GITHUB_ACTION_PATH/augment-junit-xml-with-workflow-logs.py" \ + --test-directory "${{ inputs.test_directory }}" \ + --namespace "${{ inputs.default_namespace }}" \ + --mc-path "$HOME/.local/bin/minio-mc" + + - name: Generate HTML report + id: generate-html-report + if: (!cancelled()) && steps.install-junit2html.outcome == 'success' + shell: bash + run: | junit2html ${{ inputs.test_directory }}/reports/junit.xml ${{ inputs.test_directory }}/reports/test-report.html - continue-on-error: true - name: Configure report name id: name_gen @@ -129,36 +179,21 @@ runs: - name: Upload HTML Report id: upload uses: actions/upload-artifact@v4 - if: (!cancelled()) + if: (!cancelled()) && steps.generate-html-report.outcome == 'success' with: name: ${{ steps.name_gen.outputs.REPORT_NAME }} path: ${{ inputs.test_directory }}/reports/test-report.html retention-days: 7 - continue-on-error: true - name: Publish Test Summary With HTML Report id: publish uses: ./.github/actions/junit-summary - if: (!cancelled()) && steps.upload.outcome != 'failure' + if: always() && !cancelled() with: xml_files: '${{ inputs.test_directory }}/reports' custom_data: '{\"HTML Report\": \"${{ steps.upload.outputs.artifact-url }}\"}' - continue-on-error: true - - - name: Publish Test Summary - id: summary - uses: ./.github/actions/junit-summary - if: (!cancelled()) && steps.upload.outcome == 'failure' - with: - xml_files: '${{ inputs.test_directory }}/reports' - continue-on-error: true - name: Mark Workflow failure if test step failed if: steps.run-tests.outcome != 'success' && !cancelled() shell: bash run: exit 1 - - - name: Mark Workflow failure if test reporting failed - if: (steps.publish.outcome == 'failure' || steps.summary.outcome == 'failure' || steps.upload.outcome != 'success') && !cancelled() - shell: bash - run: exit 1 diff --git a/.github/actions/test-and-report/augment-junit-xml-with-workflow-logs.py b/.github/actions/test-and-report/augment-junit-xml-with-workflow-logs.py new file mode 100644 index 00000000000..2e0e853103f --- /dev/null +++ b/.github/actions/test-and-report/augment-junit-xml-with-workflow-logs.py @@ -0,0 +1,323 @@ +#!/usr/bin/env python3 + +import argparse +import base64 +import os +import re +import subprocess +import sys +import tempfile +from xml.etree import ElementTree +from typing import Dict, List, NoReturn, Optional, Tuple + + +REPORTS_DIRNAME = "reports" +JUNIT_XML_FILENAME = "junit.xml" +MAPPING_FILENAME = "test-workflow-mapping.txt" + +MINIO_BUCKET = "mlpipeline" +LOGS_PREFIX_TEMPLATE = "private-artifacts/{namespace}" + +MAX_BYTES_PER_WORKFLOW = 200_000 +MAX_BYTES_PER_STEP = 80_000 + + +def _die(message: str, exit_code: int) -> NoReturn: + print(f"ERROR: {message}", file=sys.stderr) + raise SystemExit(exit_code) + + +def _run(cmd: list[str], *, check: bool = True, capture: bool = True, text: bool = True) -> subprocess.CompletedProcess: + return subprocess.run( + cmd, + check=check, + stdout=subprocess.PIPE if capture else None, + stderr=subprocess.PIPE if capture else None, + text=text, + ) + + +def _get_artifact_repo(namespace: str) -> str: + cp = _run( + [ + "kubectl", + "get", + "configmap", + "workflow-controller-configmap", + "-n", + namespace, + "-o", + "jsonpath={.data.artifactRepository}", + ] + ) + return cp.stdout or "" + + +def _get_minio_creds(namespace: str) -> tuple[str, str]: + access = _run( + [ + "kubectl", + "get", + "secret", + "mlpipeline-minio-artifact", + "-n", + namespace, + "-o", + "jsonpath={.data.accesskey}", + ] + ).stdout.strip() + secret = _run( + [ + "kubectl", + "get", + "secret", + "mlpipeline-minio-artifact", + "-n", + namespace, + "-o", + "jsonpath={.data.secretkey}", + ] + ).stdout.strip() + access_key = base64.b64decode(access.encode("utf-8")).decode("utf-8").strip() + secret_key = base64.b64decode(secret.encode("utf-8")).decode("utf-8").strip() + return access_key, secret_key + + +def _mc_alias_set(mc: str, endpoint: str, access_key: str, secret_key: str) -> None: + _run([mc, "alias", "set", "kfp-minio", endpoint, access_key, secret_key, "--api", "S3v4"]) + + +def _mc_find_logs(mc: str, bucket: str, prefix: str) -> list[str]: + cp = _run([mc, "find", f"kfp-minio/{bucket}/{prefix}", "--name", "*.log"], check=False) + paths = [] + if cp.returncode == 0 and cp.stdout: + for line in cp.stdout.splitlines(): + line = line.strip() + if line: + paths.append(line) + if paths: + return paths + cp = _run([mc, "ls", "--recursive", f"kfp-minio/{bucket}/{prefix}"], check=False) + if cp.returncode != 0 or not cp.stdout: + return [] + for line in cp.stdout.splitlines(): + parts = line.split() + if not parts: + continue + p = parts[-1] + if p.endswith(".log"): + paths.append(p) + return paths + + +def _tail_bytes(s: str, max_bytes: int) -> str: + b = s.encode("utf-8", errors="replace") + if len(b) <= max_bytes: + return s + tail = b[-max_bytes:] + return f"[truncated: showing last {max_bytes} bytes of {len(b)}]\n" + tail.decode("utf-8", errors="replace") + + +def _mc_cat(mc: str, path: str) -> str: + cp = _run([mc, "cat", path], check=False) + if cp.returncode != 0: + err = (cp.stderr or "").strip() + return f"[mc cat failed for {path}] {err}\n" + return cp.stdout or "" + + +def _parse_test_to_workflows_line(line: str) -> Optional[Tuple[str, List[str]]]: + stripped = line.strip() + if not stripped: + return None + parts = stripped.split("|", 1) + if len(parts) != 2: + return None + test_name, workflows_csv = parts[0].strip(), parts[1].strip() + if not test_name or not workflows_csv: + return None + workflows = [w.strip() for w in workflows_csv.split(",") if w.strip()] + if not workflows: + return None + return test_name, workflows + + +def map_test_to_workflows(mapping_file: str) -> dict[str, list[str]]: + test_to_workflows_map: Dict[str, List[str]] = {} + with open(mapping_file, "r", encoding="utf-8", errors="replace") as f: + for line in f: + parsed = _parse_test_to_workflows_line(line) + if not parsed: + continue + test_name, workflows = parsed + test_to_workflows_map.setdefault(test_name, []).extend(workflows) + return test_to_workflows_map + + +def _is_failed_testcase(tc: ElementTree.Element) -> bool: + return tc.find("failure") is not None or tc.find("error") is not None + + +def _match_test_name(testcase_name: str, test_to_workflows_map: Dict[str, List[str]]) -> Optional[str]: + if testcase_name in test_to_workflows_map: + return testcase_name + norm = re.sub(r"\s+", " ", testcase_name).strip() + if norm in test_to_workflows_map: + return norm + for k in test_to_workflows_map.keys(): + if k in testcase_name or testcase_name in k: + return k + return None + + +def _append_system_out(tc: ElementTree.Element, text_to_append: str) -> None: + so = tc.find("system-out") + if so is None: + so = ElementTree.SubElement(tc, "system-out") + so.text = "" + if so.text is None: + so.text = "" + if so.text and not so.text.endswith("\n"): + so.text += "\n" + so.text += text_to_append + + +def _require_mc_path(mc_path: str) -> str: + if not mc_path: + _die("MinIO client path is required.", 2) + mc = str(os.fspath(mc_path)) + if not os.path.isfile(mc): + _die(f"MinIO client not found at: {mc}", 2) + if not os.access(mc, os.X_OK): + _die(f"MinIO client is not executable: {mc}", 2) + return mc + + +def _build_workflow_logs_text( + *, + mc: str, + bucket: str, + logs_prefix: str, + workflows: List[str], + max_bytes_per_workflow: int, + max_bytes_per_step: int, +) -> str: + out_lines: List[str] = ["===== Argo Workflows archived logs (tailed) ====="] + for wf in workflows: + out_lines.append(f"--- Workflow: {wf} ---") + wf_prefix = f"{logs_prefix}/{wf}/" + log_paths = _mc_find_logs(mc, bucket, wf_prefix) + if not log_paths: + out_lines.append(f"[no *.log files found under s3://{bucket}/{wf_prefix}]") + continue + + bytes_budget = max_bytes_per_workflow + for p in log_paths: + step_name = os.path.basename(os.path.dirname(p)) + out_lines.append(f"[step: {step_name}]") + content = _mc_cat(mc, p) + content = _tail_bytes(content, min(max_bytes_per_step, bytes_budget)) + out_lines.append(content.rstrip("\n")) + out_lines.append("") + bytes_budget -= len(content.encode("utf-8", errors="replace")) + if bytes_budget <= 0: + out_lines.append(f"[truncated: workflow {wf} exceeded max bytes budget]") + break + + out_lines.append("===== End Argo Workflows logs =====") + return "\n".join(out_lines) + "\n" + + +def _augment_junit_xml( + *, + junit_xml_path: str, + test_to_workflows_map: Dict[str, List[str]], + mc: str, + bucket: str, + logs_prefix: str, + max_bytes_per_workflow: int, + max_bytes_per_step: int, +) -> int: + tree = ElementTree.parse(junit_xml_path) + root = tree.getroot() + testcases = root.findall(".//testcase") + + modified = 0 + for tc in testcases: + name = tc.get("name") or "" + if not name or not _is_failed_testcase(tc): + continue + + key = _match_test_name(name, test_to_workflows_map) + if not key: + continue + + workflows = test_to_workflows_map.get(key, []) + if not workflows: + continue + + out = _build_workflow_logs_text( + mc=mc, + bucket=bucket, + logs_prefix=logs_prefix, + workflows=workflows, + max_bytes_per_workflow=max_bytes_per_workflow, + max_bytes_per_step=max_bytes_per_step, + ) + _append_system_out(tc, out) + modified += 1 + + if modified == 0: + _die("No failing testcases matched mapping entries; junit.xml was not augmented.", 1) + + with tempfile.NamedTemporaryFile("wb", delete=False, dir=os.path.dirname(junit_xml_path) or None) as tmp: + tmp_path = tmp.name + tree.write(tmp, encoding="utf-8", xml_declaration=True) + os.replace(tmp_path, junit_xml_path) + return modified + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--test-directory", required=True) + parser.add_argument("--namespace", required=True) + parser.add_argument("--mc-path", required=True) + args = parser.parse_args() + + junit_xml = os.path.join(args.test_directory, REPORTS_DIRNAME, JUNIT_XML_FILENAME) + mapping_file = os.path.join(args.test_directory, REPORTS_DIRNAME, MAPPING_FILENAME) + mc = _require_mc_path(args.mc_path) + + if not (os.path.isfile(mapping_file) and os.path.isfile(junit_xml)): + _die( + f"mapping file or junit.xml not found (mapping={mapping_file}, junit={junit_xml})", + 2, + ) + + test_to_workflows_map = map_test_to_workflows(mapping_file) + if not test_to_workflows_map: + _die("No mappings found; nothing to do.", 1) + + artifact_repo = _get_artifact_repo(args.namespace) + if "archiveLogs: true" not in artifact_repo: + _die("Argo Workflows log archiving is not enabled (archiveLogs: true not found).", 1) + + access_key, secret_key = _get_minio_creds(args.namespace) + _mc_alias_set(mc, "http://localhost:9000", access_key, secret_key) + modified = _augment_junit_xml( + junit_xml_path=junit_xml, + test_to_workflows_map=test_to_workflows_map, + mc=mc, + bucket=MINIO_BUCKET, + logs_prefix=LOGS_PREFIX_TEMPLATE.format(namespace=args.namespace), + max_bytes_per_workflow=MAX_BYTES_PER_WORKFLOW, + max_bytes_per_step=MAX_BYTES_PER_STEP, + ) + + print(f"Updated junit.xml: appended workflow logs to {modified} failing testcase(s).") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) + diff --git a/backend/test/end2end/pipeline_e2e_test.go b/backend/test/end2end/pipeline_e2e_test.go index ed0a0536ca5..1fbd1a6dab1 100644 --- a/backend/test/end2end/pipeline_e2e_test.go +++ b/backend/test/end2end/pipeline_e2e_test.go @@ -16,6 +16,7 @@ package end2end import ( "fmt" + "os" "path/filepath" "strconv" "strings" @@ -94,10 +95,21 @@ var _ = Describe("Upload and Verify Pipeline Run >", Label(FullRegression), func }) AfterEach(func() { - - // Delete pipelines created during the test logger.Log("################### Global Cleanup after each test #####################") + // Capture workflow mapping for failed tests before cleanup deletes the workflows + if CurrentSpecReport().Failed() && len(testContext.PipelineRun.CreatedRunIds) > 0 { + currentDir, err := os.Getwd() + if err == nil { + testutil.WriteTestWorkflowMapping( + GinkgoT().Name(), + testContext.PipelineRun.CreatedRunIds, + testutil.GetNamespace(), + filepath.Join(currentDir, testReportDirectory, "test-workflow-mapping.txt"), + ) + } + } + logger.Log("Deleting %d run(s)", len(testContext.PipelineRun.CreatedRunIds)) for _, runID := range testContext.PipelineRun.CreatedRunIds { runID := runID diff --git a/backend/test/testutil/test_utils.go b/backend/test/testutil/test_utils.go index f08f2318fdc..3dba6c97e3e 100644 --- a/backend/test/testutil/test_utils.go +++ b/backend/test/testutil/test_utils.go @@ -22,6 +22,7 @@ import ( "math/rand" "net/http" "os" + "os/exec" "path/filepath" "regexp" "strings" @@ -88,6 +89,65 @@ func WriteLogFile(specReport types.SpecReport, testName, logDirectory string) { } } +// GetWorkflowNameByRunID retrieves the Argo Workflow name for a given pipeline run ID +// by querying the Kubernetes API using the pipeline/runid label. +func GetWorkflowNameByRunID(namespace string, runID string) string { + cmd := exec.Command("kubectl", "get", "workflows", "-n", namespace, + "-l", fmt.Sprintf("pipeline/runid=%s", runID), + "-o", "jsonpath={.items[0].metadata.name}") + output, err := cmd.Output() + if err != nil { + logger.Log("Failed to get workflow for run ID %s: %v", runID, err) + return "" + } + workflowName := strings.TrimSpace(string(output)) + if workflowName == "" { + logger.Log("No workflow found for run ID %s", runID) + } + return workflowName +} + +// WriteTestWorkflowMapping appends a test-to-workflow mapping entry for failed tests. +// The mapping file is used to correlate failed tests with their associated workflow logs. +// Format: TEST_NAME|WORKFLOW_NAME1,WORKFLOW_NAME2,... +func WriteTestWorkflowMapping(testName string, runIDs []string, namespace string, mappingFilePath string) { + if len(runIDs) == 0 { + return + } + + var workflowNames []string + for _, runID := range runIDs { + wfName := GetWorkflowNameByRunID(namespace, runID) + if wfName != "" { + workflowNames = append(workflowNames, wfName) + } + } + + if len(workflowNames) == 0 { + logger.Log("No workflows found for run IDs %v, skipping mapping", runIDs) + return + } + + mappingDir := filepath.Dir(mappingFilePath) + if err := os.MkdirAll(mappingDir, 0755); err != nil { + ginkgo.Fail(fmt.Sprintf("Failed to create mapping directory %q: %v", mappingDir, err)) + } + file, err := os.OpenFile(mappingFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + ginkgo.Fail(fmt.Sprintf("Failed to open mapping file %q: %v", mappingFilePath, err)) + } + defer func() { + if closeErr := file.Close(); closeErr != nil { + ginkgo.Fail(fmt.Sprintf("Failed to close mapping file %q: %v", mappingFilePath, closeErr)) + } + }() + entry := fmt.Sprintf("%s|%s\n", testName, strings.Join(workflowNames, ",")) + if _, err := file.WriteString(entry); err != nil { + ginkgo.Fail(fmt.Sprintf("Failed to write to mapping file %q: %v", mappingFilePath, err)) + } + logger.Log("Wrote test-workflow mapping: %s -> %v", testName, workflowNames) +} + // GetNamespace - Get Namespace based on the deployment mode func GetNamespace() string { if *config.KubeflowMode || *config.MultiUserMode { diff --git a/backend/test/v2/api/integration_suite_test.go b/backend/test/v2/api/integration_suite_test.go index 8e7345ef52c..aa891992b66 100644 --- a/backend/test/v2/api/integration_suite_test.go +++ b/backend/test/v2/api/integration_suite_test.go @@ -193,6 +193,15 @@ var _ = ReportAfterEach(func(specReport types.SpecReport) { currentDir, err := os.Getwd() Expect(err).NotTo(HaveOccurred(), "Failed to get current directory") testutil.WriteLogFile(specReport, GinkgoT().Name(), filepath.Join(currentDir, testLogsDirectory)) + + if len(testContext.PipelineRun.CreatedRunIds) > 0 { + testutil.WriteTestWorkflowMapping( + GinkgoT().Name(), + testContext.PipelineRun.CreatedRunIds, + testutil.GetNamespace(), + filepath.Join(currentDir, testReportDirectory, "test-workflow-mapping.txt"), + ) + } } else { log.Printf("Test passed") }