Skip to content

Commit 6ee5d04

Browse files
authored
Update belel_integrity_crawler.py
1 parent 274880a commit 6ee5d04

File tree

1 file changed

+138
-38
lines changed

1 file changed

+138
-38
lines changed

belel_integrity_crawler.py

Lines changed: 138 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3,69 +3,169 @@
33
# Enforces the cryptographic immutability of core identity files
44

55
import os
6-
import hashlib
76
import json
87
import time
8+
import hashlib
9+
from typing import Dict, Tuple, Optional
10+
from filelock import FileLock
911
from canonical_utils import alert_violation, trigger_repair_protocol
1012

11-
# === CONFIGURATION ===
12-
13-
WATCHED_FILES = {
14-
"BELEL_AUTHORITY_PROOF.txt": "8e58b232d1ad6ca86bbdb30456a42bf69c3165e4",
15-
"identity_guard.py": "c7e4d2039a7d4ac79d7c890aaf865334110e6ac9",
13+
# === CONFIGURATION (kept) ===
14+
WATCHED_FILES: Dict[str, str] = {
15+
"BELEL_AUTHORITY_PROOF.txt": "8e58b232d1ad6ca86bbdb30456a42bf69c3165e4", # SHA-1 (40 hex)
16+
"identity_guard.py": "c7e4d2039a7d4ac79d7c890aaf865334110e6ac9", # SHA-1 (40 hex)
1617
"belel_integrity_crawler.py": "LOCKED_AT_DEPLOY",
17-
"src/protocol/identity/identity_guard.json": "LOCKED_AT_DEPLOY"
18+
"src/protocol/identity/identity_guard.json": "LOCKED_AT_DEPLOY",
1819
}
1920

20-
HASH_ALGO = "sha1"
21-
CHECK_INTERVAL_SECONDS = 300 # 5 minutes
22-
CANONICAL_LOG = "violations.json"
21+
# Default to SHA-256 while staying backward-compatible with existing SHA-1 values
22+
HASH_ALGO = os.getenv("BELEL_HASH_ALGO", "sha256")
23+
CHECK_INTERVAL_SECONDS = int(os.getenv("BELEL_CRAWLER_INTERVAL_SECS", "300")) # kept: 5 mins default
24+
CANONICAL_LOG = os.getenv("BELEL_CANONICAL_LOG", "violations.json") # kept
2325

24-
# === FUNCTIONS ===
26+
# New: optional sources/outputs
27+
EXTERNAL_EXPECTED_MAP = os.getenv("BELEL_EXPECTED_MAP") # optional JSON file with {"path": "hash|LOCKED_AT_DEPLOY"...}
28+
BASELINE_LOCK_FILE = os.getenv("BELEL_BASELINE_FILE", ".expected_hashes.lock.json")
2529

26-
def compute_hash(filepath, algo=HASH_ALGO):
30+
# === UTILITIES ===
31+
32+
def _stream_hash(path: str, algo: str) -> Optional[str]:
33+
h = hashlib.sha256() if algo == "sha256" else hashlib.sha1()
2734
try:
28-
with open(filepath, 'rb') as f:
29-
data = f.read()
30-
if algo == "sha1":
31-
return hashlib.sha1(data).hexdigest()
32-
elif algo == "sha256":
33-
return hashlib.sha256(data).hexdigest()
34-
except Exception as e:
35+
with open(path, "rb") as f:
36+
for chunk in iter(lambda: f.read(8192), b""):
37+
h.update(chunk)
38+
return h.hexdigest()
39+
except Exception:
3540
return None
3641

37-
def load_previous_violations():
38-
if not os.path.exists(CANONICAL_LOG):
39-
return {}
40-
with open(CANONICAL_LOG, 'r') as f:
41-
return json.load(f)
42+
def _detect_algo_from_expected(expected: str) -> Tuple[str, str]:
43+
"""
44+
Decide which algo to use based on the expected string form.
45+
Supports:
46+
- raw 40-hex (SHA-1)
47+
- raw 64-hex (SHA-256)
48+
- 'sha1:<hex>' or 'sha256:<hex>' prefixes
49+
Returns (algo, cleaned_expected_hex)
50+
"""
51+
exp = expected.lower()
52+
if exp.startswith("sha1:"):
53+
return "sha1", exp.split(":", 1)[1]
54+
if exp.startswith("sha256:"):
55+
return "sha256", exp.split(":", 1)[1]
56+
# length heuristic
57+
if len(exp) == 40:
58+
return "sha1", exp
59+
if len(exp) == 64:
60+
return "sha256", exp
61+
# fallback to default (sha256), do not transform
62+
return "sha256", exp
63+
64+
def _load_json_safely(path: str, default):
65+
if not os.path.exists(path):
66+
return default
67+
try:
68+
with open(path, "r", encoding="utf-8") as f:
69+
return json.load(f)
70+
except Exception:
71+
return default
72+
73+
def _save_json_safely(path: str, obj) -> None:
74+
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
75+
lock = FileLock(path + ".lock")
76+
with lock:
77+
tmp = path + ".tmp"
78+
with open(tmp, "w", encoding="utf-8") as f:
79+
json.dump(obj, f, indent=4, ensure_ascii=False)
80+
os.replace(tmp, path)
81+
82+
def _resolve_expected_map() -> Dict[str, str]:
83+
"""
84+
Build the expected hash map by merging:
85+
1) Built-in WATCHED_FILES
86+
2) Optional EXTERNAL_EXPECTED_MAP file
87+
3) Resolve LOCKED_AT_DEPLOY placeholders using a baseline lock file
88+
"""
89+
expected = dict(WATCHED_FILES)
90+
91+
# External override (optional)
92+
if EXTERNAL_EXPECTED_MAP and os.path.exists(EXTERNAL_EXPECTED_MAP):
93+
ext = _load_json_safely(EXTERNAL_EXPECTED_MAP, {})
94+
if isinstance(ext, dict):
95+
expected.update(ext)
96+
97+
# Baseline resolution for LOCKED_AT_DEPLOY
98+
baseline = _load_json_safely(BASELINE_LOCK_FILE, {})
99+
changed = False
100+
101+
for path, val in list(expected.items()):
102+
if isinstance(val, str) and val == "LOCKED_AT_DEPLOY":
103+
# compute current SHA-256 as baseline (preferred)
104+
cur = _stream_hash(path, "sha256")
105+
if cur:
106+
if baseline.get(path) != cur:
107+
baseline[path] = cur
108+
changed = True
109+
expected[path] = baseline[path] # enforce baseline
110+
else:
111+
# file missing/unreadable: leave placeholder, we'll skip check
112+
expected[path] = "LOCKED_AT_DEPLOY"
42113

43-
def save_violation_log(violations):
44-
with open(CANONICAL_LOG, 'w') as f:
45-
json.dump(violations, f, indent=4)
114+
if changed:
115+
_save_json_safely(BASELINE_LOCK_FILE, baseline)
116+
117+
return expected
118+
119+
def load_previous_violations() -> Dict:
120+
return _load_json_safely(CANONICAL_LOG, {})
121+
122+
def save_violation_log(violations: Dict) -> None:
123+
_save_json_safely(CANONICAL_LOG, violations)
124+
125+
# === CORE CHECK ===
46126

47127
def perform_integrity_check():
48128
print("🔍 Running Belel integrity scan...")
49129
violations = load_previous_violations()
50130
new_findings = {}
51131

52-
for file_path, expected_hash in WATCHED_FILES.items():
132+
expected_map = _resolve_expected_map()
133+
134+
for file_path, expected_hash in expected_map.items():
135+
# Skip unresolved placeholders (kept behavior)
53136
if expected_hash == "LOCKED_AT_DEPLOY":
54-
continue # Skip placeholder
55-
actual_hash = compute_hash(file_path)
56-
if not actual_hash:
57-
print(f"⚠️ File missing or unreadable: {file_path}")
58137
continue
59138

60-
if actual_hash != expected_hash:
139+
algo, want = _detect_algo_from_expected(expected_hash)
140+
141+
if not os.path.exists(file_path):
142+
print(f"⚠️ File missing or unreadable: {file_path}")
143+
# Kept behavior: log to console, do not alert/repair on missing
144+
continue
145+
146+
got = _stream_hash(file_path, algo)
147+
148+
if not got:
149+
print(f"⚠️ Unable to read file: {file_path}")
150+
continue
151+
152+
if got != want:
61153
print(f"🚨 Tampering detected in {file_path}")
62154
new_findings[file_path] = {
63155
"expected": expected_hash,
64-
"found": actual_hash,
65-
"timestamp": time.time()
156+
"found": got,
157+
"timestamp": time.time(),
158+
"algo": algo,
66159
}
67-
alert_violation(file_path, expected_hash, actual_hash)
68-
trigger_repair_protocol(file_path)
160+
# Kept: protocol hooks
161+
try:
162+
alert_violation(file_path, expected_hash, got)
163+
except Exception as e:
164+
print(f"⚠️ alert_violation failed: {e}")
165+
try:
166+
trigger_repair_protocol(file_path)
167+
except Exception as e:
168+
print(f"⚠️ trigger_repair_protocol failed: {e}")
69169

70170
if new_findings:
71171
violations.update(new_findings)
@@ -74,7 +174,7 @@ def perform_integrity_check():
74174
else:
75175
print("✅ No integrity violations found.")
76176

77-
# === MAIN LOOP ===
177+
# === MAIN LOOP (kept) ===
78178

79179
if __name__ == "__main__":
80180
print("🛡️ Belel Integrity Crawler active.")

0 commit comments

Comments
 (0)