From 2691f8e862ef7b992627993b37ec28bd75ada8ce Mon Sep 17 00:00:00 2001 From: Zewei Wang Date: Mon, 7 Jul 2025 16:30:00 -0400 Subject: [PATCH 1/8] feat: Add core data models and error handling for historical data SDK - Add comprehensive data models for build, crash, corpus, and coverage history - Implement HistoricalSummary model for aggregated statistics - Add specialized error classes for SDK configuration and validation - Include proper type hints and Pydantic validation --- ossfuzz_py/core/data_models.py | 128 ++++++++++++++++++++++++++++++++- ossfuzz_py/errors/__init__.py | 8 +++ ossfuzz_py/errors/core.py | 11 +++ ossfuzz_py/errors/factory.py | 19 +++++ 4 files changed, 165 insertions(+), 1 deletion(-) diff --git a/ossfuzz_py/core/data_models.py b/ossfuzz_py/core/data_models.py index d2d4efe75..383d1f055 100644 --- a/ossfuzz_py/core/data_models.py +++ b/ossfuzz_py/core/data_models.py @@ -21,7 +21,7 @@ from datetime import datetime from enum import Enum from pathlib import Path -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field @@ -133,3 +133,129 @@ def to_yaml(self, path: Path) -> bool: return True except Exception: return False + + +class BuildHistoryData(BaseDataModel): + """Represents a single build history entry.""" + build_id: str = Field(..., description="Unique identifier for the build") + timestamp: datetime = Field(..., description="Build timestamp") + project_name: str = Field(..., description="Name of the project") + success: bool = Field(..., description="Whether the build was successful") + duration_seconds: Optional[int] = Field( + None, description="Build duration in seconds") + commit_hash: Optional[str] = Field(None, description="Git commit hash") + branch: Optional[str] = Field(None, description="Git branch") + sanitizer: Optional[Sanitizer] = Field(None, description="Sanitizer used") + architecture: Optional[str] = Field(None, description="Target architecture") + error_message: Optional[str] = Field( + None, description="Error message if build failed") + artifacts: Optional[List[str]] = Field(None, + description="List of build artifacts") + + +class CrashHistoryData(BaseDataModel): + """Represents a single crash history entry.""" + crash_id: str = Field(..., description="Unique identifier for the crash") + timestamp: datetime = Field(..., description="Crash timestamp") + project_name: str = Field(..., description="Name of the project") + fuzzer_name: str = Field(..., description="Name of the fuzzer") + crash_type: str = Field( + ..., description="Type of crash (e.g., heap-buffer-overflow)") + crash_signature: str = Field(..., description="Crash signature/hash") + severity: Severity = Field(Severity.UNKNOWN, description="Crash severity") + reproducible: Optional[bool] = Field( + None, description="Whether crash is reproducible") + stack_trace: Optional[str] = Field(None, description="Stack trace") + testcase_path: Optional[str] = Field(None, description="Path to testcase") + regression_range: Optional[str] = Field(None, description="Regression range") + + +class CorpusHistoryData(BaseDataModel): + """Represents a single corpus history entry.""" + timestamp: datetime = Field(..., description="Corpus snapshot timestamp") + project_name: str = Field(..., description="Name of the project") + fuzzer_name: str = Field(..., description="Name of the fuzzer") + corpus_size: int = Field(..., description="Number of files in corpus") + total_size_bytes: int = Field(..., + description="Total size of corpus in bytes") + new_files_count: Optional[int] = Field( + None, description="Number of new files added") + coverage_increase: Optional[float] = Field( + None, description="Coverage increase percentage") + unique_features: Optional[int] = Field( + None, description="Number of unique features") + + +class CoverageHistoryData(BaseDataModel): + """Represents a single coverage history entry.""" + timestamp: datetime = Field(..., description="Coverage measurement timestamp") + project_name: str = Field(..., description="Name of the project") + fuzzer_name: Optional[str] = Field(None, description="Name of the fuzzer") + line_coverage: float = Field(..., description="Line coverage percentage") + function_coverage: Optional[float] = Field( + None, description="Function coverage percentage") + branch_coverage: Optional[float] = Field( + None, description="Branch coverage percentage") + lines_covered: Optional[int] = Field(None, + description="Number of lines covered") + lines_total: Optional[int] = Field(None, description="Total number of lines") + functions_covered: Optional[int] = Field( + None, description="Number of functions covered") + functions_total: Optional[int] = Field( + None, description="Total number of functions") + branches_covered: Optional[int] = Field( + None, description="Number of branches covered") + branches_total: Optional[int] = Field(None, + description="Total number of branches") + + +class TimeSeriesData(BaseDataModel): + """Generic time series data container.""" + project_name: str = Field(..., description="Name of the project") + data_type: str = Field( + ..., description="Type of data (build, crash, corpus, coverage)") + start_date: datetime = Field(..., description="Start date of the time series") + end_date: datetime = Field(..., description="End date of the time series") + data_points: List[Dict[str, + Any]] = Field(..., + description="Time series data points") + metadata: Optional[Dict[str, Any]] = Field(None, + description="Additional metadata") + + +class HistoricalSummary(BaseDataModel): + """Summary statistics for historical data.""" + project_name: str = Field(..., description="Name of the project") + period_start: datetime = Field(..., description="Start of the summary period") + period_end: datetime = Field(..., description="End of the summary period") + + # Build statistics + total_builds: Optional[int] = Field(None, + description="Total number of builds") + successful_builds: Optional[int] = Field( + None, description="Number of successful builds") + build_success_rate: Optional[float] = Field( + None, description="Build success rate percentage") + + # Crash statistics + total_crashes: Optional[int] = Field(None, + description="Total number of crashes") + unique_crashes: Optional[int] = Field(None, + description="Number of unique crashes") + critical_crashes: Optional[int] = Field( + None, description="Number of critical crashes") + + # Coverage statistics + max_coverage: Optional[float] = Field(None, + description="Maximum coverage achieved") + avg_coverage: Optional[float] = Field(None, description="Average coverage") + coverage_trend: Optional[str] = Field( + None, description="Coverage trend (increasing/decreasing/stable)") + + # Corpus statistics + max_corpus_size: Optional[int] = Field(None, + description="Maximum corpus size") + avg_corpus_size: Optional[float] = Field(None, + description="Average corpus size") + corpus_growth_rate: Optional[float] = Field(None, + description="Corpus growth rate") diff --git a/ossfuzz_py/errors/__init__.py b/ossfuzz_py/errors/__init__.py index bd47dea53..1c3050dca 100644 --- a/ossfuzz_py/errors/__init__.py +++ b/ossfuzz_py/errors/__init__.py @@ -165,6 +165,14 @@ # Manager errors 'OSSFuzzManagerError', + # Historical Data SDK errors + 'HistoryManagerError', + 'HistoryStorageError', + 'HistoryRetrievalError', + 'HistoryValidationError', + 'OSSFuzzSDKError', + 'OSSFuzzSDKConfigError', + # General/legacy errors 'SDKError', 'EnvironmentParametersError', diff --git a/ossfuzz_py/errors/core.py b/ossfuzz_py/errors/core.py index b3ade5dc7..4e39d0c95 100644 --- a/ossfuzz_py/errors/core.py +++ b/ossfuzz_py/errors/core.py @@ -100,6 +100,7 @@ class ErrorCode(str, Enum): # Storage errors STORAGE_ERROR = "STORAGE_ERROR" STORAGE_CONNECTION_ERROR = "STORAGE_CONNECTION_ERROR" + STORAGE_MANAGER_ERROR = "STORAGE_MANAGER_ERROR" # Data errors DATA_ERROR = "DATA_ERROR" @@ -111,6 +112,16 @@ class ErrorCode(str, Enum): CACHE_ERROR = "CACHE_ERROR" RESULT_COMPARISON_ERROR = "RESULT_COMPARISON_ERROR" + # Historical data errors + HISTORY_MANAGER_ERROR = "HISTORY_MANAGER_ERROR" + HISTORY_STORAGE_ERROR = "HISTORY_STORAGE_ERROR" + HISTORY_RETRIEVAL_ERROR = "HISTORY_RETRIEVAL_ERROR" + HISTORY_VALIDATION_ERROR = "HISTORY_VALIDATION_ERROR" + + # OSS-Fuzz SDK errors + OSSFUZZ_SDK_ERROR = "OSSFUZZ_SDK_ERROR" + OSSFUZZ_SDK_CONFIG_ERROR = "OSSFUZZ_SDK_CONFIG_ERROR" + # Analysis errors ANALYSIS_ERROR = "ANALYSIS_ERROR" CHANGE_TRACKING_ERROR = "CHANGE_TRACKING_ERROR" diff --git a/ossfuzz_py/errors/factory.py b/ossfuzz_py/errors/factory.py index 267aa719e..e6c265309 100644 --- a/ossfuzz_py/errors/factory.py +++ b/ossfuzz_py/errors/factory.py @@ -329,5 +329,24 @@ def list_error_classes() -> Dict[str, Type[OSSFuzzError]]: OSSFuzzManagerError = make_error("OSSFuzzManagerError", ErrorCode.INVALID_CONFIG, ErrorDomain.CONFIG) +# Historical Data SDK errors +HistoryManagerError = make_error("HistoryManagerError", + ErrorCode.HISTORY_MANAGER_ERROR, + ErrorDomain.DATA) +HistoryStorageError = make_error("HistoryStorageError", + ErrorCode.HISTORY_STORAGE_ERROR, + ErrorDomain.STORAGE) +HistoryRetrievalError = make_error("HistoryRetrievalError", + ErrorCode.HISTORY_RETRIEVAL_ERROR, + ErrorDomain.DATA) +HistoryValidationError = make_error("HistoryValidationError", + ErrorCode.HISTORY_VALIDATION_ERROR, + ErrorDomain.VALIDATION) +OSSFuzzSDKError = make_error("OSSFuzzSDKError", ErrorCode.OSSFUZZ_SDK_ERROR, + ErrorDomain.CONFIG) +OSSFuzzSDKConfigError = make_error("OSSFuzzSDKConfigError", + ErrorCode.OSSFUZZ_SDK_CONFIG_ERROR, + ErrorDomain.CONFIG) + # General/legacy errors for backward compatibility SDKError = make_error("SDKError", ErrorCode.UNKNOWN, ErrorDomain.CONFIG) From ac61847edb968ee5f35079873434a6f239f4f0d0 Mon Sep 17 00:00:00 2001 From: Zewei Wang Date: Tue, 8 Jul 2025 18:15:00 -0400 Subject: [PATCH 2/8] feat: Enhance storage infrastructure for historical data management - Extend storage adapters with history-specific functionality - Add support for time-series data storage and retrieval - Implement environment variable utilities for configuration - Improve error handling and logging in storage operations --- ossfuzz_py/data/storage_adapter.py | 422 +++++++++++++++++++++++++++++ ossfuzz_py/data/storage_manager.py | 56 ++++ ossfuzz_py/utils/env_vars.py | 5 + 3 files changed, 483 insertions(+) diff --git a/ossfuzz_py/data/storage_adapter.py b/ossfuzz_py/data/storage_adapter.py index def3c291a..73ce30197 100644 --- a/ossfuzz_py/data/storage_adapter.py +++ b/ossfuzz_py/data/storage_adapter.py @@ -133,6 +133,142 @@ def fetch_crash_data( StorageAdapterError: If not connected or connection lost. """ + @abstractmethod + def store_file(self, key: str, file_path: str) -> str: + """ + Store a file with the given key. + + Args: + key: Storage key/path for the file + file_path: Local path to the file to store + + Returns: + str: Storage path or identifier where file was stored + + Raises: + StorageAdapterError: If storage fails + """ + + @abstractmethod + def retrieve_file(self, key: str, dest_path: str) -> str: + """ + Retrieve a file to the specified destination. + + Args: + key: Storage key/path for the file + dest_path: Local path where file should be saved + + Returns: + str: Local path where file was saved + + Raises: + StorageAdapterError: If retrieval fails + """ + + @abstractmethod + def store_object(self, key: str, data: Any) -> str: + """ + Store an object with the given key. + + Args: + key: Storage key/path for the object + data: Object data to store + + Returns: + str: Storage path or identifier where object was stored + + Raises: + StorageAdapterError: If storage fails + """ + + @abstractmethod + def retrieve_object(self, key: str) -> Any: + """ + Retrieve an object with the given key. + + Args: + key: Storage key/path for the object + + Returns: + Any: Retrieved object data + + Raises: + StorageAdapterError: If retrieval fails + """ + + @abstractmethod + def list_keys(self, prefix: str = "") -> List[str]: + """ + List all keys with the given prefix. + + Args: + prefix: Key prefix to filter by + + Returns: + List[str]: List of matching keys + + Raises: + StorageAdapterError: If listing fails + """ + + @abstractmethod + def delete(self, key: str) -> bool: + """ + Delete data with the given key. + + Args: + key: Storage key/path for the data to delete + + Returns: + bool: True if deletion was successful, False otherwise + + Raises: + StorageAdapterError: If deletion fails + """ + + @abstractmethod + def get_history(self, + category: str, + name: str, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + limit: Optional[int] = None) -> List[Any]: + """ + Retrieve historical data for a specific category and name. + + Args: + category: History category + (e.g., 'build', 'crash', 'corpus', 'coverage') + name: Specific name/identifier within the category + start_date: Optional start date filter (ISO format) + end_date: Optional end date filter (ISO format) + limit: Optional limit on number of results + + Returns: + List of historical data entries + + Raises: + StorageAdapterError: If retrieval fails + """ + + @abstractmethod + def append_history(self, category: str, name: str, data: Any) -> str: + """ + Append new data to historical records. + + Args: + category: History category + (e.g., 'build', 'crash', 'corpus', 'coverage') + name: Specific name/identifier within the category + data: Data to append to history + + Returns: + str: Storage path or identifier where data was stored + + Raises: + StorageAdapterError: If storage fails + """ + # - fetch_project_list() # - fetch_build_information(...) # - fetch_report_details(...) @@ -358,6 +494,150 @@ def fetch_crash_data( exc_info=True) raise QueryError(f"Failed to fetch crash data for {project_name}: {e}") + def store_file(self, key: str, file_path: str) -> str: + """Store a file with the given key.""" + try: + dest_path = self.base_directory / key + dest_path.parent.mkdir(parents=True, exist_ok=True) + + import shutil + shutil.copy2(file_path, dest_path) + return str(dest_path) + except Exception as e: + raise StorageAdapterError(f"Failed to store file {key}: {e}") + + def retrieve_file(self, key: str, dest_path: str) -> str: + """Retrieve a file to the specified destination.""" + try: + src_path = self.base_directory / key + if not src_path.exists(): + raise StorageAdapterError(f"File not found: {key}") + + import shutil + shutil.copy2(src_path, dest_path) + return dest_path + except Exception as e: + raise StorageAdapterError(f"Failed to retrieve file {key}: {e}") + + def store_object(self, key: str, data: Any) -> str: + """Store an object with the given key.""" + try: + dest_path = self.base_directory / key + dest_path.parent.mkdir(parents=True, exist_ok=True) + + with open(dest_path, 'w') as f: + json.dump(data, f, indent=2, default=str) + return str(dest_path) + except Exception as e: + raise StorageAdapterError(f"Failed to store object {key}: {e}") + + def retrieve_object(self, key: str) -> Any: + """Retrieve an object with the given key.""" + try: + src_path = self.base_directory / key + if not src_path.exists(): + raise StorageAdapterError(f"Object not found: {key}") + + with open(src_path, 'r') as f: + return json.load(f) + except Exception as e: + raise StorageAdapterError(f"Failed to retrieve object {key}: {e}") + + def list_keys(self, prefix: str = "") -> List[str]: + """List all keys with the given prefix.""" + try: + keys = [] + search_path = self.base_directory / prefix \ + if prefix else self.base_directory + + if search_path.is_file(): + return [str(search_path.relative_to(self.base_directory))] + + if search_path.is_dir(): + for path in search_path.rglob('*'): + if path.is_file(): + keys.append(str(path.relative_to(self.base_directory))) + + return keys + except Exception as e: + raise StorageAdapterError( + f"Failed to list keys with prefix {prefix}: {e}") + + def delete(self, key: str) -> bool: + """Delete data with the given key.""" + try: + path = self.base_directory / key + if path.exists(): + if path.is_file(): + path.unlink() + elif path.is_dir(): + import shutil + shutil.rmtree(path) + return True + return False + except Exception as e: + raise StorageAdapterError(f"Failed to delete {key}: {e}") + + def get_history(self, + category: str, + name: str, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + limit: Optional[int] = None) -> List[Any]: + """Retrieve historical data for a specific category and name.""" + try: + history_path = self.base_directory / "history" / category / f"{name}.json" + if not history_path.exists(): + return [] + + with open(history_path, 'r') as f: + data = json.load(f) + + # Filter by date if specified + if start_date or end_date: + filtered_data = [] + for entry in data: + entry_date = entry.get('timestamp', entry.get('date', '')) + if start_date and entry_date < start_date: + continue + if end_date and entry_date > end_date: + continue + filtered_data.append(entry) + data = filtered_data + + # Apply limit if specified + if limit: + data = data[-limit:] # Get most recent entries + + return data + except Exception as e: + raise StorageAdapterError( + f"Failed to get history for {category}/{name}: {e}") + + def append_history(self, category: str, name: str, data: Any) -> str: + """Append new data to historical records.""" + try: + history_path = self.base_directory / "history" / category / f"{name}.json" + history_path.parent.mkdir(parents=True, exist_ok=True) + + # Load existing data + existing_data = [] + if history_path.exists(): + with open(history_path, 'r') as f: + existing_data = json.load(f) + + # Append new data + existing_data.append(data) + + # Save back to file + with open(history_path, 'w') as f: + json.dump(existing_data, f, indent=2, default=str) + + return str(history_path) + except Exception as e: + raise StorageAdapterError( + f"Failed to append history for {category}/{name}: {e}") + class GCSStorageAdapter(StorageAdapter): """ @@ -703,3 +983,145 @@ def fetch_crash_data(self, exc_info=True) raise QueryError(f"GCSStorageAdapter: Failed to fetch crash data for " f"{project_name}: {e}") + + def store_file(self, key: str, file_path: str) -> str: + """Store a file with the given key.""" + if self._bucket: + try: + blob = self._bucket.blob(key) + blob.upload_from_filename(file_path) + return f"gs://{self.bucket_name}/{key}" + except Exception as e: + raise StorageAdapterError(f"Failed to store file {key}: {e}") + return '' + + def retrieve_file(self, key: str, dest_path: str) -> str: + """Retrieve a file to the specified destination.""" + if self._bucket: + try: + blob = self._bucket.blob(key) + if not blob.exists(): + raise StorageAdapterError(f"File not found: {key}") + + blob.download_to_filename(dest_path) + return dest_path + except Exception as e: + raise StorageAdapterError(f"Failed to retrieve file {key}: {e}") + return '' + + def store_object(self, key: str, data: Any) -> str: + """Store an object with the given key.""" + if self._bucket: + try: + blob = self._bucket.blob(key) + blob.upload_from_string(json.dumps(data, indent=2, default=str), + content_type='application/json') + return f"gs://{self.bucket_name}/{key}" + except Exception as e: + raise StorageAdapterError(f"Failed to store object {key}: {e}") + return '' + + def retrieve_object(self, key: str) -> Any: + """Retrieve an object with the given key.""" + if self._bucket: + try: + blob = self._bucket.blob(key) + if not blob.exists(): + raise StorageAdapterError(f"Object not found: {key}") + + content = blob.download_as_text() + return json.loads(content) + except Exception as e: + raise StorageAdapterError(f"Failed to retrieve object {key}: {e}") + return None + + def list_keys(self, prefix: str = "") -> List[str]: + """List all keys with the given prefix.""" + if self._bucket: + try: + blobs = self._bucket.list_blobs(prefix=prefix) + return [blob.name for blob in blobs] + except Exception as e: + raise StorageAdapterError( + f"Failed to list keys with prefix {prefix}: {e}") + return [] + + def delete(self, key: str) -> bool: + """Delete data with the given key.""" + if self._bucket: + try: + blob = self._bucket.blob(key) + if blob.exists(): + blob.delete() + return True + return False + except Exception as e: + raise StorageAdapterError(f"Failed to delete {key}: {e}") + return False + + def get_history(self, + category: str, + name: str, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + limit: Optional[int] = None) -> list[Any]: + """Retrieve historical data for a specific category and name.""" + if self._bucket: + try: + key = f"history/{category}/{name}.json" + blob = self._bucket.blob(key) + + if not blob.exists(): + return [] + + content = blob.download_as_text() + data = json.loads(content) + + # Filter by date if specified + if start_date or end_date: + filtered_data = [] + for entry in data: + entry_date = entry.get('timestamp', entry.get('date', '')) + if start_date and entry_date < start_date: + continue + if end_date and entry_date > end_date: + continue + filtered_data.append(entry) + data = filtered_data + + # Apply limit if specified + if limit: + data = data[-limit:] # Get most recent entries + + return data + except Exception as e: + raise StorageAdapterError( + f"Failed to get history for {category}/{name}: {e}") + return [] + + def append_history(self, category: str, name: str, data: Any) -> str: + """Append new data to historical records.""" + if self._bucket: + try: + key = f"history/{category}/{name}.json" + blob = self._bucket.blob(key) + + # Load existing data + existing_data = [] + if blob.exists(): + content = blob.download_as_text() + existing_data = json.loads(content) + + # Append new data + existing_data.append(data) + + # Save back to GCS + blob.upload_from_string(json.dumps(existing_data, indent=2, + default=str), + content_type='application/json') + + return f"gs://{self.bucket_name}/{key}" + except Exception as e: + raise StorageAdapterError( + f"Failed to append history for {category}/{name}: {e}") + return '' diff --git a/ossfuzz_py/data/storage_manager.py b/ossfuzz_py/data/storage_manager.py index d3ae34aea..566a96d7b 100644 --- a/ossfuzz_py/data/storage_manager.py +++ b/ossfuzz_py/data/storage_manager.py @@ -180,6 +180,62 @@ def _store_file_data(self, key: str, data: Any) -> str: pickle.dump(data, f) return str(file_path.with_suffix('.pkl')) + def store_history(self, category: str, name: str, data: Any) -> str: + """ + Store historical data for a specific category and name. + + Args: + category: History category + (e.g., 'build', 'crash', 'corpus', 'coverage') + name: Specific name/identifier within the category + data: Data to store in history + + Returns: + str: Storage path or identifier where data was stored + + Raises: + StorageManagerError: If storage operation fails + """ + try: + self.logger.debug("Storing history data for %s/%s", category, name) + return self.adapter.append_history(category, name, data) + except Exception as e: + error_msg = f"Failed to store history for {category}/{name}: {str(e)}" + self.logger.error(error_msg) + raise StorageManagerError(error_msg) + + def get_history(self, + category: str, + name: str, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + limit: Optional[int] = None) -> List[Any]: + """ + Retrieve historical data for a specific category and name. + + Args: + category: History category + (e.g., 'build', 'crash', 'corpus', 'coverage') + name: Specific name/identifier within the category + start_date: Optional start date filter (ISO format) + end_date: Optional end date filter (ISO format) + limit: Optional limit on number of results + + Returns: + List of historical data entries + + Raises: + StorageManagerError: If retrieval fails + """ + try: + self.logger.debug("Retrieving history data for %s/%s", category, name) + return self.adapter.get_history(category, name, start_date, end_date, + limit) + except Exception as e: + error_msg = f"Failed to get history for {category}/{name}: {str(e)}" + self.logger.error(error_msg) + raise StorageManagerError(error_msg) + def retrieve(self, key: str) -> Any: # pylint: disable=inconsistent-return-statements """ Retrieve data with the given key. diff --git a/ossfuzz_py/utils/env_vars.py b/ossfuzz_py/utils/env_vars.py index 3d0794443..a534576fc 100644 --- a/ossfuzz_py/utils/env_vars.py +++ b/ossfuzz_py/utils/env_vars.py @@ -59,3 +59,8 @@ class EnvVars(str, Enum): OSSFUZZ_CLIENT_SECRET = "OSSFUZZ_CLIENT_SECRET" OSSFUZZ_TOKEN_URL = "OSSFUZZ_TOKEN_URL" OSSFUZZ_API_KEY = "OSSFUZZ_API_KEY" + + # Historical Data SDK specific variables + GCS_BUCKET_NAME = "GCS_BUCKET_NAME" + OSSFUZZ_HISTORY_STORAGE_BACKEND = "OSSFUZZ_HISTORY_STORAGE_BACKEND" + OSSFUZZ_HISTORY_STORAGE_PATH = "OSSFUZZ_HISTORY_STORAGE_PATH" From 46f8aa0934831ce2bab4223a18bb626a70bb7c63 Mon Sep 17 00:00:00 2001 From: Zewei Wang Date: Thu, 10 Jul 2025 17:45:00 -0400 Subject: [PATCH 3/8] feat: Implement base history manager and specialized history managers - Add abstract HistoryManager base class with common functionality - Implement BuildHistoryManager for build statistics and trends - Add CoverageHistoryManager for coverage data analysis - Include data validation and storage abstraction - Add comprehensive logging and error handling --- ossfuzz_py/history/__init__.py | 36 ++ ossfuzz_py/history/build_history_manager.py | 281 ++++++++++++ .../history/coverage_history_manager.py | 427 ++++++++++++++++++ ossfuzz_py/history/history_manager.py | 180 ++++++++ 4 files changed, 924 insertions(+) create mode 100644 ossfuzz_py/history/__init__.py create mode 100644 ossfuzz_py/history/build_history_manager.py create mode 100644 ossfuzz_py/history/coverage_history_manager.py create mode 100644 ossfuzz_py/history/history_manager.py diff --git a/ossfuzz_py/history/__init__.py b/ossfuzz_py/history/__init__.py new file mode 100644 index 000000000..fa82c6051 --- /dev/null +++ b/ossfuzz_py/history/__init__.py @@ -0,0 +1,36 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +History management package for the OSS-Fuzz Python SDK. + +This package provides managers for different types of historical data: +- BuildHistoryManager: Build history and statistics +- CrashHistoryManager: Crash data and analysis +- CorpusHistoryManager: Corpus growth and statistics +- CoverageHistoryManager: Coverage trends and analysis +""" + +from .build_history_manager import BuildHistoryManager +from .corpus_history_manager import CorpusHistoryManager +from .coverage_history_manager import CoverageHistoryManager +from .crash_history_manager import CrashHistoryManager +from .history_manager import HistoryManager + +__all__ = [ + 'HistoryManager', + 'BuildHistoryManager', + 'CrashHistoryManager', + 'CorpusHistoryManager', + 'CoverageHistoryManager', +] diff --git a/ossfuzz_py/history/build_history_manager.py b/ossfuzz_py/history/build_history_manager.py new file mode 100644 index 000000000..5b03efb8a --- /dev/null +++ b/ossfuzz_py/history/build_history_manager.py @@ -0,0 +1,281 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Build history manager for the OSS-Fuzz Python SDK. + +This module manages historical build data including build results, +success rates, and build artifact tracking. +""" + +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional + +from ossfuzz_py.core.data_models import BuildHistoryData +from ossfuzz_py.errors import HistoryManagerError, HistoryValidationError + +from .history_manager import HistoryManager + + +class BuildHistoryManager(HistoryManager): + """ + Manages historical build data for OSS-Fuzz projects. + + This manager handles storage and retrieval of build history, including + build results, timing information, and artifact tracking. + """ + + @property + def category(self) -> str: + """Get the history category for build data.""" + return "build" + + def validate_data(self, data: Any) -> bool: # pylint: disable=inconsistent-return-statements + """ + Validate build data before storage. + + Args: + data: Build data to validate + + Returns: + bool: True if data is valid + + Raises: + HistoryValidationError: If validation fails + """ + try: + if isinstance(data, dict): + # Validate required fields + required_fields = ['build_id', 'timestamp', 'project_name', 'success'] + for field in required_fields: + if field not in data: + raise HistoryValidationError(f"Missing required field: {field}") + + # Validate data types + if not isinstance(data['success'], bool): + raise HistoryValidationError("'success' field must be boolean") + + return True + if isinstance(data, BuildHistoryData): + # Pydantic model validation is automatic + return True + raise HistoryValidationError(f"Invalid data type: {type(data)}") + except Exception as e: + raise HistoryValidationError( + f"Build data validation failed: {str(e)}") from e + + def get_build_history(self, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + limit: Optional[int] = None) -> List[Dict[str, Any]]: + """ + Get build history for the project. + + Args: + start_date: Optional start date filter (ISO format) + end_date: Optional end date filter (ISO format) + limit: Optional limit on number of results + + Returns: + List of build history entries + + Raises: + HistoryManagerError: If retrieval fails + """ + try: + return self.get_data(self.project_name, start_date, end_date, limit) + except Exception as e: + raise HistoryManagerError(f"Failed to get build history: {str(e)}") + + def get_last_successful_build(self) -> Optional[Dict[str, Any]]: + """ + Get the last successful build for the project. + + Returns: + Last successful build data or None if no successful builds + + Raises: + HistoryManagerError: If retrieval fails + """ + try: + # Get recent builds and find the last successful one + builds = self.get_build_history(limit=50) # Check last 50 builds + + for build in reversed(builds): # Start from the most recent + if build.get('success', False): + return build + + return None + except Exception as e: + raise HistoryManagerError( + f"Failed to get last successful build: {str(e)}") + + def store_build_result(self, build_data: Dict[str, Any]) -> str: + """ + Store a build result. + + Args: + build_data: Build result data to store + + Returns: + str: Storage path where data was stored + + Raises: + HistoryManagerError: If storage fails + """ + try: + # Add a timestamp if not present + if 'timestamp' not in build_data: + build_data['timestamp'] = datetime.now().isoformat() + + # Add a project name if not present + if 'project_name' not in build_data: + build_data['project_name'] = self.project_name + + # Validate data + self.validate_data(build_data) + + return self.store_data(self.project_name, build_data) + except Exception as e: + raise HistoryManagerError(f"Failed to store build result: {str(e)}") + + def get_build_statistics(self, + start_date: Optional[str] = None, + end_date: Optional[str] = None) -> Dict[str, Any]: + """ + Get build statistics for the specified period. + + Args: + start_date: Optional start date filter (ISO format) + end_date: Optional end date filter (ISO format) + + Returns: + Dictionary containing build statistics + + Raises: + HistoryManagerError: If calculation fails + """ + try: + builds = self.get_build_history(start_date, end_date) + + if not builds: + return { + 'total_builds': 0, + 'successful_builds': 0, + 'failed_builds': 0, + 'success_rate': 0.0, + 'average_duration': 0.0 + } + + total_builds = len(builds) + successful_builds = sum( + 1 for build in builds if build.get('success', False)) + failed_builds = total_builds - successful_builds + success_rate = (successful_builds / + total_builds) * 100 if total_builds > 0 else 0.0 + + # Calculate average duration for builds with duration data + durations = [ + build.get('duration_seconds', 0) + for build in builds + if build.get('duration_seconds') is not None + ] + average_duration = sum(durations) / len(durations) if durations else 0.0 + + return { + 'total_builds': total_builds, + 'successful_builds': successful_builds, + 'failed_builds': failed_builds, + 'success_rate': success_rate, + 'average_duration': average_duration, + 'period_start': start_date, + 'period_end': end_date + } + except Exception as e: + raise HistoryManagerError( + f"Failed to calculate build statistics: {str(e)}") + + def get_build_trends(self, days: int = 30) -> Dict[str, Any]: + """ + Get build trends for the specified number of days. + + Args: + days: Number of days to analyze + + Returns: + Dictionary containing trend analysis + + Raises: + HistoryManagerError: If analysis fails + """ + try: + end_date = datetime.now() + start_date = end_date - timedelta(days=days) + + builds = self.get_build_history(start_date=start_date.isoformat(), + end_date=end_date.isoformat()) + + if not builds: + return {'trend': 'no_data', 'builds_per_day': 0.0} + + # Group builds by day + daily_builds = {} + for build in builds: + build_date = build.get('timestamp', '')[:10] # Get YYYY-MM-DD + if build_date not in daily_builds: + daily_builds[build_date] = {'total': 0, 'successful': 0} + daily_builds[build_date]['total'] += 1 + if build.get('success', False): + daily_builds[build_date]['successful'] += 1 + + # Calculate trends + total_days = len(daily_builds) + builds_per_day = len(builds) / days if days > 0 else 0.0 + + # Calculate success rate trend + daily_success_rates = [] + for day_data in daily_builds.values(): + rate = (day_data['successful'] / + day_data['total']) * 100 if day_data['total'] > 0 else 0.0 + daily_success_rates.append(rate) + + # Simple trend analysis + if len(daily_success_rates) >= 2: + recent_rate = sum(daily_success_rates[-7:]) / min( + 7, len(daily_success_rates)) + older_rate = sum(daily_success_rates[:-7]) / max( + 1, + len(daily_success_rates) - 7) + + if recent_rate > older_rate + 5: + trend = 'improving' + elif recent_rate < older_rate - 5: + trend = 'declining' + else: + trend = 'stable' + else: + trend = 'insufficient_data' + + return { + 'trend': + trend, + 'builds_per_day': + builds_per_day, + 'total_days_with_builds': + total_days, + 'average_success_rate': + sum(daily_success_rates) / + len(daily_success_rates) if daily_success_rates else 0.0 + } + except Exception as e: + raise HistoryManagerError(f"Failed to analyze build trends: {str(e)}") diff --git a/ossfuzz_py/history/coverage_history_manager.py b/ossfuzz_py/history/coverage_history_manager.py new file mode 100644 index 000000000..8ec53c8a6 --- /dev/null +++ b/ossfuzz_py/history/coverage_history_manager.py @@ -0,0 +1,427 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Coverage history manager for the OSS-Fuzz Python SDK. + +This module manages historical coverage data including coverage trends, +analysis, and reporting. +""" + +from datetime import datetime +from typing import Any, Dict, List, Optional + +from ossfuzz_py.core.data_models import CoverageHistoryData +from ossfuzz_py.errors import HistoryManagerError, HistoryValidationError + +from .history_manager import HistoryManager + + +class CoverageHistoryManager(HistoryManager): + """ + Manages historical coverage data for OSS-Fuzz projects. + + This manager handles storage and retrieval of coverage data including + line coverage, function coverage, and branch coverage trends. + """ + + @property + def category(self) -> str: + """Get the history category for coverage data.""" + return "coverage" + + def validate_data(self, data: Any) -> bool: # pylint: disable=inconsistent-return-statements + """ + Validate coverage data before storage. + + Args: + data: Coverage data to validate + + Returns: + bool: True if data is valid + + Raises: + HistoryValidationError: If validation fails + """ + try: + if isinstance(data, dict): + # Validate required fields + required_fields = ['timestamp', 'project_name', 'line_coverage'] + for field in required_fields: + if field not in data: + raise HistoryValidationError(f"Missing required field: {field}") + + # Validate coverage percentages + coverage_fields = [ + 'line_coverage', 'function_coverage', 'branch_coverage' + ] + for field in coverage_fields: + if field in data: + value = data[field] + if not isinstance(value, (int, float)) or value < 0 or value > 100: + raise HistoryValidationError( + f"'{field}' must be between 0 and 100") + + return True + if isinstance(data, CoverageHistoryData): + # Pydantic model validation is automatic + return True + raise HistoryValidationError(f"Invalid data type: {type(data)}") + except Exception as e: + raise HistoryValidationError( + f"Coverage data validation failed: {str(e)}") from e + + def get_coverage_history(self, + fuzzer_name: Optional[str] = None, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + limit: Optional[int] = None) -> List[Dict[str, Any]]: + """ + Get coverage history for the project. + + Args: + fuzzer_name: Optional fuzzer name filter + start_date: Optional start date filter (ISO format) + end_date: Optional end date filter (ISO format) + limit: Optional limit on number of results + + Returns: + List of coverage history entries + + Raises: + HistoryManagerError: If retrieval fails + """ + try: + data_name = fuzzer_name if fuzzer_name else self.project_name + history = self.get_data(data_name, start_date, end_date, limit) + + # Filter by fuzzer if specified and data contains multiple fuzzers + if fuzzer_name: + history = [h for h in history if h.get('fuzzer_name') == fuzzer_name] + + return history + except Exception as e: + raise HistoryManagerError(f"Failed to get coverage history: {str(e)}") + + def get_latest_coverage(self, + fuzzer_name: Optional[str] = None + ) -> Optional[Dict[str, Any]]: + """ + Get the latest coverage data for the project. + + Args: + fuzzer_name: Optional fuzzer name filter + + Returns: + Latest coverage data or None if no data exists + + Raises: + HistoryManagerError: If retrieval fails + """ + try: + history = self.get_coverage_history(fuzzer_name=fuzzer_name, limit=1) + return history[0] if history else None + except Exception as e: + raise HistoryManagerError(f"Failed to get latest coverage: {str(e)}") + + def get_coverage_report(self, + start_date: Optional[str] = None, + end_date: Optional[str] = None) -> Dict[str, Any]: + """ + Generate a comprehensive coverage report for the specified period. + + Args: + start_date: Optional start date filter (ISO format) + end_date: Optional end date filter (ISO format) + + Returns: + Dictionary containing coverage report + + Raises: + HistoryManagerError: If report generation fails + """ + try: + history = self.get_coverage_history(start_date=start_date, + end_date=end_date) + + if not history: + return { + 'summary': { + 'total_measurements': 0, + 'max_line_coverage': 0.0, + 'avg_line_coverage': 0.0, + 'coverage_trend': 'no_data' + }, + 'details': [], + 'recommendations': ['No coverage data available'] + } + + # Sort by timestamp + history.sort(key=lambda x: x.get('timestamp', '')) + + # Calculate summary statistics + line_coverages = [h.get('line_coverage', 0.0) for h in history] + function_coverages = [ + h.get('function_coverage', 0.0) + for h in history + if h.get('function_coverage') is not None + ] + branch_coverages = [ + h.get('branch_coverage', 0.0) + for h in history + if h.get('branch_coverage') is not None + ] + + max_line_coverage = max(line_coverages) if line_coverages else 0.0 + avg_line_coverage = sum(line_coverages) / len( + line_coverages) if line_coverages else 0.0 + + # Analyze trend + if len(line_coverages) >= 2: + recent_avg = sum(line_coverages[-5:]) / min(5, len(line_coverages)) + older_avg = sum(line_coverages[:-5]) / max(1, len(line_coverages) - 5) + + if recent_avg > older_avg + 1: + trend = 'improving' + elif recent_avg < older_avg - 1: + trend = 'declining' + else: + trend = 'stable' + else: + trend = 'insufficient_data' + + # Generate recommendations + recommendations = [] + if max_line_coverage < 50: + recommendations.append( + "Line coverage is below 50%. Consider adding more test cases.") + if function_coverages and max(function_coverages) < 70: + recommendations.append( + "Function coverage could be improved. Focus on uncovered functions." + ) + if trend == 'declining': + recommendations.append( + "Coverage trend is declining. Review recent changes.") + if not recommendations: + recommendations.append( + "Coverage metrics look good. Continue current testing approach.") + + return { + 'summary': { + 'total_measurements': + len(history), + 'max_line_coverage': + max_line_coverage, + 'avg_line_coverage': + avg_line_coverage, + 'max_function_coverage': + max(function_coverages) if function_coverages else None, + 'avg_function_coverage': + sum(function_coverages) / + len(function_coverages) if function_coverages else None, + 'max_branch_coverage': + max(branch_coverages) if branch_coverages else None, + 'avg_branch_coverage': + sum(branch_coverages) / + len(branch_coverages) if branch_coverages else None, + 'coverage_trend': + trend, + 'period_start': + start_date, + 'period_end': + end_date + }, + 'details': history, + 'recommendations': recommendations + } + except Exception as e: + raise HistoryManagerError(f"Failed to generate coverage report: {str(e)}") + + def store_coverage(self, coverage_data: Dict[str, Any]) -> str: + """ + Store coverage data. + + Args: + coverage_data: Coverage data to store + + Returns: + str: Storage path where data was stored + + Raises: + HistoryManagerError: If storage fails + """ + try: + # Add timestamp if not present + if 'timestamp' not in coverage_data: + coverage_data['timestamp'] = datetime.now().isoformat() + + # Add project name if not present + if 'project_name' not in coverage_data: + coverage_data['project_name'] = self.project_name + + # Validate data + self.validate_data(coverage_data) + + # Use fuzzer name as the data identifier if available + data_name = coverage_data.get('fuzzer_name', self.project_name) + + return self.store_data(data_name, coverage_data) + except Exception as e: + raise HistoryManagerError(f"Failed to store coverage data: {str(e)}") + + def analyze_coverage_trends(self, days: int = 30) -> Dict[str, Any]: + """ + Analyze coverage trends for the specified number of days. + + Args: + days: Number of days to analyze + + Returns: + Dictionary containing trend analysis + + Raises: + HistoryManagerError: If analysis fails + """ + try: + from datetime import timedelta + + end_date = datetime.now() + start_date = end_date - timedelta(days=days) + + history = self.get_coverage_history(start_date=start_date.isoformat(), + end_date=end_date.isoformat()) + + if not history: + return { + 'trend': 'no_data', + 'coverage_velocity': 0.0, + 'stability': 'unknown' + } + + # Sort by timestamp + history.sort(key=lambda x: x.get('timestamp', '')) + + line_coverages = [h.get('line_coverage', 0.0) for h in history] + + # Calculate coverage velocity (change per day) + if len(line_coverages) >= 2: + coverage_change = line_coverages[-1] - line_coverages[0] + coverage_velocity = coverage_change / days + else: + coverage_velocity = 0.0 + + # Calculate stability (variance in coverage) + if len(line_coverages) > 1: + mean_coverage = sum(line_coverages) / len(line_coverages) + variance = sum((x - mean_coverage)**2 + for x in line_coverages) / len(line_coverages) + std_dev = variance**0.5 + + if std_dev < 1.0: + stability = 'stable' + elif std_dev < 3.0: + stability = 'moderate' + else: + stability = 'unstable' + else: + stability = 'unknown' + + # Determine overall trend + if coverage_velocity > 0.1: + trend = 'improving' + elif coverage_velocity < -0.1: + trend = 'declining' + else: + trend = 'stable' + + return { + 'trend': trend, + 'coverage_velocity': coverage_velocity, + 'stability': stability, + 'current_coverage': line_coverages[-1] if line_coverages else 0.0, + 'max_coverage': max(line_coverages) if line_coverages else 0.0, + 'min_coverage': min(line_coverages) if line_coverages else 0.0, + 'analysis_period_days': days + } + except Exception as e: + raise HistoryManagerError(f"Failed to analyze coverage trends: {str(e)}") + + def compare_coverage(self, + baseline_date: str, + comparison_date: Optional[str] = None) -> Dict[str, Any]: + """ + Compare coverage between two time points. + + Args: + baseline_date: Baseline date for comparison (ISO format) + comparison_date: Comparison date (ISO format), defaults to latest + + Returns: + Dictionary containing comparison results + + Raises: + HistoryManagerError: If comparison fails + """ + try: + # Get baseline coverage + baseline_history = self.get_coverage_history(start_date=baseline_date, + end_date=baseline_date, + limit=1) + + if not baseline_history: + raise HistoryManagerError( + f"No coverage data found for baseline date: {baseline_date}") + + baseline_coverage = baseline_history[0] + + # Get comparison coverage + if comparison_date: + comparison_history = self.get_coverage_history( + start_date=comparison_date, end_date=comparison_date, limit=1) + else: + comparison_history = self.get_coverage_history(limit=1) + + if not comparison_history: + raise HistoryManagerError("No coverage data found for comparison") + + comparison_coverage = comparison_history[0] + + # Calculate differences + line_diff = comparison_coverage.get( + 'line_coverage', 0.0) - baseline_coverage.get('line_coverage', 0.0) + function_diff = None + branch_diff = None + + if (comparison_coverage.get('function_coverage') is not None and + baseline_coverage.get('function_coverage') is not None): + function_diff = comparison_coverage[ + 'function_coverage'] - baseline_coverage['function_coverage'] + + if (comparison_coverage.get('branch_coverage') is not None and + baseline_coverage.get('branch_coverage') is not None): + branch_diff = comparison_coverage[ + 'branch_coverage'] - baseline_coverage['branch_coverage'] + + return { + 'baseline': baseline_coverage, + 'comparison': comparison_coverage, + 'differences': { + 'line_coverage': line_diff, + 'function_coverage': function_diff, + 'branch_coverage': branch_diff + }, + 'improvement': line_diff > 0, + 'significant_change': abs(line_diff) > 1.0 + } + except Exception as e: + raise HistoryManagerError(f"Failed to compare coverage: {str(e)}") diff --git a/ossfuzz_py/history/history_manager.py b/ossfuzz_py/history/history_manager.py new file mode 100644 index 000000000..24a12c743 --- /dev/null +++ b/ossfuzz_py/history/history_manager.py @@ -0,0 +1,180 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Abstract base class for history managers. + +This module defines the common interface and functionality for all +history managers in the OSS-Fuzz SDK. +""" + +import logging +from abc import ABC, abstractmethod +from typing import Any, List, Optional + +from ossfuzz_py.data.storage_manager import StorageManager +from ossfuzz_py.errors import HistoryManagerError + + +class HistoryManager(ABC): + """ + Abstract base class for managing historical data. + + This class provides the common interface and functionality for all + history managers. Concrete implementations handle specific types of + historical data (builds, crashes, corpus, coverage). + + Attributes: + storage_manager: Storage manager for data persistence + project_name: Name of the OSS-Fuzz project + logger: Logger instance for this manager + """ + + def __init__(self, storage_manager: StorageManager, project_name: str): + """ + Initialize the history manager. + + Args: + storage_manager: Storage manager for data persistence + project_name: Name of the OSS-Fuzz project + + Raises: + HistoryManagerError: If initialization fails + """ + if not storage_manager: + raise HistoryManagerError("StorageManager is required") + if not project_name: + raise HistoryManagerError("Project name is required") + + self.storage_manager = storage_manager + self.project_name = project_name + self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") + + self.logger.info("Initialized %s for project: %s", self.__class__.__name__, + project_name) + + @property + @abstractmethod + def category(self) -> str: + """ + Get the history category for this manager. + + Returns: + str: Category name (e.g., 'build', 'crash', 'corpus', 'coverage') + """ + + def store_data(self, name: str, data: Any) -> str: + """ + Store historical data. + + Args: + name: Identifier for the data + data: Data to store + + Returns: + str: Storage path where data was stored + + Raises: + HistoryManagerError: If storage fails + """ + try: + self.logger.debug("Storing %s data for %s", self.category, name) + return self.storage_manager.store_history(self.category, name, data) + except Exception as e: + error_msg = f"Failed to store {self.category} data for {name}: {str(e)}" + self.logger.error(error_msg) + raise HistoryManagerError(error_msg) + + def get_data(self, + name: str, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + limit: Optional[int] = None) -> List[Any]: + """ + Retrieve historical data. + + Args: + name: Identifier for the data + start_date: Optional start date filter (ISO format) + end_date: Optional end date filter (ISO format) + limit: Optional limit on number of results + + Returns: + List of historical data entries + + Raises: + HistoryManagerError: If retrieval fails + """ + try: + self.logger.debug("Retrieving %s data for %s", self.category, name) + return self.storage_manager.get_history(self.category, name, start_date, + end_date, limit) + except Exception as e: + error_msg = f"Failed to get {self.category} data for {name}: {str(e)}" + self.logger.error(error_msg) + raise HistoryManagerError(error_msg) + + def get_latest(self, name: str) -> Optional[Any]: + """ + Get the latest entry for the specified name. + + Args: + name: Identifier for the data + + Returns: + Latest data entry or None if no data exists + + Raises: + HistoryManagerError: If retrieval fails + """ + try: + data = self.get_data(name, limit=1) + return data[0] if data else None + except Exception as e: + error_msg = (f"Failed to get latest {self.category} data for " + f"{name}: {str(e)}") + self.logger.error(error_msg) + raise HistoryManagerError(error_msg) + + @abstractmethod + def validate_data(self, data: Any) -> bool: + """ + Validate data before storage. + + Args: + data: Data to validate + + Returns: + bool: True if data is valid + + Raises: + HistoryManagerError: If validation fails + """ + + def _format_timestamp(self, timestamp: Any) -> str: + """ + Format timestamp to ISO string. + + Args: + timestamp: Timestamp to format + + Returns: + str: ISO formatted timestamp + """ + from datetime import datetime + + if isinstance(timestamp, str): + return timestamp + if isinstance(timestamp, datetime): + return timestamp.isoformat() + return str(timestamp) From 8a7cb56f22917e00fc565424381958022956522b Mon Sep 17 00:00:00 2001 From: Zewei Wang Date: Fri, 11 Jul 2025 19:20:00 -0400 Subject: [PATCH 4/8] feat: Add corpus and crash history managers to complete history suite - Implement CorpusHistoryManager for corpus growth analysis - Add CrashHistoryManager for crash tracking and statistics - Include duplicate detection and data validation - Complete the historical data management infrastructure --- ossfuzz_py/history/corpus_history_manager.py | 377 +++++++++++++++++++ ossfuzz_py/history/crash_history_manager.py | 328 ++++++++++++++++ 2 files changed, 705 insertions(+) create mode 100644 ossfuzz_py/history/corpus_history_manager.py create mode 100644 ossfuzz_py/history/crash_history_manager.py diff --git a/ossfuzz_py/history/corpus_history_manager.py b/ossfuzz_py/history/corpus_history_manager.py new file mode 100644 index 000000000..48d104b97 --- /dev/null +++ b/ossfuzz_py/history/corpus_history_manager.py @@ -0,0 +1,377 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Corpus history manager for the OSS-Fuzz Python SDK. + +This module manages historical corpus data including corpus growth, +statistics, and merging operations. +""" + +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +from ossfuzz_py.core.data_models import CorpusHistoryData +from ossfuzz_py.errors import HistoryManagerError, HistoryValidationError + +from .history_manager import HistoryManager + + +class CorpusHistoryManager(HistoryManager): + """ + Manages historical corpus data for OSS-Fuzz projects. + + This manager handles storage and retrieval of corpus statistics including + corpus size, growth rates, and coverage impact. + """ + + @property + def category(self) -> str: + """Get the history category for corpus data.""" + return "corpus" + + def validate_data(self, data: Any) -> bool: # pylint: disable=inconsistent-return-statements + """ + Validate corpus data before storage. + + Args: + data: Corpus data to validate + + Returns: + bool: True if data is valid + + Raises: + HistoryValidationError: If validation fails + """ + try: + if isinstance(data, dict): + # Validate required fields + required_fields = [ + 'timestamp', 'project_name', 'fuzzer_name', 'corpus_size' + ] + for field in required_fields: + if field not in data: + raise HistoryValidationError(f"Missing required field: {field}") + + # Validate data types + if not isinstance(data['corpus_size'], int) or data['corpus_size'] < 0: + raise HistoryValidationError( + "'corpus_size' must be a non-negative integer") + + return True + if isinstance(data, CorpusHistoryData): + # Pydantic model validation is automatic + return True + raise HistoryValidationError(f"Invalid data type: {type(data)}") + except Exception as e: + raise HistoryValidationError( + f"Corpus data validation failed: {str(e)}") from e + + def get_corpus_stats(self, + fuzzer_name: Optional[str] = None, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + limit: Optional[int] = None) -> List[Dict[str, Any]]: + """ + Get corpus statistics for the project. + + Args: + fuzzer_name: Optional fuzzer name filter + start_date: Optional start date filter (ISO format) + end_date: Optional end date filter (ISO format) + limit: Optional limit on number of results + + Returns: + List of corpus statistics entries + + Raises: + HistoryManagerError: If retrieval fails + """ + try: + data_name = fuzzer_name if fuzzer_name else self.project_name + stats = self.get_data(data_name, start_date, end_date, limit) + + # Filter by fuzzer if specified and data contains multiple fuzzers + if fuzzer_name: + stats = [s for s in stats if s.get('fuzzer_name') == fuzzer_name] + + return stats + except Exception as e: + raise HistoryManagerError(f"Failed to get corpus stats: {str(e)}") + + def get_corpus_growth(self, + fuzzer_name: Optional[str] = None, + days: int = 30) -> Dict[str, Any]: + """ + Get corpus growth statistics for the specified period. + + Args: + fuzzer_name: Optional fuzzer name filter + days: Number of days to analyze + + Returns: + Dictionary containing growth statistics + + Raises: + HistoryManagerError: If analysis fails + """ + try: + from datetime import timedelta + + end_date = datetime.now() + start_date = end_date - timedelta(days=days) + + stats = self.get_corpus_stats(fuzzer_name=fuzzer_name, + start_date=start_date.isoformat(), + end_date=end_date.isoformat()) + + if not stats: + return { + 'growth_rate': 0.0, + 'size_change': 0, + 'average_size': 0.0, + 'trend': 'no_data' + } + + # Sort by timestamp + stats.sort(key=lambda x: x.get('timestamp', '')) + + initial_size = stats[0].get('corpus_size', 0) + final_size = stats[-1].get('corpus_size', 0) + size_change = final_size - initial_size + + # Calculate growth rate + growth_rate = (size_change / initial_size * + 100) if initial_size > 0 else 0.0 + + # Calculate average size + sizes = [s.get('corpus_size', 0) for s in stats] + average_size = sum(sizes) / len(sizes) if sizes else 0.0 + + # Determine trend + if growth_rate > 5: + trend = 'growing' + elif growth_rate < -5: + trend = 'shrinking' + else: + trend = 'stable' + + return { + 'growth_rate': growth_rate, + 'size_change': size_change, + 'initial_size': initial_size, + 'final_size': final_size, + 'average_size': average_size, + 'trend': trend, + 'period_days': days + } + except Exception as e: + raise HistoryManagerError(f"Failed to analyze corpus growth: {str(e)}") + + def merge_corpus(self, source_path: str, target_path: str) -> Dict[str, Any]: + """ + Merge corpus from source to target directory. + + Args: + source_path: Path to source corpus directory + target_path: Path to target corpus directory + + Returns: + Dictionary containing merge results + + Raises: + HistoryManagerError: If merge fails + """ + try: + source_dir = Path(source_path) + target_dir = Path(target_path) + + if not source_dir.exists(): + raise HistoryManagerError( + f"Source corpus directory not found: {source_path}") + + # Create target directory if it doesn't exist + target_dir.mkdir(parents=True, exist_ok=True) + + # Count files before merge + initial_target_count = len(list( + target_dir.glob('*'))) if target_dir.exists() else 0 + source_count = len(list(source_dir.glob('*'))) + + # Copy files from source to target + import shutil + copied_files = 0 + skipped_files = 0 + + for source_file in source_dir.glob('*'): + if source_file.is_file(): + target_file = target_dir / source_file.name + + # Skip if file already exists and is identical + if target_file.exists(): + if source_file.stat().st_size == target_file.stat().st_size: + skipped_files += 1 + continue + + shutil.copy2(source_file, target_file) + copied_files += 1 + + # Count files after merge + final_target_count = len(list(target_dir.glob('*'))) + + merge_result = { + 'initial_target_count': initial_target_count, + 'source_count': source_count, + 'copied_files': copied_files, + 'skipped_files': skipped_files, + 'final_target_count': final_target_count, + 'files_added': final_target_count - initial_target_count, + 'timestamp': datetime.now().isoformat() + } + + # Store merge result in history + self.store_corpus_stats({ + 'timestamp': merge_result['timestamp'], + 'project_name': self.project_name, + 'fuzzer_name': 'merged', + 'corpus_size': final_target_count, + 'new_files_count': copied_files, + 'total_size_bytes': self._calculate_directory_size(target_dir) + }) + + return merge_result + except Exception as e: + raise HistoryManagerError(f"Failed to merge corpus: {str(e)}") + + def store_corpus_stats(self, corpus_data: Dict[str, Any]) -> str: + """ + Store corpus statistics. + + Args: + corpus_data: Corpus statistics to store + + Returns: + str: Storage path where data was stored + + Raises: + HistoryManagerError: If storage fails + """ + try: + # Add timestamp if not present + if 'timestamp' not in corpus_data: + corpus_data['timestamp'] = datetime.now().isoformat() + + # Add project name if not present + if 'project_name' not in corpus_data: + corpus_data['project_name'] = self.project_name + + # Validate data + self.validate_data(corpus_data) + + # Use fuzzer name as the data identifier + data_name = corpus_data.get('fuzzer_name', self.project_name) + + return self.store_data(data_name, corpus_data) + except Exception as e: + raise HistoryManagerError(f"Failed to store corpus stats: {str(e)}") + + def _calculate_directory_size(self, directory: Path) -> int: + """ + Calculate total size of files in a directory. + + Args: + directory: Directory path + + Returns: + int: Total size in bytes + """ + try: + total_size = 0 + for file_path in directory.rglob('*'): + if file_path.is_file(): + total_size += file_path.stat().st_size + return total_size + except Exception: + return 0 + + def analyze_corpus_effectiveness(self, + fuzzer_name: str, + days: int = 7) -> Dict[str, Any]: + """ + Analyze corpus effectiveness in terms of coverage and crash discovery. + + Args: + fuzzer_name: Name of the fuzzer to analyze + days: Number of days to analyze + + Returns: + Dictionary containing effectiveness analysis + + Raises: + HistoryManagerError: If analysis fails + """ + try: + from datetime import timedelta + + end_date = datetime.now() + start_date = end_date - timedelta(days=days) + + corpus_stats = self.get_corpus_stats(fuzzer_name=fuzzer_name, + start_date=start_date.isoformat(), + end_date=end_date.isoformat()) + + if not corpus_stats: + return { + 'effectiveness_score': 0.0, + 'corpus_efficiency': 0.0, + 'recommendation': 'insufficient_data' + } + + # Calculate corpus efficiency (coverage increase per corpus size increase) + corpus_stats.sort(key=lambda x: x.get('timestamp', '')) + + initial_stats = corpus_stats[0] + final_stats = corpus_stats[-1] + + corpus_growth = final_stats.get('corpus_size', 0) - initial_stats.get( + 'corpus_size', 0) + coverage_increase = final_stats.get('coverage_increase', 0.0) + + # Calculate efficiency score + if corpus_growth > 0: + efficiency = coverage_increase / corpus_growth + else: + efficiency = 0.0 + + # Generate recommendation + if efficiency > 0.1: + recommendation = 'highly_effective' + elif efficiency > 0.05: + recommendation = 'moderately_effective' + elif efficiency > 0.01: + recommendation = 'low_effectiveness' + else: + recommendation = 'ineffective' + + return { + 'effectiveness_score': efficiency, + 'corpus_growth': corpus_growth, + 'coverage_increase': coverage_increase, + 'corpus_efficiency': efficiency, + 'recommendation': recommendation, + 'analysis_period_days': days + } + except Exception as e: + raise HistoryManagerError( + f"Failed to analyze corpus effectiveness: {str(e)}") diff --git a/ossfuzz_py/history/crash_history_manager.py b/ossfuzz_py/history/crash_history_manager.py new file mode 100644 index 000000000..fa2c6b531 --- /dev/null +++ b/ossfuzz_py/history/crash_history_manager.py @@ -0,0 +1,328 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Crash history manager for the OSS-Fuzz Python SDK. + +This module manages historical crash data including crash detection, +deduplication, and analysis. +""" + +import hashlib +from datetime import datetime +from typing import Any, Dict, List, Optional, Set + +from ossfuzz_py.core.data_models import CrashHistoryData, Severity +from ossfuzz_py.errors import HistoryManagerError, HistoryValidationError + +from .history_manager import HistoryManager + + +class CrashHistoryManager(HistoryManager): + """ + Manages historical crash data for OSS-Fuzz projects. + + This manager handles storage and retrieval of crash data including + crash deduplication, severity analysis, and trend tracking. + """ + + @property + def category(self) -> str: + """Get the history category for crash data.""" + return "crash" + + def validate_data(self, data: Any) -> bool: # pylint: disable=inconsistent-return-statements + """ + Validate crash data before storage. + + Args: + data: Crash data to validate + + Returns: + bool: True if data is valid + + Raises: + HistoryValidationError: If validation fails + """ + try: + if isinstance(data, dict): + # Validate required fields + required_fields = [ + 'crash_id', 'timestamp', 'project_name', 'fuzzer_name', 'crash_type' + ] + for field in required_fields: + if field not in data: + raise HistoryValidationError(f"Missing required field: {field}") + + return True + if isinstance(data, CrashHistoryData): + # Pydantic model validation is automatic + return True + raise HistoryValidationError(f"Invalid data type: {type(data)}") + except Exception as e: + raise HistoryValidationError( + f"Crash data validation failed: {str(e)}") from e + + def get_crash_history(self, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + limit: Optional[int] = None) -> List[Dict[str, Any]]: + """ + Get crash history for the project. + + Args: + start_date: Optional start date filter (ISO format) + end_date: Optional end date filter (ISO format) + limit: Optional limit on number of results + + Returns: + List of crash history entries + + Raises: + HistoryManagerError: If retrieval fails + """ + try: + return self.get_data(self.project_name, start_date, end_date, limit) + except Exception as e: + raise HistoryManagerError(f"Failed to get crash history: {str(e)}") + + def is_duplicate_crash(self, crash_data: Dict[str, Any]) -> bool: + """ + Check if a crash is a duplicate of an existing crash. + + Args: + crash_data: Crash data to check + + Returns: + bool: True if crash is a duplicate + + Raises: + HistoryManagerError: If check fails + """ + try: + # Generate crash signature + signature = self._generate_crash_signature(crash_data) + + # Get recent crashes to check for duplicates + recent_crashes = self.get_crash_history(limit=1000) + + for crash in recent_crashes: + if crash.get('crash_signature') == signature: + return True + + return False + except Exception as e: + raise HistoryManagerError( + f"Failed to check for duplicate crash: {str(e)}") + + def store_crash(self, crash_data: Dict[str, Any]) -> str: + """ + Store a crash after deduplication check. + + Args: + crash_data: Crash data to store + + Returns: + str: Storage path where data was stored, or empty string if duplicate + + Raises: + HistoryManagerError: If storage fails + """ + try: + # Add timestamp if not present + if 'timestamp' not in crash_data: + crash_data['timestamp'] = datetime.now().isoformat() + + # Add project name if not present + if 'project_name' not in crash_data: + crash_data['project_name'] = self.project_name + + # Generate crash signature if not present + if 'crash_signature' not in crash_data: + crash_data['crash_signature'] = self._generate_crash_signature( + crash_data) + + # Check for duplicates + if self.is_duplicate_crash(crash_data): + self.logger.info("Duplicate crash detected, skipping storage") + return "" + + # Validate data + self.validate_data(crash_data) + + return self.store_data(self.project_name, crash_data) + except Exception as e: + raise HistoryManagerError(f"Failed to store crash: {str(e)}") + + def _parse_crashes_output(self, output: str) -> List[Dict[str, Any]]: + """ + Parse crash output from fuzzing tools. + + Args: + output: Raw output from fuzzing tools + + Returns: + List of parsed crash data + + Raises: + HistoryManagerError: If parsing fails + """ + try: + crashes = [] + + # Simple parsing logic - this would be more sophisticated in practice + lines = output.split('\n') + current_crash = {} + + for line in lines: + line = line.strip() + + if 'ERROR:' in line or 'CRASH:' in line: + if current_crash: + crashes.append(current_crash) + current_crash = { + 'crash_id': self._generate_crash_id(), + 'timestamp': datetime.now().isoformat(), + 'project_name': self.project_name, + 'fuzzer_name': 'unknown', + 'crash_type': 'unknown', + 'severity': Severity.UNKNOWN.value + } + + # Extract crash type + if 'heap-buffer-overflow' in line.lower(): + current_crash['crash_type'] = 'heap-buffer-overflow' + current_crash['severity'] = Severity.HIGH.value + elif 'use-after-free' in line.lower(): + current_crash['crash_type'] = 'use-after-free' + current_crash['severity'] = Severity.CRITICAL.value + elif 'null-dereference' in line.lower(): + current_crash['crash_type'] = 'null-dereference' + current_crash['severity'] = Severity.MEDIUM.value + + # Extract stack trace + if line.startswith('#'): + if 'stack_trace' not in current_crash: + current_crash['stack_trace'] = line + else: + current_crash['stack_trace'] += '\n' + line + + # Add the last crash if any + if current_crash: + crashes.append(current_crash) + + return crashes + except Exception as e: + raise HistoryManagerError(f"Failed to parse crash output: {str(e)}") + + def _generate_crash_signature(self, crash_data: Dict[str, Any]) -> str: + """ + Generate a unique signature for a crash. + + Args: + crash_data: Crash data + + Returns: + str: Crash signature hash + """ + # Create signature from crash type and stack trace + signature_parts = [ + crash_data.get('crash_type', ''), + crash_data.get('fuzzer_name', ''), + ] + + # Use first few lines of stack trace for signature + stack_trace = crash_data.get('stack_trace', '') + if stack_trace: + # Take first 3 lines of stack trace + stack_lines = stack_trace.split('\n')[:3] + signature_parts.extend(stack_lines) + + signature_string = '|'.join(signature_parts) + return hashlib.md5(signature_string.encode()).hexdigest() + + def _generate_crash_id(self) -> str: + """Generate a unique crash ID.""" + import uuid + return str(uuid.uuid4()) + + def get_crash_statistics(self, + start_date: Optional[str] = None, + end_date: Optional[str] = None) -> Dict[str, Any]: + """ + Get crash statistics for the specified period. + + Args: + start_date: Optional start date filter (ISO format) + end_date: Optional end date filter (ISO format) + + Returns: + Dictionary containing crash statistics + + Raises: + HistoryManagerError: If calculation fails + """ + try: + crashes = self.get_crash_history(start_date, end_date) + + if not crashes: + return { + 'total_crashes': 0, + 'unique_crashes': 0, + 'crash_types': {}, + 'severity_distribution': {}, + 'top_fuzzers': {} + } + + # Count unique crashes by signature + unique_signatures: Set[str] = set() + crash_types: Dict[str, int] = {} + severity_counts: Dict[str, int] = {} + fuzzer_counts: Dict[str, int] = {} + + for crash in crashes: + signature = crash.get('crash_signature', '') + if signature: + unique_signatures.add(signature) + + crash_type = crash.get('crash_type', 'unknown') + crash_types[crash_type] = crash_types.get(crash_type, 0) + 1 + + severity = crash.get('severity', 'UNKNOWN') + severity_counts[severity] = severity_counts.get(severity, 0) + 1 + + fuzzer = crash.get('fuzzer_name', 'unknown') + fuzzer_counts[fuzzer] = fuzzer_counts.get(fuzzer, 0) + 1 + + return { + 'total_crashes': + len(crashes), + 'unique_crashes': + len(unique_signatures), + 'crash_types': + crash_types, + 'severity_distribution': + severity_counts, + 'top_fuzzers': + dict( + sorted(fuzzer_counts.items(), + key=lambda x: x[1], + reverse=True)[:10]), + 'period_start': + start_date, + 'period_end': + end_date + } + except Exception as e: + raise HistoryManagerError( + f"Failed to calculate crash statistics: {str(e)}") From 0d3d172b7e3c9646fac678afeefd175c653f1912 Mon Sep 17 00:00:00 2001 From: Zewei Wang Date: Mon, 14 Jul 2025 15:45:00 -0400 Subject: [PATCH 5/8] feat: Implement main OSSFuzzSDK facade for historical data access - Add OSSFuzzSDK class as main entry point for historical data - Implement project report generation and analysis features - Add fuzzing efficiency analysis and health scoring - Include environment configuration and error handling - Provide unified interface for all history managers --- ossfuzz_py/core/ossfuzz_sdk.py | 497 +++++++++++++++++++++++++++++++++ 1 file changed, 497 insertions(+) create mode 100644 ossfuzz_py/core/ossfuzz_sdk.py diff --git a/ossfuzz_py/core/ossfuzz_sdk.py b/ossfuzz_py/core/ossfuzz_sdk.py new file mode 100644 index 000000000..4a2e5899e --- /dev/null +++ b/ossfuzz_py/core/ossfuzz_sdk.py @@ -0,0 +1,497 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +OSS-Fuzz Historical Data SDK. + +This module provides the main SDK facade for accessing and analyzing +historical OSS-Fuzz data including builds, crashes, corpus, and coverage. +""" + +import logging +from typing import Any, Dict, Optional + +from ossfuzz_py.data.storage_manager import StorageManager +from ossfuzz_py.errors import OSSFuzzSDKConfigError, OSSFuzzSDKError +from ossfuzz_py.history import (BuildHistoryManager, CorpusHistoryManager, + CoverageHistoryManager, CrashHistoryManager) +from ossfuzz_py.utils.env_utils import EnvUtils +from ossfuzz_py.utils.env_vars import EnvVars + + +class OSSFuzzSDK: + """ + Main SDK facade for OSS-Fuzz historical data access and analysis. + + This class provides a unified interface for accessing historical data + across different categories (builds, crashes, corpus, coverage) and + generating comprehensive reports and analyses. + + Example: + ```python + # Initialize SDK + config = { + 'storage_backend': 'local', + 'storage_path': '/path/to/data' + } + sdk = OSSFuzzSDK('libpng', config) + + # Generate project report + report = sdk.generate_project_report() + + # Analyze fuzzing efficiency + efficiency = sdk.analyze_fuzzing_efficiency() + ``` + """ + + def __init__(self, + project_name: str, + config: Optional[Dict[str, Any]] = None): + """ + Initialize the OSS-Fuzz SDK. + + Args: + project_name: Name of the OSS-Fuzz project + config: Configuration dictionary for storage and other settings + + Raises: + OSSFuzzSDKConfigError: If configuration is invalid + OSSFuzzSDKError: If initialization fails + """ + self.project_name = project_name + self.config = config or {} + self.logger = logging.getLogger(f"{__name__}.{project_name}") + + try: + if not project_name: + raise OSSFuzzSDKConfigError("Project name is required") + + # Merge environment variables into config + self._load_config_from_env() + + # Initialize storage manager + self.storage = StorageManager(self.config) + + # Initialize history managers + self.build = BuildHistoryManager(self.storage, project_name) + self.crash = CrashHistoryManager(self.storage, project_name) + self.corpus = CorpusHistoryManager(self.storage, project_name) + self.coverage = CoverageHistoryManager(self.storage, project_name) + + self.logger.info("Initialized OSSFuzzSDK " + "for project: %s", project_name) + + except OSSFuzzSDKConfigError: + # Re-raise config errors as-is + raise + except Exception as e: + error_msg = (f"Failed to initialize OSSFuzzSDK " + f"for {project_name}: {str(e)}") + self.logger.error(error_msg) + raise OSSFuzzSDKError(error_msg) from e + + def _load_config_from_env(self) -> None: + """Load configuration from environment variables.""" + try: + # Storage configuration + storage_backend = EnvUtils.get_env( + EnvVars.OSSFUZZ_HISTORY_STORAGE_BACKEND) + if storage_backend: + self.config['storage_backend'] = storage_backend + + storage_path = EnvUtils.get_env(EnvVars.OSSFUZZ_HISTORY_STORAGE_PATH) + if storage_path: + self.config['storage_path'] = storage_path + + # GCS configuration + gcs_bucket = EnvUtils.get_env(EnvVars.GCS_BUCKET_NAME) + if gcs_bucket: + self.config['gcs_bucket_name'] = gcs_bucket + + except Exception as e: + self.logger.warning("Failed to load some environment variables: %s", + str(e)) + + def generate_project_report( + self, + days: int = 30, + include_details: bool = True # pylint: disable=unused-argument + ) -> Dict[str, Any]: + """ + Generate a comprehensive project report. + + Args: + days: Number of days to include in the report + include_details: Whether to include detailed data + + Returns: + Dictionary containing comprehensive project report + + Raises: + OSSFuzzSDKError: If report generation fails + """ + try: + from datetime import datetime, timedelta + + end_date = datetime.now() + start_date = end_date - timedelta(days=days) + start_date_str = start_date.isoformat() + end_date_str = end_date.isoformat() + + self.logger.info("Generating project report for %s (%d days)", + self.project_name, days) + + report = { + 'project_name': self.project_name, + 'report_generated': end_date.isoformat(), + 'period': { + 'start_date': start_date_str, + 'end_date': end_date_str, + 'days': days + } + } + + # Build statistics + try: + build_stats = self.build.get_build_statistics(start_date_str, + end_date_str) + build_trends = self.build.get_build_trends(days) + report['build_summary'] = { + 'statistics': build_stats, + 'trends': build_trends + } + except Exception as e: + self.logger.warning("Failed to get build data: %s", str(e)) + report['build_summary'] = {'error': str(e)} + + # Crash statistics + try: + crash_stats = self.crash.get_crash_statistics(start_date_str, + end_date_str) + report['crash_summary'] = crash_stats + except Exception as e: + self.logger.warning("Failed to get crash data: %s", str(e)) + report['crash_summary'] = {'error': str(e)} + + # Coverage analysis + try: + coverage_report = self.coverage.get_coverage_report( + start_date_str, end_date_str) + coverage_trends = self.coverage.analyze_coverage_trends(days) + report['coverage_summary'] = { + 'report': coverage_report, + 'trends': coverage_trends + } + except Exception as e: + self.logger.warning("Failed to get coverage data: %s", str(e)) + report['coverage_summary'] = {'error': str(e)} + + # Corpus analysis + try: + corpus_growth = self.corpus.get_corpus_growth(days=days) + report['corpus_summary'] = {'growth': corpus_growth} + except Exception as e: + self.logger.warning("Failed to get corpus data: %s", str(e)) + report['corpus_summary'] = {'error': str(e)} + + # Overall health score + report['health_score'] = self._calculate_health_score(report) + + return report + + except Exception as e: + error_msg = f"Failed to generate project report: {str(e)}" + self.logger.error(error_msg) + raise OSSFuzzSDKError(error_msg) + + def analyze_fuzzing_efficiency(self, days: int = 30) -> Dict[str, Any]: + """ + Analyze overall fuzzing efficiency for the project. + + Args: + days: Number of days to analyze + + Returns: + Dictionary containing efficiency analysis + + Raises: + OSSFuzzSDKError: If analysis fails + """ + try: + self.logger.info("Analyzing fuzzing efficiency for %s (%d days)", + self.project_name, days) + + from datetime import datetime, timedelta + + end_date = datetime.now() + start_date = end_date - timedelta(days=days) + + analysis = { + 'project_name': self.project_name, + 'analysis_date': end_date.isoformat(), + 'period_days': days + } + + # Build efficiency + build_trends = self.build.get_build_trends(days) + analysis['build_efficiency'] = { + 'builds_per_day': build_trends.get('builds_per_day', 0.0), + 'success_rate': build_trends.get('average_success_rate', 0.0), + 'trend': build_trends.get('trend', 'unknown') + } + + # Coverage efficiency + coverage_trends = self.coverage.analyze_coverage_trends(days) + analysis['coverage_efficiency'] = { + 'coverage_velocity': coverage_trends.get('coverage_velocity', 0.0), + 'stability': coverage_trends.get('stability', 'unknown'), + 'current_coverage': coverage_trends.get('current_coverage', 0.0) + } + + # Crash discovery efficiency + crash_stats = self.crash.get_crash_statistics(start_date.isoformat(), + end_date.isoformat()) + total_crashes = crash_stats.get('total_crashes', 0) + unique_crashes = crash_stats.get('unique_crashes', 0) + + analysis['crash_efficiency'] = { + 'crashes_per_day': + total_crashes / days if days > 0 else 0.0, + 'unique_crash_rate': (unique_crashes / total_crashes * + 100) if total_crashes > 0 else 0.0, + 'total_crashes': + total_crashes, + 'unique_crashes': + unique_crashes + } + + # Corpus efficiency + corpus_growth = self.corpus.get_corpus_growth(days=days) + analysis['corpus_efficiency'] = { + 'growth_rate': corpus_growth.get('growth_rate', 0.0), + 'size_change': corpus_growth.get('size_change', 0), + 'trend': corpus_growth.get('trend', 'unknown') + } + + # Overall efficiency score + analysis['overall_efficiency'] = self._calculate_efficiency_score( + analysis) + + return analysis + + except Exception as e: + error_msg = f"Failed to analyze fuzzing efficiency: {str(e)}" + self.logger.error(error_msg) + raise OSSFuzzSDKError(error_msg) + + def _calculate_health_score(self, report: Dict[str, Any]) -> Dict[str, Any]: + """ + Calculate overall project health score based on report data. + + Args: + report: Project report data + + Returns: + Dictionary containing health score and breakdown + """ + try: + scores = {} + weights = {} + + # Build health (30% weight) + build_summary = report.get('build_summary', {}) + if 'statistics' in build_summary: + build_success_rate = build_summary['statistics'].get( + 'success_rate', 0.0) + scores['build'] = min(build_success_rate, 100.0) + weights['build'] = 0.3 + + # Coverage health (40% weight) + coverage_summary = report.get('coverage_summary', {}) + if 'report' in coverage_summary: + max_coverage = coverage_summary['report']['summary'].get( + 'max_line_coverage', 0.0) + scores['coverage'] = min(max_coverage, 100.0) + weights['coverage'] = 0.4 + + # Crash health (20% weight) - inverse scoring + crash_summary = report.get('crash_summary', {}) + total_crashes = crash_summary.get('total_crashes', 0) + if total_crashes == 0: + scores['crash'] = 100.0 + else: + # Lower score for more crashes + scores['crash'] = max(0.0, 100.0 - min(total_crashes, 100)) + weights['crash'] = 0.2 + + # Corpus health (10% weight) + corpus_summary = report.get('corpus_summary', {}) + if 'growth' in corpus_summary: + growth_rate = corpus_summary['growth']['growth_rate'] + if growth_rate > 0: + scores['corpus'] = min(100.0, 50.0 + growth_rate * 10) + else: + scores['corpus'] = 50.0 + weights['corpus'] = 0.1 + + # Calculate weighted average + total_score = 0.0 + total_weight = 0.0 + + for category, score in scores.items(): + weight = weights.get(category, 0.0) + total_score += score * weight + total_weight += weight + + overall_score = total_score / total_weight if total_weight > 0 else 0.0 + + # Determine health status + if overall_score >= 80: + status = 'excellent' + elif overall_score >= 60: + status = 'good' + elif overall_score >= 40: + status = 'fair' + else: + status = 'poor' + + return { + 'overall_score': round(overall_score, 2), + 'status': status, + 'category_scores': scores, + 'weights': weights + } + except Exception as e: + self.logger.warning("Failed to calculate health score: %s", str(e)) + return {'overall_score': 0.0, 'status': 'unknown', 'error': str(e)} + + def _calculate_efficiency_score(self, analysis: Dict[str, + Any]) -> Dict[str, Any]: + """ + Calculate overall efficiency score based on analysis data. + + Args: + analysis: Efficiency analysis data + + Returns: + Dictionary containing efficiency score and breakdown + """ + try: + scores = {} + + # Build efficiency + build_eff = analysis.get('build_efficiency', {}) + builds_per_day = build_eff.get('builds_per_day', 0.0) + success_rate = build_eff.get('success_rate', 0.0) + + # Score based on build frequency and success rate + build_score = min(100.0, (builds_per_day * 10) + success_rate) + scores['build'] = build_score + + # Coverage efficiency + coverage_eff = analysis.get('coverage_efficiency', {}) + coverage_velocity = coverage_eff.get('coverage_velocity', 0.0) + current_coverage = coverage_eff.get('current_coverage', 0.0) + + # Score based on coverage growth and current level + coverage_score = min(100.0, current_coverage + (coverage_velocity * 20)) + scores['coverage'] = max(0.0, coverage_score) + + # Crash efficiency + crash_eff = analysis.get('crash_efficiency', {}) + unique_crash_rate = crash_eff.get('unique_crash_rate', 0.0) + crashes_per_day = crash_eff.get('crashes_per_day', 0.0) + + # Higher score for finding unique crashes efficiently + crash_score = min(100.0, unique_crash_rate + min(crashes_per_day * 5, 20)) + scores['crash'] = crash_score + + # Corpus efficiency + corpus_eff = analysis.get('corpus_efficiency', {}) + growth_rate = corpus_eff.get('growth_rate', 0.0) + + # Score based on corpus growth + corpus_score = min(100.0, 50.0 + max(-50.0, min(50.0, growth_rate * 2))) + scores['corpus'] = corpus_score + + # Calculate overall efficiency + overall_efficiency = sum(scores.values()) / len(scores) if scores else 0.0 + + # Determine efficiency level + if overall_efficiency >= 75: + level = 'high' + elif overall_efficiency >= 50: + level = 'medium' + elif overall_efficiency >= 25: + level = 'low' + else: + level = 'very_low' + + return { + 'overall_efficiency': round(overall_efficiency, 2), + 'level': level, + 'category_scores': scores + } + except Exception as e: + self.logger.warning("Failed to calculate efficiency score: %s", str(e)) + return {'overall_efficiency': 0.0, 'level': 'unknown', 'error': str(e)} + + def get_project_summary(self) -> Dict[str, Any]: + """ + Get a quick summary of the project's current state. + + Returns: + Dictionary containing project summary + + Raises: + OSSFuzzSDKError: If summary generation fails + """ + try: + from datetime import datetime + + summary: Dict[str, Any] = { + 'project_name': self.project_name, + 'summary_date': datetime.now().isoformat() + } + + # Latest build status + try: + last_build = self.build.get_last_successful_build() + summary['last_successful_build'] = str( + last_build) if last_build else 'None' + except Exception as e: + summary['last_successful_build'] = f'error: {str(e)}' + + # Latest coverage + try: + latest_coverage = self.coverage.get_latest_coverage() + summary['latest_coverage'] = str( + latest_coverage) if latest_coverage else 'None' + except Exception as e: + summary['latest_coverage'] = f'error: {str(e)}' + + # Recent crash count + try: + from datetime import timedelta + week_ago = (datetime.now() - timedelta(days=7)).isoformat() + recent_crashes = self.crash.get_crash_history(start_date=week_ago) + summary['recent_crashes'] = len(recent_crashes) + except Exception as e: + summary['recent_crashes'] = f'error: {str(e)}' + + return summary + + except Exception as e: + error_msg = f"Failed to get project summary: {str(e)}" + self.logger.error(error_msg) + raise OSSFuzzSDKError(error_msg) From e3ea028f9d30cc18b931137763316fc005e33789 Mon Sep 17 00:00:00 2001 From: Zewei Wang Date: Tue, 15 Jul 2025 18:00:00 -0400 Subject: [PATCH 6/8] feat: Update package exports and integrate historical data SDK - Export OSSFuzzSDK and history managers in package __init__ - Add data models and error classes to public API - Maintain backward compatibility with existing exports - Complete integration of historical data functionality --- ossfuzz_py/__init__.py | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/ossfuzz_py/__init__.py b/ossfuzz_py/__init__.py index 83189b0bb..2bb6bfd37 100644 --- a/ossfuzz_py/__init__.py +++ b/ossfuzz_py/__init__.py @@ -21,25 +21,55 @@ from .core.benchmark_manager import Benchmark, BenchmarkManager # Data models and enums -from .core.data_models import (CrashData, FuzzingEngine, ProjectConfig, - Sanitizer, Severity) +from .core.data_models import (BuildHistoryData, CorpusHistoryData, + CoverageHistoryData, CrashData, CrashHistoryData, + FuzzingEngine, HistoricalSummary, ProjectConfig, + Sanitizer, Severity, TimeSeriesData) # Core SDK - Main SDK class and modules from .core.ossfuzz_manager import OSSFuzzManager +from .core.ossfuzz_sdk import OSSFuzzSDK +from .data.storage_adapter import (FileStorageAdapter, GCSStorageAdapter, + StorageAdapter) +# Storage components +from .data.storage_manager import StorageManager # Error handling from .errors import * +# History managers +from .history import (BuildHistoryManager, CorpusHistoryManager, + CoverageHistoryManager, CrashHistoryManager, + HistoryManager) # Public API - All exports available to SDK clients __all__ = [ # Core SDK - Main classes according to UML diagram 'OSSFuzzManager', + 'OSSFuzzSDK', 'BenchmarkManager', 'Benchmark', + # History managers + 'HistoryManager', + 'BuildHistoryManager', + 'CrashHistoryManager', + 'CorpusHistoryManager', + 'CoverageHistoryManager', + + # Storage components + 'StorageManager', + 'StorageAdapter', + 'FileStorageAdapter', + 'GCSStorageAdapter', + # Data models and enums 'Severity', 'Sanitizer', - 'Sanitizer', 'FuzzingEngine', + 'BuildHistoryData', + 'CrashHistoryData', + 'CorpusHistoryData', + 'CoverageHistoryData', + 'TimeSeriesData', + 'HistoricalSummary', # Core error types and enums 'ErrorCode', From 7cdac380be9770e9b3cc2e34b32dd6952871437d Mon Sep 17 00:00:00 2001 From: Zewei Wang Date: Thu, 17 Jul 2025 17:30:00 -0400 Subject: [PATCH 7/8] test: Add comprehensive unit tests for historical data SDK - Add test suite for OSSFuzzSDK main functionality - Include tests for all history managers (build, crash, corpus, coverage) - Test configuration, error handling, and edge cases - Ensure proper integration with storage and data validation - Add mocking for external dependencies --- .../unittests/test_historical_data_sdk.py | 305 ++++++++++++++++++ 1 file changed, 305 insertions(+) create mode 100644 ossfuzz_py/unittests/test_historical_data_sdk.py diff --git a/ossfuzz_py/unittests/test_historical_data_sdk.py b/ossfuzz_py/unittests/test_historical_data_sdk.py new file mode 100644 index 000000000..b947ed6cd --- /dev/null +++ b/ossfuzz_py/unittests/test_historical_data_sdk.py @@ -0,0 +1,305 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Unit tests for the Historical Data SDK. + +This module contains tests for the main SDK components including +the OSSFuzzSDK facade and history managers. +""" + +import tempfile +import unittest +from datetime import datetime +from unittest.mock import patch + +from ossfuzz_py.core.ossfuzz_sdk import OSSFuzzSDK +from ossfuzz_py.data.storage_manager import StorageManager +from ossfuzz_py.errors import OSSFuzzSDKConfigError +from ossfuzz_py.history import (BuildHistoryManager, CorpusHistoryManager, + CoverageHistoryManager, CrashHistoryManager) + + +class TestOSSFuzzSDK(unittest.TestCase): + """Test cases for the OSSFuzzSDK class.""" + + def setUp(self): + """Set up test fixtures.""" + self.temp_dir = tempfile.mkdtemp() + self.config = {'storage_backend': 'local', 'storage_path': self.temp_dir} + self.project_name = 'test_project' + + def tearDown(self): + """Clean up test fixtures.""" + import shutil + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_sdk_initialization(self): + """Test SDK initialization with valid configuration.""" + sdk = OSSFuzzSDK(self.project_name, self.config) + + self.assertEqual(sdk.project_name, self.project_name) + self.assertIsInstance(sdk.storage, StorageManager) + self.assertIsInstance(sdk.build, BuildHistoryManager) + self.assertIsInstance(sdk.crash, CrashHistoryManager) + self.assertIsInstance(sdk.corpus, CorpusHistoryManager) + self.assertIsInstance(sdk.coverage, CoverageHistoryManager) + + def test_sdk_initialization_without_project_name(self): + """Test SDK initialization fails without project name.""" + with self.assertRaises(OSSFuzzSDKConfigError): + OSSFuzzSDK('', self.config) + + def test_sdk_initialization_without_config(self): + """Test SDK initialization with default configuration.""" + sdk = OSSFuzzSDK(self.project_name) + self.assertEqual(sdk.project_name, self.project_name) + self.assertIsInstance(sdk.storage, StorageManager) + + @patch.dict( + 'os.environ', { + 'OSSFUZZ_HISTORY_STORAGE_BACKEND': 'local', + 'OSSFUZZ_HISTORY_STORAGE_PATH': '/tmp/test' + }) + def test_config_from_environment(self): + """Test configuration loading from environment variables.""" + sdk = OSSFuzzSDK(self.project_name) + self.assertEqual(sdk.config.get('storage_backend'), 'local') + self.assertEqual(sdk.config.get('storage_path'), '/tmp/test') + + def test_generate_project_report(self): + """Test project report generation.""" + sdk = OSSFuzzSDK(self.project_name, self.config) + + # Mock the history managers to return test data + with (patch.object(sdk.build, 'get_build_statistics') as mock_build_stats, \ + patch.object(sdk.build, 'get_build_trends') as mock_build_trends, \ + patch.object(sdk.crash, 'get_crash_statistics') as mock_crash_stats, \ + patch.object(sdk.coverage, 'get_coverage_report') + as mock_coverage_report, \ + patch.object(sdk.coverage, 'analyze_coverage_trends') as + mock_coverage_trends, \ + patch.object(sdk.corpus, 'get_corpus_growth') as mock_corpus_growth): + + # Set up mock return values + mock_build_stats.return_value = {'success_rate': 85.0, 'total_builds': 10} + mock_build_trends.return_value = { + 'trend': 'improving', + 'builds_per_day': 2.0 + } + mock_crash_stats.return_value = {'total_crashes': 5, 'unique_crashes': 3} + mock_coverage_report.return_value = { + 'summary': { + 'max_line_coverage': 75.0 + } + } + mock_coverage_trends.return_value = { + 'trend': 'improving', + 'coverage_velocity': 0.5 + } + mock_corpus_growth.return_value = { + 'growth_rate': 10.0, + 'trend': 'growing' + } + + report = sdk.generate_project_report(days=7) + + self.assertEqual(report['project_name'], self.project_name) + self.assertIn('build_summary', report) + self.assertIn('crash_summary', report) + self.assertIn('coverage_summary', report) + self.assertIn('corpus_summary', report) + self.assertIn('health_score', report) + + def test_analyze_fuzzing_efficiency(self): + """Test fuzzing efficiency analysis.""" + sdk = OSSFuzzSDK(self.project_name, self.config) + + # Mock the history managers to return test data + with (patch.object(sdk.build, 'get_build_trends') as mock_build_trends, \ + patch.object(sdk.coverage, 'analyze_coverage_trends') + as mock_coverage_trends, \ + patch.object(sdk.crash, 'get_crash_statistics') as mock_crash_stats, \ + patch.object(sdk.corpus, 'get_corpus_growth') as mock_corpus_growth): + + # Set up mock return values + mock_build_trends.return_value = { + 'builds_per_day': 2.0, + 'average_success_rate': 85.0, + 'trend': 'improving' + } + mock_coverage_trends.return_value = { + 'coverage_velocity': 0.5, + 'stability': 'stable', + 'current_coverage': 75.0 + } + mock_crash_stats.return_value = {'total_crashes': 10, 'unique_crashes': 8} + mock_corpus_growth.return_value = { + 'growth_rate': 15.0, + 'size_change': 100, + 'trend': 'growing' + } + + analysis = sdk.analyze_fuzzing_efficiency(days=7) + + self.assertEqual(analysis['project_name'], self.project_name) + self.assertIn('build_efficiency', analysis) + self.assertIn('coverage_efficiency', analysis) + self.assertIn('crash_efficiency', analysis) + self.assertIn('corpus_efficiency', analysis) + self.assertIn('overall_efficiency', analysis) + + def test_get_project_summary(self): + """Test project summary generation.""" + sdk = OSSFuzzSDK(self.project_name, self.config) + + # Mock the history managers to return test data + with (patch.object(sdk.build, 'get_last_successful_build') + as mock_last_build, \ + patch.object(sdk.coverage, 'get_latest_coverage') + as mock_latest_coverage, \ + patch.object(sdk.crash, 'get_crash_history') + as mock_crash_history): + + # Set up mock return values + mock_last_build.return_value = { + 'build_id': 'build_123', + 'timestamp': '2025-01-01T12:00:00', + 'success': True + } + mock_latest_coverage.return_value = { + 'timestamp': '2025-01-01T12:00:00', + 'line_coverage': 75.0 + } + mock_crash_history.return_value = [{ + 'crash_id': 'crash_1', + 'timestamp': '2025-01-01T10:00:00' + }, { + 'crash_id': 'crash_2', + 'timestamp': '2025-01-01T11:00:00' + }] + + summary = sdk.get_project_summary() + + self.assertEqual(summary['project_name'], self.project_name) + self.assertIn('last_successful_build', summary) + self.assertIn('latest_coverage', summary) + self.assertEqual(summary['recent_crashes'], 2) + + +class TestHistoryManagers(unittest.TestCase): + """Test cases for history managers.""" + + def setUp(self): + """Set up test fixtures.""" + self.temp_dir = tempfile.mkdtemp() + self.config = {'storage_backend': 'local', 'storage_path': self.temp_dir} + self.project_name = 'test_project' + self.storage_manager = StorageManager(self.config) + + def tearDown(self): + """Clean up test fixtures.""" + import shutil + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_build_history_manager(self): + """Test BuildHistoryManager functionality.""" + manager = BuildHistoryManager(self.storage_manager, self.project_name) + + # Test storing build result + build_data = { + 'build_id': 'build_123', + 'timestamp': datetime.now().isoformat(), + 'project_name': self.project_name, + 'success': True, + 'duration_seconds': 300 + } + + result = manager.store_build_result(build_data) + self.assertIsInstance(result, str) + + # Test retrieving build history + history = manager.get_build_history(limit=10) + self.assertIsInstance(history, list) + + def test_crash_history_manager(self): + """Test CrashHistoryManager functionality.""" + manager = CrashHistoryManager(self.storage_manager, self.project_name) + + # Test storing crash data (without signature so it gets generated) + crash_data = { + 'crash_id': 'crash_123', + 'timestamp': datetime.now().isoformat(), + 'project_name': self.project_name, + 'fuzzer_name': 'test_fuzzer', + 'crash_type': 'heap-buffer-overflow' + } + + # First storage should succeed + result = manager.store_crash(crash_data.copy()) + self.assertIsInstance(result, str) + self.assertNotEqual(result, "") # Should not be empty (not a duplicate) + + # Test duplicate detection - should be True after storing the same crash + is_duplicate = manager.is_duplicate_crash(crash_data) + self.assertTrue(is_duplicate) + + # Second storage should return empty string (duplicate) + result2 = manager.store_crash(crash_data.copy()) + self.assertEqual(result2, "") + + def test_coverage_history_manager(self): + """Test CoverageHistoryManager functionality.""" + manager = CoverageHistoryManager(self.storage_manager, self.project_name) + + # Test storing coverage data + coverage_data = { + 'timestamp': datetime.now().isoformat(), + 'project_name': self.project_name, + 'fuzzer_name': 'test_fuzzer', + 'line_coverage': 75.5, + 'function_coverage': 80.0, + 'branch_coverage': 70.0 + } + + result = manager.store_coverage(coverage_data) + self.assertIsInstance(result, str) + + # Test retrieving coverage history + history = manager.get_coverage_history(limit=10) + self.assertIsInstance(history, list) + + def test_corpus_history_manager(self): + """Test CorpusHistoryManager functionality.""" + manager = CorpusHistoryManager(self.storage_manager, self.project_name) + + # Test storing corpus stats + corpus_data = { + 'timestamp': datetime.now().isoformat(), + 'project_name': self.project_name, + 'fuzzer_name': 'test_fuzzer', + 'corpus_size': 1000, + 'total_size_bytes': 5000000, + 'new_files_count': 50 + } + + result = manager.store_corpus_stats(corpus_data) + self.assertIsInstance(result, str) + + # Test retrieving corpus stats + stats = manager.get_corpus_stats(limit=10) + self.assertIsInstance(stats, list) + + +if __name__ == '__main__': + unittest.main() From 29484f1dfcd6f1fa9ab7629ed48cbeb67991ec4b Mon Sep 17 00:00:00 2001 From: Zewei Wang Date: Fri, 18 Jul 2025 19:45:00 -0400 Subject: [PATCH 8/8] test: Update existing tests for compatibility with historical data features - Update cloud builder pipeline tests for new SDK integration - Modify local builder pipeline tests to work with enhanced functionality - Ensure backward compatibility and proper error handling - Fix any test conflicts with new historical data features --- ossfuzz_py/data/storage_adapter.py | 2 +- ossfuzz_py/unittests/test_cloud_builder_pipeline.py | 4 +++- ossfuzz_py/unittests/test_local_builder_pipeline.py | 4 +++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/ossfuzz_py/data/storage_adapter.py b/ossfuzz_py/data/storage_adapter.py index 73ce30197..acdb1ab15 100644 --- a/ossfuzz_py/data/storage_adapter.py +++ b/ossfuzz_py/data/storage_adapter.py @@ -1064,7 +1064,7 @@ def get_history(self, name: str, start_date: Optional[str] = None, end_date: Optional[str] = None, - limit: Optional[int] = None) -> list[Any]: + limit: Optional[int] = None) -> List[Any]: """Retrieve historical data for a specific category and name.""" if self._bucket: try: diff --git a/ossfuzz_py/unittests/test_cloud_builder_pipeline.py b/ossfuzz_py/unittests/test_cloud_builder_pipeline.py index 58215fe2b..2ce806e1c 100644 --- a/ossfuzz_py/unittests/test_cloud_builder_pipeline.py +++ b/ossfuzz_py/unittests/test_cloud_builder_pipeline.py @@ -34,6 +34,7 @@ GOOGLE_APPLICATION_CREDENTIALS=/path/to/creds.json python -m unittest test_cloud_builder_pipeline.py -v """ +import os import shutil import subprocess import unittest @@ -131,7 +132,8 @@ def test_cloud_builder_pipeline_real_gcb(self): print("✓ OSS-Fuzz repository cloned successfully") - benchmark_yaml_path = "../../benchmark-sets/all/libspng.yaml" + benchmark_yaml_path = os.path.join(os.path.dirname(__file__), + "../../benchmark-sets/all/libspng.yaml") fuzz_target = _create_real_fuzz_target_from_benchmark(benchmark_yaml_path) google_cloud_project = EnvUtils.get_env(EnvVars.GOOGLE_CLOUD_PROJECT, "oss-fuzz") or "oss-fuzz" diff --git a/ossfuzz_py/unittests/test_local_builder_pipeline.py b/ossfuzz_py/unittests/test_local_builder_pipeline.py index b5b337877..fc1f3b223 100644 --- a/ossfuzz_py/unittests/test_local_builder_pipeline.py +++ b/ossfuzz_py/unittests/test_local_builder_pipeline.py @@ -27,6 +27,7 @@ handling without requiring the full OSS-Fuzz environment. """ +import os import shutil import subprocess import tempfile @@ -166,7 +167,8 @@ def _setup_build_infrastructure_and_get_metadata(self): print("✓ OSS-Fuzz repository cloned successfully") # Create a real fuzz target from benchmark YAML - benchmark_yaml_path = "../../benchmark-sets/all/libspng.yaml" + benchmark_yaml_path = os.path.join(os.path.dirname(__file__), + "../../benchmark-sets/all/libspng.yaml") try: fuzz_target = _create_real_fuzz_target_from_benchmark(benchmark_yaml_path)