diff --git a/config.yaml b/config.yaml index c6d4f59e..4dbf9c9c 100644 --- a/config.yaml +++ b/config.yaml @@ -77,3 +77,4 @@ cluster_estimation: communications: timeout: 30.0 # seconds + worker_period: 0.5 # seconds diff --git a/main_2024.py b/main_2024.py index 25f68a72..81383697 100644 --- a/main_2024.py +++ b/main_2024.py @@ -156,6 +156,7 @@ def main() -> int: RANDOM_STATE = config["cluster_estimation"]["random_state"] COMMUNICATIONS_TIMEOUT = config["communications"]["timeout"] + COMMUNICATIONS_WORKER_PERIOD = config["communications"]["worker_period"] # pylint: enable=invalid-name except KeyError as exception: @@ -201,6 +202,10 @@ def main() -> int: mp_manager, QUEUE_MAX_SIZE, ) + communications_to_flight_interface_queue = queue_proxy_wrapper.QueueProxyWrapper( + mp_manager, + QUEUE_MAX_SIZE, + ) communications_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, QUEUE_MAX_SIZE, @@ -285,7 +290,10 @@ def main() -> int: FLIGHT_INTERFACE_BAUD_RATE, FLIGHT_INTERFACE_WORKER_PERIOD, ), - input_queues=[flight_interface_decision_queue], + input_queues=[ + flight_interface_decision_queue, + communications_to_flight_interface_queue, + ], output_queues=[ flight_interface_to_data_merge_queue, flight_interface_to_communications_queue, @@ -362,12 +370,15 @@ def main() -> int: result, communications_worker_properties = worker_manager.WorkerProperties.create( count=1, target=communications_worker.communications_worker, - work_arguments=(COMMUNICATIONS_TIMEOUT,), + work_arguments=(COMMUNICATIONS_TIMEOUT, COMMUNICATIONS_WORKER_PERIOD), input_queues=[ flight_interface_to_communications_queue, cluster_estimation_to_communications_queue, ], - output_queues=[communications_to_main_queue], + output_queues=[ + communications_to_main_queue, + communications_to_flight_interface_queue, + ], controller=controller, local_logger=main_logger, ) @@ -505,6 +516,7 @@ def main() -> int: cluster_estimation_to_communications_queue.fill_and_drain_queue() communications_to_main_queue.fill_and_drain_queue() flight_interface_decision_queue.fill_and_drain_queue() + communications_to_flight_interface_queue.fill_and_drain_queue() for manager in worker_managers: manager.join_workers() diff --git a/modules/common b/modules/common index 9acf88b4..a710405b 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit 9acf88b42dfdb145e7eabb1b09a55df102ee00ad +Subproject commit a710405b4bc11bdad6bccfc50e0e495f21c8394f diff --git a/modules/communications/communications.py b/modules/communications/communications.py index b1127475..dcb29595 100644 --- a/modules/communications/communications.py +++ b/modules/communications/communications.py @@ -8,6 +8,9 @@ from ..common.modules import position_global from ..common.modules import position_local from ..common.modules.logger import logger +from ..common.modules.data_encoding import message_encoding_decoding +from ..common.modules.data_encoding import metadata_encoding_decoding +from ..common.modules.data_encoding import worker_enum from ..common.modules.mavlink import local_global_conversion @@ -51,7 +54,7 @@ def __init__( def run( self, objects_in_world: list[object_in_world.ObjectInWorld], - ) -> tuple[True, list[object_in_world.ObjectInWorld]] | tuple[False, None]: + ) -> tuple[True, bytes, list[bytes]] | tuple[False, None, None]: objects_in_world_global = [] for object_in_world in objects_in_world: @@ -69,7 +72,7 @@ def run( self.__logger.warning( f"Could not convert ObjectInWorld to PositionLocal:\nobject in world: {object_in_world}" ) - return False, None + return False, None, None result, object_in_world_global = ( local_global_conversion.position_global_from_position_local( @@ -81,10 +84,29 @@ def run( self.__logger.warning( f"position_global_from_position_local conversion failed:\nhome_position: {self.__home_position}\nobject_position_local: {object_position_local}" ) - return False, None + return False, None, None objects_in_world_global.append(object_in_world_global) self.__logger.info(f"{time.time()}: {objects_in_world_global}") - return True, objects_in_world + encoded_position_global_objects = [] + for object in object_in_world_global: + + result, message = message_encoding_decoding.encode_position_global( + worker_enum.WorkerEnum.COMMUNICATIONS_WORKER, object + ) + if not result: + self.__logger.warning("Conversion from PositionGlobal to bytes failed", True) + return False, None, None + + encoded_position_global_objects.append(message) + + result, metadata = metadata_encoding_decoding.encode_metadata( + worker_enum.WorkerEnum.COMMUNICATIONS_WORKER, len(encoded_position_global_objects) + ) + if not result: + self.__logger.error("Failed to encode metadata", True) + return False, None, None + + return True, metadata, encoded_position_global_objects diff --git a/modules/communications/communications_worker.py b/modules/communications/communications_worker.py index 3b25dc85..346d6e55 100644 --- a/modules/communications/communications_worker.py +++ b/modules/communications/communications_worker.py @@ -5,6 +5,7 @@ import os import pathlib import queue +import time from modules import object_in_world from . import communications @@ -15,9 +16,11 @@ def communications_worker( timeout: float, + period: float, home_position_queue: queue_proxy_wrapper.QueueProxyWrapper, input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, + message_output_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, ) -> None: """ @@ -79,8 +82,16 @@ def communications_worker( if is_invalid: continue - result, value = comm.run(input_data) + result, metadata, list_of_messages = comm.run(input_data) if not result: continue - output_queue.queue.put(value) + output_queue.queue.put(metadata) + message_output_queue.queue.put(metadata) + + for message in list_of_messages: + + time.sleep(period) + + output_queue.queue.put(message) + message_output_queue.queue.put(message) diff --git a/modules/flight_interface/flight_interface.py b/modules/flight_interface/flight_interface.py index ef976278..581e7fd9 100644 --- a/modules/flight_interface/flight_interface.py +++ b/modules/flight_interface/flight_interface.py @@ -73,7 +73,7 @@ def get_home_position(self) -> position_global.PositionGlobal: """ return self.__home_position - def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]": + def run(self, message: bytes | None) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]": """ Returns a possible OdometryAndTime with current timestamp. """ @@ -103,6 +103,9 @@ def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]": self.__logger.info(str(odometry_and_time_object), True) + if message: + self.controller.send_statustext_msg(message) + return True, odometry_and_time_object def apply_decision(self, cmd: decision_command.DecisionCommand) -> bool: diff --git a/modules/flight_interface/flight_interface_worker.py b/modules/flight_interface/flight_interface_worker.py index 41610a73..4bbda1a0 100644 --- a/modules/flight_interface/flight_interface_worker.py +++ b/modules/flight_interface/flight_interface_worker.py @@ -4,6 +4,7 @@ import os import pathlib +import queue import time from utilities.workers import queue_proxy_wrapper @@ -18,6 +19,7 @@ def flight_interface_worker( baud_rate: int, period: float, input_queue: queue_proxy_wrapper.QueueProxyWrapper, + coordinates_input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, communications_output_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, @@ -62,7 +64,12 @@ def flight_interface_worker( time.sleep(period) - result, value = interface.run() + try: + coordinate = coordinates_input_queue.queue.get_nowait() + except queue.Empty: + coordinate = None + + result, value = interface.run(coordinate) if not result: continue diff --git a/tests/integration/test_flight_interface_hardware.py b/tests/integration/test_flight_interface_hardware.py index e0050441..654d5134 100644 --- a/tests/integration/test_flight_interface_hardware.py +++ b/tests/integration/test_flight_interface_hardware.py @@ -34,7 +34,7 @@ def main() -> int: assert interface is not None # Run - result, odometry_time = interface.run() + result, odometry_time = interface.run(None) # Test assert result diff --git a/tests/integration/test_flight_interface_worker.py b/tests/integration/test_flight_interface_worker.py index af52cc9d..97ea874a 100644 --- a/tests/integration/test_flight_interface_worker.py +++ b/tests/integration/test_flight_interface_worker.py @@ -12,6 +12,7 @@ from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller + MAVLINK_CONNECTION_ADDRESS = "tcp:localhost:14550" FLIGHT_INTERFACE_TIMEOUT = 10.0 # seconds FLIGHT_INTERFACE_BAUD_RATE = 57600 # symbol rate @@ -105,6 +106,7 @@ def main() -> int: out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) home_position_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + communications_in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) worker = mp.Process( target=flight_interface_worker.flight_interface_worker, @@ -114,6 +116,7 @@ def main() -> int: FLIGHT_INTERFACE_BAUD_RATE, FLIGHT_INTERFACE_WORKER_PERIOD, in_queue, # Added input_queue + communications_in_queue, out_queue, home_position_out_queue, controller,