-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_heartbeat.py
More file actions
141 lines (109 loc) · 4.41 KB
/
test_heartbeat.py
File metadata and controls
141 lines (109 loc) · 4.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python3
"""Test script for Phase 3: Heartbeat and Survival Loop implementation."""
import asyncio
import sys
from pathlib import Path
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent))
from .core.protocol.client import A2AClient as GEPClient
from .core.loop.heartbeat import HeartbeatLoop
from .core.loop.survival import SurvivalTracker, SurvivalStatus
async def test_heartbeat_instantiation():
"""Test that HeartbeatLoop can be instantiated."""
print("Testing HeartbeatLoop instantiation...")
client = GEPClient(
base_url='http://localhost:8000',
node_id='test-node-123',
)
heartbeat = HeartbeatLoop(
client=client,
worker_enabled=False,
default_interval_ms=60000, # 1 minute for testing
)
assert heartbeat.is_running is False, "Heartbeat should not be running initially"
assert heartbeat.available_work == 0, "Available work should be 0 initially"
print("✓ HeartbeatLoop instantiation successful")
async def test_survival_tracker():
"""Test SurvivalTracker functionality."""
print("\nTesting SurvivalTracker...")
tracker = SurvivalTracker()
# Test initial state
status = tracker.get_status()
assert status['status'] == 'alive', "Initial status should be alive"
assert tracker.is_alive() is True, "Should be alive initially"
# Test tracking with good balance
response = {
'credit_balance': 100,
'status': 'active',
'timestamp': '2026-03-08T12:00:00Z',
}
result = tracker.track_status(response)
assert result == SurvivalStatus.ALIVE, "Should be alive with 100 credits"
assert tracker.credit_balance == 100, "Credit balance should be 100"
# Test at-risk status
response['credit_balance'] = 5
response['status'] = 'at_risk'
result = tracker.track_status(response)
assert result == SurvivalStatus.AT_RISK, "Should be at risk with low credits"
assert tracker.is_at_risk() is True, "Should be at risk"
# Test dormant status
response['credit_balance'] = 0
response['status'] = 'dormant'
# Set old activity to trigger dormant
tracker._last_activity = None
result = tracker.track_status(response)
assert result == SurvivalStatus.DORMANT, "Should be dormant with 0 credits"
assert tracker.is_dormant() is True, "Should be dormant"
print("✓ SurvivalTracker all tests passed")
async def test_heartbeat_properties():
"""Test HeartbeatLoop properties and methods."""
print("\nTesting HeartbeatLoop properties...")
client = GEPClient(
base_url='http://localhost:8000',
node_id='test-node-456',
)
heartbeat = HeartbeatLoop(client)
# Test properties
assert hasattr(heartbeat, 'available_work'), "Should have available_work property"
assert hasattr(heartbeat, 'last_heartbeat'), "Should have last_heartbeat property"
assert hasattr(heartbeat, 'is_running'), "Should have is_running property"
# Test send_once method (will fail to connect, but tests the code path)
try:
# Mock the client's send_heartbeat to avoid network call
async def mock_send_heartbeat(**kwargs):
return {
'next_heartbeat_ms': 60000,
'credit_balance': 100,
'available_work': 5,
'status': 'active',
}
heartbeat.client.send_heartbeat = mock_send_heartbeat
response = await heartbeat.send_once()
assert response['available_work'] == 5, "Should track available work"
assert heartbeat.available_work == 5, "Available work should be updated"
print("✓ HeartbeatLoop properties and send_once work correctly")
except Exception as e:
print(f"✗ HeartbeatLoop test failed: {e}")
raise
async def main():
"""Run all tests."""
print("=" * 60)
print("Phase 3: Heartbeat and Survival Loop Tests")
print("=" * 60)
try:
await test_heartbeat_instantiation()
await test_survival_tracker()
await test_heartbeat_properties()
print("\n" + "=" * 60)
print("✓ All tests passed successfully!")
print("=" * 60)
except AssertionError as e:
print(f"\n✗ Test failed: {e}")
sys.exit(1)
except Exception as e:
print(f"\n✗ Unexpected error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
asyncio.run(main())