diff --git a/main_2024.py b/main_2024.py index 25f68a72..532182d4 100644 --- a/main_2024.py +++ b/main_2024.py @@ -197,6 +197,10 @@ def main() -> int: mp_manager, QUEUE_MAX_SIZE, ) + communications_to_flight_interface_queue = queue_proxy_wrapper.QueueProxyWrapper( + mp_manager, + QUEUE_MAX_SIZE, + ) cluster_estimation_to_communications_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, QUEUE_MAX_SIZE, @@ -285,7 +289,10 @@ def main() -> int: FLIGHT_INTERFACE_BAUD_RATE, FLIGHT_INTERFACE_WORKER_PERIOD, ), - input_queues=[flight_interface_decision_queue], + input_queues=[ + flight_interface_decision_queue, + communications_to_flight_interface_queue, + ], output_queues=[ flight_interface_to_data_merge_queue, flight_interface_to_communications_queue, @@ -367,7 +374,7 @@ def main() -> int: flight_interface_to_communications_queue, cluster_estimation_to_communications_queue, ], - output_queues=[communications_to_main_queue], + output_queues=[communications_to_main_queue, communications_to_flight_interface_queue], controller=controller, local_logger=main_logger, ) @@ -505,6 +512,7 @@ def main() -> int: cluster_estimation_to_communications_queue.fill_and_drain_queue() communications_to_main_queue.fill_and_drain_queue() flight_interface_decision_queue.fill_and_drain_queue() + communications_to_flight_interface_queue.fill_and_drain_queue() for manager in worker_managers: manager.join_workers() diff --git a/modules/common b/modules/common index c7ab98a7..9acf88b4 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit c7ab98a75be0f78c2c17084d30c7fce5708897ff +Subproject commit 9acf88b42dfdb145e7eabb1b09a55df102ee00ad diff --git a/modules/communications/communications.py b/modules/communications/communications.py index b1127475..acb71c56 100644 --- a/modules/communications/communications.py +++ b/modules/communications/communications.py @@ -8,6 +8,8 @@ from ..common.modules import position_global from ..common.modules import position_local from ..common.modules.logger import logger +from ..common.modules.data_encoding import message_encoding_decoding +from ..common.modules.data_encoding import worker_enum from ..common.modules.mavlink import local_global_conversion @@ -23,6 +25,7 @@ def create( cls, home_position: position_global.PositionGlobal, local_logger: logger.Logger, + worker_id: int, ) -> "tuple[True, Communications] | tuple[False, None]": """ Logs data and forwards it. @@ -32,13 +35,14 @@ def create( Returns: Success, class object. """ - return True, Communications(cls.__create_key, home_position, local_logger) + return True, Communications(cls.__create_key, home_position, local_logger, worker_id) def __init__( self, class_private_create_key: object, home_position: position_global.PositionGlobal, local_logger: logger.Logger, + worker_id: int, ) -> None: """ Private constructor, use create() method. @@ -47,11 +51,12 @@ def __init__( self.__home_position = home_position self.__logger = local_logger + self.__worker_id = worker_id def run( self, objects_in_world: list[object_in_world.ObjectInWorld], - ) -> tuple[True, list[object_in_world.ObjectInWorld]] | tuple[False, None]: + ) -> tuple[True, list[bytes]] | tuple[False, None]: objects_in_world_global = [] for object_in_world in objects_in_world: @@ -87,4 +92,16 @@ def run( self.__logger.info(f"{time.time()}: {objects_in_world_global}") - return True, objects_in_world + encoded_position_global_objects = [] + for object in objects_in_world_global: + + result, message = message_encoding_decoding.encode_position_global( + self.__worker_id, object + ) + if not result: + self.__logger.warning("Conversion from PositionGlobal to bytes failed", True) + return False, None + + encoded_position_global_objects.append(message) + + return True, encoded_position_global_objects diff --git a/modules/communications/communications_worker.py b/modules/communications/communications_worker.py index 3b25dc85..b0886b64 100644 --- a/modules/communications/communications_worker.py +++ b/modules/communications/communications_worker.py @@ -5,16 +5,20 @@ import os import pathlib import queue +import time from modules import object_in_world from . import communications from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller +from ..common.modules.data_encoding import metadata_encoding_decoding +from ..common.modules.data_encoding import worker_enum from ..common.modules.logger import logger def communications_worker( timeout: float, + period: float, home_position_queue: queue_proxy_wrapper.QueueProxyWrapper, input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, @@ -29,6 +33,7 @@ def communications_worker( """ worker_name = pathlib.Path(__file__).stem + worker_id = worker_enum.WorkerEnum.COMMUNICATIONS_WORKER process_id = os.getpid() result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True) if not result: @@ -49,7 +54,7 @@ def communications_worker( local_logger.info(f"Home position received: {home_position}", True) - result, comm = communications.Communications.create(home_position, local_logger) + result, comm = communications.Communications.create(home_position, local_logger, worker_id) if not result: local_logger.error("Worker failed to create class object", True) return @@ -79,8 +84,21 @@ def communications_worker( if is_invalid: continue - result, value = comm.run(input_data) + result, list_of_messages = comm.run(input_data) if not result: continue - output_queue.queue.put(value) + result, metadata = metadata_encoding_decoding.encode_metadata( + worker_id, len(list_of_messages) + ) + if not result: + local_logger.error("Failed to encode metadata", True) + continue + + output_queue.queue.put(metadata) + + for message in list_of_messages: + + time.sleep(period) + + output_queue.queue.put(message) diff --git a/modules/flight_interface/flight_interface.py b/modules/flight_interface/flight_interface.py index ef976278..52131364 100644 --- a/modules/flight_interface/flight_interface.py +++ b/modules/flight_interface/flight_interface.py @@ -73,7 +73,7 @@ def get_home_position(self) -> position_global.PositionGlobal: """ return self.__home_position - def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]": + def run(self, message: bytes) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]": """ Returns a possible OdometryAndTime with current timestamp. """ @@ -103,6 +103,10 @@ def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]": self.__logger.info(str(odometry_and_time_object), True) + result = self.controller.send_statustext_msg(message) + if not result: + self.__logger.error("Failed to send statustext message", True) + return True, odometry_and_time_object def apply_decision(self, cmd: decision_command.DecisionCommand) -> bool: diff --git a/modules/flight_interface/flight_interface_worker.py b/modules/flight_interface/flight_interface_worker.py index 41610a73..ef994ce1 100644 --- a/modules/flight_interface/flight_interface_worker.py +++ b/modules/flight_interface/flight_interface_worker.py @@ -20,6 +20,7 @@ def flight_interface_worker( input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, communications_output_queue: queue_proxy_wrapper.QueueProxyWrapper, + coordinates_input_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, ) -> None: """ @@ -27,7 +28,9 @@ def flight_interface_worker( address, timeout is initial setting. period is minimum period between loops. - output_queue is the data queue. + input_queue and output_queue are the data queues. + communications_output_queue is a one time queue that sends communications worker the home address. + coordinates_input_queue provides the flight interface worker with a list of GPS coordinates. controller is how the main process communicates to this worker process. """ # TODO: Error handling @@ -62,7 +65,13 @@ def flight_interface_worker( time.sleep(period) - result, value = interface.run() + coordinate = coordinates_input_queue.queue.get_nowait() + + if not isinstance(coordinate, bytes): + local_logger.warning(f"Skipping unexpected input: {coordinate}") + continue + + result, value = interface.run(coordinate) if not result: continue