Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions epoch_security_audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# SPDX-License-Identifier: MIT

import sqlite3
import time
import hashlib
import json
from flask import Flask, render_template_string, jsonify, request
import logging
from datetime import datetime, timedelta
import threading
from concurrent.futures import ThreadPoolExecutor

app = Flask(__name__)
logging.basicConfig(level=logging.INFO)

DB_PATH = 'rustchain.db'
EPOCH_DURATION = 600 # 10 minutes

class EpochSecurityAudit:
def __init__(self):
self.test_results = []
self.vulnerabilities_found = []
self.audit_id = int(time.time())

def log_result(self, test_name, passed, details, severity="LOW"):
result = {
'test_name': test_name,
'passed': passed,
'details': details,
'severity': severity,
'timestamp': datetime.now().isoformat()
}
self.test_results.append(result)
if not passed:
self.vulnerabilities_found.append(result)

def get_current_epoch(self):
return int(time.time()) // EPOCH_DURATION

def create_test_miner(self, node_id, hardware_class="A"):
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO nodes (node_id, hardware_class, status, last_seen)
VALUES (?, ?, 'active', ?)
''', (node_id, hardware_class, int(time.time())))
conn.commit()

def test_double_enrollment(self):
"""Test if miners can enroll in the same epoch twice"""
test_node = f"test_double_{self.audit_id}"
current_epoch = self.get_current_epoch()

try:
self.create_test_miner(test_node)

with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()

# First enrollment
cursor.execute('''
INSERT INTO epoch_enrollments (epoch_id, node_id, enrollment_time, hardware_mu
58 changes: 58 additions & 0 deletions security_test_harness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# SPDX-License-Identifier: MIT

import sqlite3
import time
import hashlib
import threading
import json
import logging
from datetime import datetime, timedelta
from contextlib import contextmanager
from typing import Dict, List, Optional, Tuple, Any
from concurrent.futures import ThreadPoolExecutor, as_completed

DB_PATH = 'rustchain_v2.db'

class SecurityTestHarness:
def __init__(self):
self.test_results = []
self.logger = self._setup_logging()
self.attack_scenarios = {
'double_enrollment': self.test_double_enrollment,
'late_attestation': self.test_late_attestation_injection,
'multiplier_manipulation': self.test_multiplier_manipulation,
'settlement_race': self.test_settlement_race_condition,
'epoch_boundary': self.test_epoch_boundary_attacks
}

def _setup_logging(self):
logger = logging.getLogger('security_harness')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger

@contextmanager
def db_connection(self):
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()

def create_test_environment(self):
"""Setup isolated test data"""
with self.db_connection() as conn:
# Create test miners
test_miners = [
('test_miner_1', 'test_wallet_1', 1.5),
('test_miner_2', 'test_wallet_2', 2.0),
('test_miner_3', 'test_wallet_3', 1.0)
]

for miner_id, wallet, multiplier in test_miners:
conn.execute('''
INSERT OR REPLACE INTO miners (miner_id, wallet_address, hardware_multiplier, is_activ
99 changes: 99 additions & 0 deletions tests/test_epoch_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# SPDX-License-Identifier: MIT

import unittest
import sqlite3
import tempfile
import os
import time
import threading
from unittest.mock import patch, MagicMock
import sys

# Import the modules we're testing
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from node.rustchain_v2_integrated_v2_2_1_rip200 import RustChain
from rewards_implementation_rip200 import EpochManager

class TestEpochSecurity(unittest.TestCase):

def setUp(self):
"""Set up test environment with temporary database"""
self.temp_db = tempfile.NamedTemporaryFile(delete=False)
self.db_path = self.temp_db.name
self.temp_db.close()

# Initialize blockchain and epoch manager
self.blockchain = RustChain(db_path=self.db_path)
self.epoch_manager = EpochManager(db_path=self.db_path)

# Set up test miners
self.test_miners = [
{'id': 'miner_001', 'hardware': 'GPU', 'multiplier': 2.0},
{'id': 'miner_002', 'hardware': 'CPU', 'multiplier': 1.0},
{'id': 'miner_003', 'hardware': 'ASIC', 'multiplier': 3.0}
]

self._setup_test_data()

def tearDown(self):
"""Clean up temporary database"""
try:
os.unlink(self.db_path)
except:
pass

def _setup_test_data(self):
"""Initialize test database with realistic data"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()

# Create miners table if not exists
cursor.execute('''
CREATE TABLE IF NOT EXISTS miners (
id TEXT PRIMARY KEY,
hardware_type TEXT,
multiplier REAL,
registered_at INTEGER
)
''')

# Create epoch_enrollments table
cursor.execute('''
CREATE TABLE IF NOT EXISTS epoch_enrollments (
epoch_id INTEGER,
miner_id TEXT,
enrollment_time INTEGER,
PRIMARY KEY (epoch_id, miner_id)
)
''')

conn.commit()

def test_double_enrollment_vulnerability(self):
"""Test that miners cannot enroll in the same epoch twice"""
epoch_id = int(time.time() // 600)
miner_id = 'test_miner_double'

with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()

# First enrollment should succeed
cursor.execute('''
INSERT INTO epoch_enrollments (epoch_id, miner_id, enrollment_time)
VALUES (?, ?, ?)
''', (epoch_id, miner_id, int(time.time())))

# Second enrollment should fail or be prevented
with self.assertRaises(sqlite3.IntegrityError):
cursor.execute('''
INSERT INTO epoch_enrollments (epoch_id, miner_id, enrollment_time)
VALUES (?, ?, ?)
''', (epoch_id, miner_id, int(time.time())))

def test_settlement_manipulation(self):
"""Test epoch settlement against manipulation attempts"""
# This test would simulate various settlement manipulation scenarios
pass

if __name__ == '__main__':
unittest.main()
Loading