diff --git a/epoch_security_audit.py b/epoch_security_audit.py new file mode 100644 index 00000000..67e6ffaf --- /dev/null +++ b/epoch_security_audit.py @@ -0,0 +1,62 @@ +# SPDX-License-Identifier: MIT + +import sqlite3 +import time +import hashlib +import json +from flask import Flask, render_template_string, jsonify, request +import logging +from datetime import datetime, timedelta +import threading +from concurrent.futures import ThreadPoolExecutor + +app = Flask(__name__) +logging.basicConfig(level=logging.INFO) + +DB_PATH = 'rustchain.db' +EPOCH_DURATION = 600 # 10 minutes + +class EpochSecurityAudit: + def __init__(self): + self.test_results = [] + self.vulnerabilities_found = [] + self.audit_id = int(time.time()) + + def log_result(self, test_name, passed, details, severity="LOW"): + result = { + 'test_name': test_name, + 'passed': passed, + 'details': details, + 'severity': severity, + 'timestamp': datetime.now().isoformat() + } + self.test_results.append(result) + if not passed: + self.vulnerabilities_found.append(result) + + def get_current_epoch(self): + return int(time.time()) // EPOCH_DURATION + + def create_test_miner(self, node_id, hardware_class="A"): + with sqlite3.connect(DB_PATH) as conn: + cursor = conn.cursor() + cursor.execute(''' + INSERT OR REPLACE INTO nodes (node_id, hardware_class, status, last_seen) + VALUES (?, ?, 'active', ?) + ''', (node_id, hardware_class, int(time.time()))) + conn.commit() + + def test_double_enrollment(self): + """Test if miners can enroll in the same epoch twice""" + test_node = f"test_double_{self.audit_id}" + current_epoch = self.get_current_epoch() + + try: + self.create_test_miner(test_node) + + with sqlite3.connect(DB_PATH) as conn: + cursor = conn.cursor() + + # First enrollment + cursor.execute(''' + INSERT INTO epoch_enrollments (epoch_id, node_id, enrollment_time, hardware_mu diff --git a/security_test_harness.py b/security_test_harness.py new file mode 100644 index 00000000..43caf415 --- /dev/null +++ b/security_test_harness.py @@ -0,0 +1,58 @@ +# SPDX-License-Identifier: MIT + +import sqlite3 +import time +import hashlib +import threading +import json +import logging +from datetime import datetime, timedelta +from contextlib import contextmanager +from typing import Dict, List, Optional, Tuple, Any +from concurrent.futures import ThreadPoolExecutor, as_completed + +DB_PATH = 'rustchain_v2.db' + +class SecurityTestHarness: + def __init__(self): + self.test_results = [] + self.logger = self._setup_logging() + self.attack_scenarios = { + 'double_enrollment': self.test_double_enrollment, + 'late_attestation': self.test_late_attestation_injection, + 'multiplier_manipulation': self.test_multiplier_manipulation, + 'settlement_race': self.test_settlement_race_condition, + 'epoch_boundary': self.test_epoch_boundary_attacks + } + + def _setup_logging(self): + logger = logging.getLogger('security_harness') + logger.setLevel(logging.INFO) + handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + return logger + + @contextmanager + def db_connection(self): + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + try: + yield conn + finally: + conn.close() + + def create_test_environment(self): + """Setup isolated test data""" + with self.db_connection() as conn: + # Create test miners + test_miners = [ + ('test_miner_1', 'test_wallet_1', 1.5), + ('test_miner_2', 'test_wallet_2', 2.0), + ('test_miner_3', 'test_wallet_3', 1.0) + ] + + for miner_id, wallet, multiplier in test_miners: + conn.execute(''' + INSERT OR REPLACE INTO miners (miner_id, wallet_address, hardware_multiplier, is_activ diff --git a/tests/test_epoch_security.py b/tests/test_epoch_security.py new file mode 100644 index 00000000..3ace9c10 --- /dev/null +++ b/tests/test_epoch_security.py @@ -0,0 +1,99 @@ +# SPDX-License-Identifier: MIT + +import unittest +import sqlite3 +import tempfile +import os +import time +import threading +from unittest.mock import patch, MagicMock +import sys + +# Import the modules we're testing +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from node.rustchain_v2_integrated_v2_2_1_rip200 import RustChain +from rewards_implementation_rip200 import EpochManager + +class TestEpochSecurity(unittest.TestCase): + + def setUp(self): + """Set up test environment with temporary database""" + self.temp_db = tempfile.NamedTemporaryFile(delete=False) + self.db_path = self.temp_db.name + self.temp_db.close() + + # Initialize blockchain and epoch manager + self.blockchain = RustChain(db_path=self.db_path) + self.epoch_manager = EpochManager(db_path=self.db_path) + + # Set up test miners + self.test_miners = [ + {'id': 'miner_001', 'hardware': 'GPU', 'multiplier': 2.0}, + {'id': 'miner_002', 'hardware': 'CPU', 'multiplier': 1.0}, + {'id': 'miner_003', 'hardware': 'ASIC', 'multiplier': 3.0} + ] + + self._setup_test_data() + + def tearDown(self): + """Clean up temporary database""" + try: + os.unlink(self.db_path) + except: + pass + + def _setup_test_data(self): + """Initialize test database with realistic data""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + # Create miners table if not exists + cursor.execute(''' + CREATE TABLE IF NOT EXISTS miners ( + id TEXT PRIMARY KEY, + hardware_type TEXT, + multiplier REAL, + registered_at INTEGER + ) + ''') + + # Create epoch_enrollments table + cursor.execute(''' + CREATE TABLE IF NOT EXISTS epoch_enrollments ( + epoch_id INTEGER, + miner_id TEXT, + enrollment_time INTEGER, + PRIMARY KEY (epoch_id, miner_id) + ) + ''') + + conn.commit() + + def test_double_enrollment_vulnerability(self): + """Test that miners cannot enroll in the same epoch twice""" + epoch_id = int(time.time() // 600) + miner_id = 'test_miner_double' + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + # First enrollment should succeed + cursor.execute(''' + INSERT INTO epoch_enrollments (epoch_id, miner_id, enrollment_time) + VALUES (?, ?, ?) + ''', (epoch_id, miner_id, int(time.time()))) + + # Second enrollment should fail or be prevented + with self.assertRaises(sqlite3.IntegrityError): + cursor.execute(''' + INSERT INTO epoch_enrollments (epoch_id, miner_id, enrollment_time) + VALUES (?, ?, ?) + ''', (epoch_id, miner_id, int(time.time()))) + + def test_settlement_manipulation(self): + """Test epoch settlement against manipulation attempts""" + # This test would simulate various settlement manipulation scenarios + pass + +if __name__ == '__main__': + unittest.main()