2626IMAGE_HEIGHT = 640.0 # pixels
2727IMAGE_WIDTH = 640.0 # pixels
2828WORKER_PERIOD = 0.1 # seconds
29- DETECTION_STRATEGY = auto_landing .DetectionSelectionStrategy .NEAREST_TO_CENTER
29+ # The worker now defaults to HIGHEST_CONFIDENCE, so we test for that
30+ DETECTION_STRATEGY = auto_landing .DetectionSelectionStrategy .HIGHEST_CONFIDENCE
3031LOG_TIMINGS = False # Disable timing logging for the test
3132
3233# Ensure logs directory exists and create timestamped subdirectory
@@ -109,7 +110,6 @@ def main() -> int:
109110 mp_manager = mp .Manager ()
110111 input_queue = queue_proxy_wrapper .QueueProxyWrapper (mp_manager )
111112 output_queue = queue_proxy_wrapper .QueueProxyWrapper (mp_manager )
112- command_queue = queue_proxy_wrapper .QueueProxyWrapper (mp_manager )
113113
114114 # Create worker process
115115 worker = mp .Process (
@@ -123,7 +123,6 @@ def main() -> int:
123123 DETECTION_STRATEGY ,
124124 input_queue ,
125125 output_queue ,
126- command_queue ,
127126 controller ,
128127 ),
129128 )
@@ -134,7 +133,8 @@ def main() -> int:
134133 # Give worker time to initialize
135134 time .sleep (0.5 )
136135
137- # Test 1: Worker should not process detections when disabled (default state)
136+ # Test 1: Send a single detection and verify it's processed
137+ local_logger .info ("--- Test 1: Processing single detection ---" )
138138 detection1 = create_test_detection ([200 , 200 , 400 , 400 ], 1 , 0.9 )
139139 simulate_detection_input (
140140 input_queue ,
@@ -145,91 +145,49 @@ def main() -> int:
145145
146146 time .sleep (0.2 )
147147
148- # Should have no output since auto-landing is disabled by default
149- assert output_queue .queue .empty ()
150-
151- # Test 2: Enable auto-landing and verify it processes detections
152- enable_command = auto_landing_worker .AutoLandingCommand ("enable" )
153- command_queue .queue .put (enable_command )
154-
155- time .sleep (0.2 )
156-
157- # Now send the same detection - should be processed
158- detection2 = create_test_detection ([300 , 300 , 500 , 500 ], 2 , 0.85 )
159- simulate_detection_input (
160- input_queue ,
161- [detection2 ],
162- (10.0 , 5.0 , - 75.0 ), # 75 meters above ground
163- (0.1 , 0.0 , 0.0 ),
164- )
165-
166- time .sleep (0.2 )
167-
168148 # Should have output now
169149 assert not output_queue .queue .empty ()
170150 landing_info = output_queue .queue .get_nowait ()
171151 assert landing_info is not None
172152 assert hasattr (landing_info , "angle_x" )
173153 assert hasattr (landing_info , "angle_y" )
174154 assert hasattr (landing_info , "target_dist" )
155+ local_logger .info ("--- Test 1 Passed ---" )
175156
176- # Test 3: Test with multiple detections (should use NEAREST_TO_CENTER strategy)
177- detection3 = create_test_detection ([100 , 100 , 200 , 200 ], 1 , 0.7 ) # Far from center
178- detection4 = create_test_detection ([310 , 310 , 330 , 330 ], 2 , 0.8 ) # Close to center (320, 320)
157+ # Test 2: Test with multiple detections (should use HIGHEST_CONFIDENCE strategy)
158+ local_logger .info ("--- Test 2: Processing multiple detections with HIGHEST_CONFIDENCE ---" )
159+ detection_low_confidence = create_test_detection ([100 , 100 , 200 , 200 ], 1 , 0.7 )
160+ detection_high_confidence = create_test_detection ([320 , 320 , 320 , 320 ], 2 , 0.95 ) # This one should be chosen
179161
180162 simulate_detection_input (
181163 input_queue ,
182- [detection3 , detection4 ],
164+ [detection_low_confidence , detection_high_confidence ],
183165 (0.0 , 0.0 , - 100.0 ), # 100 meters above ground
184166 (0.0 , 0.0 , 0.0 ),
185167 )
186168
187169 time .sleep (0.2 )
188170
189- # Should have output for the detection closest to center
171+ # Should have output for the detection with the highest confidence
190172 assert not output_queue .queue .empty ()
191173 landing_info2 = output_queue .queue .get_nowait ()
192174 assert landing_info2 is not None
193175
194- # Test 4: Disable auto-landing and verify it stops processing
195- disable_command = auto_landing_worker .AutoLandingCommand ("disable" )
196- command_queue .queue .put (disable_command )
197-
198- time .sleep (0.2 )
199-
200- # Send another detection - should not be processed
201- detection5 = create_test_detection ([400 , 400 , 600 , 600 ], 3 , 0.95 )
202- simulate_detection_input (
203- input_queue ,
204- [detection5 ],
205- (0.0 , 0.0 , - 60.0 ),
206- (0.0 , 0.0 , 0.0 ),
207- )
208-
209- time .sleep (0.2 )
210-
211- # Should have no new output
212- assert output_queue .queue .empty ()
213-
214- # Test 5: Test invalid command handling
215- invalid_command = auto_landing_worker .AutoLandingCommand ("invalid_command" )
216- command_queue .queue .put (invalid_command )
217-
218- time .sleep (0.2 )
219-
220- # Worker should continue running despite invalid command
221- assert worker .is_alive ()
176+ # To verify the correct detection was chosen, we can check the calculated angles.
177+ # The high confidence detection is at (320, 320), which is the center of the 640x640 image.
178+ # Therefore, the angles should be 0.
179+ assert landing_info2 .angle_x == 0.0
180+ assert landing_info2 .angle_y == 0.0
181+ local_logger .info ("--- Test 2 Passed ---" )
222182
223- # Test 6: Test with no detections (empty detection list should not crash)
224- # This should not create a MergedOdometryDetections object since it requires non-empty detections
225- # So we just verify the worker continues running
183+ # The case of "no detections" is handled by the worker's queue.get_nowait() exception.
184+ # No specific test is needed for an empty detection list as the data structure does not allow it.
226185
227186 # Cleanup
228187 controller .request_exit ()
229188
230189 # Drain queues
231190 input_queue .fill_and_drain_queue ()
232- command_queue .fill_and_drain_queue ()
233191
234192 # Wait for worker to finish
235193 worker .join (timeout = 5.0 )
@@ -245,4 +203,4 @@ def main() -> int:
245203 if result_main < 0 :
246204 print (f"ERROR: Status code: { result_main } " )
247205
248- print ("Done!" )
206+ print ("Done!" )
0 commit comments