diff --git a/experiment/coverage.py b/experiment/coverage.py new file mode 100644 index 0000000000..8319ead603 --- /dev/null +++ b/experiment/coverage.py @@ -0,0 +1,24 @@ +"""Standard coverage calculation functions to ensure consistency.""" + +from experiment import textcov + + +def calculate_coverage(cov: textcov.Textcov, linked_lines: int) -> float: + """Calculates coverage according to formula: Cov(f) / Linked(f).""" + if not linked_lines: + return 0.0 + return cov.covered_lines / linked_lines + + +def calculate_coverage_improvement(new_cov: textcov.Textcov, + existing_cov: textcov.Textcov, + union_linked_lines: int) -> float: + """Calculates coverage improvement: [Cov(f1) - Cov(f0)] / [Linked(f1 ∪ f0)].""" + if not union_linked_lines: + return 0.0 + + # Make a copy to avoid modifying the original + diff_cov = new_cov.copy() + diff_cov.subtract_covered_lines(existing_cov) + + return diff_cov.covered_lines / union_linked_lines diff --git a/experiment/evaluator.py b/experiment/evaluator.py index 67e07c4a6b..2febdf3924 100644 --- a/experiment/evaluator.py +++ b/experiment/evaluator.py @@ -24,7 +24,9 @@ from google.cloud import storage -from experiment import builder_runner, oss_fuzz_checkout, textcov +from experiment import builder_runner +from experiment import coverage as coverage_utils +from experiment import oss_fuzz_checkout, textcov from experiment.benchmark import Benchmark from experiment.builder_runner import BuildResult, RunResult from experiment.fuzz_target_error import SemanticCheckResult @@ -461,11 +463,10 @@ def check_target(self, ai_binary, target_path: str) -> Result: coverage_percent = 0.0 existing_textcov = self.load_existing_textcov() - if run_result.coverage: - run_result.coverage.subtract_covered_lines(existing_textcov) - if total_lines and run_result.coverage: - coverage_diff = run_result.coverage.covered_lines / total_lines + union_linked_lines = max(run_result.coverage.total_lines, total_lines) + coverage_diff = coverage_utils.calculate_coverage_improvement( + run_result.coverage, existing_textcov, union_linked_lines) else: dual_logger.log( f'Warning: total_lines == 0 in {generated_oss_fuzz_project}.') diff --git a/experiment/textcov.py b/experiment/textcov.py index 0fc2f531ab..d82fad753b 100644 --- a/experiment/textcov.py +++ b/experiment/textcov.py @@ -15,6 +15,7 @@ from __future__ import annotations +import copy import dataclasses import json import logging @@ -573,3 +574,7 @@ class name specification use single upper case letter for next_arg += '[]' * array_count args.append(next_arg) return args + + def copy(self) -> 'Textcov': + """Create a deep copy of this Textcov oject.""" + return copy.deepcopy(self) diff --git a/report/aggregate_coverage_diff.py b/report/aggregate_coverage_diff.py index acafc4324f..ca55ed1f71 100644 --- a/report/aggregate_coverage_diff.py +++ b/report/aggregate_coverage_diff.py @@ -27,6 +27,7 @@ from google.cloud import storage +from experiment import coverage as coverage_utils from experiment import evaluator, textcov @@ -53,14 +54,17 @@ def compute_coverage_diff(project: str, coverage_links: list[str]): # TODO: skip other functions defined the target. new_textcov.merge(textcov.Textcov.from_file(f)) - new_textcov.subtract_covered_lines(existing_textcov) + # union of linked lines try: - total_lines = coverage_summary['data'][0]['totals']['lines']['count'] + existing_lines = coverage_summary['data'][0]['totals']['lines']['count'] except KeyError: - total_lines = 1 + existing_lines = 0 - return new_textcov.covered_lines / total_lines - #print(f'{project}:', new_textcov.covered_lines / total_lines) + union_linked_lines = max(new_textcov.total_lines, existing_lines) + + return coverage_utils.calculate_coverage_improvement(new_textcov, + existing_textcov, + union_linked_lines) def main(): diff --git a/run_all_experiments.py b/run_all_experiments.py index 3d03bf6414..8b3eb9f649 100755 --- a/run_all_experiments.py +++ b/run_all_experiments.py @@ -31,6 +31,7 @@ import run_one_experiment from data_prep import introspector from experiment import benchmark as benchmarklib +from experiment import coverage as coverage_utils from experiment import evaluator, oss_fuzz_checkout, textcov from experiment.workdir import WorkDirs from llm_toolkit import models, prompt_builder @@ -502,23 +503,17 @@ def _process_total_coverage_gain() -> dict[str, dict[str, Any]]: total_existing_lines = sum(lines) total_cov_covered_lines_before_subtraction = total_cov.covered_lines - total_cov.subtract_covered_lines(existing_textcov) - try: - cov_relative_gain = (total_cov.covered_lines / - existing_textcov.covered_lines) - except ZeroDivisionError: - cov_relative_gain = 0.0 total_lines = max(total_cov.total_lines, total_existing_lines) + union_linked_lines = max(total_cov.total_lines, total_existing_lines) if total_lines: coverage_gain[project] = { 'language': oss_fuzz_checkout.get_project_language(project), 'coverage_diff': - total_cov.covered_lines / total_lines, - 'coverage_relative_gain': - cov_relative_gain, + coverage_utils.calculate_coverage_improvement( + total_cov, existing_textcov, union_linked_lines), 'coverage_ofg_total_covered_lines': total_cov_covered_lines_before_subtraction, 'coverage_ofg_total_new_covered_lines': diff --git a/stage/execution_stage.py b/stage/execution_stage.py index 98013881d8..1e4d8dc805 100644 --- a/stage/execution_stage.py +++ b/stage/execution_stage.py @@ -17,6 +17,7 @@ import os from experiment import builder_runner as builder_runner_lib +from experiment import coverage as coverage_utils from experiment import evaluator as evaluator_lib from experiment.evaluator import Evaluator from results import BuildResult, Result, RunResult @@ -112,16 +113,20 @@ def execute(self, result_history: list[Result]) -> Result: coverage_percent = 0.0 existing_textcov = evaluator.load_existing_textcov() - run_result.coverage.subtract_covered_lines(existing_textcov) + # Calculate linked lines union using both textcov objects to determine + # the total number of lines that could be covered + union_linked_lines = max(total_lines, existing_textcov.total_lines) - if total_lines: - coverage_diff = run_result.coverage.covered_lines / total_lines + if union_linked_lines: + coverage_diff = coverage_utils.calculate_coverage_improvement( + run_result.coverage, existing_textcov, union_linked_lines) self.logger.info('coverage diff == %s in %s.', coverage_diff, generated_oss_fuzz_project) else: self.logger.warning('total_lines == 0 in %s', generated_oss_fuzz_project) coverage_diff = 0.0 + runresult = RunResult( benchmark=benchmark, trial=last_result.trial,