Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Global constants for main with CUDA

queue_max_size: 10
log_timings: false # Enable logging of setup and iteration times of workers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a nitpick: add 2 spaces between end of line and the comment


video_input:
worker_period: 1.0 # seconds
Expand Down
10 changes: 8 additions & 2 deletions main_2025.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def main() -> int:
# Local constants
# pylint: disable=invalid-name
QUEUE_MAX_SIZE = config["queue_max_size"]
LOG_TIMINGS = config["log_timings"]

VIDEO_INPUT_WORKER_PERIOD = config["video_input"]["worker_period"]
VIDEO_INPUT_OPTION = camera_factory.CameraOption(config["video_input"]["camera_enum"])
Expand Down Expand Up @@ -248,6 +249,7 @@ def main() -> int:
VIDEO_INPUT_CAMERA_CONFIG,
VIDEO_INPUT_IMAGE_NAME,
VIDEO_INPUT_WORKER_PERIOD,
LOG_TIMINGS,
),
input_queues=[],
output_queues=[video_input_to_detect_target_queue],
Expand All @@ -269,6 +271,7 @@ def main() -> int:
DETECT_TARGET_SHOW_ANNOTATED,
DETECT_TARGET_OPTION,
DETECT_TARGET_CONFIG,
LOG_TIMINGS,
),
input_queues=[video_input_to_detect_target_queue],
output_queues=[detect_target_to_data_merge_queue],
Expand All @@ -290,6 +293,7 @@ def main() -> int:
FLIGHT_INTERFACE_TIMEOUT,
FLIGHT_INTERFACE_BAUD_RATE,
FLIGHT_INTERFACE_WORKER_PERIOD,
LOG_TIMINGS,
),
input_queues=[
flight_interface_decision_queue,
Expand All @@ -312,7 +316,7 @@ def main() -> int:
result, data_merge_worker_properties = worker_manager.WorkerProperties.create(
count=1,
target=data_merge_worker.data_merge_worker,
work_arguments=(DATA_MERGE_TIMEOUT,),
work_arguments=(DATA_MERGE_TIMEOUT, LOG_TIMINGS),
input_queues=[
detect_target_to_data_merge_queue,
flight_interface_to_data_merge_queue,
Expand All @@ -334,6 +338,7 @@ def main() -> int:
work_arguments=(
camera_intrinsics,
camera_extrinsics,
LOG_TIMINGS,
),
input_queues=[data_merge_to_geolocation_queue],
output_queues=[geolocation_to_cluster_estimation_queue],
Expand All @@ -355,6 +360,7 @@ def main() -> int:
MIN_NEW_POINTS_TO_RUN,
MAX_NUM_COMPONENTS,
RANDOM_STATE,
LOG_TIMINGS,
MIN_POINTS_PER_CLUSTER,
),
input_queues=[geolocation_to_cluster_estimation_queue],
Expand All @@ -372,7 +378,7 @@ def main() -> int:
result, communications_worker_properties = worker_manager.WorkerProperties.create(
count=1,
target=communications_worker.communications_worker,
work_arguments=(COMMUNICATIONS_TIMEOUT, COMMUNICATIONS_WORKER_PERIOD),
work_arguments=(COMMUNICATIONS_TIMEOUT, COMMUNICATIONS_WORKER_PERIOD, LOG_TIMINGS),
input_queues=[
flight_interface_to_communications_queue,
cluster_estimation_to_communications_queue,
Expand Down
32 changes: 20 additions & 12 deletions modules/cluster_estimation/cluster_estimation_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def cluster_estimation_worker(
min_new_points_to_run: int,
max_num_components: int,
random_state: int,
log_timings: bool,
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
Expand All @@ -40,6 +41,9 @@ def cluster_estimation_worker(
random_state: int
Seed for randomizer, to get consistent results.

log_timings: bool
Whether to log setup and iteration times.

input_queue: queue_proxy_wrapper.QueuePRoxyWrapper
Data queue.

Expand All @@ -49,7 +53,7 @@ def cluster_estimation_worker(
worker_controller: worker_controller.WorkerController
How the main process communicates to this worker process.
"""
setup_start_time = time.time()
setup_start_time = time.time() if log_timings else None

worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
Expand Down Expand Up @@ -77,14 +81,16 @@ def cluster_estimation_worker(
# Get Pylance to stop complaining
assert estimator is not None

setup_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)
# Logging and controller is identical to detect_target_worker.py
# pylint: disable=duplicate-code
if log_timings:
setup_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)

while not controller.is_exit_requested():
iteration_start_time = time.time()
iteration_start_time = time.time() if log_timings else None

controller.check_pause()

Expand All @@ -93,6 +99,8 @@ def cluster_estimation_worker(
local_logger.info("Recieved type None, exiting.")
break

# pylint: enable=duplicate-code

is_invalid = False

for single_input in input_data:
Expand All @@ -113,8 +121,8 @@ def cluster_estimation_worker(

output_queue.queue.put(value)

iteration_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
if log_timings:
iteration_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
25 changes: 13 additions & 12 deletions modules/communications/communications_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
def communications_worker(
timeout: float,
period: float,
log_timings: bool,
home_position_queue: queue_proxy_wrapper.QueueProxyWrapper,
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
Expand All @@ -30,7 +31,7 @@ def communications_worker(
input_queue and output_queue are data queues.
controller is how the main process communicates to this worker process.
"""
setup_start_time = time.time()
setup_start_time = time.time() if log_timings else None

worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
Expand Down Expand Up @@ -61,14 +62,14 @@ def communications_worker(
# Get Pylance to stop complaining
assert comm is not None

setup_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)
if log_timings:
setup_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)

while not controller.is_exit_requested():
iteration_start_time = time.time()
iteration_start_time = time.time() if log_timings else None

controller.check_pause()

Expand Down Expand Up @@ -105,8 +106,8 @@ def communications_worker(
output_queue.queue.put(message)
message_output_queue.queue.put(message)

iteration_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
if log_timings:
iteration_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
25 changes: 13 additions & 12 deletions modules/data_merge/data_merge_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

def data_merge_worker(
timeout: float,
log_timings: bool,
detections_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
odometry_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
Expand All @@ -32,7 +33,7 @@ def data_merge_worker(
Merge work is done in the worker process as the queues and control mechanisms
are naturally available.
"""
setup_start_time = time.time()
setup_start_time = time.time() if log_timings else None

worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
Expand All @@ -55,14 +56,14 @@ def data_merge_worker(
local_logger.error("Queue timed out on startup", True)
return

setup_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)
if log_timings:
setup_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)

while not controller.is_exit_requested():
iteration_start_time = time.time()
iteration_start_time = time.time() if log_timings else None

controller.check_pause()

Expand Down Expand Up @@ -119,8 +120,8 @@ def data_merge_worker(

output_queue.queue.put(merged)

iteration_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
if log_timings:
iteration_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
29 changes: 17 additions & 12 deletions modules/detect_target/detect_target_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def detect_target_worker(
detect_target_brightspot.DetectTargetBrightspotConfig
| detect_target_ultralytics.DetectTargetUltralyticsConfig
),
log_timings: bool,
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
Expand All @@ -35,7 +36,7 @@ def detect_target_worker(
input_queue and output_queue are data queues.
controller is how the main process communicates to this worker process.
"""
setup_start_time = time.time()
setup_start_time = time.time() if log_timings else None

worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
Expand Down Expand Up @@ -63,14 +64,16 @@ def detect_target_worker(
# Get Pylance to stop complaining
assert detector is not None

setup_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)
# Logging and controller is identical to cluster_estimation_worker.py
# pylint: disable=duplicate-code
if log_timings:
setup_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)

while not controller.is_exit_requested():
iteration_start_time = time.time()
iteration_start_time = time.time() if log_timings else None

controller.check_pause()

Expand All @@ -79,6 +82,8 @@ def detect_target_worker(
local_logger.info("Recieved type None, exiting.")
break

# pylint: enable=duplicate-code

if not isinstance(input_data, image_and_time.ImageAndTime):
local_logger.warning(f"Skipping unexpected input: {input_data}")
continue
Expand All @@ -89,8 +94,8 @@ def detect_target_worker(

output_queue.queue.put(value)

iteration_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
if log_timings:
iteration_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
25 changes: 13 additions & 12 deletions modules/flight_interface/flight_interface_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def flight_interface_worker(
timeout: float,
baud_rate: int,
period: float,
log_timings: bool,
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
coordinates_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
Expand All @@ -33,7 +34,7 @@ def flight_interface_worker(
controller is how the main process communicates to this worker process.
"""
# TODO: Error handling
setup_start_time = time.time()
setup_start_time = time.time() if log_timings else None

worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
Expand All @@ -60,14 +61,14 @@ def flight_interface_worker(
home_position = interface.get_home_position()
communications_output_queue.queue.put(home_position)

setup_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)
if log_timings:
setup_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
)

while not controller.is_exit_requested():
iteration_start_time = time.time()
iteration_start_time = time.time() if log_timings else None

controller.check_pause()

Expand All @@ -90,8 +91,8 @@ def flight_interface_worker(
# Pass the decision command to the flight controller
interface.apply_decision(command)

iteration_end_time = time.time()

local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
if log_timings:
iteration_end_time = time.time()
local_logger.info(
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
)
Loading
Loading