Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,4 @@ cluster_estimation:

communications:
timeout: 30.0 # seconds
worker_period: 0.5 # seconds
18 changes: 15 additions & 3 deletions main_2024.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def main() -> int:
RANDOM_STATE = config["cluster_estimation"]["random_state"]

COMMUNICATIONS_TIMEOUT = config["communications"]["timeout"]
COMMUNICATIONS_WORKER_PERIOD = config["communications"]["worker_period"]

# pylint: enable=invalid-name
except KeyError as exception:
Expand Down Expand Up @@ -201,6 +202,10 @@ def main() -> int:
mp_manager,
QUEUE_MAX_SIZE,
)
communications_to_flight_interface_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
)
communications_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
Expand Down Expand Up @@ -285,7 +290,10 @@ def main() -> int:
FLIGHT_INTERFACE_BAUD_RATE,
FLIGHT_INTERFACE_WORKER_PERIOD,
),
input_queues=[flight_interface_decision_queue],
input_queues=[
flight_interface_decision_queue,
communications_to_flight_interface_queue,
],
output_queues=[
flight_interface_to_data_merge_queue,
flight_interface_to_communications_queue,
Expand Down Expand Up @@ -362,12 +370,15 @@ def main() -> int:
result, communications_worker_properties = worker_manager.WorkerProperties.create(
count=1,
target=communications_worker.communications_worker,
work_arguments=(COMMUNICATIONS_TIMEOUT,),
work_arguments=(COMMUNICATIONS_TIMEOUT, COMMUNICATIONS_WORKER_PERIOD),
input_queues=[
flight_interface_to_communications_queue,
cluster_estimation_to_communications_queue,
],
output_queues=[communications_to_main_queue],
output_queues=[
communications_to_main_queue,
communications_to_flight_interface_queue,
],
controller=controller,
local_logger=main_logger,
)
Expand Down Expand Up @@ -505,6 +516,7 @@ def main() -> int:
cluster_estimation_to_communications_queue.fill_and_drain_queue()
communications_to_main_queue.fill_and_drain_queue()
flight_interface_decision_queue.fill_and_drain_queue()
communications_to_flight_interface_queue.fill_and_drain_queue()

for manager in worker_managers:
manager.join_workers()
Expand Down
2 changes: 1 addition & 1 deletion modules/common
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this change? Can you make this a PR in common or something?

30 changes: 26 additions & 4 deletions modules/communications/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from ..common.modules import position_global
from ..common.modules import position_local
from ..common.modules.logger import logger
from ..common.modules.data_encoding import message_encoding_decoding
from ..common.modules.data_encoding import metadata_encoding_decoding
from ..common.modules.data_encoding import worker_enum
from ..common.modules.mavlink import local_global_conversion


Expand Down Expand Up @@ -51,7 +54,7 @@ def __init__(
def run(
self,
objects_in_world: list[object_in_world.ObjectInWorld],
) -> tuple[True, list[object_in_world.ObjectInWorld]] | tuple[False, None]:
) -> tuple[True, list[bytes], bytes] | tuple[False, None, None]:

objects_in_world_global = []
for object_in_world in objects_in_world:
Expand All @@ -69,7 +72,7 @@ def run(
self.__logger.warning(
f"Could not convert ObjectInWorld to PositionLocal:\nobject in world: {object_in_world}"
)
return False, None
return False, None, None

result, object_in_world_global = (
local_global_conversion.position_global_from_position_local(
Expand All @@ -81,10 +84,29 @@ def run(
self.__logger.warning(
f"position_global_from_position_local conversion failed:\nhome_position: {self.__home_position}\nobject_position_local: {object_position_local}"
)
return False, None
return False, None, None

objects_in_world_global.append(object_in_world_global)

self.__logger.info(f"{time.time()}: {objects_in_world_global}")

return True, objects_in_world
encoded_position_global_objects = []
for object in object_in_world_global:

result, message = message_encoding_decoding.encode_position_global(
worker_enum.WorkerEnum.COMMUNICATIONS_WORKER, object
)
if not result:
self.__logger.warning("Conversion from PositionGlobal to bytes failed", True)
return False, None, None

encoded_position_global_objects.append(message)

result, metadata = metadata_encoding_decoding.encode_metadata(
worker_enum.WorkerEnum.COMMUNICATIONS_WORKER, len(encoded_position_global_objects)
)
if not result:
self.__logger.error("Failed to encode metadata", True)
return False, None, None

return True, encoded_position_global_objects, metadata
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just switch the order: metadata then encoded_position_global_objects. Make sure to change function definition and also communications worker.

15 changes: 13 additions & 2 deletions modules/communications/communications_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import pathlib
import queue
import time

from modules import object_in_world
from . import communications
Expand All @@ -15,9 +16,11 @@

def communications_worker(
timeout: float,
period: float,
home_position_queue: queue_proxy_wrapper.QueueProxyWrapper,
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
message_output_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
) -> None:
"""
Expand Down Expand Up @@ -79,8 +82,16 @@ def communications_worker(
if is_invalid:
continue

result, value = comm.run(input_data)
result, list_of_messages, metadata = comm.run(input_data)
if not result:
continue

output_queue.queue.put(value)
output_queue.queue.put(metadata)
message_output_queue.queue.put(metadata)

for message in list_of_messages:

time.sleep(period)

output_queue.queue.put(message)
message_output_queue.queue.put(message)
5 changes: 4 additions & 1 deletion modules/flight_interface/flight_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def get_home_position(self) -> position_global.PositionGlobal:
"""
return self.__home_position

def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]":
def run(self, message: bytes | None) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]":
"""
Returns a possible OdometryAndTime with current timestamp.
"""
Expand Down Expand Up @@ -103,6 +103,9 @@ def run(self) -> "tuple[bool, odometry_and_time.OdometryAndTime | None]":

self.__logger.info(str(odometry_and_time_object), True)

if message:
self.controller.send_statustext_msg(message)

return True, odometry_and_time_object

def apply_decision(self, cmd: decision_command.DecisionCommand) -> bool:
Expand Down
9 changes: 8 additions & 1 deletion modules/flight_interface/flight_interface_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import os
import pathlib
import queue
import time

from utilities.workers import queue_proxy_wrapper
Expand All @@ -18,6 +19,7 @@ def flight_interface_worker(
baud_rate: int,
period: float,
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
coordinates_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
communications_output_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
Expand Down Expand Up @@ -62,7 +64,12 @@ def flight_interface_worker(

time.sleep(period)

result, value = interface.run()
try:
coordinate = coordinates_input_queue.queue.get_nowait()
except queue.Empty:
coordinate = None

result, value = interface.run(coordinate)
if not result:
continue

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_flight_interface_hardware.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def main() -> int:
assert interface is not None

# Run
result, odometry_time = interface.run()
result, odometry_time = interface.run(None)

# Test
assert result
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/test_flight_interface_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller


MAVLINK_CONNECTION_ADDRESS = "tcp:localhost:14550"
FLIGHT_INTERFACE_TIMEOUT = 10.0 # seconds
FLIGHT_INTERFACE_BAUD_RATE = 57600 # symbol rate
Expand Down Expand Up @@ -105,6 +106,7 @@ def main() -> int:
out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
home_position_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
communications_in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)

worker = mp.Process(
target=flight_interface_worker.flight_interface_worker,
Expand All @@ -114,6 +116,7 @@ def main() -> int:
FLIGHT_INTERFACE_BAUD_RATE,
FLIGHT_INTERFACE_WORKER_PERIOD,
in_queue, # Added input_queue
communications_in_queue,
out_queue,
home_position_out_queue,
controller,
Expand Down