-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbully_algorithm.py
More file actions
162 lines (136 loc) · 5.86 KB
/
bully_algorithm.py
File metadata and controls
162 lines (136 loc) · 5.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import threading
import time
from typing import Dict, Optional
from dataclasses import dataclass
from protocol import *
@dataclass
class PeerInfo:
node_id: int
host: str
port: int
#
class BullyAlgorithm:
def __init__(self, my_id: int, messenger, peers_dict: Dict[int, PeerInfo] = None):
self.my_id = my_id
self.messenger = messenger # MessagingModule instance
self.peers = peers_dict if peers_dict is not None else {} # Dict[node_id, PeerInfo]
self.leader_id: Optional[int] = None
self.election_lock = threading.Lock()
# State flags
self.in_election = False
self.waiting_for_ack = False
self.election_timeout = 2.0 # seconds
def update_peers(self, new_peers: Dict[int, PeerInfo]):
"""
Update the peers list from Discovery Module.
"""
with self.election_lock:
self.peers = new_peers
# Check if Leader is still alive
if self.leader_id and self.leader_id != self.my_id:
if self.leader_id not in self.peers:
print(f"[Bully] Leader {self.leader_id} disappeared! Starting election.")
self.leader_id = None
# If we don't have a leader (newly joined or leader died), start election
if self.leader_id is None and not self.in_election:
print(f"[Bully] No leader known. Starting election.")
threading.Thread(target=self.start_election, daemon=True).start()
# 用來處理傳入的訊息
def handle_message(self, msg: dict):
"""
Callback interface for MessagingModule.
"""
msg_type = msg.get("type")
sender_id = msg.get("sender_id")
# payload = msg.get("payload", {})
if msg_type == MSG_LEADER_ELECTION:
self._handle_election_request(sender_id)
elif msg_type == MSG_ELECTION_ACK:
self._handle_election_ack(sender_id)
elif msg_type == MSG_LEADER_ANNOUNCE:
self._handle_coordinator_msg(sender_id)
elif msg_type == MSG_HEARTBEAT:
# Optional: Handle heartbeat
pass
def start_election(self):
"""
Initiates the election process.
"""
with self.election_lock:
self.in_election = True
self.waiting_for_ack = False
self.leader_id = None
print(f"[Bully] Node {self.my_id} starting election.")
# Find all peers with ID higher than mine
higher_peers = [p for pid, p in self.peers.items() if pid > self.my_id]
if not higher_peers:
# No higher peers, I am the leader
self._become_leader()
return
# Send Election message to all higher peers
self.waiting_for_ack = True
for p in higher_peers:
self.messenger.send_message(p.host, p.port, MSG_LEADER_ELECTION)
# Start a timer to wait for ACKs
threading.Thread(target=self._wait_for_ack_timeout, daemon=True).start()
def _wait_for_ack_timeout(self):
"""
Waits for election_timeout. If no ACK received, declare self as leader.
"""
time.sleep(self.election_timeout)
with self.election_lock:
if self.in_election and self.waiting_for_ack:
print(f"[Bully] Timeout waiting for ACK. I am taking over.")
self._become_leader()
def _handle_election_request(self, sender_id):
"""
Received ELECTION from a lower ID node.
1. Send ACK.
2. Start my own election (if not already).
"""
print(f"[Bully] Received ELECTION from {sender_id}. Sending ACK.")
# if sender_id in self.peers:
# target = self.peers[sender_id]
# self.messenger.send_message(target.host, target.port, MSG_ELECTION_ACK)
if self.my_id > sender_id:
print(f"[Bully] 😏 My ID {self.my_id} is higher than {sender_id}. Starting my own election.")
if sender_id in self.peers:
target = self.peers[sender_id]
self.messenger.send_message(target.host, target.port, MSG_ELECTION_ACK)
# If I am not already in an election, I should start one because I have a higher ID
if not self.in_election:
self.start_election()
else:
print(f"[Bully] 😢 I am bullied by {sender_id}")
def _handle_election_ack(self, sender_id):
"""
Received ACK. Someone higher is alive.
Stop waiting for timeout, wait for Coordinator message.
"""
print(f"[Bully] 🤨 Received ACK from {sender_id}. Stopping timeout, waiting for Coordinator.")
with self.election_lock:
self.waiting_for_ack = False
# We stay in self.in_election = True until we receive Coordinator message
def _handle_coordinator_msg(self, sender_id):
"""
Received COORDINATOR (Leader Announce).
Update leader and finish election.
"""
print(f"[Bully] 🥺 Received LEADER_ANNOUNCE from {sender_id}. New Leader is {sender_id}.")
with self.election_lock:
self.leader_id = sender_id
self.in_election = False
self.waiting_for_ack = False
def _become_leader(self):
"""
Declare self as leader and broadcast to all.
"""
with self.election_lock:
self.leader_id = self.my_id
self.in_election = False
self.waiting_for_ack = False
print(f"[Bully] 😎 I am the Leader ({self.my_id}). Announcing to peers...")
# Broadcast Coordinator message to ALL peers (lower and higher)
for pid, p in self.peers.items():
if pid != self.my_id:
self.messenger.send_message(p.host, p.port, MSG_LEADER_ANNOUNCE)