Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions main_2024.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ def main() -> int:
mp_manager,
QUEUE_MAX_SIZE,
)
communications_to_flight_interface_queue = queue_proxy_wrapper.QueueProxyWrapper(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this queue to communication's output queue

mp_manager,
QUEUE_MAX_SIZE,
)
cluster_estimation_to_communications_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
Expand Down Expand Up @@ -285,7 +289,10 @@ def main() -> int:
FLIGHT_INTERFACE_BAUD_RATE,
FLIGHT_INTERFACE_WORKER_PERIOD,
),
input_queues=[flight_interface_decision_queue],
input_queues=[
flight_interface_decision_queue,
communications_to_flight_interface_queue,
],
output_queues=[
flight_interface_to_data_merge_queue,
flight_interface_to_communications_queue,
Expand Down Expand Up @@ -367,7 +374,7 @@ def main() -> int:
flight_interface_to_communications_queue,
cluster_estimation_to_communications_queue,
],
output_queues=[communications_to_main_queue],
output_queues=[communications_to_main_queue, communications_to_flight_interface_queue],
controller=controller,
local_logger=main_logger,
)
Expand Down Expand Up @@ -505,6 +512,7 @@ def main() -> int:
cluster_estimation_to_communications_queue.fill_and_drain_queue()
communications_to_main_queue.fill_and_drain_queue()
flight_interface_decision_queue.fill_and_drain_queue()
communications_to_flight_interface_queue.fill_and_drain_queue()

for manager in worker_managers:
manager.join_workers()
Expand Down
2 changes: 1 addition & 1 deletion modules/common
23 changes: 20 additions & 3 deletions modules/communications/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from ..common.modules import position_global
from ..common.modules import position_local
from ..common.modules.logger import logger
from ..common.modules.data_encoding import message_encoding_decoding
from ..common.modules.data_encoding import worker_enum
from ..common.modules.mavlink import local_global_conversion


Expand All @@ -23,6 +25,7 @@ def create(
cls,
home_position: position_global.PositionGlobal,
local_logger: logger.Logger,
worker_id: int,
) -> "tuple[True, Communications] | tuple[False, None]":
"""
Logs data and forwards it.
Expand All @@ -32,13 +35,14 @@ def create(
Returns: Success, class object.
"""

return True, Communications(cls.__create_key, home_position, local_logger)
return True, Communications(cls.__create_key, home_position, local_logger, worker_id)

def __init__(
self,
class_private_create_key: object,
home_position: position_global.PositionGlobal,
local_logger: logger.Logger,
worker_id: int,
) -> None:
"""
Private constructor, use create() method.
Expand All @@ -47,11 +51,12 @@ def __init__(

self.__home_position = home_position
self.__logger = local_logger
self.__worker_id = worker_id

def run(
self,
objects_in_world: list[object_in_world.ObjectInWorld],
) -> tuple[True, list[object_in_world.ObjectInWorld]] | tuple[False, None]:
) -> tuple[True, list[bytes]] | tuple[False, None]:

objects_in_world_global = []
for object_in_world in objects_in_world:
Expand Down Expand Up @@ -87,4 +92,16 @@ def run(

self.__logger.info(f"{time.time()}: {objects_in_world_global}")

return True, objects_in_world
encoded_position_global_objects = []
for object in objects_in_world_global:

result, message = message_encoding_decoding.encode_position_global(
self.__worker_id, object
)
if not result:
self.__logger.warning("Conversion from PositionGlobal to bytes failed", True)
return False, None

encoded_position_global_objects.append(message)

return True, encoded_position_global_objects
24 changes: 21 additions & 3 deletions modules/communications/communications_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
import os
import pathlib
import queue
import time

from modules import object_in_world
from . import communications
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from ..common.modules.data_encoding import metadata_encoding_decoding
from ..common.modules.data_encoding import worker_enum
from ..common.modules.logger import logger


def communications_worker(
timeout: float,
period: float,
home_position_queue: queue_proxy_wrapper.QueueProxyWrapper,
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
Expand All @@ -29,6 +33,7 @@ def communications_worker(
"""

worker_name = pathlib.Path(__file__).stem
worker_id = worker_enum.WorkerEnum.COMMUNICATIONS_WORKER
process_id = os.getpid()
result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True)
if not result:
Expand All @@ -49,7 +54,7 @@ def communications_worker(

local_logger.info(f"Home position received: {home_position}", True)

result, comm = communications.Communications.create(home_position, local_logger)
result, comm = communications.Communications.create(home_position, local_logger, worker_id)
if not result:
local_logger.error("Worker failed to create class object", True)
return
Expand Down Expand Up @@ -79,8 +84,21 @@ def communications_worker(
if is_invalid:
continue

result, value = comm.run(input_data)
result, list_of_messages = comm.run(input_data)
if not result:
continue

output_queue.queue.put(value)
result, metadata = metadata_encoding_decoding.encode_metadata(
worker_id, len(list_of_messages)
)
if not result:
local_logger.error("Failed to encode metadata", True)
continue

output_queue.queue.put(metadata)

for message in list_of_messages:

time.sleep(period)

output_queue.queue.put(message)
6 changes: 5 additions & 1 deletion modules/flight_interface/flight_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def get_home_position(self) -> position_global.PositionGlobal:
"""
return self.__home_position

def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]":
def run(self, message: bytes) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]":
"""
Returns a possible OdometryAndTime with current timestamp.
"""
Expand Down Expand Up @@ -103,6 +103,10 @@ def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]":

self.__logger.info(str(odometry_and_time_object), True)

result = self.controller.send_statustext_msg(message)
if not result:
self.__logger.error("Failed to send statustext message", True)

return True, odometry_and_time_object

def apply_decision(self, cmd: decision_command.DecisionCommand) -> bool:
Expand Down
13 changes: 11 additions & 2 deletions modules/flight_interface/flight_interface_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ def flight_interface_worker(
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
communications_output_queue: queue_proxy_wrapper.QueueProxyWrapper,
coordinates_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
) -> None:
"""
Worker process.

address, timeout is initial setting.
period is minimum period between loops.
output_queue is the data queue.
input_queue and output_queue are the data queues.
communications_output_queue is a one time queue that sends communications worker the home address.
coordinates_input_queue provides the flight interface worker with a list of GPS coordinates.
controller is how the main process communicates to this worker process.
"""
# TODO: Error handling
Expand Down Expand Up @@ -62,7 +65,13 @@ def flight_interface_worker(

time.sleep(period)

result, value = interface.run()
coordinate = coordinates_input_queue.queue.get_nowait()

if not isinstance(coordinate, bytes):
local_logger.warning(f"Skipping unexpected input: {coordinate}")
continue

result, value = interface.run(coordinate)
if not result:
continue

Expand Down
Loading