Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion .github/actions/test-and-report/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,52 @@ runs:
fi
./.github/resources/scripts/collect-logs.sh --ns $NAMESPACE --output /tmp/tmp_pod_log.txt

- name: Install Junit2Html plugin and generate report
- name: Install Junit2Html plugin
if: (!cancelled()) && steps.collect-logs.outcome != 'failure'
shell: bash
run: |
pip install junit2html
continue-on-error: true

- name: Install MinIO Client for log collection
id: install-mc
if: ${{ steps.run-tests.outcome != 'success' }}
shell: bash
run: |
MC_PATH="$HOME/.local/bin/minio-mc"
if [ -f "$MC_PATH" ]; then
echo "MinIO client already installed"
else
echo "Installing MinIO client..."
curl -sLO https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
mkdir -p "$HOME/.local/bin"
mv mc "$MC_PATH"
fi
echo "MC_PATH=$MC_PATH" >> "$GITHUB_ENV"
continue-on-error: true

- name: Augment junit.xml with workflow logs
id: augment-junit-xml
if: ${{ steps.run-tests.outcome != 'success' && steps.install-mc.outcome == 'success' }}
shell: bash
run: |
MAPPING_FILE="${{ inputs.test_directory }}/reports/test-workflow-mapping.txt"
JUNIT_XML="${{ inputs.test_directory }}/reports/junit.xml"
if [[ -f "$MAPPING_FILE" && -f "$JUNIT_XML" ]]; then
python3 ./.github/resources/scripts/augment-junit-xml-with-workflow-logs.py \
--junit-xml "$JUNIT_XML" \
--mapping-file "$MAPPING_FILE" \
--namespace "${{ inputs.default_namespace }}"
else
echo "Skipping: mapping file or junit.xml not found"
fi
continue-on-error: true

- name: Generate HTML report
if: (!cancelled()) && steps.collect-logs.outcome != 'failure'
shell: bash
run: |
junit2html ${{ inputs.test_directory }}/reports/junit.xml ${{ inputs.test_directory }}/reports/test-report.html
continue-on-error: true

Expand Down
323 changes: 323 additions & 0 deletions .github/resources/scripts/augment-junit-xml-with-workflow-logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
#!/usr/bin/env python3

import argparse
import os
import re
import signal
import subprocess
import sys
import tempfile
import time
import xml.etree.ElementTree as ET
from typing import Dict, List, Optional


def _run(cmd: list[str], *, check: bool = True, capture: bool = True, text: bool = True) -> subprocess.CompletedProcess:
return subprocess.run(
cmd,
check=check,
stdout=subprocess.PIPE if capture else None,
stderr=subprocess.PIPE if capture else None,
text=text,
)


def _get_artifact_repo(namespace: str) -> str:
cp = _run(
[
"kubectl",
"get",
"configmap",
"workflow-controller-configmap",
"-n",
namespace,
"-o",
"jsonpath={.data.artifactRepository}",
]
)
return cp.stdout or ""


def _get_minio_creds(namespace: str) -> tuple[str, str]:
access = _run(
[
"kubectl",
"get",
"secret",
"mlpipeline-minio-artifact",
"-n",
namespace,
"-o",
"jsonpath={.data.accesskey}",
]
).stdout.strip()
secret = _run(
[
"kubectl",
"get",
"secret",
"mlpipeline-minio-artifact",
"-n",
namespace,
"-o",
"jsonpath={.data.secretkey}",
]
).stdout.strip()
access_key = subprocess.check_output(["base64", "-d"], input=access.encode("utf-8")).decode("utf-8").strip()
secret_key = subprocess.check_output(["base64", "-d"], input=secret.encode("utf-8")).decode("utf-8").strip()
return access_key, secret_key


class PortForward:
def __init__(self, namespace: str, local_port: int = 9000, remote_port: int = 9000):
self._namespace = namespace
self._local_port = local_port
self._remote_port = remote_port
self._proc: Optional[subprocess.Popen] = None

