Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,4 @@ cluster_estimation:

communications:
timeout: 30.0 # seconds
worker_period: 0.5 # seconds
18 changes: 15 additions & 3 deletions main_2024.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def main() -> int:
RANDOM_STATE = config["cluster_estimation"]["random_state"]

COMMUNICATIONS_TIMEOUT = config["communications"]["timeout"]
COMMUNICATIONS_WORKER_PERIOD = config["communications"]["worker_period"]

# pylint: enable=invalid-name
except KeyError as exception:
Expand Down Expand Up @@ -201,6 +202,10 @@ def main() -> int:
mp_manager,
QUEUE_MAX_SIZE,
)
communications_to_flight_interface_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
)
communications_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
Expand Down Expand Up @@ -285,7 +290,10 @@ def main() -> int:
FLIGHT_INTERFACE_BAUD_RATE,
FLIGHT_INTERFACE_WORKER_PERIOD,
),
input_queues=[flight_interface_decision_queue],
input_queues=[
flight_interface_decision_queue,
communications_to_flight_interface_queue,
],
output_queues=[
flight_interface_to_data_merge_queue,
flight_interface_to_communications_queue,
Expand Down Expand Up @@ -362,12 +370,15 @@ def main() -> int:
result, communications_worker_properties = worker_manager.WorkerProperties.create(
count=1,
target=communications_worker.communications_worker,
work_arguments=(COMMUNICATIONS_TIMEOUT,),
work_arguments=(COMMUNICATIONS_TIMEOUT, COMMUNICATIONS_WORKER_PERIOD),
input_queues=[
flight_interface_to_communications_queue,
cluster_estimation_to_communications_queue,
],
output_queues=[communications_to_main_queue],
output_queues=[
communications_to_main_queue,
communications_to_flight_interface_queue,
],
controller=controller,
local_logger=main_logger,
)
Expand Down Expand Up @@ -505,6 +516,7 @@ def main() -> int:
cluster_estimation_to_communications_queue.fill_and_drain_queue()
communications_to_main_queue.fill_and_drain_queue()
flight_interface_decision_queue.fill_and_drain_queue()
communications_to_flight_interface_queue.fill_and_drain_queue()

for manager in worker_managers:
manager.join_workers()
Expand Down
2 changes: 1 addition & 1 deletion modules/common
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this change? Can you make this a PR in common or something?

23 changes: 20 additions & 3 deletions modules/communications/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from ..common.modules import position_global
from ..common.modules import position_local
from ..common.modules.logger import logger
from ..common.modules.data_encoding import message_encoding_decoding
from ..common.modules.data_encoding import worker_enum
from ..common.modules.mavlink import local_global_conversion


Expand All @@ -23,6 +25,7 @@ def create(
cls,
home_position: position_global.PositionGlobal,
local_logger: logger.Logger,
worker_id: worker_enum.WorkerEnum,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having it as an input, the communications worker/class should always use the communication worker id. Just use the enum when you need to as a constant instead of as an input

) -> "tuple[True, Communications] | tuple[False, None]":
"""
Logs data and forwards it.
Expand All @@ -32,13 +35,14 @@ def create(
Returns: Success, class object.
"""

return True, Communications(cls.__create_key, home_position, local_logger)
return True, Communications(cls.__create_key, home_position, local_logger, worker_id)

def __init__(
self,
class_private_create_key: object,
home_position: position_global.PositionGlobal,
local_logger: logger.Logger,
worker_id: worker_enum.WorkerEnum,
) -> None:
"""
Private constructor, use create() method.
Expand All @@ -47,11 +51,12 @@ def __init__(

self.__home_position = home_position
self.__logger = local_logger
self.__worker_id = worker_id

def run(
self,
objects_in_world: list[object_in_world.ObjectInWorld],
) -> tuple[True, list[object_in_world.ObjectInWorld]] | tuple[False, None]:
) -> tuple[True, list[bytes]] | tuple[False, None]:

objects_in_world_global = []
for object_in_world in objects_in_world:
Expand Down Expand Up @@ -87,4 +92,16 @@ def run(

self.__logger.info(f"{time.time()}: {objects_in_world_global}")

return True, objects_in_world
encoded_position_global_objects = []
for object in object_in_world_global:

result, message = message_encoding_decoding.encode_position_global(
self.__worker_id, object
)
if not result:
self.__logger.warning("Conversion from PositionGlobal to bytes failed", True)
return False, None

encoded_position_global_objects.append(message)

return True, encoded_position_global_objects
24 changes: 21 additions & 3 deletions modules/communications/communications_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
import os
import pathlib
import queue
import time

from modules import object_in_world
from . import communications
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from ..common.modules.data_encoding import metadata_encoding_decoding
from ..common.modules.data_encoding import worker_enum
from ..common.modules.logger import logger


def communications_worker(
timeout: float,
period: float,
home_position_queue: queue_proxy_wrapper.QueueProxyWrapper,
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
Expand All @@ -29,6 +33,7 @@ def communications_worker(
"""

worker_name = pathlib.Path(__file__).stem
worker_id = worker_enum.WorkerEnum.COMMUNICATIONS_WORKER
process_id = os.getpid()
result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True)
if not result:
Expand All @@ -49,7 +54,7 @@ def communications_worker(

local_logger.info(f"Home position received: {home_position}", True)

result, comm = communications.Communications.create(home_position, local_logger)
result, comm = communications.Communications.create(home_position, local_logger, worker_id)
if not result:
local_logger.error("Worker failed to create class object", True)
return
Expand Down Expand Up @@ -79,8 +84,21 @@ def communications_worker(
if is_invalid:
continue

result, value = comm.run(input_data)
result, list_of_messages = comm.run(input_data)
if not result:
continue

output_queue.queue.put(value)
result, metadata = metadata_encoding_decoding.encode_metadata(
worker_id, len(list_of_messages)
)
if not result:
local_logger.error("Failed to encode metadata", True)
continue

output_queue.queue.put(metadata)

for message in list_of_messages:

time.sleep(period)

output_queue.queue.put(message)
6 changes: 5 additions & 1 deletion modules/flight_interface/flight_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def get_home_position(self) -> position_global.PositionGlobal:
"""
return self.__home_position

def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]":
def run(self, message: bytes) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message: bytes | None

"""
Returns a possible OdometryAndTime with current timestamp.
"""
Expand Down Expand Up @@ -103,6 +103,10 @@ def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]":

self.__logger.info(str(odometry_and_time_object), True)

result = self.controller.send_statustext_msg(message)
if not result:
self.__logger.error("Failed to send statustext message", True)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that message may be None due to get_nowait, so you should not log error every time. Skip normally if its None

return True, odometry_and_time_object

def apply_decision(self, cmd: decision_command.DecisionCommand) -> bool:
Expand Down
14 changes: 13 additions & 1 deletion modules/flight_interface/flight_interface_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import os
import pathlib
import queue
import time

from utilities.workers import queue_proxy_wrapper
Expand All @@ -20,6 +21,7 @@ def flight_interface_worker(
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
communications_output_queue: queue_proxy_wrapper.QueueProxyWrapper,
coordinates_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be the second queue becasue that's what you defined in main_2024. It goes in order (input1, input2, output1, output2...)

controller: worker_controller.WorkerController,
) -> None:
"""
Expand Down Expand Up @@ -62,7 +64,17 @@ def flight_interface_worker(

time.sleep(period)

result, value = interface.run()
try:
coordinate = coordinates_input_queue.queue.get_nowait()
except queue.Empty:
local_logger.warning("No more coordinates to process")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to log this, as it will log it 8 times per second, and it's normal operation

coordinate = None

if not isinstance(coordinate, bytes):
local_logger.warning(f"Skipping unexpected input: {coordinate}")
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not put this check here, put it in the run() function. This is so that it still continues to put odometry in the output queues, and you don't skip that functionality of the flight interface worker (cuz now you'll just skip getting odometry every time you don't have a communications message)


result, value = interface.run(coordinate)
if not result:
continue

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_flight_interface_hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def main() -> int:
assert interface is not None

# Run
result, odometry_time = interface.run()
result, odometry_time = interface.run(None)

# Test
assert result
Expand Down
48 changes: 48 additions & 0 deletions tests/integration/test_flight_interface_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,34 @@
from modules import decision_command
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from modules.common.modules import position_global
from modules.common.modules.data_encoding import message_encoding_decoding
from modules.common.modules.data_encoding import worker_enum

MAVLINK_CONNECTION_ADDRESS = "tcp:localhost:14550"
FLIGHT_INTERFACE_TIMEOUT = 10.0 # seconds
FLIGHT_INTERFACE_BAUD_RATE = 57600 # symbol rate
FLIGHT_INTERFACE_WORKER_PERIOD = 0.1 # seconds
WORK_COUNT = 4
COMMUNICATIONS_WORKER_ID = worker_enum.WorkerEnum.COMMUNICATIONS_WORKER


def simulate_communications_worker(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to add this test, as Jane is doing it as her task. Just make sure that the old test can still run (ie fix the inputs to the functions and worker)

in_queue: queue_proxy_wrapper.QueueProxyWrapper,
data_point: position_global.PositionGlobal,
) -> None:
"""
Encode coordinates and place into queue.
"""
result, message = message_encoding_decoding.encode_position_global(
COMMUNICATIONS_WORKER_ID, data_point
)
assert result
assert message is not None

in_queue.queue.put(message)

return


def apply_decision_test(
Expand Down Expand Up @@ -105,6 +128,8 @@ def main() -> int:
out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
home_position_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
communications_in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
communications_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)

worker = mp.Process(
target=flight_interface_worker.flight_interface_worker,
Expand All @@ -116,6 +141,7 @@ def main() -> int:
in_queue, # Added input_queue
out_queue,
home_position_out_queue,
communications_in_queue,
controller,
),
)
Expand All @@ -129,6 +155,28 @@ def main() -> int:
home_position = home_position_out_queue.queue.get()
assert home_position is not None

data_points = [
position_global.PositionGlobal.create(43.471468, -80.544205, 335),
position_global.PositionGlobal.create(43.6629, -79.3957, 105),
position_global.PositionGlobal.create(43.2609, -79.9192, 100),
position_global.PositionGlobal.create(43.7735, -79.5019, 170),
]

# Simulate communications worker
for i in range(0, WORK_COUNT):
simulate_communications_worker(communications_in_queue, home_position, data_points[i])

# Test flight interface worker sending statustext messages
for i in range(0, WORK_COUNT):
try:
input_data: bytes = communications_out_queue.queue.get_nowait()
assert input_data is not None
except queue.Empty:
print("Output queue has no more messages to process, exiting")
break

assert communications_out_queue.queue.empty()

# Run the apply_decision tests
test_result = apply_decision_test(in_queue, out_queue)
if not test_result:
Expand Down
Loading