diff --git a/docs/examples.md b/docs/examples.md index b38acd2..0fcc186 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -701,6 +701,238 @@ async def custom_executor_configuration(): await cpu_executor.shutdown() ``` +## Enhanced Fuzzing Examples + +### Hypothesis Extensions + +The MCP fuzzer now supports Hypothesis extensions for enhanced data generation and fuzzing capabilities. + +#### JSON Schema-Based Fuzzing + +```python +import asyncio +from mcp_fuzzer.fuzz_engine.strategy import ProtocolStrategies + +async def json_schema_fuzzing(): + # Define a JSON schema for MCP protocol messages + mcp_schema = { + "type": "object", + "properties": { + "jsonrpc": {"type": "string", "enum": ["2.0"]}, + "id": {"type": ["string", "integer", "null"]}, + "method": {"type": "string"}, + "params": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "arguments": {"type": "object"} + } + } + }, + "required": ["jsonrpc", "method"] + } + + # Fuzz using Hypothesis extensions + results = await ProtocolStrategies.fuzz_with_hypothesis_extensions( + schema=mcp_schema, + runs=10, + use_realistic_data=True + ) + + print(f"Generated {len(results)} fuzzing examples") + for result in results[:3]: # Show first 3 + print(f"Success: {result['success']}") + print(f"Data: {result['fuzz_data']}") + +asyncio.run(json_schema_fuzzing()) +``` + +#### Realistic Data Generation + +```python +import asyncio +from mcp_fuzzer.fuzz_engine.strategy.hypothesis_extensions import hypothesis_extensions + +async def realistic_data_generation(): + # Generate realistic emails + emails = await hypothesis_extensions.generate_realistic_user_data("email", count=5) + print(f"Generated emails: {emails}") + + # Generate realistic names + names = await hypothesis_extensions.generate_realistic_user_data("name", count=5) + print(f"Generated names: {names}") + + # Generate realistic URLs + urls = await hypothesis_extensions.generate_realistic_user_data("url", count=5) + print(f"Generated URLs: {urls}") + +asyncio.run(realistic_data_generation()) +``` + +### Alternative Fuzzing Libraries + +#### Mutation-Based Fuzzing + +```python +import asyncio +from mcp_fuzzer.fuzz_engine.strategy import ProtocolStrategies + +async def mutation_based_fuzzing(): + # Define base inputs to mutate + base_inputs = [ + {"jsonrpc": "2.0", "method": "test", "id": 1}, + {"jsonrpc": "2.0", "method": "initialize", "params": {}}, + "simple string input" + ] + + # Define target function to fuzz + def target_function(data): + if isinstance(data, dict): + if "method" in data and len(str(data.get("method", ""))) > 50: + raise ValueError("Method name too long") + return True + + # Fuzz using custom mutation strategies + results = await ProtocolStrategies.fuzz_with_alternative_libraries( + base_inputs=base_inputs, + target_function=target_function, + strategy="mutation", + num_mutations_per_input=5 + ) + + successful = len([r for r in results if r.get("success")]) + print(f"Mutation fuzzing: {successful}/{len(results)} successful") + +asyncio.run(mutation_based_fuzzing()) +``` + +#### Atheris Integration (if available) + +```python +import asyncio +from mcp_fuzzer.fuzz_engine.strategy import ProtocolStrategies + +async def atheris_fuzzing(): + # Define target function + def target_function(data): + if isinstance(data, dict) and "malicious" in str(data): + raise Exception("Malicious input detected") + return True + + # Prepare initial inputs + initial_inputs = [ + b'{"jsonrpc": "2.0", "method": "test"}', + b'{"jsonrpc": "2.0", "method": "malicious"}', + b"simple string" + ] + + # Fuzz using Atheris + results = await ProtocolStrategies.fuzz_with_alternative_libraries( + base_inputs=initial_inputs, + target_function=target_function, + strategy="atheris", + max_iterations=1000, + timeout_seconds=10 + ) + + print(f"Atheris fuzzing completed with {len(results)} results") + +asyncio.run(atheris_fuzzing()) +``` + +### Combined Fuzzing Strategies + +#### Multi-Strategy Fuzzing Campaign + +```python +import asyncio +from mcp_fuzzer.fuzz_engine.strategy import ProtocolStrategies + +async def combined_fuzzing_campaign(): + # Define comprehensive schema + schema = { + "type": "object", + "properties": { + "jsonrpc": {"type": "string", "enum": ["2.0"]}, + "method": {"type": "string"}, + "params": { + "type": "object", + "properties": { + "name": {"type": "string", "format": "email"}, + "url": {"type": "string", "format": "uri"} + } + } + } + } + + # Strategy 1: Hypothesis extensions + print("Running Hypothesis extensions fuzzing...") + hypo_results = await ProtocolStrategies.fuzz_with_hypothesis_extensions( + schema=schema, runs=10, use_realistic_data=True + ) + + # Strategy 2: Custom mutations + base_data = [{"jsonrpc": "2.0", "method": "test"}] + def simple_target(data): + if isinstance(data, dict) and len(str(data.get("method", ""))) > 20: + raise ValueError("Method too long") + return True + + print("Running mutation-based fuzzing...") + mutation_results = await ProtocolStrategies.fuzz_with_alternative_libraries( + base_inputs=base_data, + target_function=simple_target, + strategy="mutation", + num_mutations_per_input=8 + ) + + # Analyze results + hypo_success = len([r for r in hypo_results if r.get("success")]) + mutation_success = len([r for r in mutation_results if r.get("success")]) + + print(" +Fuzzing Campaign Results:") + print(f"Hypothesis Extensions: {hypo_success}/{len(hypo_results)} successful") + print(f"Custom Mutations: {mutation_success}/{len(mutation_results)} successful") + print(f"Total: {hypo_success + mutation_success} successful examples generated") + +asyncio.run(combined_fuzzing_campaign()) +``` + +#### Enhanced Demo Script + +Run the comprehensive demo script to see all enhanced fuzzing features: + +```bash +# Run the enhanced fuzzing demo +python examples/enhanced_fuzzing_demo.py +``` + +This demo showcases: +- Hypothesis extensions for JSON schema-based fuzzing +- Realistic data generation with faker +- Alternative fuzzing libraries (Atheris, PythonFuzz) +- Custom mutation-based fuzzing +- Combined multi-strategy fuzzing campaigns + +### Command Line Usage + +#### Hypothesis Extensions + +```bash +# Use enhanced fuzzing with Hypothesis extensions (via API) +# The enhanced features are available through the Python API +# See the examples above for usage patterns +``` + +#### Alternative Libraries + +```bash +# Use alternative fuzzing libraries (via API) +# The alternative libraries are integrated through the strategy manager +# See the Python examples above for detailed usage +``` + ## Enhanced Reporting Examples ### Comprehensive Safety Reporting diff --git a/examples/enhanced_fuzzing_demo.py b/examples/enhanced_fuzzing_demo.py new file mode 100644 index 0000000..09546b7 --- /dev/null +++ b/examples/enhanced_fuzzing_demo.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +""" +Enhanced Fuzzing Demo + +This script demonstrates the enhanced fuzzing capabilities added to the MCP fuzzer, +including Hypothesis extensions and alternative fuzzing libraries. +""" + +import asyncio +import json +import logging +from typing import Any, Dict, List + +from mcp_fuzzer.fuzz_engine.strategy import ProtocolStrategies +from mcp_fuzzer.fuzz_engine.strategy.hypothesis_extensions import hypothesis_extensions +from mcp_fuzzer.fuzz_engine.strategy.alternative_fuzzers import alternative_fuzzers + + +async def demo_hypothesis_extensions(): + """Demonstrate Hypothesis extensions for enhanced fuzzing.""" + print("šŸ”¬ Demonstrating Hypothesis Extensions") + print("=" * 50) + + # Example JSON schema for MCP protocol messages + mcp_schema = { + "type": "object", + "properties": { + "jsonrpc": {"type": "string", "enum": ["2.0"]}, + "id": {"type": ["string", "integer", "null"]}, + "method": {"type": "string"}, + "params": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "arguments": {"type": "object"} + } + } + }, + "required": ["jsonrpc", "method"] + } + + print("šŸ“‹ Generating data from JSON schema...") + examples = await hypothesis_extensions.generate_from_json_schema(mcp_schema, max_examples=5) + + for i, example in enumerate(examples, 1): + print(f"Example {i}: {json.dumps(example, indent=2)}") + + print("\nšŸ“§ Generating realistic user data...") + emails = await hypothesis_extensions.generate_realistic_user_data("email", count=3) + names = await hypothesis_extensions.generate_realistic_user_data("name", count=3) + + print(f"Generated emails: {emails}") + print(f"Generated names: {names}") + + print("\nšŸŽÆ Enhanced fuzzing with realistic data...") + results = await ProtocolStrategies.fuzz_with_hypothesis_extensions( + mcp_schema, runs=3, use_realistic_data=True + ) + + for result in results: + print(f"Fuzz result: {result['success']} - {result.get('fuzz_data', 'N/A')}") + + +async def demo_alternative_fuzzers(): + """Demonstrate alternative fuzzing libraries.""" + print("\nšŸ”„ Demonstrating Alternative Fuzzing Libraries") + print("=" * 50) + + # Sample target function to fuzz + def target_function(data): + """Sample function that processes JSON data.""" + if isinstance(data, dict): + if "method" in data and len(str(data.get("method", ""))) > 100: + raise ValueError("Method name too long") + if "id" in data and data["id"] == "malicious": + raise Exception("Malicious ID detected") + return True + + # Base inputs for fuzzing + base_inputs = [ + {"jsonrpc": "2.0", "method": "test", "id": 1}, + {"jsonrpc": "2.0", "method": "initialize", "params": {}}, + "[1, 2, 3]", + "simple string" + ] + + print("🧬 Custom mutation-based fuzzing...") + mutation_results = await alternative_fuzzers.mutation_based_fuzz( + base_inputs, target_function, num_mutations_per_input=3 + ) + + successful = len([r for r in mutation_results if r.get("success")]) + failed = len([r for r in mutation_results if not r.get("success")]) + + print(f"Mutation fuzzing: {successful} successful, {failed} failed") + + for result in mutation_results[:3]: # Show first 3 results + status = "āœ…" if result.get("success") else "āŒ" + print(f"{status} {result.get('strategy', 'unknown')}: {result.get('fuzz_data', 'N/A')}") + + print("\n⚔ Alternative library fuzzing...") + try: + alt_results = await ProtocolStrategies.fuzz_with_alternative_libraries( + base_inputs, target_function, strategy="mutation", num_mutations_per_input=2 + ) + + alt_successful = len([r for r in alt_results if r.get("success")]) + alt_failed = len([r for r in alt_results if not r.get("success")]) + + print(f"Alternative fuzzing: {alt_successful} successful, {alt_failed} failed") + + except Exception as e: + print(f"Alternative fuzzing demo failed: {e}") + + +async def demo_combined_strategies(): + """Demonstrate combining multiple fuzzing strategies.""" + print("\nšŸŽ­ Demonstrating Combined Fuzzing Strategies") + print("=" * 50) + + # Create a comprehensive fuzzing schema + comprehensive_schema = { + "type": "object", + "properties": { + "jsonrpc": {"type": "string", "enum": ["2.0"]}, + "method": {"type": "string"}, + "params": { + "type": "object", + "properties": { + "name": {"type": "string", "format": "email"}, + "url": {"type": "string", "format": "uri"}, + "timestamp": {"type": "string", "format": "date-time"} + } + } + } + } + + print("šŸš€ Running comprehensive fuzzing campaign...") + + # Strategy 1: Hypothesis extensions + print("\n1ļøāƒ£ Hypothesis Extensions:") + hypo_results = await ProtocolStrategies.fuzz_with_hypothesis_extensions( + comprehensive_schema, runs=5, use_realistic_data=True + ) + + hypo_success = len([r for r in hypo_results if r.get("success")]) + print(f" Results: {hypo_success}/{len(hypo_results)} successful") + + # Strategy 2: Custom mutations + print("\n2ļøāƒ£ Custom Mutation Fuzzing:") + base_data = [{"jsonrpc": "2.0", "method": "test"}] + + def simple_target(data): + if isinstance(data, dict) and "method" in data: + if len(str(data["method"])) > 50: + raise ValueError("Method too long") + return True + + mutation_results = await alternative_fuzzers.mutation_based_fuzz( + base_data, simple_target, num_mutations_per_input=5 + ) + + mutation_success = len([r for r in mutation_results if r.get("success")]) + print(f" Results: {mutation_success}/{len(mutation_results)} successful") + + print("\nšŸ“Š Fuzzing Campaign Summary:") + print(f" Hypothesis Extensions: {hypo_success}/{len(hypo_results)} successful") + print(f" Custom Mutations: {mutation_success}/{len(mutation_results)} successful") + print(f" Total: {hypo_success + mutation_success}/{len(hypo_results) + len(mutation_results)} successful") + + +async def main(): + """Main demo function.""" + logging.basicConfig(level=logging.INFO) + + print("šŸŽÆ MCP Fuzzer - Enhanced Fuzzing Capabilities Demo") + print("=" * 60) + print("This demo showcases the new fuzzing features added to the MCP fuzzer:") + print("• Hypothesis extensions for better data generation") + print("• Alternative fuzzing libraries (Atheris, PythonFuzz)") + print("• Custom mutation-based fuzzing") + print("• Combined fuzzing strategies") + print() + + try: + await demo_hypothesis_extensions() + await demo_alternative_fuzzers() + await demo_combined_strategies() + + print("\nšŸŽ‰ Demo completed successfully!") + print("\nTo use these features in your fuzzing campaigns:") + print("1. Import the enhanced modules:") + print(" from mcp_fuzzer.fuzz_engine.strategy import ProtocolStrategies") + print("2. Use the new methods:") + print(" await ProtocolStrategies.fuzz_with_hypothesis_extensions(schema)") + print(" await ProtocolStrategies.fuzz_with_alternative_libraries(inputs, func)") + print("3. Check the documentation for more advanced usage patterns") + + except Exception as e: + print(f"\nāŒ Demo failed with error: {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/mcp_fuzzer/fuzz_engine/strategy/alternative_fuzzers.py b/mcp_fuzzer/fuzz_engine/strategy/alternative_fuzzers.py new file mode 100644 index 0000000..ca33cb4 --- /dev/null +++ b/mcp_fuzzer/fuzz_engine/strategy/alternative_fuzzers.py @@ -0,0 +1,451 @@ +#!/usr/bin/env python3 +""" +Alternative Fuzzing Libraries Integration + +This module integrates various Python fuzzing libraries for enhanced fuzzing +capabilities including mutation-based, coverage-guided, and other testing strategies. +""" + +import asyncio +import json +import logging +import random +import string +from typing import Any, Dict, List, Callable, Union + +try: + import atheris + ATHERIS_AVAILABLE = True +except ImportError: + ATHERIS_AVAILABLE = False + atheris = None + +try: + from pythonfuzz.main import PythonFuzz + PYTHONFUZZ_AVAILABLE = True +except ImportError: + PYTHONFUZZ_AVAILABLE = False + PythonFuzz = None + +from ...types import FuzzDataResult + + +class AlternativeFuzzers: + """Integration of alternative fuzzing libraries for enhanced testing.""" + + def __init__(self): + self._logger = logging.getLogger(__name__) + + async def atheris_mutation_fuzz( + self, + target_function: Callable, + initial_inputs: List[bytes], + max_iterations: int = 1000, + timeout_seconds: int = 30 + ) -> List[FuzzDataResult]: + """ + Use Atheris for mutation-based fuzzing. + + Args: + target_function: Function to fuzz + initial_inputs: Initial input corpus + max_iterations: Maximum fuzzing iterations + timeout_seconds: Timeout for fuzzing session + + Returns: + List of fuzzing results + """ + if not ATHERIS_AVAILABLE: + self._logger.warning( + "Atheris not available, skipping mutation-based fuzzing" + ) + return [] + + results = [] + + def test_function(input_bytes: bytes) -> None: + """Test function wrapper for Atheris.""" + try: + # Convert bytes to string for JSON parsing + input_str = input_bytes.decode('utf-8', errors='ignore') + + # Try to parse as JSON + try: + data = json.loads(input_str) + except json.JSONDecodeError: + # If not JSON, treat as raw string + data = input_str + + # Call target function + target_function(data) + + # If we reach here, input was processed successfully + results.append({ + "protocol_type": "AtherisMutation", + "fuzz_data": data, + "success": True, + "strategy": "mutation_based", + "library": "atheris" + }) + + except Exception as e: + # Input caused an exception - this might be interesting + results.append({ + "protocol_type": "AtherisMutation", + "fuzz_data": input_bytes, + "success": False, + "exception": str(e), + "strategy": "mutation_based", + "library": "atheris" + }) + + try: + # Set up Atheris fuzzing + atheris.Setup(initial_inputs, test_function) + + # Run fuzzing with timeout + loop = asyncio.get_running_loop() + + def run_fuzzing(): + try: + atheris.Fuzz(max_iterations=max_iterations) + except Exception as e: + self._logger.debug(f"Atheris fuzzing completed with: {e}") + + await asyncio.wait_for( + loop.run_in_executor(None, run_fuzzing), + timeout=timeout_seconds + ) + + except asyncio.TimeoutError: + self._logger.info("Atheris fuzzing timed out") + except Exception as e: + self._logger.error(f"Atheris fuzzing failed: {e}") + + return results + + async def pythonfuzz_coverage_fuzz( + self, + target_function: Callable, + initial_corpus: List[bytes] = None, + max_runs: int = 100 + ) -> List[FuzzDataResult]: + """ + Use PythonFuzz for coverage-guided fuzzing. + + Args: + target_function: Function to fuzz + initial_corpus: Initial input corpus + max_runs: Maximum number of fuzzing runs + + Returns: + List of fuzzing results + """ + if not PYTHONFUZZ_AVAILABLE: + self._logger.warning( + "PythonFuzz not available, skipping coverage-guided fuzzing" + ) + return [] + + if initial_corpus is None: + initial_corpus = [b"{}", b'{"test": "data"}', b"[1, 2, 3]"] + + results = [] + + def fuzz_target(input_data: bytes) -> None: + """Fuzz target wrapper for PythonFuzz.""" + try: + input_str = input_data.decode('utf-8', errors='ignore') + + try: + data = json.loads(input_str) + except json.JSONDecodeError: + data = input_str + + target_function(data) + + results.append({ + "protocol_type": "PythonFuzzCoverage", + "fuzz_data": data, + "success": True, + "strategy": "coverage_guided", + "library": "pythonfuzz" + }) + + except Exception as e: + results.append({ + "protocol_type": "PythonFuzzCoverage", + "fuzz_data": input_data, + "success": False, + "exception": str(e), + "strategy": "coverage_guided", + "library": "pythonfuzz" + }) + + try: + # Create PythonFuzz instance + fuzzer = PythonFuzz(target_function=fuzz_target, corpus=initial_corpus) + + # Run fuzzing + loop = asyncio.get_running_loop() + + def run_pythonfuzz(): + for _ in range(max_runs): + try: + fuzzer.fuzz() + except Exception as e: + self._logger.debug(f"PythonFuzz iteration failed: {e}") + break + + await loop.run_in_executor(None, run_pythonfuzz) + + except Exception as e: + self._logger.error(f"PythonFuzz fuzzing failed: {e}") + + return results + + def generate_random_mutations( + self, + base_input: Union[str, Dict, List], + num_mutations: int = 10 + ) -> List[Any]: + """ + Generate random mutations of input data for fuzzing. + + Args: + base_input: Base input to mutate + num_mutations: Number of mutations to generate + + Returns: + List of mutated inputs + """ + mutations = [] + + for _ in range(num_mutations): + if isinstance(base_input, str): + mutation = self._mutate_string(base_input) + elif isinstance(base_input, dict): + mutation = self._mutate_dict(base_input.copy()) + elif isinstance(base_input, list): + mutation = self._mutate_list(base_input.copy()) + else: + # Convert to string and mutate + mutation = self._mutate_string(str(base_input)) + + mutations.append(mutation) + + return mutations + + def _mutate_string(self, s: str) -> str: + """Apply random mutations to a string.""" + mutations = [ + self._insert_random_chars, + self._delete_random_chars, + self._replace_random_chars, + self._duplicate_substring, + self._swap_chars, + ] + + mutation_func = random.choice(mutations) + return mutation_func(s) + + def _mutate_dict(self, d: Dict) -> Dict: + """Apply random mutations to a dictionary.""" + mutations = [ + self._add_random_key, + self._remove_random_key, + self._mutate_dict_value, + self._swap_dict_keys, + ] + + mutation_func = random.choice(mutations) + return mutation_func(d) + + def _mutate_list(self, lst: List) -> List: + """Apply random mutations to a list.""" + mutations = [ + self._add_list_element, + self._remove_list_element, + self._mutate_list_element, + self._shuffle_list, + ] + + mutation_func = random.choice(mutations) + return mutation_func(lst) + + # String mutation helpers + def _insert_random_chars(self, s: str) -> str: + if not s: + return "".join(random.choices(string.printable, k=5)) + + pos = random.randint(0, len(s)) + chars = "".join(random.choices(string.printable, k=random.randint(1, 5))) + return s[:pos] + chars + s[pos:] + + def _delete_random_chars(self, s: str) -> str: + if len(s) <= 1: + return s + + start = random.randint(0, len(s) - 1) + end = random.randint(start + 1, min(start + 5, len(s))) + return s[:start] + s[end:] + + def _replace_random_chars(self, s: str) -> str: + if not s: + return s + + pos = random.randint(0, len(s) - 1) + replacement = random.choice(string.printable) + return s[:pos] + replacement + s[pos + 1:] + + def _duplicate_substring(self, s: str) -> str: + if len(s) <= 1: + return s + + start = random.randint(0, len(s) - 1) + end = random.randint(start + 1, len(s)) + substring = s[start:end] + pos = random.randint(0, len(s)) + return s[:pos] + substring + s[pos:] + + def _swap_chars(self, s: str) -> str: + if len(s) <= 1: + return s + + i = random.randint(0, len(s) - 1) + j = random.randint(0, len(s) - 1) + chars = list(s) + chars[i], chars[j] = chars[j], chars[i] + return "".join(chars) + + # Dict mutation helpers + def _add_random_key(self, d: Dict) -> Dict: + key = f"key_{random.randint(0, 100)}" + value = random.choice([ + random.randint(0, 100), "string", [1, 2, 3], {"nested": "value"} + ]) + d[key] = value + return d + + def _remove_random_key(self, d: Dict) -> Dict: + if not d: + return d + + key = random.choice(list(d.keys())) + del d[key] + return d + + def _mutate_dict_value(self, d: Dict) -> Dict: + if not d: + return d + + key = random.choice(list(d.keys())) + if isinstance(d[key], str): + d[key] = self._mutate_string(d[key]) + elif isinstance(d[key], int): + d[key] = d[key] + random.randint(-10, 10) + elif isinstance(d[key], list): + d[key] = self._mutate_list(d[key]) + elif isinstance(d[key], dict): + d[key] = self._mutate_dict(d[key]) + return d + + def _swap_dict_keys(self, d: Dict) -> Dict: + if len(d) <= 1: + return d + + keys = list(d.keys()) + i, j = random.sample(range(len(keys)), 2) + keys[i], keys[j] = keys[j], keys[i] + return {k: d[k] for k in keys} + + # List mutation helpers + def _add_list_element(self, lst: List) -> List: + element = random.choice([ + random.randint(0, 100), "string", [1, 2], {"key": "value"} + ]) + pos = random.randint(0, len(lst)) + lst.insert(pos, element) + return lst + + def _remove_list_element(self, lst: List) -> List: + if not lst: + return lst + + pos = random.randint(0, len(lst) - 1) + del lst[pos] + return lst + + def _mutate_list_element(self, lst: List) -> List: + if not lst: + return lst + + pos = random.randint(0, len(lst) - 1) + if isinstance(lst[pos], str): + lst[pos] = self._mutate_string(lst[pos]) + elif isinstance(lst[pos], int): + lst[pos] = lst[pos] + random.randint(-10, 10) + elif isinstance(lst[pos], list): + lst[pos] = self._mutate_list(lst[pos]) + elif isinstance(lst[pos], dict): + lst[pos] = self._mutate_dict(lst[pos]) + return lst + + def _shuffle_list(self, lst: List) -> List: + random.shuffle(lst) + return lst + + async def mutation_based_fuzz( + self, + base_inputs: List[Union[str, Dict, List]], + target_function: Callable, + num_mutations_per_input: int = 10 + ) -> List[FuzzDataResult]: + """ + Perform mutation-based fuzzing using custom mutation strategies. + + Args: + base_inputs: Base inputs to mutate + target_function: Function to test with mutated inputs + num_mutations_per_input: Number of mutations per base input + + Returns: + List of fuzzing results + """ + results = [] + + for base_input in base_inputs: + mutations = self.generate_random_mutations( + base_input, num_mutations_per_input + ) + + for mutation in mutations: + try: + # Test the mutated input + target_function(mutation) + + results.append({ + "protocol_type": "CustomMutation", + "fuzz_data": mutation, + "success": True, + "strategy": "mutation_based", + "library": "custom", + "base_input": base_input + }) + + except Exception as e: + results.append({ + "protocol_type": "CustomMutation", + "fuzz_data": mutation, + "success": False, + "exception": str(e), + "strategy": "mutation_based", + "library": "custom", + "base_input": base_input + }) + + return results + + +# Global instance for easy access +alternative_fuzzers = AlternativeFuzzers() \ No newline at end of file diff --git a/mcp_fuzzer/fuzz_engine/strategy/hypothesis_extensions.py b/mcp_fuzzer/fuzz_engine/strategy/hypothesis_extensions.py new file mode 100644 index 0000000..43fff69 --- /dev/null +++ b/mcp_fuzzer/fuzz_engine/strategy/hypothesis_extensions.py @@ -0,0 +1,319 @@ +#!/usr/bin/env python3 +""" +Hypothesis Extensions for Enhanced Fuzzing + +This module provides enhanced fuzzing strategies using Hypothesis extensions +to generate more realistic and comprehensive test data for MCP server fuzzing. +""" + +import asyncio +import logging +from typing import Any, Dict, List, Optional + +from hypothesis import strategies as st +from hypothesis_jsonschema import from_schema + +try: + from faker import Faker + FAKER_AVAILABLE = True +except ImportError: + FAKER_AVAILABLE = False + Faker = None + +from ...types import FuzzDataResult + + +class HypothesisExtensions: + """Enhanced fuzzing strategies using Hypothesis extensions.""" + + def __init__(self): + self._logger = logging.getLogger(__name__) + self._faker = Faker() if FAKER_AVAILABLE else None + + # Mapping of data types to Faker methods + self.faker_mappings = { + "email": "email", + "name": "name", + "address": "address", + "phone": "phone_number", + "company": "company", + "url": "url", + "text": "text", + "sentence": "sentence", + "word": "word", + "uuid": "uuid4", + "date": "date", + "datetime": "date_time", + } + + async def generate_from_json_schema( + self, + schema: Dict[str, Any], + max_examples: int = 10 + ) -> List[Dict[str, Any]]: + """ + Generate test data from JSON schema using hypothesis-jsonschema. + + Args: + schema: JSON schema to generate data from + max_examples: Maximum number of examples to generate + + Returns: + List of generated data examples + """ + try: + strategy = from_schema(schema) + examples = [] + + for _ in range(max_examples): + try: + # Run in thread pool to avoid asyncio issues + loop = asyncio.get_running_loop() + example = await loop.run_in_executor(None, strategy.example) + examples.append(example) + except Exception as e: + self._logger.debug(f"Failed to generate example from schema: {e}") + continue + + return examples + + except Exception as e: + self._logger.error(f"Failed to create strategy from schema: {e}") + return [] + + async def generate_realistic_user_data( + self, + data_type: str, + count: int = 10 + ) -> List[Any]: + """ + Generate realistic user data using hypothesis-faker. + + Args: + data_type: Type of data to generate (email, name, address, etc.) + count: Number of examples to generate + + Returns: + List of generated realistic data + """ + if not FAKER_AVAILABLE: + self._logger.warning( + "hypothesis-faker not available, falling back to basic generation" + ) + return [f"fake_{data_type}_{i}" for i in range(count)] + + try: + # Map common data types to faker providers + faker_mappings = { + "email": "email", + "name": "name", + "address": "address", + "phone": "phone_number", + "company": "company", + "url": "url", + "text": "text", + "sentence": "sentence", + "word": "word", + "uuid": "uuid4", + "date": "date", + "datetime": "date_time", + } + + if data_type not in faker_mappings: + return [f"fake_{data_type}_{i}" for i in range(count)] + + faker_method = faker_mappings[data_type] + examples = [] + + for _ in range(count): + try: + # Use Faker directly + example = getattr(self._faker, faker_method)() + examples.append(example) + except Exception as e: + self._logger.debug(f"Failed to generate fake {data_type}: {e}") + continue + + return examples + + except Exception as e: + self._logger.error(f"Failed to generate realistic data: {e}") + return [f"fallback_{data_type}_{i}" for i in range(count)] + + def create_enhanced_protocol_strategy( + self, + base_schema: Dict[str, Any], + realistic_fields: Optional[Dict[str, Any]] = None + ) -> st.SearchStrategy: + """ + Create an enhanced strategy that combines JSON schema with realistic data. + + Args: + base_schema: Base JSON schema + realistic_fields: Fields to enhance with realistic data + + Returns: + Enhanced Hypothesis strategy + """ + try: + base_strategy = from_schema(base_schema) + + if realistic_fields and FAKER_AVAILABLE: + # Enhance specific fields with realistic data + enhancements = {} + for field, data_type in realistic_fields.items(): + # Use Faker directly for realistic data + if data_type in self.faker_mappings: + faker_method = self.faker_mappings[data_type] + enhancements[field] = st.just( + getattr(self._faker, faker_method)() + ) + else: + enhancements[field] = st.just(f"fake_{data_type}") + + return base_strategy.flatmap( + lambda base_data: st.builds( + lambda **kwargs: {**base_data, **kwargs}, + **enhancements + ) + ) + + return base_strategy + + except Exception as e: + self._logger.error(f"Failed to create enhanced strategy: {e}") + return st.just({}) + + async def generate_batch_from_schemas( + self, + schemas: List[Dict[str, Any]], + batch_size: int = 5 + ) -> List[Dict[str, Any]]: + """ + Generate a batch of data from multiple schemas. + + Args: + schemas: List of JSON schemas + batch_size: Number of items per schema + + Returns: + List of generated data from all schemas + """ + all_examples = [] + + for schema in schemas: + examples = await self.generate_from_json_schema(schema, batch_size) + all_examples.extend(examples) + + return all_examples + + def create_mcp_message_strategy( + self, + message_type: str = "request" + ) -> st.SearchStrategy: + """ + Create a strategy for generating MCP protocol messages. + + Args: + message_type: Type of MCP message (request, response, notification) + + Returns: + Strategy for generating MCP messages + """ + base_schema = { + "type": "object", + "properties": { + "jsonrpc": {"type": "string", "enum": ["2.0"]}, + "id": {"type": ["string", "integer", "null"]}, + "method": {"type": "string"}, + "params": {"type": "object"}, + }, + "required": ["jsonrpc", "method"] + } + + if message_type == "response": + base_schema["properties"]["result"] = {"type": "object"} + base_schema["properties"]["error"] = {"type": "object"} + elif message_type == "notification": + # Notifications don't have id + if "id" in base_schema["properties"]: + del base_schema["properties"]["id"] + if "id" in base_schema.get("required", []): + base_schema["required"].remove("id") + + return from_schema(base_schema) + + async def fuzz_with_extensions( + self, + schema: Dict[str, Any], + runs: int = 10, + use_realistic_data: bool = True + ) -> List[FuzzDataResult]: + """ + Perform fuzzing using Hypothesis extensions. + + Args: + schema: JSON schema to fuzz + runs: Number of fuzzing runs + use_realistic_data: Whether to use realistic data generation + + Returns: + List of fuzzing results + """ + results = [] + + try: + if use_realistic_data and FAKER_AVAILABLE: + # Use enhanced strategy with realistic data + strategy = self.create_enhanced_protocol_strategy( + schema, + realistic_fields={ + "name": "name", + "email": "email", + "url": "url", + "description": "text" + } + ) + else: + # Use basic JSON schema strategy + strategy = from_schema(schema) + + for run in range(runs): + try: + loop = asyncio.get_running_loop() + example = await loop.run_in_executor(None, strategy.example) + + result = { + "protocol_type": "EnhancedFuzz", + "run": run + 1, + "fuzz_data": example, + "success": True, + "strategy": "hypothesis_extensions", + "extensions_used": ["jsonschema"] + ( + ["faker"] if FAKER_AVAILABLE else [] + ) + } + + results.append(result) + + except Exception as e: + self._logger.debug( + f"Failed to generate example in run {run + 1}: {e}" + ) + results.append({ + "protocol_type": "EnhancedFuzz", + "run": run + 1, + "fuzz_data": None, + "success": False, + "exception": str(e), + "strategy": "hypothesis_extensions" + }) + + except Exception as e: + self._logger.error(f"Failed to create fuzzing strategy: {e}") + + return results + + +# Global instance for easy access +hypothesis_extensions = HypothesisExtensions() \ No newline at end of file diff --git a/mcp_fuzzer/fuzz_engine/strategy/strategy_manager.py b/mcp_fuzzer/fuzz_engine/strategy/strategy_manager.py index c20d646..4bd084d 100644 --- a/mcp_fuzzer/fuzz_engine/strategy/strategy_manager.py +++ b/mcp_fuzzer/fuzz_engine/strategy/strategy_manager.py @@ -27,6 +27,8 @@ fuzz_initialize_request_aggressive, get_protocol_fuzzer_method as get_aggressive_fuzzer_method, ) +from .hypothesis_extensions import hypothesis_extensions +from .alternative_fuzzers import alternative_fuzzers class ProtocolStrategies: @@ -230,6 +232,64 @@ def generate_out_of_order_batch( return batch + @staticmethod + async def fuzz_with_hypothesis_extensions( + schema: Dict[str, Any], + runs: int = 10, + use_realistic_data: bool = True + ) -> List[Dict[str, Any]]: + """ + Fuzz using Hypothesis extensions for enhanced data generation. + + Args: + schema: JSON schema to generate data from + runs: Number of fuzzing runs + use_realistic_data: Whether to use realistic data generation + + Returns: + List of fuzzing results + """ + return await hypothesis_extensions.fuzz_with_extensions( + schema, runs, use_realistic_data + ) + + @staticmethod + async def fuzz_with_alternative_libraries( + base_inputs: List[Any], + target_function: Callable, + strategy: str = "mutation", + **kwargs + ) -> List[Dict[str, Any]]: + """ + Fuzz using alternative fuzzing libraries. + + Args: + base_inputs: Base inputs to fuzz + target_function: Function to test + strategy: Fuzzing strategy to use + **kwargs: Additional arguments for the fuzzing strategy + + Returns: + List of fuzzing results + """ + if strategy == "atheris": + # Convert inputs to bytes for Atheris + initial_inputs = [str(inp).encode() for inp in base_inputs] + return await alternative_fuzzers.atheris_mutation_fuzz( + target_function, initial_inputs, **kwargs + ) + elif strategy == "pythonfuzz": + initial_corpus = [str(inp).encode() for inp in base_inputs] + return await alternative_fuzzers.pythonfuzz_coverage_fuzz( + target_function, initial_corpus, **kwargs + ) + elif strategy == "mutation": + return await alternative_fuzzers.mutation_based_fuzz( + base_inputs, target_function, **kwargs + ) + else: + raise ValueError(f"Unknown fuzzing strategy: {strategy}") + class ToolStrategies: """Unified tool strategies with two-phase approach.""" diff --git a/pyproject.toml b/pyproject.toml index 1ac456d..a75e69a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,8 @@ classifiers = [ dependencies = [ "httpx", "hypothesis", + "hypothesis-jsonschema", + "faker", "rich", "pyyaml>=6.0", ] diff --git a/tests/unit/fuzz_engine/strategy/test_alternative_fuzzers.py b/tests/unit/fuzz_engine/strategy/test_alternative_fuzzers.py new file mode 100644 index 0000000..257d5b9 --- /dev/null +++ b/tests/unit/fuzz_engine/strategy/test_alternative_fuzzers.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python3 +""" +Tests for Alternative Fuzzing Libraries + +This module contains tests for the alternative fuzzing libraries integration. +""" + +import asyncio +import pytest +from unittest.mock import patch, MagicMock + +from mcp_fuzzer.fuzz_engine.strategy.alternative_fuzzers import alternative_fuzzers + + +class TestAlternativeFuzzers: + """Test cases for alternative fuzzing libraries.""" + + def test_generate_random_mutations_string(self): + """Test random string mutations.""" + base_string = "test_input" + mutations = alternative_fuzzers.generate_random_mutations(base_string, num_mutations=5) + + assert len(mutations) == 5 + for mutation in mutations: + assert isinstance(mutation, str) + + def test_generate_random_mutations_dict(self): + """Test random dictionary mutations.""" + base_dict = {"key": "value", "number": 42} + mutations = alternative_fuzzers.generate_random_mutations(base_dict, num_mutations=3) + + assert len(mutations) == 3 + for mutation in mutations: + assert isinstance(mutation, dict) + + def test_generate_random_mutations_list(self): + """Test random list mutations.""" + base_list = [1, 2, "three"] + mutations = alternative_fuzzers.generate_random_mutations(base_list, num_mutations=4) + + assert len(mutations) == 4 + for mutation in mutations: + assert isinstance(mutation, list) + + @pytest.mark.asyncio + async def test_mutation_based_fuzz_success(self): + """Test successful mutation-based fuzzing.""" + base_inputs = [{"test": "data"}] + + def target_function(data): + return True # Always succeeds + + results = await alternative_fuzzers.mutation_based_fuzz( + base_inputs, target_function, num_mutations_per_input=3 + ) + + assert len(results) >= 3 # At least the original + mutations + for result in results: + assert "protocol_type" in result + assert "fuzz_data" in result + assert result["success"] is True + assert "strategy" in result + + @pytest.mark.asyncio + async def test_mutation_based_fuzz_with_failures(self): + """Test mutation-based fuzzing with some failures.""" + base_inputs = [{"method": "test"}] + + def target_function(data): + if isinstance(data, dict) and data.get("method") == "malicious": + raise ValueError("Malicious method detected") + return True + + results = await alternative_fuzzers.mutation_based_fuzz( + base_inputs, target_function, num_mutations_per_input=5 + ) + + assert len(results) >= 1 + # Should have some successful and potentially some failed results + successful = [r for r in results if r.get("success")] + assert len(successful) > 0 + + @pytest.mark.asyncio + async def test_atheris_fuzz_unavailable(self): + """Test Atheris fuzzing when library is not available.""" + # Since Atheris is not available in this environment, test the fallback + def target_function(data): + return True + + results = await alternative_fuzzers.atheris_mutation_fuzz( + target_function, [b"test"], max_iterations=10 + ) + + assert results == [] + + @pytest.mark.asyncio + async def test_pythonfuzz_coverage_fuzz_unavailable(self): + """Test PythonFuzz when library is not available.""" + # Since PythonFuzz is not available in this environment, test the fallback + def target_function(data): + return True + + results = await alternative_fuzzers.pythonfuzz_coverage_fuzz( + target_function, [b"test"], max_runs=5 + ) + + assert results == [] + + def test_string_mutation_helpers(self): + """Test individual string mutation helper functions.""" + test_string = "hello" + + # Test insert + mutated = alternative_fuzzers._insert_random_chars(test_string) + assert isinstance(mutated, str) + assert len(mutated) >= len(test_string) + + # Test delete + if len(test_string) > 1: + mutated = alternative_fuzzers._delete_random_chars(test_string) + assert isinstance(mutated, str) + assert len(mutated) <= len(test_string) + + # Test replace + mutated = alternative_fuzzers._replace_random_chars(test_string) + assert isinstance(mutated, str) + assert len(mutated) == len(test_string) + + # Test duplicate + mutated = alternative_fuzzers._duplicate_substring(test_string) + assert isinstance(mutated, str) + assert len(mutated) >= len(test_string) + + # Test swap + if len(test_string) > 1: + mutated = alternative_fuzzers._swap_chars(test_string) + assert isinstance(mutated, str) + assert len(mutated) == len(test_string) + + def test_dict_mutation_helpers(self): + """Test dictionary mutation helper functions.""" + test_dict = {"a": 1, "b": "test"} + + # Test add key + mutated = alternative_fuzzers._add_random_key(test_dict.copy()) + assert isinstance(mutated, dict) + assert len(mutated) >= len(test_dict) + + # Test remove key + if test_dict: + mutated = alternative_fuzzers._remove_random_key(test_dict.copy()) + assert isinstance(mutated, dict) + assert len(mutated) <= len(test_dict) + + # Test mutate value + mutated = alternative_fuzzers._mutate_dict_value(test_dict.copy()) + assert isinstance(mutated, dict) + assert len(mutated) == len(test_dict) + + def test_list_mutation_helpers(self): + """Test list mutation helper functions.""" + test_list = [1, 2, 3] + + # Test add element + mutated = alternative_fuzzers._add_list_element(test_list.copy()) + assert isinstance(mutated, list) + assert len(mutated) >= len(test_list) + + # Test remove element + if test_list: + mutated = alternative_fuzzers._remove_list_element(test_list.copy()) + assert isinstance(mutated, list) + assert len(mutated) <= len(test_list) + + # Test mutate element + mutated = alternative_fuzzers._mutate_list_element(test_list.copy()) + assert isinstance(mutated, list) + assert len(mutated) == len(test_list) + + # Test shuffle + mutated = alternative_fuzzers._shuffle_list(test_list.copy()) + assert isinstance(mutated, list) + assert len(mutated) == len(test_list) + assert set(mutated) == set(test_list) # Same elements, possibly reordered + + +class TestAlternativeFuzzersIntegration: + """Integration tests for alternative fuzzing libraries.""" + + @pytest.mark.asyncio + async def test_comprehensive_mutation_fuzzing(self): + """Test comprehensive mutation-based fuzzing workflow.""" + base_inputs = [ + {"jsonrpc": "2.0", "method": "test"}, + {"jsonrpc": "2.0", "method": "initialize", "params": {}}, + "string_input", + [1, 2, 3] + ] + + def comprehensive_target(data): + """Target function that checks various conditions.""" + if isinstance(data, dict): + if data.get("method") == "forbidden": + raise ValueError("Forbidden method") + if "params" in data and len(str(data["params"])) > 1000: + raise ValueError("Params too large") + elif isinstance(data, str): + if len(data) > 1000: + raise ValueError("String too long") + elif isinstance(data, list): + if len(data) > 100: + raise ValueError("List too long") + + return True + + results = await alternative_fuzzers.mutation_based_fuzz( + base_inputs, comprehensive_target, num_mutations_per_input=10 + ) + + assert len(results) >= len(base_inputs) # At least original inputs + + # Check that we have both successful and potentially failed results + successful_results = [r for r in results if r.get("success")] + assert len(successful_results) > 0 + + # Verify result structure + for result in results: + assert "protocol_type" in result + assert "fuzz_data" in result + assert "success" in result + assert "strategy" in result + assert result["strategy"] == "mutation_based" + assert result["library"] == "custom" + + @pytest.mark.asyncio + async def test_empty_base_inputs(self): + """Test fuzzing with empty base inputs.""" + def target_function(data): + return True + + results = await alternative_fuzzers.mutation_based_fuzz( + [], target_function, num_mutations_per_input=5 + ) + + assert results == [] + + @pytest.mark.asyncio + async def test_target_function_exceptions(self): + """Test handling of exceptions in target function.""" + base_inputs = [{"test": "data"}] + + def failing_target(data): + raise Exception("Test exception") + + results = await alternative_fuzzers.mutation_based_fuzz( + base_inputs, failing_target, num_mutations_per_input=3 + ) + + assert len(results) >= 1 + for result in results: + assert result["success"] is False + assert "exception" in result \ No newline at end of file diff --git a/tests/unit/fuzz_engine/strategy/test_hypothesis_extensions.py b/tests/unit/fuzz_engine/strategy/test_hypothesis_extensions.py new file mode 100644 index 0000000..8cc466c --- /dev/null +++ b/tests/unit/fuzz_engine/strategy/test_hypothesis_extensions.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python3 +""" +Tests for Hypothesis Extensions + +This module contains tests for the enhanced fuzzing capabilities +using Hypothesis extensions. +""" + +import asyncio +import json +import pytest +from unittest.mock import patch, MagicMock + +from mcp_fuzzer.fuzz_engine.strategy.hypothesis_extensions import hypothesis_extensions + + +class TestHypothesisExtensions: + """Test cases for Hypothesis extensions functionality.""" + + @pytest.mark.asyncio + async def test_generate_from_json_schema_basic(self): + """Test basic JSON schema data generation.""" + schema = { + "type": "object", + "properties": { + "name": {"type": "string"}, + "value": {"type": "integer"} + } + } + + results = await hypothesis_extensions.generate_from_json_schema(schema, max_examples=3) + + assert len(results) <= 3 + for result in results: + assert isinstance(result, dict) + if "name" in result: + assert isinstance(result["name"], str) + if "value" in result: + assert isinstance(result["value"], int) + + @pytest.mark.asyncio + async def test_generate_from_json_schema_empty(self): + """Test generation with empty schema.""" + schema = {} + results = await hypothesis_extensions.generate_from_json_schema(schema, max_examples=2) + + # Should handle empty schema gracefully + assert isinstance(results, list) + + @pytest.mark.asyncio + async def test_generate_realistic_user_data_without_faker(self): + """Test realistic data generation fallback when faker is not available.""" + with patch('mcp_fuzzer.fuzz_engine.strategy.hypothesis_extensions.FAKER_AVAILABLE', False): + results = await hypothesis_extensions.generate_realistic_user_data("email", count=3) + + assert len(results) == 3 + for result in results: + assert isinstance(result, str) + assert "email" in result + + @pytest.mark.asyncio + async def test_generate_realistic_user_data_unknown_type(self): + """Test realistic data generation with unknown data type.""" + results = await hypothesis_extensions.generate_realistic_user_data("unknown_type", count=2) + + assert len(results) == 2 + for result in results: + assert isinstance(result, str) + assert "unknown_type" in result + + @pytest.mark.asyncio + async def test_create_enhanced_protocol_strategy(self): + """Test creation of enhanced protocol strategy.""" + base_schema = { + "type": "object", + "properties": {"method": {"type": "string"}} + } + + strategy = hypothesis_extensions.create_enhanced_protocol_strategy(base_schema) + + # Should return a strategy object + assert strategy is not None + + @pytest.mark.asyncio + async def test_fuzz_with_extensions_basic(self): + """Test basic fuzzing with extensions.""" + schema = { + "type": "object", + "properties": {"test": {"type": "string"}} + } + + results = await hypothesis_extensions.fuzz_with_extensions( + schema, runs=2, use_realistic_data=False + ) + + assert len(results) == 2 + for result in results: + assert "protocol_type" in result + assert "run" in result + assert "fuzz_data" in result + assert "success" in result + assert "strategy" in result + + @pytest.mark.asyncio + async def test_fuzz_with_extensions_with_realistic_data(self): + """Test fuzzing with realistic data generation.""" + schema = { + "type": "object", + "properties": {"email": {"type": "string"}} + } + + results = await hypothesis_extensions.fuzz_with_extensions( + schema, runs=2, use_realistic_data=True + ) + + assert len(results) == 2 + # Should complete without errors even if faker is not available + + @pytest.mark.asyncio + async def test_fuzz_with_extensions_invalid_schema(self): + """Test fuzzing with invalid schema.""" + invalid_schema = {"invalid": "schema"} + + results = await hypothesis_extensions.fuzz_with_extensions( + invalid_schema, runs=1, use_realistic_data=False + ) + + # Should handle invalid schema gracefully + assert isinstance(results, list) + + @pytest.mark.asyncio + async def test_generate_batch_from_schemas(self): + """Test batch generation from multiple schemas.""" + schemas = [ + {"type": "object", "properties": {"a": {"type": "string"}}}, + {"type": "object", "properties": {"b": {"type": "integer"}}} + ] + + results = await hypothesis_extensions.generate_batch_from_schemas(schemas, batch_size=2) + + assert len(results) <= 4 # 2 schemas * 2 examples each + for result in results: + assert isinstance(result, dict) + + @pytest.mark.asyncio + async def test_create_mcp_message_strategy(self): + """Test MCP message strategy creation.""" + strategy = hypothesis_extensions.create_mcp_message_strategy("request") + + assert strategy is not None + + strategy_response = hypothesis_extensions.create_mcp_message_strategy("response") + assert strategy_response is not None + + strategy_notification = hypothesis_extensions.create_mcp_message_strategy("notification") + assert strategy_notification is not None + + +class TestHypothesisExtensionsIntegration: + """Integration tests for Hypothesis extensions.""" + + @pytest.mark.asyncio + async def test_full_fuzzing_workflow(self): + """Test complete fuzzing workflow with extensions.""" + # Define a realistic MCP schema + mcp_schema = { + "type": "object", + "properties": { + "jsonrpc": {"type": "string", "enum": ["2.0"]}, + "id": {"type": ["integer", "string", "null"]}, + "method": {"type": "string"}, + "params": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "value": {"type": "integer"} + } + } + }, + "required": ["jsonrpc", "method"] + } + + # Generate data from schema + schema_data = await hypothesis_extensions.generate_from_json_schema( + mcp_schema, max_examples=5 + ) + + # Fuzz with extensions + fuzz_results = await hypothesis_extensions.fuzz_with_extensions( + mcp_schema, runs=3, use_realistic_data=False + ) + + # Verify results + assert len(fuzz_results) == 3 + for result in fuzz_results: + assert result["strategy"] == "hypothesis_extensions" + assert "extensions_used" in result + + @pytest.mark.asyncio + async def test_error_handling(self): + """Test error handling in extensions.""" + # Test with None schema + results = await hypothesis_extensions.fuzz_with_extensions( + None, runs=1, use_realistic_data=False + ) + + assert isinstance(results, list) + + # Test with malformed schema + malformed_schema = {"type": "invalid"} + results = await hypothesis_extensions.fuzz_with_extensions( + malformed_schema, runs=1, use_realistic_data=False + ) + + assert isinstance(results, list) \ No newline at end of file