def __enter__(self) -> "PortForward":
self._proc = subprocess.Popen(
[
"kubectl",
"port-forward",
"-n",
self._namespace,
"svc/minio-service",
f"{self._local_port}:{self._remote_port}",
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
# Wait briefly for port-forward to come up.
deadline = time.time() + 10
out = ""
while time.time() < deadline:
if self._proc.poll() is not None:
break
try:
if self._proc.stdout is not None:
line = self._proc.stdout.readline()
if line:
out += line
if "Forwarding from" in line:
return self
except Exception:
pass
time.sleep(0.1)
raise RuntimeError(f"kubectl port-forward did not start successfully. Output:\n{out}")

def __exit__(self, exc_type, exc, tb):
if self._proc and self._proc.poll() is None:
try:
self._proc.send_signal(signal.SIGINT)
self._proc.wait(timeout=3)
except Exception:
try:
self._proc.kill()
except Exception:
pass


def _mc_alias_set(mc: str, endpoint: str, access_key: str, secret_key: str) -> None:
_run([mc, "alias", "set", "kfp-minio", endpoint, access_key, secret_key, "--api", "S3v4"])


def _mc_find_logs(mc: str, bucket: str, prefix: str) -> list[str]:
# Try find first (best signal). Fallback to ls --recursive parsing.
cp = _run([mc, "find", f"kfp-minio/{bucket}/{prefix}", "--name", "*.log"], check=False)
paths = []
if cp.returncode == 0 and cp.stdout:
for line in cp.stdout.splitlines():
line = line.strip()
if line:
paths.append(line)
if paths:
return paths
cp = _run([mc, "ls", "--recursive", f"kfp-minio/{bucket}/{prefix}"], check=False)
if cp.returncode != 0 or not cp.stdout:
return []
for line in cp.stdout.splitlines():
parts = line.split()
if not parts:
continue
# mc ls --recursive prints "<date> <time> <size> <path>"
p = parts[-1]
if p.endswith(".log"):
paths.append(p)
return paths


def _tail_bytes(s: str, max_bytes: int) -> str:
b = s.encode("utf-8", errors="replace")
if len(b) <= max_bytes:
return s
tail = b[-max_bytes:]
return (
f"[truncated: showing last {max_bytes} bytes of {len(b)}]\n"
+ tail.decode("utf-8", errors="replace")
)


def _mc_cat(mc: str, path: str) -> str:
cp = _run([mc, "cat", path], check=False)
if cp.returncode != 0:
err = (cp.stderr or "").strip()
return f"[mc cat failed for {path}] {err}\n"
return cp.stdout or ""


def _read_mapping(mapping_file: str) -> dict[str, list[str]]:
mapping: Dict[str, List[str]] = {}
with open(mapping_file, "r", encoding="utf-8", errors="replace") as f:
for line in f:
line = line.strip()
if not line or "|" not in line:
continue
test_name, workflows = line.split("|", 1)
wf_list = [w.strip() for w in workflows.split(",") if w.strip()]
if wf_list:
mapping.setdefault(test_name, []).extend(wf_list)
return mapping


def _is_failed_testcase(tc: ET.Element) -> bool:
return tc.find("failure") is not None or tc.find("error") is not None


def _match_test_name(testcase_name: str, mapping: Dict[str, List[str]]) -> Optional[str]:
if testcase_name in mapping:
return testcase_name
# Fallback: normalize whitespace and try substring match.
norm = re.sub(r"\s+", " ", testcase_name).strip()
if norm in mapping:
return norm
for k in mapping.keys():
if k in testcase_name or testcase_name in k:
return k
return None


def _append_system_out(tc: ET.Element, text_to_append: str) -> None:
so = tc.find("system-out")
if so is None:
so = ET.SubElement(tc, "system-out")
so.text = ""
if so.text is None:
so.text = ""
if so.text and not so.text.endswith("\n"):
so.text += "\n"
so.text += text_to_append


def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--junit-xml", required=True)
parser.add_argument("--mapping-file", required=True)
parser.add_argument("--namespace", default="kubeflow")
parser.add_argument("--bucket", default="mlpipeline")
parser.add_argument("--logs-prefix", default=None)
parser.add_argument("--max-bytes-per-workflow", type=int, default=200_000)
parser.add_argument("--max-bytes-per-step", type=int, default=80_000)
args = parser.parse_args()

junit_xml = args.junit_xml
mapping_file = args.mapping_file
namespace = args.namespace
bucket = args.bucket
logs_prefix = args.logs_prefix or f"private-artifacts/{namespace}"

if not os.path.isfile(junit_xml):
print(f"ERROR: junit.xml not found at {junit_xml}", file=sys.stderr)
return 2
if not os.path.isfile(mapping_file):
print(f"ERROR: mapping file not found at {mapping_file}", file=sys.stderr)
return 2

mapping = _read_mapping(mapping_file)
if not mapping:
print("No mappings found; nothing to do.")
return 0

artifact_repo = _get_artifact_repo(namespace)
if "archiveLogs: true" not in artifact_repo:
print("ERROR: Argo Workflows log archiving is not enabled (archiveLogs: true not found).", file=sys.stderr)
return 1

# Determine mc path lazily (import shutil only if needed to keep startup small).
import shutil # noqa: WPS433

mc = os.environ.get("MC_PATH") if os.environ.get("MC_PATH") else shutil.which("mc") or shutil.which("minio-mc")
if not mc:
print("ERROR: MinIO client not found. Set MC_PATH or ensure mc is on PATH.", file=sys.stderr)
return 1

access_key, secret_key = _get_minio_creds(namespace)

with PortForward(namespace):
_mc_alias_set(mc, "http://localhost:9000", access_key, secret_key)

tree = ET.parse(junit_xml)
root = tree.getroot()

# JUnit can be <testsuite> or <testsuites>
testcases = root.findall(".//testcase")
modified = 0

for tc in testcases:
name = tc.get("name") or ""
if not name or not _is_failed_testcase(tc):
continue
key = _match_test_name(name, mapping)
if not key:
continue

workflows = mapping.get(key, [])
if not workflows:
continue

out_lines: List[str] = []
out_lines.append("===== Argo Workflows archived logs (tailed) =====")
for wf in workflows:
out_lines.append(f"--- Workflow: {wf} ---")
wf_prefix = f"{logs_prefix}/{wf}/"
log_paths = _mc_find_logs(mc, bucket, wf_prefix)
if not log_paths:
out_lines.append(f"[no *.log files found under s3://{bucket}/{wf_prefix}]")
continue

bytes_budget = args.max_bytes_per_workflow
for p in log_paths:
step_name = os.path.basename(os.path.dirname(p))
out_lines.append(f"[step: {step_name}]")
content = _mc_cat(mc, p)
content = _tail_bytes(content, min(args.max_bytes_per_step, bytes_budget))
out_lines.append(content.rstrip("\n"))
out_lines.append("")
bytes_budget -= len(content.encode("utf-8", errors="replace"))
if bytes_budget <= 0:
out_lines.append(f"[truncated: workflow {wf} exceeded max bytes budget]")
break

out_lines.append("===== End Argo Workflows logs =====")
out = "\n".join(out_lines) + "\n"
_append_system_out(tc, out)
modified += 1

if modified == 0:
print("No failing testcases matched mapping entries; junit.xml unchanged.")
return 0

# Write atomically
with tempfile.NamedTemporaryFile("wb", delete=False, dir=os.path.dirname(junit_xml) or None) as tmp:
tmp_path = tmp.name
tree.write(tmp, encoding="utf-8", xml_declaration=True)
os.replace(tmp_path, junit_xml)
print(f"Updated junit.xml: appended workflow logs to {modified} failing testcase(s).")
return 0


if __name__ == "__main__":
sys.exit(main())


Loading
Loading