Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.

Commit a5a6d34

Browse files
authored
Add config for enabling logging of worker timings (#258)
1 parent 9a1644c commit a5a6d34

File tree

9 files changed

+111
-86
lines changed

9 files changed

+111
-86
lines changed

config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Global constants for main with CUDA
22

33
queue_max_size: 10
4+
log_timings: false # Enable logging of setup and iteration times of workers
45

56
video_input:
67
worker_period: 1.0 # seconds

main_2025.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ def main() -> int:
8585
# Local constants
8686
# pylint: disable=invalid-name
8787
QUEUE_MAX_SIZE = config["queue_max_size"]
88+
LOG_TIMINGS = config["log_timings"]
8889

8990
VIDEO_INPUT_WORKER_PERIOD = config["video_input"]["worker_period"]
9091
VIDEO_INPUT_OPTION = camera_factory.CameraOption(config["video_input"]["camera_enum"])
@@ -248,6 +249,7 @@ def main() -> int:
248249
VIDEO_INPUT_CAMERA_CONFIG,
249250
VIDEO_INPUT_IMAGE_NAME,
250251
VIDEO_INPUT_WORKER_PERIOD,
252+
LOG_TIMINGS,
251253
),
252254
input_queues=[],
253255
output_queues=[video_input_to_detect_target_queue],
@@ -269,6 +271,7 @@ def main() -> int:
269271
DETECT_TARGET_SHOW_ANNOTATED,
270272
DETECT_TARGET_OPTION,
271273
DETECT_TARGET_CONFIG,
274+
LOG_TIMINGS,
272275
),
273276
input_queues=[video_input_to_detect_target_queue],
274277
output_queues=[detect_target_to_data_merge_queue],
@@ -290,6 +293,7 @@ def main() -> int:
290293
FLIGHT_INTERFACE_TIMEOUT,
291294
FLIGHT_INTERFACE_BAUD_RATE,
292295
FLIGHT_INTERFACE_WORKER_PERIOD,
296+
LOG_TIMINGS,
293297
),
294298
input_queues=[
295299
flight_interface_decision_queue,
@@ -312,7 +316,7 @@ def main() -> int:
312316
result, data_merge_worker_properties = worker_manager.WorkerProperties.create(
313317
count=1,
314318
target=data_merge_worker.data_merge_worker,
315-
work_arguments=(DATA_MERGE_TIMEOUT,),
319+
work_arguments=(DATA_MERGE_TIMEOUT, LOG_TIMINGS),
316320
input_queues=[
317321
detect_target_to_data_merge_queue,
318322
flight_interface_to_data_merge_queue,
@@ -334,6 +338,7 @@ def main() -> int:
334338
work_arguments=(
335339
camera_intrinsics,
336340
camera_extrinsics,
341+
LOG_TIMINGS,
337342
),
338343
input_queues=[data_merge_to_geolocation_queue],
339344
output_queues=[geolocation_to_cluster_estimation_queue],
@@ -355,6 +360,7 @@ def main() -> int:
355360
MIN_NEW_POINTS_TO_RUN,
356361
MAX_NUM_COMPONENTS,
357362
RANDOM_STATE,
363+
LOG_TIMINGS,
358364
MIN_POINTS_PER_CLUSTER,
359365
),
360366
input_queues=[geolocation_to_cluster_estimation_queue],
@@ -372,7 +378,7 @@ def main() -> int:
372378
result, communications_worker_properties = worker_manager.WorkerProperties.create(
373379
count=1,
374380
target=communications_worker.communications_worker,
375-
work_arguments=(COMMUNICATIONS_TIMEOUT, COMMUNICATIONS_WORKER_PERIOD),
381+
work_arguments=(COMMUNICATIONS_TIMEOUT, COMMUNICATIONS_WORKER_PERIOD, LOG_TIMINGS),
376382
input_queues=[
377383
flight_interface_to_communications_queue,
378384
cluster_estimation_to_communications_queue,

modules/cluster_estimation/cluster_estimation_worker.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def cluster_estimation_worker(
1818
min_new_points_to_run: int,
1919
max_num_components: int,
2020
random_state: int,
21+
log_timings: bool,
2122
min_points_per_cluster: int,
2223
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
2324
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
@@ -40,6 +41,9 @@ def cluster_estimation_worker(
4041
random_state: int
4142
Seed for randomizer, to get consistent results.
4243
44+
log_timings: bool
45+
Whether to log setup and iteration times.
46+
4347
input_queue: queue_proxy_wrapper.QueuePRoxyWrapper
4448
Data queue.
4549
@@ -49,7 +53,7 @@ def cluster_estimation_worker(
4953
worker_controller: worker_controller.WorkerController
5054
How the main process communicates to this worker process.
5155
"""
52-
setup_start_time = time.time()
56+
setup_start_time = time.time() if log_timings else None
5357

5458
worker_name = pathlib.Path(__file__).stem
5559
process_id = os.getpid()
@@ -77,14 +81,16 @@ def cluster_estimation_worker(
7781
# Get Pylance to stop complaining
7882
assert estimator is not None
7983

80-
setup_end_time = time.time()
81-
82-
local_logger.info(
83-
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
84-
)
84+
# Logging and controller is identical to detect_target_worker.py
85+
# pylint: disable=duplicate-code
86+
if log_timings:
87+
setup_end_time = time.time()
88+
local_logger.info(
89+
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
90+
)
8591

8692
while not controller.is_exit_requested():
87-
iteration_start_time = time.time()
93+
iteration_start_time = time.time() if log_timings else None
8894

8995
controller.check_pause()
9096

@@ -93,6 +99,8 @@ def cluster_estimation_worker(
9399
local_logger.info("Recieved type None, exiting.")
94100
break
95101

102+
# pylint: enable=duplicate-code
103+
96104
is_invalid = False
97105

98106
for single_input in input_data:
@@ -113,8 +121,8 @@ def cluster_estimation_worker(
113121

114122
output_queue.queue.put(value)
115123

116-
iteration_end_time = time.time()
117-
118-
local_logger.info(
119-
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
120-
)
124+
if log_timings:
125+
iteration_end_time = time.time()
126+
local_logger.info(
127+
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
128+
)

modules/communications/communications_worker.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
def communications_worker(
1818
timeout: float,
1919
period: float,
20+
log_timings: bool,
2021
home_position_queue: queue_proxy_wrapper.QueueProxyWrapper,
2122
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
2223
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
@@ -30,7 +31,7 @@ def communications_worker(
3031
input_queue and output_queue are data queues.
3132
controller is how the main process communicates to this worker process.
3233
"""
33-
setup_start_time = time.time()
34+
setup_start_time = time.time() if log_timings else None
3435

3536
worker_name = pathlib.Path(__file__).stem
3637
process_id = os.getpid()
@@ -61,14 +62,14 @@ def communications_worker(
6162
# Get Pylance to stop complaining
6263
assert comm is not None
6364

64-
setup_end_time = time.time()
65-
66-
local_logger.info(
67-
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
68-
)
65+
if log_timings:
66+
setup_end_time = time.time()
67+
local_logger.info(
68+
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
69+
)
6970

7071
while not controller.is_exit_requested():
71-
iteration_start_time = time.time()
72+
iteration_start_time = time.time() if log_timings else None
7273

7374
controller.check_pause()
7475

@@ -105,8 +106,8 @@ def communications_worker(
105106
output_queue.queue.put(message)
106107
message_output_queue.queue.put(message)
107108

108-
iteration_end_time = time.time()
109-
110-
local_logger.info(
111-
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
112-
)
109+
if log_timings:
110+
iteration_end_time = time.time()
111+
local_logger.info(
112+
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
113+
)

modules/data_merge/data_merge_worker.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
def data_merge_worker(
1919
timeout: float,
20+
log_timings: bool,
2021
detections_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
2122
odometry_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
2223
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
@@ -32,7 +33,7 @@ def data_merge_worker(
3233
Merge work is done in the worker process as the queues and control mechanisms
3334
are naturally available.
3435
"""
35-
setup_start_time = time.time()
36+
setup_start_time = time.time() if log_timings else None
3637

3738
worker_name = pathlib.Path(__file__).stem
3839
process_id = os.getpid()
@@ -55,14 +56,14 @@ def data_merge_worker(
5556
local_logger.error("Queue timed out on startup", True)
5657
return
5758

58-
setup_end_time = time.time()
59-
60-
local_logger.info(
61-
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
62-
)
59+
if log_timings:
60+
setup_end_time = time.time()
61+
local_logger.info(
62+
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
63+
)
6364

6465
while not controller.is_exit_requested():
65-
iteration_start_time = time.time()
66+
iteration_start_time = time.time() if log_timings else None
6667

6768
controller.check_pause()
6869

@@ -119,8 +120,8 @@ def data_merge_worker(
119120

120121
output_queue.queue.put(merged)
121122

122-
iteration_end_time = time.time()
123-
124-
local_logger.info(
125-
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
126-
)
123+
if log_timings:
124+
iteration_end_time = time.time()
125+
local_logger.info(
126+
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
127+
)

modules/detect_target/detect_target_worker.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ def detect_target_worker(
2323
detect_target_brightspot.DetectTargetBrightspotConfig
2424
| detect_target_ultralytics.DetectTargetUltralyticsConfig
2525
),
26+
log_timings: bool,
2627
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
2728
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
2829
controller: worker_controller.WorkerController,
@@ -35,7 +36,7 @@ def detect_target_worker(
3536
input_queue and output_queue are data queues.
3637
controller is how the main process communicates to this worker process.
3738
"""
38-
setup_start_time = time.time()
39+
setup_start_time = time.time() if log_timings else None
3940

4041
worker_name = pathlib.Path(__file__).stem
4142
process_id = os.getpid()
@@ -63,14 +64,16 @@ def detect_target_worker(
6364
# Get Pylance to stop complaining
6465
assert detector is not None
6566

66-
setup_end_time = time.time()
67-
68-
local_logger.info(
69-
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
70-
)
67+
# Logging and controller is identical to cluster_estimation_worker.py
68+
# pylint: disable=duplicate-code
69+
if log_timings:
70+
setup_end_time = time.time()
71+
local_logger.info(
72+
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
73+
)
7174

7275
while not controller.is_exit_requested():
73-
iteration_start_time = time.time()
76+
iteration_start_time = time.time() if log_timings else None
7477

7578
controller.check_pause()
7679

@@ -79,6 +82,8 @@ def detect_target_worker(
7982
local_logger.info("Recieved type None, exiting.")
8083
break
8184

85+
# pylint: enable=duplicate-code
86+
8287
if not isinstance(input_data, image_and_time.ImageAndTime):
8388
local_logger.warning(f"Skipping unexpected input: {input_data}")
8489
continue
@@ -89,8 +94,8 @@ def detect_target_worker(
8994

9095
output_queue.queue.put(value)
9196

92-
iteration_end_time = time.time()
93-
94-
local_logger.info(
95-
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
96-
)
97+
if log_timings:
98+
iteration_end_time = time.time()
99+
local_logger.info(
100+
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
101+
)

modules/flight_interface/flight_interface_worker.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def flight_interface_worker(
1818
timeout: float,
1919
baud_rate: int,
2020
period: float,
21+
log_timings: bool,
2122
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
2223
coordinates_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
2324
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
@@ -33,7 +34,7 @@ def flight_interface_worker(
3334
controller is how the main process communicates to this worker process.
3435
"""
3536
# TODO: Error handling
36-
setup_start_time = time.time()
37+
setup_start_time = time.time() if log_timings else None
3738

3839
worker_name = pathlib.Path(__file__).stem
3940
process_id = os.getpid()
@@ -60,14 +61,14 @@ def flight_interface_worker(
6061
home_position = interface.get_home_position()
6162
communications_output_queue.queue.put(home_position)
6263

63-
setup_end_time = time.time()
64-
65-
local_logger.info(
66-
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
67-
)
64+
if log_timings:
65+
setup_end_time = time.time()
66+
local_logger.info(
67+
f"{time.time()}: Worker setup took {setup_end_time - setup_start_time} seconds."
68+
)
6869

6970
while not controller.is_exit_requested():
70-
iteration_start_time = time.time()
71+
iteration_start_time = time.time() if log_timings else None
7172

7273
controller.check_pause()
7374

@@ -90,8 +91,8 @@ def flight_interface_worker(
9091
# Pass the decision command to the flight controller
9192
interface.apply_decision(command)
9293

93-
iteration_end_time = time.time()
94-
95-
local_logger.info(
96-
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
97-
)
94+
if log_timings:
95+
iteration_end_time = time.time()
96+
local_logger.info(
97+
f"{time.time()}: Worker iteration took {iteration_end_time - iteration_start_time} seconds."
98+
)

0 commit comments

Comments
 (0)