-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcontrol_memory.py
More file actions
234 lines (179 loc) · 11.7 KB
/
control_memory.py
File metadata and controls
234 lines (179 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import time
from lift_utils import LIFT_SYSTEM, LIFT_CALL, FLOOR_REQUEST
class ControlMemoryFunctions:
'''TODO need to change this manually according to the state to control mapping'''
def __init__(self):
self.state_control_function_map = { #states
"a0": self.handle_waiting_state, # WAITING
"a1": self.handle_idle_state, # IDLE
"a2": self.handle_queue_updated_state, # QUEUE_UPDATED
"a3": self.handle_door_closed_state, # DOOR_CLOSED
"a4": self.handle_target_set_state, # TARGET_SET
"a5": self.handle_queue_popped_state, # QUEUE_POPPED
"a6": self.handle_door_opened_state, # DOOR_OPENED
"a7": self.handle_motors_moving_state, # MOTORS_MOVING
"a8": self.handle_updated_currfloor_state, # UPDATED_CURRFLOOR
"a9": self.handle_await_floor_requests_state, # AWAIT_FLOOR_REQUESTS
"a10": self.handle_overloaded_state, # OVERLOADED
"a11": self.handle_redundant_call_removed_state, # REDUNDANT_CALL_REMOVED
"a12": self.handle_motors_stopped_state # MOTORS_STOPPED
}
self.state_index_map = {
"WAITING": 0,
"IDLE": 1,
"QUEUE_UPDATED": 2,
"DOOR_CLOSED": 3,
"TARGET_SET": 4,
"QUEUE_POPPED": 5,
"DOOR_OPENED": 6,
"MOTORS_MOVING": 7,
"UPDATED_CURRFLOOR": 8,
"AWAIT_FLOOR_REQUESTS": 9,
"OVERLOADED": 10,
"REDUNDANT_CALL_REMOVED": 11,
"MOTORS_STOPPED": 12
}
self.dir ={
1: "UP",
0: "DOWN"
}
def handle_waiting_state(self, lift: LIFT_SYSTEM, request_list, motor_controller):
print("\n@@@ Lift is in WAITING state...")
# simply wait for lift call requests..not idle yet
lift.current_state_index = self.state_index_map["WAITING"]
if lift.previous_state_index != self.state_index_map["WAITING"]: #when prev is same..i dont set time..so no idle is handled
lift.idle_start_time = time.time()
if lift.previous_state_index == self.state_index_map["WAITING"]: #update idle timee only if pev was a waiting state..avoids direct transitions to idle after long time in floor r eqwaits
lift.idle_time = int(time.time() - lift.idle_start_time)
print(f"lift idle time: {lift.idle_time} seconds ? {lift.MAX_IDLE_TIME} seconds")
lift.previous_state_index = lift.current_state_index
def handle_idle_state(self, lift: LIFT_SYSTEM, request_list, motor_controller):
print(f"\n@@@ Lift is idle at floor {lift.current_floor}, Homing to floor {lift.target_floor}...")
lift.target_floor = lift.HOME_FLOOR
lift.current_direction = lift.target_floor > lift.current_direction #whaat ..why was this home floor (2nd word)
request_list.append((FLOOR_REQUEST, lift.target_floor, lift.current_direction))
lift.is_idle = True
lift.idle_time = 0
lift.current_state_index = self.state_index_map["IDLE"]
lift.previous_state_index = lift.current_state_index #as reaches state 0
def handle_queue_updated_state(self, lift: LIFT_SYSTEM, request_list, motor_controller):
print("\n@@@ Lift Not IDLE, Queue was updated, adding a new floor call request to the queue..., ")
if len(request_list) == 0: #assuming check is done in the condition part
raise ValueError("Request list is empty, cannot add new request.")
#approp additiona of q requested queue is done in gui part
lift.is_idle = False
lift.idle_time = 0
lift.current_state_index = self.state_index_map["QUEUE_UPDATED"]
lift.previous_state_index = lift.current_state_index #as reaches state 0
def handle_door_closed_state(self, lift: LIFT_SYSTEM, request_list, motor_controller):
print("\n@@@ Lift Closing Door...")
lift.door_closed = True
lift.current_state_index = self.state_index_map["DOOR_CLOSED"]
#-----------motor cmds----------------
motor_controller.close_door()
#TODO call a door closing music
lift.previous_state_index = lift.current_state_index #as reaches state 0
def handle_target_set_state(self, lift: LIFT_SYSTEM, request_list, motor_controller):
print(f"\n@@@ Lift Target floor set to {lift.target_floor}...by {request_list[0]}")
if len(request_list) == 0:
raise ValueError("Request list is empty, cannot set target floor.")
type, floor, direction = request_list[0] # Assuming the first request is the target
lift.target_floor = floor #assuming non-empty request list (check done in conditions)
lift.current_direction = floor > lift.current_floor #direction #or floor > lift.current_floor..same ig..not really when a simple floor is added..there is nodirection with it ...this gives issue witht hte motor movement
print(f"direction comparisons {floor > lift.current_floor} = {direction}")
lift.current_state_index = self.state_index_map["TARGET_SET"]
lift.previous_state_index = lift.current_state_index #as reaches state 0
def handle_queue_popped_state(self, lift: LIFT_SYSTEM, request_list, motor_controller):
print(f"\n@@@ Lift Request Queue popped {request_list[0]}, processing next request...Remaining requests: {len(request_list)}")
if len(request_list) == 0: # Assuming the request list is not empty checked in conditions
raise ValueError("Request list is empty, cannot pop request.")
type, floor, direction = request_list[0] # Assuming the first request is the target
request_list.pop(0) # Remove the first request after processing it
lift.current_state_index = self.state_index_map["QUEUE_POPPED"]
if len(request_list) == 0 and type == FLOOR_REQUEST: #so i remove the direction if it was a floor request serve...so bascially make it strict if it was a lift call
lift.previous_required_direction = None
else:
lift.previous_state_index = lift.current_state_index #as reaches state 0
def handle_door_opened_state(self, lift: LIFT_SYSTEM, request_list, motor_controller):
print("\n@@@ Lift Opening door...")
lift.door_closed = False
lift.current_state_index = self.state_index_map["DOOR_OPENED"]
lift.floor_request_wait_start_time = time.time() # Start the timer for floor request wait time
#-----------motor cmds----------------
motor_controller.open_door()
#TODO call a door opening music
lift.previous_state_index = lift.current_state_index #as reaches state 0
def handle_motors_moving_state(self, lift: LIFT_SYSTEM, request_list, motor_controller):
print(f"\n@@@ Motors moving {self.dir[lift.current_direction]} towards target floor {lift.target_floor}...")
lift.motors_stopped = False
lift.current_state_index = self.state_index_map["MOTORS_MOVING"]
#-----------motor cmds----------------
motor_controller.move_up() if lift.current_floor < lift.target_floor else motor_controller.move_down()
#TODO call a moving music
lift.previous_state_index = lift.current_state_index #as reaches state 0
def handle_updated_currfloor_state(self, lift: LIFT_SYSTEM, request_list, motor_controller):
if lift.current_floor < lift.target_floor:
lift.current_floor += 1
elif lift.current_floor > lift.target_floor:
lift.current_floor -= 1
print(f"\n@@@ Updating current floor...to {lift.current_floor}")
lift.previous_state_index = lift.current_state_index #as reaches state 0
def handle_await_floor_requests_state(self, lift: LIFT_SYSTEM, request_list, motor_controller):
print("\n@@@ Waiting for dialpad floor requests...")
# waits for new requests (queue is empty, no movement)
lift.floor_request_wait_time = int(time.time() - lift.floor_request_wait_start_time)
# lift.rather than sorting..i'll just pop it if i reach there on the way
lift.current_state_index = self.state_index_map["AWAIT_FLOOR_REQUESTS"]
if len(request_list) == 0:
print("No new floor requests, remaining idle.")
else:
print(f"Floor requests pending: {len(request_list)}.")
lift.previous_state_index = lift.current_state_index #as reaches state 0
def handle_overloaded_state(self, lift: LIFT_SYSTEM, request_list, motor_controller):
print(f"\n@@@ Lift is overloaded! motors: check: {lift.motors_stopped}")
lift.current_state_index = self.state_index_map["OVERLOADED"]
lift.floor_request_wait_start_time = time.time() # Start the timer for floor request wait time afresh after every overload check
# lift.floor_request_wait_time = 0 #so that enought time after overload removed
# TODO play alarm , Handle the overloaded situation, e.g., prevent movement, alarm, etc.
# motor_controller.stop() # already stopped at this moment
print("Lift stopped due to overload.")
lift.previous_state_index = lift.current_state_index #as reaches state 0
def handle_redundant_call_removed_state(self, lift: LIFT_SYSTEM, request_list, motor_controller):
print("\n@@@ Redundant call removed...")
# to remove redundant requests from the queue
if len(request_list) == 0:
#reached the last floor and no more requests
print("No requests to remove.")
else:
for idx, (type, floor, direction) in enumerate(request_list):
if floor == lift.current_floor:
request_list.pop(idx) # Remove the redundant request..prolly those requests which might come from lift calls...lift might anyways go there if that floor pressed inside the lift
print(f"Removed redundant request. Remaining requests: {len(request_list)}.")
lift.current_state_index = self.state_index_map["REDUNDANT_CALL_REMOVED"]
lift.previous_state_index = lift.current_state_index #as reaches state 0
def handle_motors_stopped_state(self, lift: LIFT_SYSTEM, request_list, motor_controller):
print("\n@@@ Motors stopped.")
lift.motors_stopped = True
lift.current_state_index = self.state_index_map["MOTORS_STOPPED"]
#-----------motor cmds----------------
motor_controller.stop_motors()
# Lift has reached its destination or stopped due to an error
print(f"Lift stopped at floor {lift.current_floor}.")
lift.previous_state_index = lift.current_state_index #as reaches state 0
#reference CS426 , gourinath sir's handbook
class ControlMemoryEntry:
def __init__(self, control=None, imm_transition=False):
self.control = control # This will hold function references that interact with motors
self.imm_transition = imm_transition
class ControlMemory:
''' Need to manually change the control memory array according to the transition equations (for imm_transitions) '''
def __init__(self):
# Initialize the control memory functions
self.control_memory_functions = ControlMemoryFunctions()
# Initialize control memory array, setting transition types for states
self.control_memory_array = [
ControlMemoryEntry(
self.control_memory_functions.state_control_function_map[f"a{i}"],
True if i in [6, 7, 10, 12] else False
) for i in range(13)
]