diff --git a/report/tests/__init__.py b/report/tests/__init__.py
new file mode 100644
index 000000000..0a2669d7a
--- /dev/null
+++ b/report/tests/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/report/tests/aggregate_coverage_diff_test.py b/report/tests/aggregate_coverage_diff_test.py
new file mode 100644
index 000000000..0b01312ad
--- /dev/null
+++ b/report/tests/aggregate_coverage_diff_test.py
@@ -0,0 +1,126 @@
+import io
+import json
+import sys
+import pytest
+
+import report.aggregate_coverage_diff as aggregate_coverage_diff
+
+# --- Tests for compute_coverage_diff ---
+
+def test_compute_coverage_diff_basic(monkeypatch):
+ class ExistingTextcov:
+ def __init__(self):
+ self.covered_lines = 2
+ monkeypatch.setattr(
+ aggregate_coverage_diff.evaluator,
+ 'load_existing_textcov',
+ lambda project: ExistingTextcov()
+ )
+ monkeypatch.setattr(
+ aggregate_coverage_diff.evaluator,
+ 'load_existing_coverage_summary',
+ lambda project: {'data': [{'totals': {'lines': {'count': 4}}}]}
+ )
+
+ class DummyTextcov:
+ def __init__(self):
+ self.covered_lines = 0
+ def merge(self, other):
+ self.covered_lines += other.covered_lines
+ def subtract_covered_lines(self, existing):
+ self.covered_lines -= existing.covered_lines
+ @classmethod
+ def from_file(cls, f):
+ inst = cls()
+ inst.covered_lines = int(f.read())
+ return inst
+ monkeypatch.setattr(
+ aggregate_coverage_diff.textcov,
+ 'Textcov',
+ DummyTextcov
+ )
+
+ # Fake storage client with two blobs
+ class FakeBlob:
+ def __init__(self, name, content):
+ self.name = name
+ self._content = content
+ def open(self):
+ return io.StringIO(self._content)
+ class FakeClient:
+ def bucket(self, name):
+ return name # bucket identifier passed through
+ def list_blobs(self, bucket, prefix, delimiter):
+ # Return two blobs with covered lines 3 and 5
+ return [FakeBlob('a', '3'), FakeBlob('b', '5')]
+ monkeypatch.setattr(
+ aggregate_coverage_diff.storage,
+ 'Client',
+ FakeClient
+ )
+
+ ratio = aggregate_coverage_diff.compute_coverage_diff('proj', ['gs://bucket/foo'])
+ assert ratio == pytest.approx(6/4)
+
+
+def test_compute_coverage_diff_no_totals(monkeypatch):
+ class ExistingTextcov:
+ def __init__(self):
+ self.covered_lines = 0
+ monkeypatch.setattr(
+ aggregate_coverage_diff.evaluator,
+ 'load_existing_textcov',
+ lambda project: ExistingTextcov()
+ )
+ monkeypatch.setattr(
+ aggregate_coverage_diff.evaluator,
+ 'load_existing_coverage_summary',
+ lambda project: {}
+ )
+ class DummyTextcovEmpty:
+ def __init__(self):
+ self.covered_lines = 0
+ def merge(self, other):
+ pass
+ def subtract_covered_lines(self, existing):
+ pass
+ @classmethod
+ def from_file(cls, f):
+ return cls()
+ monkeypatch.setattr(
+ aggregate_coverage_diff.textcov,
+ 'Textcov',
+ DummyTextcovEmpty
+ )
+ class FakeClientEmpty:
+ def bucket(self, name):
+ return name
+ def list_blobs(self, bucket, prefix, delimiter):
+ return []
+ monkeypatch.setattr(
+ aggregate_coverage_diff.storage,
+ 'Client',
+ FakeClientEmpty
+ )
+ ratio = aggregate_coverage_diff.compute_coverage_diff('proj', ['gs://bucket/foo'])
+ assert ratio == 0
+
+# --- Tests for main() ---
+
+def test_main_prints_expected(monkeypatch, capsys):
+ monkeypatch.setattr(
+ aggregate_coverage_diff,
+ 'compute_coverage_diff',
+ lambda project, links: 0.5
+ )
+ input_data = {'benchmarks': [
+ {'benchmark': 'x-proj', 'max_line_coverage_diff_report': 'link1'},
+ {'benchmark': 'y-proj2'}
+ ]}
+ monkeypatch.setattr(
+ sys, 'stdin',
+ io.StringIO(json.dumps(input_data))
+ )
+ aggregate_coverage_diff.main()
+ out = capsys.readouterr().out.strip()
+ assert out == "{'proj': 0.5}"
diff --git a/report/tests/common_test.py b/report/tests/common_test.py
new file mode 100644
index 000000000..71c5b1ce3
--- /dev/null
+++ b/report/tests/common_test.py
@@ -0,0 +1,392 @@
+import os
+import io
+import json
+import pytest
+import tempfile
+import run_one_experiment
+from report.common import (
+ AccumulatedResult,
+ Sample,
+ LogPart,
+ _parse_log_parts,
+ FileSystem,
+ Target,
+ Triage,
+ Benchmark,
+ Results,
+ MAX_RUN_LOGS_LEN,
+
+)
+
+class DummyResult:
+ def __init__(self, reproducer_path=None):
+ self.reproducer_path = reproducer_path
+
+
+def test_accumulated_result_properties():
+ ar = AccumulatedResult(
+ compiles=2,
+ crashes=1,
+ crash_cases=3,
+ total_runs=2,
+ total_coverage=50.0,
+ total_line_coverage_diff=10.0,
+ )
+ # average_coverage = total_coverage / total_runs = 25.0
+ assert ar.average_coverage == 25.0
+ # average_line_coverage_diff = total_line_coverage_diff / total_runs = 5.0
+ assert ar.average_line_coverage_diff == 5.0
+ # build_rate = compiles / total_runs = 1.0
+ assert ar.build_rate == 1.0
+
+
+def test_parse_log_parts_no_markers():
+ log = "plain log without markers"
+ parts = _parse_log_parts(log)
+ assert len(parts) == 1
+ assert parts[0].content == log
+ assert not parts[0].chat_prompt
+ assert not parts[0].chat_response
+
+
+def test_parse_log_parts_with_markers():
+ log = (
+ "start"
+ "prompt1"
+ "middle"
+ "response1"
+ "end"
+ )
+ parts = _parse_log_parts(log)
+ # Should produce 5 parts: 'start', prompt, 'middle', response, 'end'
+ assert len(parts) == 5
+ assert parts[0].content == "start"
+ assert parts[0].chat_prompt is False and parts[0].chat_response is False
+
+ prompt_part = parts[1]
+ assert prompt_part.chat_prompt
+ assert not prompt_part.chat_response
+ assert prompt_part.content == "prompt1"
+
+ assert parts[2].content == "middle"
+
+ response_part = parts[3]
+ assert not response_part.chat_prompt
+ assert response_part.chat_response
+ assert response_part.content == "response1"
+
+ assert parts[4].content == "end"
+
+
+def test_sample_properties_with_result():
+ # Create dummy result with reproducer_path
+ path = "/tmp/reproducer"
+ dummy = DummyResult(reproducer_path=path)
+ sample = Sample(id="01", status="Done", result=dummy)
+
+ assert sample.stacktrace == f"{path}/stacktrace"
+ assert sample.target_binary == f"{path}/target_binary"
+ assert sample.reproducer == f"{path}/artifacts"
+ # run_log uses removesuffix logic: replaces 'reproducer' with '' and adds 'run.log'
+ assert sample.run_log == f"/tmp/run.log"
+
+
+def test_sample_properties_without_result():
+ sample = Sample(id="02", status="Running", result=None)
+ assert sample.stacktrace == ""
+ assert sample.target_binary == ""
+ assert sample.reproducer == ""
+ assert sample.run_log == ""
+
+
+def test_target_and_triage_dataclasses():
+ target = Target(code="code snippet", fixer_prompt="fixer", build_script_code="script")
+ assert target.code == "code snippet"
+ assert target.fixer_prompt == "fixer"
+ assert target.build_script_code == "script"
+
+ triage = Triage(result="result text", triager_prompt="prompt text")
+ assert triage.result == "result text"
+ assert triage.triager_prompt == "prompt text"
+
+
+def test_filesystem_local_operations(tmp_path):
+ dir_path = tmp_path / "subdir"
+ dir_path.mkdir()
+ file_path = dir_path / "test.txt"
+ content = "hello world"
+ file_path.write_text(content)
+
+ # Test FileSystem on file
+ fs_file = FileSystem(str(file_path))
+ assert fs_file.exists()
+ assert fs_file.isfile()
+ assert not fs_file.isdir()
+ assert fs_file.getsize() == len(content)
+ with fs_file.open() as f:
+ assert f.read() == content
+
+ # Test FileSystem on directory
+ fs_dir = FileSystem(str(dir_path))
+ assert fs_dir.exists()
+ assert fs_dir.isdir()
+ assert not fs_dir.isfile()
+ listing = fs_dir.listdir()
+ assert "test.txt" in listing
+
+ # Test makedirs: create new nested dir
+ new_dir = tmp_path / "a" / "b" / "c"
+ fs_new = FileSystem(str(new_dir))
+ fs_new.makedirs()
+ assert new_dir.exists()
+
+ # Test opening with write mode
+ new_file = new_dir / "new.txt"
+ fs_new_file = FileSystem(str(new_file))
+ with fs_new_file.open("w") as f:
+ f.write("data")
+ assert fs_new_file.getsize() == 4
+
+# Tests for Results class methods
+def test_results_list_benchmark_ids(tmp_path):
+ base = tmp_path / "results"
+ base.mkdir()
+ valid = base / "output-proj1-func1"
+ valid.mkdir()
+ (valid / "status").mkdir()
+ invalid = base / "lost+found"
+ invalid.mkdir()
+
+ res = Results(results_dir=str(base), benchmark_set='all')
+ ids = res.list_benchmark_ids()
+ assert ids == ["output-proj1-func1"]
+
+
+def test_results_match_benchmark(monkeypatch):
+ # Dummy aggregated result
+ dummy_aggr = run_one_experiment.AggregatedResult()
+ monkeypatch.setattr(run_one_experiment, 'aggregate_results', lambda filtered, t: dummy_aggr)
+
+ class DummyE:
+ def __init__(self, finished): self.finished = finished
+ results = [DummyE(True), DummyE(False), DummyE(True)]
+ targets = ["t1", "t2", "t3"]
+ r = Results()
+ bm = r.match_benchmark("output-proj-f-func", results, targets)
+ assert isinstance(bm, Benchmark)
+ assert bm.id == "output-proj-f-func"
+ assert bm.status.startswith("Running") or bm.status == "Done"
+ assert bm.result is dummy_aggr
+
+
+def test_get_final_target_code(tmp_path):
+ rdir = tmp_path / "results"
+ bdir = rdir / "bench1" / "fixed_targets"
+ bdir.mkdir(parents=True)
+ sample_file = bdir / "s1.code"
+ sample_file.write_text("abc123")
+
+ res = Results(results_dir=str(rdir))
+ code = res.get_final_target_code("bench1", "s1")
+ assert json.loads(code) == "abc123"
+
+
+def test_get_logs_and_parse(tmp_path):
+ rdir = tmp_path / "results"
+ logdir = rdir / "bench" / "status" / "s"
+ logdir.mkdir(parents=True)
+ txt = logdir / "log.txt"
+ content = "p"
+ txt.write_text(content)
+
+ res = Results(results_dir=str(rdir))
+ parts = res.get_logs("bench", "s")
+ assert all(isinstance(p, LogPart) for p in parts)
+ assert parts[0].content == 'p'
+
+
+def test_get_run_logs_simple(tmp_path):
+ # Create run logs
+ rdir = tmp_path / "results"
+ rundir = rdir / "bench" / "logs" / "run"
+ rundir.mkdir(parents=True)
+ f = rundir / "01.log"
+ text = "short log"
+ f.write_text(text)
+
+ res = Results(results_dir=str(rdir))
+ log = res.get_run_logs("bench", "01")
+ assert log == text
+
+
+def test_get_run_logs_truncated(tmp_path, monkeypatch):
+ rdir = tmp_path / "results"
+ rundir = rdir / "bench" / "logs" / "run"
+ rundir.mkdir(parents=True)
+ fname = "01.log"
+ fpath = rundir / fname
+ big = 'A' * (MAX_RUN_LOGS_LEN + 10)
+ fpath.write_text(big)
+
+ res = Results(results_dir=str(rdir))
+ log = res.get_run_logs("bench", "01")
+ assert '...truncated...' in log
+ half = MAX_RUN_LOGS_LEN // 2
+ assert log.startswith('A' * half)
+ assert log.endswith('A' * half)
+
+
+def test_get_triage_empty_and_with_data(tmp_path):
+ rdir = tmp_path / "results"
+ # empty
+ res = __import__('report.common', fromlist=['Results']).Results(results_dir=str(rdir))
+ tri = res.get_triage("b", "s")
+ assert tri.result == '' and tri.triager_prompt == ''
+
+ # with data
+ tri_dir = rdir / "b" / "fixed_targets" / "s-triage"
+ tri_dir.mkdir(parents=True)
+ pfile = tri_dir / "prompt.txt"
+ pfile.write_text(json.dumps([{"content": "hello"}]))
+ rfile = tri_dir / "out.txt"
+ rfile.write_text("res")
+ tri2 = res.get_triage("b", "s")
+ assert "hello" in tri2.triager_prompt
+ assert tri2.result == "res"
+
+def test_get_targets_fixed_and_agent(tmp_path):
+ # Setup fixed_targets
+ rdir = tmp_path / "results"
+ bench = rdir / "bench"
+ fixed = bench / "fixed_targets"
+ fixed.mkdir(parents=True)
+ # sample file
+ sample_file = fixed / "01.txt"
+ sample_file.write_text("code1")
+ dir_f = fixed / "01-F00"
+ dir_f.mkdir()
+ p = dir_f / "prompt.txt"
+ p.write_text(json.dumps([{"content":"fix prompt"}]))
+ r = dir_f / "fix.rawoutput"
+ r.write_text("fixed code")
+ res = Results(results_dir=str(rdir))
+ targets = res.get_targets("bench", "01")
+ assert len(targets) == 2
+ # First: code from sample_file
+ assert targets[0].code == "code1"
+ # Second: Target from fixed dir
+ assert targets[1].code == "fixed code"
+ assert "fix prompt" in targets[1].fixer_prompt
+
+# Tests for get_samples
+
+def test_get_samples_mapping():
+ all_targets = ["t1", "t2", "t3"]
+ results_list = [object(), None, object()]
+ res = Results()
+ samples = res.get_samples(results_list, all_targets)
+ assert len(samples) == 3
+ assert samples[0].status == "Done"
+ assert samples[1].status == "Running"
+ assert samples[2].status == "Done"
+ assert isinstance(samples[0], Sample)
+
+# Tests for get_prompt
+
+def test_get_prompt_raw_and_structured(tmp_path):
+ rdir = tmp_path / "results"
+ bench = rdir / "bench"
+ bench.mkdir(parents=True)
+ # raw text prompt
+ pt = bench / "prompt1.txt"
+ pt.write_text("hello raw")
+ res = Results(results_dir=str(rdir))
+ assert "hello raw" in res.get_prompt("bench")
+ # structured prompt
+ pt.write_text(json.dumps([{"content":"line1"},{"content":"line2"}]))
+ assert "line1" in res.get_prompt("bench")
+ assert "line2" in res.get_prompt("bench")
+
+# Tests for get_results
+
+def test_get_results_and_targets(tmp_path, monkeypatch):
+ # Prepare raw_targets and result.json
+ rdir = tmp_path / "results"
+ bench = rdir / "bench"
+ raw = bench / "raw_targets"
+ raw.mkdir(parents=True)
+ f1 = raw / "00.py"
+ f1.write_text("dummy")
+ status = bench / "status"
+ s0 = status / "00"
+ s0.mkdir(parents=True)
+ res_file = s0 / "result.json"
+ res_file.write_text("{}")
+ # Monkeypatch evaluator.Result to accept no args
+ class DummyE:
+ def __init__(self):
+ pass
+ import report.common as rc
+ monkeypatch.setattr(rc.evaluator, "Result", DummyE)
+ results, targets = Results(results_dir=str(rdir)).get_results("bench")
+ assert isinstance(results[0], DummyE)
+ # The target path should match f1
+ assert targets == [str(f1)]
+
+# Tests for get_macro_insights
+
+def test_get_macro_insights():
+ # Create dummy benchmarks
+ ag1 = run_one_experiment.AggregatedResult()
+ ag1.build_success_rate = 1.0; ag1.found_bug=1; ag1.max_coverage=10; ag1.max_line_coverage_diff=2
+ ag2 = run_one_experiment.AggregatedResult()
+ ag2.build_success_rate = 0.0; ag2.found_bug=0; ag2.max_coverage=20; ag2.max_line_coverage_diff=3
+ b1 = Benchmark("id1","Done",ag1)
+ b2 = Benchmark("id2","Done",ag2)
+ acc = Results().get_macro_insights([b1,b2])
+
+ assert acc.compiles == 1
+ assert acc.crashes == 1
+ assert acc.total_runs == 2
+ assert acc.average_coverage == 15
+ assert acc.average_line_coverage_diff == 2.5
+
+# Tests for get_coverage_language_gains and get_project_summary
+
+def test_get_coverage_language_gains_and_project_summary(tmp_path):
+ # Deploy report.json with project_summary
+ rdir = tmp_path / "results"
+ rdir.mkdir(parents=True)
+ summary = {"project_summary":{
+ "p1":{
+ "coverage_diff":5,
+ "coverage_relative_gain":0.1,
+ "coverage_ofg_total_new_covered_lines":2,
+ "coverage_existing_total_covered_lines":3,
+ "coverage_existing_total_lines":10,
+ "coverage_ofg_total_covered_lines":7
+ }
+ }}
+ j = rdir / "report.json"
+ j.write_text(json.dumps(summary))
+ # Prepare benchmarks list
+ class DummyAg:
+ build_success_count = 1
+
+ b = Benchmark(id="id-p1-f", status="Done", result=DummyAg(), signature="", project="p1", function="", language="")
+
+ gains = Results(results_dir=str(rdir)).get_coverage_language_gains()
+ assert "project_summary" in gains
+ assert "p1" in gains["project_summary"]
+ assert gains["project_summary"]["p1"]["coverage_diff"] == 5
+ # get_project_summary maps summary into Project objects
+ ps = Results(results_dir=str(rdir)).get_project_summary([b])
+ assert len(ps) == 1
+ proj = ps[0]
+ assert proj.name == "p1"
+ assert proj.coverage_gain == 5
+ assert proj.coverage_relative_gain == 0.1
+ assert proj.coverage_ofg_total_new_covered_lines == 2
+ assert proj.coverage_existing_total_covered_lines == 3
+ assert proj.coverage_existing_total_lines == 10
+ assert proj.coverage_ofg_total_covered_lines == 7
diff --git a/report/tests/compare_results_test.py b/report/tests/compare_results_test.py
new file mode 100644
index 000000000..39260afe1
--- /dev/null
+++ b/report/tests/compare_results_test.py
@@ -0,0 +1,69 @@
+import os
+import pandas as pd
+import pytest
+
+from report.compare_results import extract_basename_from_filename, merge_tables
+
+
+def test_extract_basename_from_filename():
+
+ assert extract_basename_from_filename('path/to/file.csv') == 'file'
+
+ assert extract_basename_from_filename('another.ext1.ext2.txt') == 'another.ext1.ext2'
+
+ assert extract_basename_from_filename('no_ext') == 'no_ext'
+
+
+def test_merge_tables(tmp_path):
+ # Create first CSV file (basename 'a')
+ df1 = pd.DataFrame({
+ 'Benchmark': ['bench1', 'bench2'],
+ 'Status': ['OK', 'FAIL'],
+ 'Build rate': [10, 5],
+ 'Crash rate': [0.1, 0.2],
+ 'Coverage': [80, 85],
+ 'Line coverage diff': [5, 10],
+ })
+ file1 = tmp_path / 'a.csv'
+ df1.to_csv(file1, index=False)
+
+ # Create second CSV file (basename 'b')
+ df2 = pd.DataFrame({
+ 'Benchmark': ['bench1', 'bench3'],
+ 'Status': ['OK2', 'FAIL2'],
+ 'Build rate': [12, 0],
+ 'Crash rate': [0.1, 0.3],
+ 'Coverage': [82, 90],
+ 'Line coverage diff': [6, 15],
+ })
+ file2 = tmp_path / 'b.csv'
+ df2.to_csv(file2, index=False)
+
+ merged = merge_tables(str(file1), str(file2))
+
+ # Expected column order
+ expected_cols = [
+ 'Benchmark', 'Status_a', 'Status_b',
+ 'Build rate_a', 'Build rate_b',
+ 'Crash rate_a', 'Crash rate_b',
+ 'Coverage_a', 'Coverage_b',
+ 'Line coverage diff_a', 'Line coverage diff_b'
+ ]
+ assert merged.columns.tolist() == expected_cols
+
+
+ assert merged['Benchmark'].tolist() == ['bench1', 'bench2', 'bench3']
+
+
+ row2 = merged[merged['Benchmark'] == 'bench2'].iloc[0]
+ assert row2['Status_b'] == '-'
+ assert row2['Build rate_b'] == '-'
+
+ row3 = merged[merged['Benchmark'] == 'bench3'].iloc[0]
+ assert row3['Status_a'] == '-'
+ assert row3['Build rate_a'] == '-'
+
+
+ row1 = merged[merged['Benchmark'] == 'bench1'].iloc[0]
+ assert row1['Build rate_a'] == '10.0' or row1['Build rate_a'] == '10'
+ assert row1['Build rate_b'] == '12.0' or row1['Build rate_b'] == '12'
diff --git a/report/tests/docker_run_test.py b/report/tests/docker_run_test.py
new file mode 100644
index 000000000..56a158e3f
--- /dev/null
+++ b/report/tests/docker_run_test.py
@@ -0,0 +1,264 @@
+import os
+import io
+import datetime
+import builtins
+import subprocess
+import logging
+import pytest
+import argparse
+import gettext
+
+
+_ORIGINAL_OPEN = builtins.open
+
+import report.docker_run as dr
+
+# --- Tests for _parse_args ---
+
+def test_parse_args_defaults():
+ args = dr._parse_args([])
+ assert args.benchmark_set == dr.BENCHMARK_SET
+ assert args.frequency_label == dr.FREQUENCY_LABEL
+ assert args.run_timeout == dr.RUN_TIMEOUT
+ assert args.sub_dir == dr.SUB_DIR
+ assert args.model == dr.MODEL
+ assert args.delay == dr.DELAY
+ assert args.local_introspector is False
+ assert args.num_samples == dr.NUM_SAMPLES
+ assert args.llm_fix_limit == dr.LLM_FIX_LIMIT
+ assert args.vary_temperature is True
+ assert args.agent is False
+ assert args.max_round == dr.MAX_ROUND
+ assert args.redirect_outs is False
+
+ assert args.additional_args == []
+
+
+def test_parse_args_with_custom_and_additional():
+ cmd = [
+ '-b', 'custom_set',
+ '--frequency-label', 'weekly',
+ '--run-timeout', '123',
+ '-sd', 'subdir',
+ '-m', 'custom_model',
+ '-d', '5',
+ '-i', 'true',
+ '-ns', '20',
+ '-nf', '3',
+ '-vt', 'false',
+ '-ag', 'true',
+ '-mr', '50',
+ '-rd', 'true',
+ '--', 'extra1', 'extra2'
+ ]
+ args = dr._parse_args(cmd)
+ # Check overridden values
+ assert args.benchmark_set == 'custom_set'
+ assert args.frequency_label == 'weekly'
+ assert args.run_timeout == 123
+ assert args.sub_dir == 'subdir'
+ assert args.model == 'custom_model'
+ assert args.delay == 5
+ assert args.local_introspector is True
+ assert args.num_samples == 20
+ assert args.llm_fix_limit == 3
+ assert args.vary_temperature is False
+ assert args.agent is True
+ assert args.max_round == 50
+ assert args.redirect_outs is True
+
+ assert args.additional_args == ['extra1', 'extra2']
+
+
+# --- Tests for _run_command ---
+
+def test_run_command_returncode(monkeypatch):
+ class DummyProc:
+ def __init__(self):
+ self.returncode = 99
+ monkeypatch.setattr(subprocess, 'run', lambda *args, **kwargs: DummyProc())
+ rc = dr._run_command(['any', 'cmd'], shell=True)
+ assert rc == 99
+
+
+# --- Tests for _authorize_gcloud ---
+
+def test_authorize_gcloud_no_creds(monkeypatch, caplog):
+ caplog.set_level(logging.INFO)
+
+ monkeypatch.delenv('GOOGLE_APPLICATION_CREDENTIALS', raising=False)
+
+ monkeypatch.setattr(dr, '_run_command', lambda *args, **kwargs: (_ for _ in ()).throw(Exception("Should not be called")))
+
+ dr._authorize_gcloud()
+ # Should log that credentials not set
+ assert any("GOOGLE APPLICATION CREDENTIALS is not set." in rec.message for rec in caplog.records)
+
+
+def test_authorize_gcloud_with_creds(monkeypatch, caplog):
+ caplog.set_level(logging.INFO)
+ # Set fake credentials
+ monkeypatch.setenv('GOOGLE_APPLICATION_CREDENTIALS', '/path/to/creds.json')
+ commands = []
+ def fake_run(cmd, shell=False):
+ commands.append((cmd, shell))
+ return 0
+ monkeypatch.setattr(dr, '_run_command', fake_run)
+
+ dr._authorize_gcloud()
+ # Should log that credentials are set
+ assert any("GOOGLE APPLICATION CREDENTIALS set" in rec.message for rec in caplog.records)
+ # Check that _run_command was called with gcloud auth activate-service-account
+ assert any('gcloud' in cmd and 'activate-service-account' in cmd for cmd, _ in commands)
+
+
+# --- Tests for _log_common_args ---
+
+def test_log_common_args(caplog):
+ caplog.set_level(logging.INFO)
+ Args = type('A', (), {})()
+ args = Args
+ args.benchmark_set = 'set1'
+ args.frequency_label = 'label1'
+ args.run_timeout = 200
+ args.sub_dir = 'sub1'
+ args.model = 'model1'
+ args.delay = 42
+
+ dr._log_common_args(args)
+ msgs = [rec.message for rec in caplog.records]
+ assert any('Benchmark set is set1.' in m for m in msgs)
+ assert any('Frequency label is label1.' in m for m in msgs)
+ assert any('Run timeout is 200.' in m for m in msgs)
+ assert any('Sub-directory is sub1.' in m for m in msgs)
+ assert any('LLM is model1.' in m for m in msgs)
+ assert any('DELAY is 42.' in m for m in msgs)
+
+
+def test_run_on_data_from_scratch_flow(monkeypatch, tmp_path):
+
+ monkeypatch.setattr(os.path, 'isdir', lambda path: True)
+
+
+ monkeypatch.setattr(dr, '_authorize_gcloud', lambda: None)
+ monkeypatch.setattr(dr, '_log_common_args', lambda args: None)
+
+ monkeypatch.setattr(os.path, 'exists', lambda path: False)
+
+ # Stub subprocess.check_call for starter script
+ starter_calls = []
+ monkeypatch.setattr(subprocess, 'check_call', lambda cmd, shell: starter_calls.append((cmd, shell)) or 0)
+
+ RealDateTime = datetime.datetime
+ class FakeDateTime(RealDateTime):
+ @classmethod
+ def now(cls):
+ return RealDateTime(2025, 4, 22)
+ monkeypatch.setattr(dr.datetime, 'datetime', FakeDateTime)
+
+ # Stub os.listdir for projects
+ def fake_listdir(path):
+ return ['proj1', 'file.txt']
+ monkeypatch.setattr(os, 'listdir', fake_listdir)
+
+ # Stub subprocess.Popen for upload_report.sh
+ class FakeProc:
+ def __init__(self, cmd):
+ self.cmd = cmd
+ def wait(self):
+ self.waited = True
+ p_calls = []
+ monkeypatch.setattr(subprocess, 'Popen', lambda cmd: p_calls.append(cmd) or FakeProc(cmd))
+
+ # Stub subprocess.run for run_all_experiments
+ def fake_run(cmd, stdout=None, stderr=None, env=None):
+ class P:
+ returncode = 7
+ return P()
+ monkeypatch.setattr(subprocess, 'run', fake_run)
+
+ # Stub git check_output
+ monkeypatch.setattr(subprocess, 'check_output',
+ lambda cmd: b'hash' if 'rev-parse' in cmd else b'2025-04-22')
+
+ # Capture writes to /experiment_ended
+ written = {}
+ def fake_open(path, mode='r', **kwargs):
+ assert path == '/experiment_ended'
+ written['opened'] = True
+ return io.StringIO()
+ monkeypatch.setattr(builtins, 'open', fake_open)
+
+ # Execute
+ ret = dr.main([])
+
+ assert ret is None
+
+ assert starter_calls
+
+ assert p_calls
+
+ assert written.get('opened', False)
+
+
+def test_run_standard_flow(monkeypatch, tmp_path):
+
+ monkeypatch.setattr(os.path, 'isdir', lambda path: False)
+
+ # Stub authorization and logging
+ monkeypatch.setattr(dr, '_authorize_gcloud', lambda: None)
+ monkeypatch.setattr(dr, '_log_common_args', lambda args: None)
+
+ # Stub python path resolution to True to test /venv/bin/python3
+ monkeypatch.setattr(os.path, 'exists', lambda path: True)
+
+ # Stub subprocess.Popen for upload_report.sh
+ p_calls = []
+ class FakePopen:
+ def __init__(self, cmd):
+ self.cmd = cmd
+ def wait(self):
+ self.waited = True
+ monkeypatch.setattr(subprocess, 'Popen', lambda cmd: p_calls.append(cmd) or FakePopen(cmd))
+
+ # Stub subprocess.run for experiment and trends
+ run_calls = []
+ def fake_run(cmd, stdout=None, stderr=None, shell=False, env=None, check=False):
+ run_calls.append(cmd)
+ class P:
+ returncode = 3
+ return P()
+ monkeypatch.setattr(subprocess, 'run', fake_run)
+
+ # Stub git check_output
+ def fake_check_output(cmd):
+ if 'rev-parse' in cmd:
+ return b'abc123'
+ if '--format=%cs' in cmd:
+ return b'2025-04-22'
+ if 'branch' in cmd:
+ return b'main'
+ return b''
+ monkeypatch.setattr(subprocess, 'check_output', fake_check_output)
+
+
+ written = {}
+ def fake_open(path, mode='r', **kwargs):
+ if path == '/experiment_ended':
+ written['opened'] = True
+ return io.StringIO()
+ return _ORIGINAL_OPEN(path, mode, **kwargs)
+ monkeypatch.setattr(builtins, 'open', fake_open)
+
+ # Execute
+ ret = dr.main([])
+
+ assert ret is None
+
+ assert p_calls
+
+ assert written.get('opened', False)
+
+ assert any('run_all_experiments.py' in arg for c in run_calls for arg in c)
+
+ assert any('-m' in c or '--model' in c for c in run_calls)
diff --git a/report/tests/trends_report/update_index_test.py b/report/tests/trends_report/update_index_test.py
new file mode 100644
index 000000000..b690460da
--- /dev/null
+++ b/report/tests/trends_report/update_index_test.py
@@ -0,0 +1,94 @@
+import json
+import sys
+import pytest
+from report.trends_report.update_index import trends_report_index
+
+class DummyBlob:
+ def __init__(self, name, data=None, throws=False):
+ self.name = name
+ self._data = data
+ self._throws = throws
+ self.uploaded_data = None
+ self.upload_content_type = None
+
+ def download_as_text(self):
+ if self._throws:
+ raise Exception("download error")
+ return self._data
+
+ def upload_from_string(self, data, content_type):
+ self.uploaded_data = data
+ self.upload_content_type = content_type
+
+class DummyBucket:
+ def __init__(self, blobs):
+ self._blobs = blobs
+ # Create an upload blob for index.json
+ self._upload_blob = DummyBlob('trend-reports/index.json')
+
+ def list_blobs(self, prefix=None):
+ # Return iterable of listing blobs
+ return self._blobs
+
+ def blob(self, name):
+ # Return the upload target for index.json
+ assert name == 'trend-reports/index.json'
+ return self._upload_blob
+
+class DummyClient:
+ def __init__(self, bucket):
+ self._bucket = bucket
+
+ def bucket(self, name):
+ assert name == 'oss-fuzz-gcb-experiment-run-logs'
+ return self._bucket
+
+@pytest.fixture(autouse=True)
+def patch_storage(monkeypatch):
+ # DummyClient instead of real storage.Client
+ dummy_bucket = DummyBucket([])
+ dummy_client = DummyClient(dummy_bucket)
+ monkeypatch.setattr('report.trends_report.update_index.storage',
+ type('m', (), {'Client': lambda self=None: dummy_client}))
+ return dummy_bucket
+
+
+def test_no_op_on_shallow_event(patch_storage, capsys):
+ # Event path depth < 3 should not trigger GCS
+ event = {'attributes': {'objectId': 'a/b'}}
+ res = trends_report_index(event, None)
+ captured = capsys.readouterr()
+ assert res == ''
+ assert captured.out == '' and captured.err == ''
+ assert patch_storage._upload_blob.uploaded_data is None
+
+
+def test_trends_report_index_success(patch_storage, capsys):
+ # Prepare blobs: shallow skip, valid, invalid
+ valid_report = {'name': 'r1', 'url': 'u1', 'date': 'd1', 'benchmark_set': 'bs', 'llm_model': 'm1', 'tags': ['t']}
+ shallow_blob = DummyBlob('trend-reports/index.json', data=json.dumps(valid_report))
+ good_blob = DummyBlob('trend-reports/scheduled/2025-04-22-weekly.json', data=json.dumps(valid_report))
+ bad_blob = DummyBlob('trend-reports/scheduled/bad.json', data='notjson', throws=True)
+ patch_storage._blobs[:] = [shallow_blob, good_blob, bad_blob]
+
+ event = {'attributes': {'objectId': 'trend-reports/scheduled/2025-04-22-weekly.json'}}
+ res = trends_report_index(event, None)
+ out, err = capsys.readouterr()
+
+ # Should read only good_blob (skip shallow, handle bad without raising)
+ assert 'Reading trend-reports/scheduled/2025-04-22-weekly.json' in out
+ assert 'Issue when reading trend-reports/scheduled/bad.json' in err
+ # Verify upload
+ upload_blob = patch_storage._upload_blob
+ assert upload_blob.uploaded_data is not None
+ index = json.loads(upload_blob.uploaded_data)
+ # Index should have 'r1'
+ assert 'r1' in index
+ entry = index['r1']
+ assert entry['url'] == 'u1'
+ assert entry['directory'] == 'scheduled'
+ assert entry['date'] == 'd1'
+ assert entry['benchmark_set'] == 'bs'
+ assert entry['llm_model'] == 'm1'
+ assert entry['tags'] == ['t']
+ assert res == ''
diff --git a/report/tests/trends_report/update_web_test.py b/report/tests/trends_report/update_web_test.py
new file mode 100644
index 000000000..f4f73e911
--- /dev/null
+++ b/report/tests/trends_report/update_web_test.py
@@ -0,0 +1,82 @@
+import io
+import zipfile
+import pytest
+import os
+from report.trends_report.update_web import trends_report_web
+
+# Dummy response for urllib.request.urlopen
+class DummyResponse:
+ def __init__(self, data):
+ self._data = data
+ def read(self):
+ return self._data
+ def __enter__(self):
+ return self
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+# Dummy GCS blobs and bucket
+class DummyBlob:
+ def __init__(self, name):
+ self.name = name
+ self.uploaded_files = []
+
+ def upload_from_filename(self, filename):
+ self.uploaded_files.append(filename)
+
+class DummyBucket:
+ def __init__(self):
+ self.blobs = {}
+
+ def blob(self, name):
+ blob = DummyBlob(name)
+ self.blobs[name] = blob
+ return blob
+
+class DummyClient:
+ def __init__(self, bucket):
+ self._bucket = bucket
+
+ def bucket(self, name):
+ assert name == 'oss-fuzz-gcb-experiment-run-logs'
+ return self._bucket
+
+@pytest.fixture(autouse=True)
+def patch_env(monkeypatch, tmp_path):
+ # Create in-memory zip archive
+ zip_mem = io.BytesIO()
+ with zipfile.ZipFile(zip_mem, mode='w') as zf:
+ zf.writestr('oss-fuzz-gen-trends-report/report/trends_report_web/index.html', '')
+ zf.writestr('oss-fuzz-gen-trends-report/report/trends_report_web/static/style.css', 'body {}')
+ zf.writestr('oss-fuzz-gen-trends-report/README.md', 'readme content')
+ zip_bytes = zip_mem.getvalue()
+
+ monkeypatch.setattr('report.trends_report.update_web.urllib.request.urlopen',
+ lambda url: DummyResponse(zip_bytes))
+ # Monkeypatch storage client
+ dummy_bucket = DummyBucket()
+ dummy_client = DummyClient(dummy_bucket)
+ monkeypatch.setattr('report.trends_report.update_web.storage',
+ type('S', (), {'Client': lambda self=None: dummy_client}))
+ return dummy_bucket
+
+
+def test_trends_report_web_uploads_only_relevant_files(patch_env, capsys, tmp_path):
+ # Run the function
+ trends_report_web(None, None)
+ out, err = capsys.readouterr()
+ # Check print statements for uploads
+ assert 'uploading oss-fuzz-gen-trends-report/report/trends_report_web/index.html to trend-reports/index.html' in out
+ assert 'uploading oss-fuzz-gen-trends-report/report/trends_report_web/static/style.css to trend-reports/static/style.css' in out
+ # Verify that only relevant files were uploaded
+ bucket = patch_env
+ assert set(bucket.blobs.keys()) == {
+ 'trend-reports/index.html',
+ 'trend-reports/static/style.css'
+ }
+ # Ensure upload paths exist in temporary extraction directory
+ for blob_name, blob in bucket.blobs.items():
+ assert len(blob.uploaded_files) == 1
+ uploaded_path = blob.uploaded_files[0]
+ assert tmp_path in tmp_path.parents or True
+ assert uploaded_path.endswith(os.path.basename(blob_name))
diff --git a/report/tests/trends_report/upload_summary_test.py b/report/tests/trends_report/upload_summary_test.py
new file mode 100644
index 000000000..543f1f6d2
--- /dev/null
+++ b/report/tests/trends_report/upload_summary_test.py
@@ -0,0 +1,158 @@
+import json
+import sys
+import pytest
+
+from report.trends_report import upload_summary
+from dataclasses import dataclass
+
+# Dummy classes for testing generate_summary
+@dataclass
+class DummyResult:
+ build_success_rate: float
+ crash_rate: float
+ found_bug: bool
+ max_coverage: float
+ max_line_coverage_diff: float
+
+@dataclass
+class DummyBenchmark:
+ id: str
+ project: str
+ function: str
+ signature: str
+ result: DummyResult
+
+@dataclass
+class DummyMacroInsights:
+ total_build_success_rate: float
+ total_crash_rate: float
+
+@dataclass
+class DummyProjectSummary:
+ project: str
+ num_benchmarks: int
+
+class DummyResultsUtil:
+ def __init__(self, results_dir=None, benchmark_set=None):
+ pass
+
+ def list_benchmark_ids(self):
+ return ['bm1', 'bm2']
+
+ def get_results(self, benchmark_id):
+ return {}, {}
+
+ def match_benchmark(self, benchmark_id, results, targets):
+ # produce a DummyBenchmark with different values per id
+ if benchmark_id == 'bm1':
+ res = DummyResult(1.0, 0.1, True, 75.5, 5.0)
+ return DummyBenchmark('bm1', 'proj1', 'func1', 'sig1', res)
+ else:
+ res = DummyResult(0.9, 0.2, False, 80.0, 3.5)
+ return DummyBenchmark('bm2', 'proj2', 'func2', 'sig2', res)
+
+ def get_macro_insights(self, benchmarks):
+ assert len(benchmarks) == 2
+ # return dummy insights
+ return DummyMacroInsights(total_build_success_rate=1.9, total_crash_rate=0.3)
+
+ def get_project_summary(self, benchmarks):
+ # return list of DummyProjectSummary
+ return [DummyProjectSummary('proj1', 1), DummyProjectSummary('proj2', 1)]
+
+
+def test_generate_summary():
+ # Use the dummy results util to generate summary
+ dummy_util = DummyResultsUtil()
+ summary = upload_summary.generate_summary(dummy_util)
+
+ assert isinstance(summary.benchmarks, list)
+ assert len(summary.benchmarks) == 2
+ assert summary.benchmarks[0] == {
+ 'id': 'bm1',
+ 'project': 'proj1',
+ 'function': 'func1',
+ 'signature': 'sig1',
+ 'build_success_rate': 1.0,
+ 'crash_rate': 0.1,
+ 'found_bug': True,
+ 'max_coverage': 75.5,
+ 'max_line_coverage_diff': 5.0,
+ }
+ assert summary.benchmarks[1]['id'] == 'bm2'
+
+ # Verify accumulated_results
+ assert summary.accumulated_results == {
+ 'total_build_success_rate': 1.9,
+ 'total_crash_rate': 0.3,
+ }
+
+ # Verify projects
+ assert summary.projects == [
+ {'project': 'proj1', 'num_benchmarks': 1},
+ {'project': 'proj2', 'num_benchmarks': 1},
+ ]
+
+
+def test_main_writes_summary(tmp_path, monkeypatch):
+ output_file = tmp_path / 'summary.json'
+
+ class DummyFileSystem:
+ def __init__(self, path):
+ # Ensure path matches expected
+ assert path == str(output_file)
+ self._path = path
+
+ def open(self, mode, encoding):
+ return open(self._path, mode, encoding=encoding)
+
+ monkeypatch.setattr(upload_summary, 'FileSystem', DummyFileSystem)
+
+ monkeypatch.setattr(upload_summary, 'Results', DummyResultsUtil)
+
+ args = [
+ 'upload_summary.py',
+ '--results-dir', 'dummy_results',
+ '--output-path', str(output_file),
+ '--date', '2025-04-21',
+ '--name', 'test_report',
+ '--url', 'http://example.com',
+ '--benchmark-set', 'bset',
+ '--run-timeout', '10',
+ '--num-samples', '5',
+ '--llm-fix-limit', '2',
+ '--model', 'test_model',
+ '--commit-hash', 'abc123',
+ '--commit-date', '2025-04-20',
+ '--git-branch', 'main',
+ '--tags', 'tagA', 'tagB'
+ ]
+ monkeypatch.setattr(sys, 'argv', args)
+
+ upload_summary.main()
+
+ assert output_file.exists()
+ data = json.loads(output_file.read_text(encoding='utf-8'))
+
+ expected_keys = {
+ 'name', 'date', 'benchmark_set', 'llm_model', 'url',
+ 'run_parameters', 'build_info', 'tags',
+ 'benchmarks', 'accumulated_results', 'projects'
+ }
+ assert expected_keys <= set(data.keys())
+
+ assert data['name'] == 'test_report'
+ assert data['date'] == '2025-04-21'
+ assert data['llm_model'] == 'test_model'
+ assert data['url'] == 'http://example.com'
+ assert data['benchmark_set'] == 'bset'
+
+ assert data['tags'] == ['test_model', 'bset', 'tagA', 'tagB']
+
+ assert data['run_parameters'] == {'run_timeout': 10, 'num_samples': 5, 'llm_fix_limit': 2}
+
+ assert data['build_info'] == {
+ 'branch': 'main',
+ 'commit_hash': 'abc123',
+ 'commit_date': '2025-04-20'
+ }
diff --git a/report/tests/web_test.py b/report/tests/web_test.py
new file mode 100644
index 000000000..22a0428fb
--- /dev/null
+++ b/report/tests/web_test.py
@@ -0,0 +1,175 @@
+import os
+import sys
+import json
+import shutil
+import pytest
+import jinja2
+
+from report.web import (
+ JinjaEnv,
+ GenerateReport,
+ generate_report,
+ launch_webserver,
+ _parse_arguments,
+ LOCAL_HOST,
+)
+
+# -- JinjaEnv filter tests --
+
+def test_urlencode_filter():
+ je = JinjaEnv()
+ assert je._urlencode_filter("hello world!") == "hello%20world%21"
+
+
+def test_percent():
+ je = JinjaEnv()
+ assert je._percent(0.123456) == "12.35" # rounded two decimals
+
+
+def test_cov_report_link_empty():
+ je = JinjaEnv()
+ assert je._cov_report_link("") == "#"
+
+
+def test_cov_report_link_local_without_gcb():
+ je = JinjaEnv()
+ link = "/some/local/path"
+ res = je._cov_report_link(link)
+ assert res == "/some/local/pathreport.html"
+
+
+def test_cov_report_link_cloud_paths():
+ je = JinjaEnv()
+ cloud_link = "gs://oss-fuzz-gcb-experiment-run-logs/foo/bar"
+ expected = "https://llm-exp.oss-fuzz.com/foo/bar/report/linux/index.html"
+ assert je._cov_report_link(cloud_link) == expected
+
+ cloud_link2 = "gs://oss-fuzz-gcb-experiment-run-logs/foo/bar.txt"
+ expected2 = "https://llm-exp.oss-fuzz.com/foo/bar.txt/report/linux/index.html"
+ assert je._cov_report_link(cloud_link2) == expected2
+
+
+def test_remove_trailing_empty_lines():
+ je = JinjaEnv()
+ code = "line1\nline2\n \n \n"
+ assert je._remove_trailing_empty_lines(code) == "line1\nline2"
+ assert je._remove_trailing_empty_lines("") == ""
+
+
+def test_splitlines():
+ je = JinjaEnv()
+ text = "a\nb\r\nc"
+ assert je._splitlines(text) == ["a", "b", "c"]
+ assert je._splitlines("") == []
+
+# -- GenerateReport.read_timings test --
+
+def test_read_timings(tmp_path):
+ data = {'a': 1, 'b': 2}
+ results_dir = tmp_path / "results"
+ results_dir.mkdir()
+ with open(results_dir / 'report.json', 'w') as f:
+ json.dump(data, f)
+
+ fake_jinja = JinjaEnv()
+ gr = GenerateReport(results=None,
+ jinja_env=fake_jinja,
+ results_dir=str(results_dir),
+ output_dir=str(tmp_path / 'out'))
+ timings = gr.read_timings()
+ assert timings == data
+
+# -- Argument parsing tests --
+
+def test_parse_arguments_structure(monkeypatch):
+ import sys
+ monkeypatch.setattr(sys, 'argv', ['__main__.py', '-r', 'resdir'])
+ ns = _parse_arguments()
+ for attr in ['results_dir', 'output_dir', 'benchmark_set', 'model', 'serve', 'port']:
+ assert hasattr(ns, attr)
+
+# -- I/O-heavy methods tests --
+
+def test_copy_and_set_coverage_report(tmp_path):
+
+ class DummyResult:
+ def __init__(self):
+ self.coverage_report_path = ''
+ class DummyBenchmark:
+ def __init__(self, id):
+ self.id = id
+ class DummySample:
+ def __init__(self, id):
+ self.id = id
+ self.result = DummyResult()
+
+ # Create directories: results/benchmark1/code-coverage-reports/sample1/{linux, extra, style.css}
+ results_dir = tmp_path / 'results'
+ coverage_root = results_dir / 'benchmark1' / 'code-coverage-reports'
+ sample_dir = coverage_root / 'sample1'
+ (sample_dir / 'linux').mkdir(parents=True)
+ (sample_dir / 'extra').mkdir()
+
+ (sample_dir / 'style.css').write_text('')
+
+ out_dir = tmp_path / 'out'
+ gr = GenerateReport(results=None,
+ jinja_env=None,
+ results_dir=str(results_dir),
+ output_dir=str(out_dir))
+ benchmark = DummyBenchmark('benchmark1')
+ sample = DummySample('sample1')
+ gr._copy_and_set_coverage_report(benchmark, sample)
+
+
+ dest = out_dir / 'sample' / 'benchmark1' / 'coverage' / 'sample1' / 'linux'
+ assert dest.exists()
+
+ assert sample.result.coverage_report_path == '/sample/benchmark1/coverage/sample1/linux/'
+
+
+def test_generate_report_invokes_generate(monkeypatch):
+ from report.web import generate_report, GenerateReport, Results
+ calls = {}
+
+ monkeypatch.setattr('report.web.Results', lambda results_dir, benchmark_set: None)
+
+ original_init = GenerateReport.__init__
+ def fake_init(self, results, jinja_env, results_dir, output_dir):
+ original_init(self, results=None, jinja_env=jinja_env, results_dir=results_dir, output_dir=output_dir)
+ monkeypatch.setattr(GenerateReport, '__init__', fake_init)
+
+ def fake_generate(self):
+ calls['generated'] = True
+ monkeypatch.setattr(GenerateReport, 'generate', fake_generate)
+
+ from argparse import Namespace
+ args = Namespace(results_dir='rdir', output_dir='odir', benchmark_set='', model='', serve=False, port=0)
+ generate_report(args)
+ assert calls.get('generated', False)
+
+
+def test_launch_webserver(monkeypatch):
+ from report.web import launch_webserver, LOCAL_HOST, ThreadingHTTPServer
+
+ instances = []
+ port = 12345
+
+ class DummyServer:
+ def __init__(self, addr, handler):
+ # Assert that correct host and port are used
+ assert addr[0] == LOCAL_HOST
+ assert addr[1] == port
+ instances.append(self)
+ def serve_forever(self):
+ self.serve_called = True
+ raise SystemExit
+
+ monkeypatch.setattr('report.web.ThreadingHTTPServer', DummyServer)
+ from argparse import Namespace
+ args = Namespace(port=port, output_dir='unused')
+ with pytest.raises(SystemExit):
+ launch_webserver(args)
+
+ assert instances and getattr(instances[0], 'serve_called', False)
+