Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion modules/communications/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def run(
) -> tuple[True, bytes, list[bytes]] | tuple[False, None, None]:

objects_in_world_global = []
objects_in_world = objects_in_world or []

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I realized that we will never receive None as an input, the worker takes care of that and makes sure the type coming in is the same as the type listed above. You can probably remove that test case

for object_in_world in objects_in_world:
# We assume detected objects are on the ground
north = object_in_world.location_x
Expand Down Expand Up @@ -91,7 +93,7 @@ def run(
self.__logger.info(f"{time.time()}: {objects_in_world_global}")

encoded_position_global_objects = []
for object in object_in_world_global:
for object in objects_in_world_global:

result, message = message_encoding_decoding.encode_position_global(
worker_enum.WorkerEnum.COMMUNICATIONS_WORKER, object
Expand Down
338 changes: 338 additions & 0 deletions tests/unit/test_communications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,338 @@
"""
Tests the communications class.
"""

import pytest

from modules.communications import communications
from modules.common.modules.logger import logger
from modules import object_in_world
from modules.common.modules import position_local
from modules.common.modules.mavlink import local_global_conversion
from modules.common.modules import position_global
from modules.common.modules.data_encoding import metadata_encoding_decoding
from modules.common.modules.data_encoding import message_encoding_decoding

# Test functions use test fixture signature names and access class privates
# No enable
# pylint: disable=protected-access,redefined-outer-name

LATITUDE_TOLERANCE = 0.000001
LONGITUDE_TOLERANCE = 0.000001
ALTITUDE_TOLERANCE = 7


@pytest.fixture
def home_position() -> position_global.PositionGlobal: # type: ignore
"""
Home position.
"""
# University of Waterloo WGS84 Coordinate
result, position = position_global.PositionGlobal.create(43.472978, -80.540103, 336.0)
assert result
assert position is not None

yield position


@pytest.fixture
def communications_maker(
home_position: position_global.PositionGlobal,
) -> communications.Communications: # type: ignore
"""
Construct a Communications instance with the Home position
"""
result, test_logger = logger.Logger.create("test_logger", False)

assert result
assert test_logger is not None

result, communications_instance = communications.Communications.create(
home_position, test_logger
)
assert result
assert communications_instance is not None

yield communications_instance # type: ignore


def object_in_world_from_position_local(
position_local: position_local.PositionLocal,
) -> object_in_world.ObjectInWorld:
"""
Convert position local to object_in_world as defined in Communications.py
"""
result, obj = object_in_world.ObjectInWorld.create(
position_local.north, position_local.east, 0.0
)
assert result
assert obj is not None

return obj


def assert_global_positions(
expected: position_global.PositionGlobal, actual: position_global.PositionGlobal
) -> None:
"""
Assert each values of the global positions using the Tolerances
"""
assert abs(expected.latitude - actual.latitude) < LATITUDE_TOLERANCE
assert abs(expected.longitude - actual.longitude) < LONGITUDE_TOLERANCE
assert abs(expected.altitude - actual.altitude) < ALTITUDE_TOLERANCE


class TestCommunications:
"""
Tests for the Communications.run() method.
"""

def test_run(
self,
home_position: position_global.PositionGlobal,
communications_maker: communications.Communications,
) -> None:
"""
Test if the Communications.run returns the correct instance
"""
# Setup
result, position = position_global.PositionGlobal.create(43.472978, -80.540103, 336.0)
assert result
assert position is not None

result, actual = local_global_conversion.position_local_from_position_global(
home_position, position
)
assert result
assert actual is not None

objects_in_world = [object_in_world_from_position_local(actual)]

# Run
result, metadata, generated_objects = communications_maker.run(objects_in_world)

# Test
assert result
assert isinstance(metadata, bytes)
for generated_object in generated_objects:
assert isinstance(generated_object, bytes)

def test_normal(
self,
home_position: position_global.PositionGlobal,
communications_maker: communications.Communications,
) -> None:
"""
Normal
"""
# Setup
result, global_position_1 = position_global.PositionGlobal.create(
43.472978, -80.540103, 336.0
)
assert result
assert global_position_1 is not None

result, local_position_1 = local_global_conversion.position_local_from_position_global(
home_position, global_position_1
)
assert result
assert local_position_1 is not None

result, global_position_2 = position_global.PositionGlobal.create(
43.472800, -80.539500, 330.0
)
assert result
assert global_position_2 is not None

result, local_position_2 = local_global_conversion.position_local_from_position_global(
home_position, global_position_2
)
assert result
assert local_position_2 is not None

global_positions = [global_position_1, global_position_2]

objects_in_world = [
object_in_world_from_position_local(local_position_1),
object_in_world_from_position_local(local_position_2),
]
number_of_messages = len(objects_in_world)

# Run
result, metadata, generated_objects = communications_maker.run(objects_in_world)
assert result
assert metadata is not None
assert generated_objects is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you go through and change all of these to is bytes/list[bytes] (the correct data type)


result, worker_id, actual_number_of_messages = metadata_encoding_decoding.decode_metadata(
metadata
)
assert result
assert worker_id is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you assert this is worker_enum.COMMUNICATIONS_WORKER? (Also from common's data_encoding module)

assert actual_number_of_messages is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably no need to duplicate this assert, just assert that it is == number_of_messages as you did below. I'm pretty sure if assert fails it'll tell you why (ie if it was None, or something else)


# Test
assert actual_number_of_messages == number_of_messages

# Conversion
for i, global_position in enumerate(global_positions):
result, worker_id, actual = message_encoding_decoding.decode_bytes_to_position_global(
generated_objects[i]
)
assert result
assert worker_id is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worker_enum.COMMUNICATIONS_WORKER

assert actual is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, no need to duplicate assert


assert_global_positions(global_position, actual)

def test_empty_objects(
self,
communications_maker: communications.Communications,
) -> None:
"""
When nothing is passed in
"""
objects_in_world = []

result, metadata, generated_objects = communications_maker.run(objects_in_world)
assert result
assert metadata is not None
assert generated_objects is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bytes, list[bytes]


result, worker_id, actual_number_of_messages = metadata_encoding_decoding.decode_metadata(
metadata
)

# it will encounter an error where metadata is failed to encode
assert result
assert worker_id is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

real value

assert actual_number_of_messages is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove


# Test
assert actual_number_of_messages == 0
assert len(generated_objects) == 0

def test_none(self, communications_maker: communications.Communications) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, no need for this lol

"""
When None is passed in
"""
objects_in_world = None

result, metadata, generated_objects = communications_maker.run(objects_in_world)
assert result
assert metadata is not None
assert generated_objects is not None

result, worker_id, actual_number_of_messages = metadata_encoding_decoding.decode_metadata(
metadata
)

# it will encounter an error where metadata is failed to encode
assert result
assert worker_id is not None
assert actual_number_of_messages is not None

# Test
assert actual_number_of_messages == 0
assert len(generated_objects) == 0

def test_same_as_home(
self,
home_position: position_global.PositionGlobal,
communications_maker: communications.Communications,
) -> None:
"""
When the objects_in_world contains the home positions
"""
# Setup
result, local_position = local_global_conversion.position_local_from_position_global(
home_position, home_position
)
assert result
assert local_position is not None

actual = object_in_world_from_position_local(local_position)
objects_in_world = [actual]
number_of_messages = len(objects_in_world)

# Run
result, metadata, generated_objects = communications_maker.run(objects_in_world)
assert result
assert metadata is not None
assert generated_objects is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bytes, list[bytes]


# Conversion
result, worker_id, actual_number_of_messages = metadata_encoding_decoding.decode_metadata(
metadata
)
assert result
assert worker_id is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

real value

assert actual_number_of_messages is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove


# Test
assert actual_number_of_messages == number_of_messages

# Conversion
result, worker_id, actual = message_encoding_decoding.decode_bytes_to_position_global(
generated_objects[0]
)
assert result
assert worker_id is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

real value

assert actual is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove


# Test
assert_global_positions(home_position, actual)

def test_duplicate_coordinates(
self,
home_position: position_global.PositionGlobal,
communications_maker: communications.Communications,
) -> None:
"""
When the objects_in_world contains duplicate positions
"""
# Setup
result, global_position = position_global.PositionGlobal.create(
43.472978, -80.540103, 336.0
)
assert result
assert global_position is not None

result, local_position = local_global_conversion.position_local_from_position_global(
home_position, global_position
)
assert result
assert local_position is not None

position = object_in_world_from_position_local(local_position)

objects_in_world = [position, position, position]
number_of_messages = len(objects_in_world)

# Run
result, metadata, generated_objects = communications_maker.run(objects_in_world)
assert result
assert metadata is not None
assert generated_objects is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check types


result, worker_id, actual_number_of_messages = metadata_encoding_decoding.decode_metadata(
metadata
)
assert result
assert worker_id is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

real value

assert actual_number_of_messages is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove


# Test
assert actual_number_of_messages == number_of_messages

for generated_object in generated_objects:
# Conversion
result, worker_id, actual = message_encoding_decoding.decode_bytes_to_position_global(
generated_object
)
assert result
assert worker_id is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

real value

assert actual is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove


# Test
assert_global_positions(global_position, actual)