Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions tests/integration/test_communications_to_ground_station.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""
Test MAVLink integration test
"""

import multiprocessing as mp
import time

from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller

from modules.common.modules import position_global
from modules.common.modules.data_encoding import message_encoding_decoding
from modules.flight_interface import flight_interface_worker


MAVLINK_CONNECTION_ADDRESS = "tcp:localhost:14550"
FLIGHT_INTERFACE_TIMEOUT = 30.0 # seconds
FLIGHT_INTERFACE_BAUD_RATE = 57600 # symbol rate
FLIGHT_INTERFACE_WORKER_PERIOD = 0.1 # seconds


def apply_communications_test(
communications_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
) -> bool:
"""
Method to send in hardcoded GPS coordinates to the flight interface worker
"""
gps_coordinates = [
position_global.PositionGlobal.create(43.47321268948186, -80.53950244232878, 10), # E7
position_global.PositionGlobal.create(37.7749, 122.4194, 30), # San Francisco
position_global.PositionGlobal.create(40.7128, 74.0060, -5.6), # New York
position_global.PositionGlobal.create(51.5072, 0.1276, 20.1), # London UK
]

# Place the GPS coordinates
print(f"Inserting list of gps coordinates, length {len(gps_coordinates)}")

for success, gps_coordinate in gps_coordinates:
if not success:
print("ERROR: GPS Coordinate not successfully generated")
return False

success, message = message_encoding_decoding.encode_position_global(
"FLIGHT_INTERFACE_WORKER", gps_coordinate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wait until Herman does his common change, and you'll use the worker enum here later

)

if not success:
print("ERROR: Conversion from PositionGlobal to bytes failed")
return False

communications_input_queue.queue.put(message)

# Wait for processing
time.sleep(10)

# Verify that stuff is sending
print(
"TEST OPERATOR ACTION REQUIRED: Open mission planner's MAVLink inspector or the groundside repo (https://github.com/UWARG/statustext-parser-2025) to check for MAVLink messages"
)
return True


# pylint: disable=duplicate-code
def main() -> int:
"""
Main function
"""
# Setup
controller = worker_controller.WorkerController()

mp_manager = mp.Manager()

in_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
communications_input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
home_position_out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)

worker = mp.Process(
target=flight_interface_worker.flight_interface_worker,
args=(
MAVLINK_CONNECTION_ADDRESS,
FLIGHT_INTERFACE_TIMEOUT,
FLIGHT_INTERFACE_BAUD_RATE,
FLIGHT_INTERFACE_WORKER_PERIOD,
in_queue,
communications_input_queue,
out_queue,
home_position_out_queue,
controller,
),
)

worker.start()

time.sleep(3)

# Test
home_position = home_position_out_queue.queue.get()
assert home_position is not None

# Run the apply_communication tests
test_result = apply_communications_test(communications_input_queue)
if not test_result:
print("apply_communications test failed.")
worker.terminate()
return -1

# Teardown
controller.request_exit()
worker.join()

return 0


if __name__ == "__main__":
result_main = main()
if result_main < 0:
print(f"ERROR: Status code: {result_main}")

print("Done!")