-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmessaging.py
More file actions
73 lines (64 loc) · 2.46 KB
/
messaging.py
File metadata and controls
73 lines (64 loc) · 2.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# In charge of starting server listening and providing sending interface
import socket
import threading
import time
from protocol import *
class MessagingModule:
def __init__(self, my_ip, my_port, node_id, logic_handler=None):
self.ip = my_ip
self.port = my_port
self.node_id = node_id
self.logic_handler = logic_handler
self.running = True
# Server
def start_listening(self):
# Start server listening thread
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Allow address reuse
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((self.ip, self.port))
server_socket.listen(5)
print(f"[Net] Node {self.node_id} listening on {self.ip}:{self.port}")
while self.running:
try:
conn, addr = server_socket.accept()
threading.Thread(target=self._handle_connection, args=(conn, addr), daemon=True).start()
except Exception as e:
print(f"[Net] Accept error: {e}")
def _handle_connection(self, conn, addr):
try:
# Process data through recv_json
message = recv_json(conn)
if message:
print(f"\n[~~~Recv~~~] From {addr}: {message}")
# Pass message to logic handler
if self.logic_handler:
self.logic_handler.handle_message(message)
except Exception as e:
print(f"[Net] Receive error: {e}")
finally:
conn.close()
# Client
def send_message(self, target_ip, target_port, msg_type, payload=None):
"""
Inferface for Bully Algorithm to send messages.
"""
if payload is None: payload = {}
full_msg = {
"type": msg_type,
"sender_id": self.node_id,
"host": self.ip,
"port": self.port,
"payload": payload
}
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((target_ip, target_port))
send_json(sock, full_msg)
print(f"[--Send--] To {target_ip}:{target_port}:{msg_type}: {full_msg}")
sock.close()
return True
except Exception as e:
print(f"[*Net*] Send error to {target_ip}:{target_port}: {e}")
return False