Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.

Commit 34ab226

Browse files
committed
Added message queue to communications, moved checking to inside run() function, adjusted flight interface test to reflect new inputs
1 parent e08f5d3 commit 34ab226

File tree

5 files changed

+14
-65
lines changed

5 files changed

+14
-65
lines changed

modules/communications/communications.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ def create(
2525
cls,
2626
home_position: position_global.PositionGlobal,
2727
local_logger: logger.Logger,
28-
worker_id: worker_enum.WorkerEnum,
2928
) -> "tuple[True, Communications] | tuple[False, None]":
3029
"""
3130
Logs data and forwards it.
@@ -35,14 +34,13 @@ def create(
3534
Returns: Success, class object.
3635
"""
3736

38-
return True, Communications(cls.__create_key, home_position, local_logger, worker_id)
37+
return True, Communications(cls.__create_key, home_position, local_logger)
3938

4039
def __init__(
4140
self,
4241
class_private_create_key: object,
4342
home_position: position_global.PositionGlobal,
4443
local_logger: logger.Logger,
45-
worker_id: worker_enum.WorkerEnum,
4644
) -> None:
4745
"""
4846
Private constructor, use create() method.
@@ -51,7 +49,6 @@ def __init__(
5149

5250
self.__home_position = home_position
5351
self.__logger = local_logger
54-
self.__worker_id = worker_id
5552

5653
def run(
5754
self,
@@ -96,7 +93,7 @@ def run(
9693
for object in object_in_world_global:
9794

9895
result, message = message_encoding_decoding.encode_position_global(
99-
self.__worker_id, object
96+
worker_enum.WorkerEnum.COMMUNICATIONS_WORKER, object
10097
)
10198
if not result:
10299
self.__logger.warning("Conversion from PositionGlobal to bytes failed", True)

modules/communications/communications_worker.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ def communications_worker(
2222
home_position_queue: queue_proxy_wrapper.QueueProxyWrapper,
2323
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
2424
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
25+
message_output_queue: queue_proxy_wrapper.QueueProxyWrapper,
2526
controller: worker_controller.WorkerController,
2627
) -> None:
2728
"""
@@ -33,7 +34,6 @@ def communications_worker(
3334
"""
3435

3536
worker_name = pathlib.Path(__file__).stem
36-
worker_id = worker_enum.WorkerEnum.COMMUNICATIONS_WORKER
3737
process_id = os.getpid()
3838
result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True)
3939
if not result:
@@ -54,7 +54,7 @@ def communications_worker(
5454

5555
local_logger.info(f"Home position received: {home_position}", True)
5656

57-
result, comm = communications.Communications.create(home_position, local_logger, worker_id)
57+
result, comm = communications.Communications.create(home_position, local_logger)
5858
if not result:
5959
local_logger.error("Worker failed to create class object", True)
6060
return
@@ -89,16 +89,18 @@ def communications_worker(
8989
continue
9090

9191
result, metadata = metadata_encoding_decoding.encode_metadata(
92-
worker_id, len(list_of_messages)
92+
worker_enum.WorkerEnum.COMMUNICATIONS_WORKER, len(list_of_messages)
9393
)
9494
if not result:
9595
local_logger.error("Failed to encode metadata", True)
9696
continue
9797

9898
output_queue.queue.put(metadata)
99+
message_output_queue.queue.put(metadata)
99100

100101
for message in list_of_messages:
101102

102103
time.sleep(period)
103104

104105
output_queue.queue.put(message)
106+
message_output_queue.queue.put(message)

modules/flight_interface/flight_interface.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,10 @@ def run(self, message: bytes) -> "tuple[bool, odometry_and_time.OdometryAndTime
103103

104104
self.__logger.info(str(odometry_and_time_object), True)
105105

106-
result = self.controller.send_statustext_msg(message)
107-
if not result:
108-
self.__logger.error("Failed to send statustext message", True)
106+
if not isinstance(message, bytes):
107+
self.__logger.warning(f"Skipping unexpected input: {message}")
108+
else:
109+
result = self.controller.send_statustext_msg(message)
109110

110111
return True, odometry_and_time_object
111112

modules/flight_interface/flight_interface_worker.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ def flight_interface_worker(
1919
baud_rate: int,
2020
period: float,
2121
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
22+
coordinates_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
2223
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
2324
communications_output_queue: queue_proxy_wrapper.QueueProxyWrapper,
24-
coordinates_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
2525
controller: worker_controller.WorkerController,
2626
) -> None:
2727
"""
@@ -67,13 +67,8 @@ def flight_interface_worker(
6767
try:
6868
coordinate = coordinates_input_queue.queue.get_nowait()
6969
except queue.Empty:
70-
local_logger.warning("No more coordinates to process")
7170
coordinate = None
7271

73-
if not isinstance(coordinate, bytes):
74-
local_logger.warning(f"Skipping unexpected input: {coordinate}")
75-
continue
76-
7772
result, value = interface.run(coordinate)
7873
if not result:
7974
continue

tests/integration/test_flight_interface_worker.py

Lines changed: 2 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -11,35 +11,12 @@
1111
from modules import decision_command
1212
from utilities.workers import queue_proxy_wrapper
1313
from utilities.workers import worker_controller
14-
from modules.common.modules import position_global
15-
from modules.common.modules.data_encoding import message_encoding_decoding
16-
from modules.common.modules.data_encoding import worker_enum
14+
1715

1816
MAVLINK_CONNECTION_ADDRESS = "tcp:localhost:14550"
1917
FLIGHT_INTERFACE_TIMEOUT = 10.0 # seconds
2018
FLIGHT_INTERFACE_BAUD_RATE = 57600 # symbol rate
2119
FLIGHT_INTERFACE_WORKER_PERIOD = 0.1 # seconds
22-
WORK_COUNT = 4
23-
COMMUNICATIONS_WORKER_ID = worker_enum.WorkerEnum.COMMUNICATIONS_WORKER
24-
25-
26-
def simulate_communications_worker(
27-
in_queue: queue_proxy_wrapper.QueueProxyWrapper,
28-
data_point: position_global.PositionGlobal,
29-
) -> None:
30-
"""
31-
Encode coordinates and place into queue.
32-
"""
33-
result, message = message_encoding_decoding.encode_position_global(
34-
COMMUNICATIONS_WORKER_ID, data_point
35-
)
36-
assert result
37-
assert message is not None
38-
39-
in_queue.queue.put(message)
40-
41-
return
42-
4320

4421
def apply_decision_test(
4522
in_queue: queue_proxy_wrapper.QueueProxyWrapper,
@@ -129,7 +106,6 @@ def main() -> int:
129106
home_position_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
130107
in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
131108
communications_in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
132-
communications_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
133109

134110
worker = mp.Process(
135111
target=flight_interface_worker.flight_interface_worker,
@@ -139,9 +115,9 @@ def main() -> int:
139115
FLIGHT_INTERFACE_BAUD_RATE,
140116
FLIGHT_INTERFACE_WORKER_PERIOD,
141117
in_queue, # Added input_queue
118+
communications_in_queue,
142119
out_queue,
143120
home_position_out_queue,
144-
communications_in_queue,
145121
controller,
146122
),
147123
)
@@ -155,28 +131,6 @@ def main() -> int:
155131
home_position = home_position_out_queue.queue.get()
156132
assert home_position is not None
157133

158-
data_points = [
159-
position_global.PositionGlobal.create(43.471468, -80.544205, 335),
160-
position_global.PositionGlobal.create(43.6629, -79.3957, 105),
161-
position_global.PositionGlobal.create(43.2609, -79.9192, 100),
162-
position_global.PositionGlobal.create(43.7735, -79.5019, 170),
163-
]
164-
165-
# Simulate communications worker
166-
for i in range(0, WORK_COUNT):
167-
simulate_communications_worker(communications_in_queue, home_position, data_points[i])
168-
169-
# Test flight interface worker sending statustext messages
170-
for i in range(0, WORK_COUNT):
171-
try:
172-
input_data: bytes = communications_out_queue.queue.get_nowait()
173-
assert input_data is not None
174-
except queue.Empty:
175-
print("Output queue has no more messages to process, exiting")
176-
break
177-
178-
assert communications_out_queue.queue.empty()
179-
180134
# Run the apply_decision tests
181135
test_result = apply_decision_test(in_queue, out_queue)
182136
if not test_result:

0 commit comments

Comments
 (0)