-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdiscovery.py
More file actions
122 lines (100 loc) · 4.47 KB
/
discovery.py
File metadata and controls
122 lines (100 loc) · 4.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import socket
import threading
import time
import json
from dataclasses import dataclass
from typing import Dict, Callable
# Configuration for Discovery
DISCOVERY_PORT = 54321 # UDP Port for broadcasting
BROADCAST_INTERVAL = 2.0 # Seconds
PEER_TIMEOUT = 6.0 # Seconds (if no hello received, consider dead)
@dataclass
class DiscoveredPeer:
node_id: int
host: str
port: int
last_seen: float
class DiscoveryModule:
def __init__(self, node_id, tcp_port, on_peer_changed: Callable = None):
self.node_id = node_id
self.tcp_port = tcp_port
self.on_peer_changed = on_peer_changed # Callback when peers list changes
self.running = False
self.peers: Dict[int, DiscoveredPeer] = {}
self.lock = threading.Lock()
# Setup UDP socket for broadcasting
self.broadcast_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# Setup UDP socket for listening
self.listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# On macOS, SO_REUSEPORT is often needed to allow multiple processes to bind to the same port
try:
self.listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass # Not available on Windows, but usually not needed there for this
self.listen_sock.bind(('', DISCOVERY_PORT))
def start(self):
self.running = True
threading.Thread(target=self._broadcast_loop, daemon=True).start()
threading.Thread(target=self._listen_loop, daemon=True).start()
threading.Thread(target=self._cleanup_loop, daemon=True).start()
print(f"[🔎Discovery] Started. Broadcasting on UDP {DISCOVERY_PORT}")
def stop(self):
self.running = False
self.broadcast_sock.close()
self.listen_sock.close()
def _broadcast_loop(self):
while self.running:
msg = {
"type": "HELLO",
"node_id": self.node_id,
"tcp_port": self.tcp_port
}
data = json.dumps(msg).encode('utf-8')
try:
# Broadcast to local network
self.broadcast_sock.sendto(data, ('<broadcast>', DISCOVERY_PORT))
except Exception as e:
print(f"[Discovery] Broadcast error: {e}")
time.sleep(BROADCAST_INTERVAL)
def _listen_loop(self):
while self.running:
try:
data, addr = self.listen_sock.recvfrom(1024)
msg = json.loads(data.decode('utf-8'))
if msg.get("type") == "HELLO":
remote_id = msg["node_id"]
remote_port = msg["tcp_port"]
remote_host = addr[0] # IP address of the sender
# Ignore self
if remote_id == self.node_id:
continue
self._update_peer(remote_id, remote_host, remote_port)
except Exception as e:
if self.running:
print(f"[Discovery] Listen error: {e}")
def _update_peer(self, node_id, host, port):
with self.lock:
current_time = time.time()
is_new = node_id not in self.peers
self.peers[node_id] = DiscoveredPeer(node_id, host, port, current_time)
if is_new:
print(f"[⭐️Discovery] Found new peer: Node {node_id} at {host}:{port}")
if self.on_peer_changed:
self.on_peer_changed(self.peers)
def _cleanup_loop(self):
while self.running:
time.sleep(2)
with self.lock:
current_time = time.time()
to_remove = []
for pid, peer in self.peers.items():
if current_time - peer.last_seen > PEER_TIMEOUT:
to_remove.append(pid)
if to_remove:
for pid in to_remove:
print(f"[Discovery] Peer Node {pid} timed out.")
del self.peers[pid]
if self.on_peer_changed:
self.on_peer_changed(self.peers)