Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion main_2024.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ def main() -> int:
mp_manager,
QUEUE_MAX_SIZE,
)
communications_to_flight_interface_queue = queue_proxy_wrapper.QueueProxyWrapper(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this queue to communication's output queue

mp_manager,
QUEUE_MAX_SIZE,
)
cluster_estimation_to_communications_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
Expand Down Expand Up @@ -272,7 +276,10 @@ def main() -> int:
FLIGHT_INTERFACE_BAUD_RATE,
FLIGHT_INTERFACE_WORKER_PERIOD,
),
input_queues=[flight_interface_decision_queue],
input_queues=[
flight_interface_decision_queue,
communications_to_flight_interface_queue,
],
output_queues=[
flight_interface_to_data_merge_queue,
flight_interface_to_communications_queue,
Expand Down Expand Up @@ -492,6 +499,7 @@ def main() -> int:
cluster_estimation_to_communications_queue.fill_and_drain_queue()
communications_to_main_queue.fill_and_drain_queue()
flight_interface_decision_queue.fill_and_drain_queue()
communications_to_flight_interface_queue.fill_and_drain_queue()

for manager in worker_managers:
manager.join_workers()
Expand Down
22 changes: 19 additions & 3 deletions modules/communications/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ..common.modules import position_global
from ..common.modules import position_local
from ..common.modules.logger import logger
from ..common.modules.data_encoding import message_encoding_decoding
from ..common.modules.mavlink import local_global_conversion


Expand All @@ -23,6 +24,7 @@ def create(
cls,
home_position: position_global.PositionGlobal,
local_logger: logger.Logger,
worker_name: str,
) -> "tuple[True, Communications] | tuple[False, None]":
"""
Logs data and forwards it.
Expand All @@ -32,13 +34,14 @@ def create(
Returns: Success, class object.
"""

return True, Communications(cls.__create_key, home_position, local_logger)
return True, Communications(cls.__create_key, home_position, local_logger, worker_name)

def __init__(
self,
class_private_create_key: object,
home_position: position_global.PositionGlobal,
local_logger: logger.Logger,
worker_name: str,
) -> None:
"""
Private constructor, use create() method.
Expand All @@ -47,11 +50,12 @@ def __init__(

self.__home_position = home_position
self.__logger = local_logger
self.__worker_name = worker_name

def run(
self,
objects_in_world: list[object_in_world.ObjectInWorld],
) -> tuple[True, list[object_in_world.ObjectInWorld]] | tuple[False, None]:
) -> tuple[True, list[bytes]] | tuple[False, None]:

objects_in_world_global = []
for object_in_world in objects_in_world:
Expand Down Expand Up @@ -87,4 +91,16 @@ def run(

self.__logger.info(f"{time.time()}: {objects_in_world_global}")

return True, objects_in_world
encoded_position_global_objects = []
for object in objects_in_world_global:

result, message = message_encoding_decoding.encode_position_global(
f"{self.__worker_name}", object
)
if not result:
self.__logger.warning("Conversion from PositionGlobal to bytes failed", True)
return False, None

encoded_position_global_objects.append(message)

return True, encoded_position_global_objects
17 changes: 14 additions & 3 deletions modules/communications/communications_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from . import communications
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from ..common.modules.data_encoding import metadata_encoding_decoding
from ..common.modules.logger import logger


Expand Down Expand Up @@ -49,7 +50,7 @@ def communications_worker(

local_logger.info(f"Home position received: {home_position}", True)

result, comm = communications.Communications.create(home_position, local_logger)
result, comm = communications.Communications.create(home_position, local_logger, worker_name)
if not result:
local_logger.error("Worker failed to create class object", True)
return
Expand Down Expand Up @@ -79,8 +80,18 @@ def communications_worker(
if is_invalid:
continue

result, value = comm.run(input_data)
result, list_of_messages = comm.run(input_data)
if not result:
continue

output_queue.queue.put(value)
result, metadata = metadata_encoding_decoding.encode_metadata(
f"{worker_name}", len(list_of_messages)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using a worker_name input, just hard code "communications_worker" here. It's my bad that the common's function doesn't take in the enum as an input, I forgot about that part. We'll change that later, or you can do that right now?

)
if not result:
local_logger.error("Failed to encode metadata", True)
continue

output_queue.queue.put(metadata)

for message in list_of_messages:
output_queue.queue.put(message)
7 changes: 6 additions & 1 deletion modules/flight_interface/flight_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def get_home_position(self) -> position_global.PositionGlobal:
"""
return self.__home_position

def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]":
def run(self, message: bytes) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]":
"""
Returns a possible OdometryAndTime with current timestamp.
"""
Expand Down Expand Up @@ -103,6 +103,11 @@ def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]":

self.__logger.info(str(odometry_and_time_object), True)

result = self.controller.send_statustext_msg(message)
if not result:
self.__logger.error("Failed to send statustext message", True)
return False, None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need to fail here, just log error. This will make data merge think we didn't get a odometry object


return True, odometry_and_time_object

def apply_decision(self, cmd: decision_command.DecisionCommand) -> bool:
Expand Down
16 changes: 14 additions & 2 deletions modules/flight_interface/flight_interface_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ def flight_interface_worker(
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
communications_output_queue: queue_proxy_wrapper.QueueProxyWrapper,
coordinates_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
) -> None:
"""
Worker process.

address, timeout is initial setting.
period is minimum period between loops.
output_queue is the data queue.
input_queue and output_queue are the data queues.
communications_output_queue is a one time queue that sends communications worker the home address.
coordinates_input_queue provides the flight interface worker with a list of GPS coordinates.
controller is how the main process communicates to this worker process.
"""
# TODO: Error handling
Expand Down Expand Up @@ -62,7 +65,16 @@ def flight_interface_worker(

time.sleep(period)

result, value = interface.run()
coordinate = coordinates_input_queue.queue.get()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a get_no_wait(), and then if it's None, means queue is empty. If queue is empty, skip getting and just run with inputting None for coordinate. Then, flight interface will be able to run without stalling for communications worker.

if coordinate is None:
local_logger.info("Received type None, exiting")
break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't exit, just get odometry only (see above) since we'll need to get many odometries before we can send 1 message


if not isinstance(coordinate, bytes):
local_logger.warning(f"Skipping unexpected input: {coordinate}")
continue

result, value = interface.run(coordinate)
if not result:
continue

Expand Down
Loading