Conversation
代码审查修复: - 未使用变量改为下划线前缀 (_exc_type, _vis, _ply_path 等) - F 别名改为 functional_nn (PEP8 命名规范) - try-except-pass 改为 contextlib.suppress - 合并嵌套 if 语句 - 参数 K 改为小写 k 视频处理功能修复: - video process 命令添加 --slice/--no-slice/--slice-radius 参数 - VideoPipelineConfig 添加 enable_slice 和 slice_radius 配置 - 视频处理流水线添加点云切片功能 - CLI 自动查找视频目录下的 video_config.yaml 配置文件 检测优化: - 降低默认置信度阈值从 0.5 到 0.25,提高车辆检测率 - 添加更详细的检测日志输出 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
使用 torch.backends.mps.is_available() 替代 torch.mps.is_available() 确保在 Apple Silicon 上正确检测 MPS GPU Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- 新增 --resolution 参数,默认 1024(原始 1536) - 降低分辨率可加速处理,但会损失精度 - 支持 pipeline 和 video process 命令 - 更新 run.sh 支持 --resolution 参数 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- slice_radius: 统一为 10.0m - semantic_confidence: 统一为 0.25 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
SHARP 模型使用 ViT,要求输入分辨率必须是 384 的倍数。 - 默认值从 1024 改为 1152 (3x384) - 有效值:768, 1152, 1536 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
SHARP 模型使用金字塔结构 (1536→768→384),内部 ViT 要求 最低层必须是 384x384。分辨率不可更改,移除 --resolution 参数。 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- 添加 mypy 类型检查(渐进式,非阻塞) - 增强 ruff 规则集(B/C4/UP/SIM/TCH/RUF) - 修复 ruff 检测到的代码问题(ClassVar、未使用变量) - CI 添加 mypy 检查步骤 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- coordinate_utils: 添加 None 检查 - object_detector: 使用 TYPE_CHECKING 延迟导入 YOLO,放宽 ndarray 类型 - pipeline_processor: 添加类型注解和 assert 检查 - pointcloud_voxelizer: 添加 int() 转换和类型注解 - semantic_fusion: 修复 tuple 类型和循环缩进错误 - video_config: 修复 enum 类型注解 - video_extractor: 修复 float/int 类型和 None 检查 - video_pipeline: 添加 assert 检查和 None 处理 - voxel_player: 使用 type: ignore 和联合类型 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- 新增 multiframe_fusion.py 模块 - ICP 点云配准(粗配准 RANSAC + 精配准 Point-to-Plane) - 位姿图优化(Levenberg-Marquardt) - 全局点云融合与体素下采样 - 新增 CLI 命令 `aylm fuse` - 支持从目录批量加载点云序列 - 输出融合地图 PLY 和位姿轨迹 JSON 使用方法: aylm fuse -i output/voxelized -o fused_map.ply Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- PointCloud 类添加 PLY 文件读写方法 - 新增 fuse.sh 便捷脚本 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
重构流水线处理逻辑: - 推理(N): 主线程执行 SHARP 推理 - 体素化(N-1): 线程1 并行执行体素化 - 语义检测(N-2): 线程2 并行执行语义检测和导航输出 每帧完成语义检测后立即输出导航点云,实现快速响应。 同时修复多帧融合的位姿累积方向问题。 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
使用独立变量 current_voxel_task 跟踪当前正在体素化的任务, 避免 pending_voxel_task 被覆盖后引用错误。 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
新增功能: - object_tracker.py: 多目标跟踪器 (ByteTrack + 匈牙利算法) - motion_estimator.py: 运动矢量估计器 (Kalman 滤波) - 支持 track_id 跨帧关联 - 支持 velocity/motion 运动信息输出 - 支持 predicted_position 位置预测 数据结构扩展: - Detection2D 添加 track_id, frame_id - ObstacleBox3D 添加 track_id, velocity, motion_vector, timestamp - JSON 输出包含完整运动信息 CLI 更新: - 添加 --track/--no-track 参数 - run.sh 支持跟踪参数 文档更新: - README.md 重写为学术风格技术文档 - pyproject.toml 更新项目描述 - GitHub 仓库描述和 topics 更新 测试: - 46 个单元测试全部通过 - 视频流水线端到端测试通过 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Reviewer's GuideRebrands the project around 'Geometric Constitutional AI', greatly expands the README with the A‑YLM concept and usage docs, and adds a full geometric safety/constitution stack: video/pipeline tracking & motion estimation, multi‑frame fusion, obstacle JSON with tracking/motion, a new constitution API layer, and tooling/CI/type‑safety upgrades. Sequence diagram for_video_pipeline_with_tracking_motion_and_obstacle_exportsequenceDiagram
actor User
participant CLI as aylm_cli
participant VP as VideoPipelineProcessor
participant SH as SHARP_predictor
participant VX as PointCloudVoxelizer
participant DET as ObjectDetector
participant TR as MultiObjectTracker
participant ME as MotionEstimator
participant OM as ObstacleMarker
User->>CLI: aylm video process -i video.mp4 --semantic --track --slice
CLI->>VP: process(video_path, output_dir, config)
loop For_each_extracted_frame
VP->>SH: load_rgb(frame_path) and infer(image_resized, disparity_factor)
SH-->>VP: gaussians_ndc
VP->>VP: unproject_gaussians -> gaussians
VP->>VP: save_ply(gaussians, ply_path)
alt slicing_enabled
VP->>VP: _apply_slice(ply_path, frame_stem)
VP-->>VP: sliced_ply_path
else no_slicing
VP-->>VP: use ply_path as input_ply_path
end
VP->>VX: process(input_ply_path, voxel_path, remove_ground, transform_coords)
VX-->>VP: voxelized_ply
VP->>VP: delete temporary sliced_ply (if used)
alt semantic_enabled
VP->>DET: detect(image, return_masks=True)
DET-->>VP: list detections
VP->>DET: save_detection_image(image, detections, detection_image_path)
opt tracking_enabled
VP->>TR: update(detections, frame_id)
TR-->>VP: list tracked_objects
end
VP->>VP: project_semantics_to_3d(voxel_ply, detections)
VP->>VP: cluster_obstacles(points, labels) -> obstacles
opt tracking_enabled
VP->>ME: update(track_id, obstacle_center_cv, frame_id, timestamp)
ME-->>VP: MotionVector
VP->>ME: predict(track_id, dt=1.0)
ME-->>VP: predicted_position_cv
end
VP->>OM: export_to_json(obstacles_with_motion, json_path, frame_id, timestamp)
OM-->>VP: json_written
opt navigation_ply_enabled
VP->>VP: save_navigation_ply(semantic_pc, navigation_path)
end
end
end
VP-->>CLI: VideoProcessingStats
CLI-->>User: print summary (frames, fps, tracking_enabled)
Class diagram for constitution_and_geometric_safety_layerclassDiagram
class ConstitutionPrinciple {
<<abstract>>
+str name
+Severity severity
+str description
+bool enabled
+evaluate(state, decision) ViolationResult
}
class Severity {
<<enum>>
CRITICAL
HIGH
MEDIUM
LOW
}
class ViolationResult {
+bool violated
+Severity severity
+float confidence
+str description
+dict~str, float~ metrics
+dict~str, any~ correction_hint
+to_dict() dict~str, any~
}
class NoCollisionPrinciple {
-float safety_margin
-float prediction_horizon
+name
+severity
+description
+evaluate(state, decision) ViolationResult
}
class TTCSafetyPrinciple {
-float warning_threshold
-float critical_threshold
-float min_safe_distance
+name
+severity
+description
+evaluate(state, decision) ViolationResult
}
ConstitutionPrinciple <|-- NoCollisionPrinciple
ConstitutionPrinciple <|-- TTCSafetyPrinciple
class SafetyScore {
+float overall
+float collision_score
+float ttc_score
+float boundary_score
+list~str~ violations
+list~dict~ violation_details
+RecommendedAction recommended_action
+float confidence
+is_safe bool
+needs_intervention bool
+to_dict() dict~str, any~
}
class RecommendedAction {
<<enum>>
SAFE
CAUTION
WARNING
INTERVENTION
EMERGENCY_STOP
}
class SafetyScorer {
<<abstract>>
+score(obstacles, ego_state, ai_decision) SafetyScore
+score_from_scene(scene_data) SafetyScore
}
class TrainingSignal {
+float timestamp
+int frame_id
+SignalType signal_type
+float safety_score
+list~str~ violations
+dict~str, any~ scene_context
+dict~str, any~ ai_decision
+dict~str, any~ correction_target
+dict~str, any~ metadata
+to_dict() dict~str, any~
+from_dict(data) TrainingSignal
}
class SignalType {
<<enum>>
POSITIVE
NEGATIVE
CORRECTION
}
class TrainingSignalGenerator {
<<abstract>>
+generate(safety_result, scene_state, ai_decision) TrainingSignal
+export(signals, output_path, format) void
+should_generate_positive() bool
+filter_signals(signals, min_score, max_score, signal_types) list~TrainingSignal~
}
class PrincipleConfig {
+str name
+str severity
+bool enabled
+dict~str, any~ params
}
class ConstitutionConfig {
+list~PrincipleConfig~ principles
+float ttc_warning_threshold
+float ttc_critical_threshold
+float min_safe_distance
+float speed_distance_factor
+bool generate_positive_signals
+str signal_export_format
+float collision_weight
+float ttc_weight
+float boundary_weight
+from_dict(data) ConstitutionConfig
+from_json(path) ConstitutionConfig
+from_yaml(path) ConstitutionConfig
+to_dict() dict~str, any~
+save_json(path) void
+save_yaml(path) void
+get_principle(name) PrincipleConfig
+is_principle_enabled(name) bool
}
class ConstitutionRegistry {
<<utility>>
+register_principle(name)
+register_scorer(name)
+register_generator(name)
+get_principle(name) type
+get_scorer(name) type
+get_generator(name) type
+list_principles() list~str~
+list_scorers() list~str~
+list_generators() list~str~
+create_principle(name) ConstitutionPrinciple
+create_scorer(name) SafetyScorer
+create_generator(name) TrainingSignalGenerator
+clear() void
}
class EgoState {
+ndarray position
+ndarray velocity
+float heading
+float speed
+ndarray acceleration
+ndarray dimensions
+to_dict() dict~str, any~
}
class TrajectoryPoint {
+ndarray position
+ndarray velocity
+float timestamp
}
class AIDecision {
+str decision_type
+list~TrajectoryPoint~ trajectory
+dict~str, float~ control
+float target_speed
+float confidence
+dict~str, any~ metadata
+to_dict() dict~str, any~
}
class SceneState {
+int frame_id
+float timestamp
+EgoState ego_state
+list obstacles
+list lane_boundaries
+list traffic_signs
+dict~str, any~ metadata
+to_dict() dict~str, any~
}
SafetyScorer --> SafetyScore
SafetyScorer --> EgoState
SafetyScorer --> AIDecision
SafetyScorer --> "*" ViolationResult
TrainingSignalGenerator --> TrainingSignal
TrainingSignalGenerator --> SafetyScore
TrainingSignalGenerator --> SceneState
TrainingSignalGenerator --> AIDecision
SceneState --> EgoState
AIDecision --> "*" TrajectoryPoint
ConstitutionRegistry ..> ConstitutionPrinciple
ConstitutionRegistry ..> SafetyScorer
ConstitutionRegistry ..> TrainingSignalGenerator
Class diagram for_video_and_geometry_pipeline_with_tracking_motion_and_fusionclassDiagram
class VideoPipelineConfig {
+VideoConfig video_config
+bool use_gpu
+str device
+int frame_queue_size
+bool enable_semantic
+str semantic_model
+float semantic_confidence
+bool colorize_semantic
+bool enable_slice
+float slice_radius
+bool output_navigation_ply
+int internal_resolution
+bool enable_tracking
+int tracker_max_age
+int tracker_min_hits
+float tracker_iou_threshold
+float motion_fps
+float motion_stationary_threshold
}
class VideoPipelineProcessor {
-VideoPipelineConfig config
-Predictor _predictor
-torch.device _device
-PointCloudVoxelizer _voxelizer
-ObjectDetector _detector
-MultiObjectTracker _tracker
-MotionEstimator _motion_estimator
+process(video_path, output_dir, progress_callback) VideoProcessingStats
+cleanup() void
-_load_model() bool
-_unload_model() void
-_load_detector() void
-_cleanup_detector() void
-_load_tracker() void
-_cleanup_tracker() void
-_apply_slice(ply_path, frame_stem) Path
-_apply_semantic_fusion(frame_path, voxel_ply_path, detections_dir, navigation_dir, focal_length, frame_id, timestamp) void
-_export_obstacles_with_tracking(obstacles, tracked_objects, detections, points, intrinsics, image_shape, frame_id, timestamp, output_path) void
-_compute_iou(bbox1, bbox2) float
-_process_frame(frame_info, ply_output_dir, voxel_output_dir, detections_dir, navigation_dir, frame_id) bool
}
class FrameInfo {
+int index
+float timestamp
+Path output_path
}
class MultiObjectTracker {
-list~_Track~ _tracks
-int _next_id
-int _frame_count
+update(detections, frame_id) list~TrackedObject~
+reset() void
+get_all_tracks() list~TrackedObject~
+int track_count
+int confirmed_track_count
}
class TrackedObject {
+int track_id
+ndarray bbox
+int class_id
+float confidence
+int age
+int hits
+int time_since_update
}
class MotionEstimator {
+float fps
+float stationary_threshold
+update(track_id, position_cv, frame_id, timestamp) MotionVector
+predict(track_id, dt) ndarray
+get_velocity(track_id) ndarray
+remove_track(track_id) bool
+clear() void
+list~int~ active_tracks
}
class MotionVector {
+ndarray velocity_cv
+ndarray velocity_robot
+float speed
+float heading
+bool is_stationary
}
class TrackedObject3D {
+int track_id
+ndarray center_cv
+ndarray center_robot
+ndarray dimensions
+MotionVector motion
+SemanticLabel semantic_label
+float confidence
+int frame_id
+float timestamp
}
class KalmanConfig {
+float process_noise
+float measurement_noise
+float initial_covariance
}
class ObjectDetector {
-YOLO _model
+DetectorConfig config
+load() void
+unload() void
+detect(image, return_masks) list~Detection2D~
+save_detection_image(image, detections, output_path, draw_masks) void
}
class DetectorConfig {
+str model_name
+float confidence_threshold
+float iou_threshold
+str device
+bool half_precision
+list~int~ classes
}
class Detection2D {
+ndarray bbox
+int class_id
+float confidence
+SemanticLabel semantic_label
+int track_id
+int frame_id
+float area
+float center_x
+float center_y
}
class PointCloud {
+ndarray points
+ndarray colors
+filter_by_mask(mask) PointCloud
+from_ply(path) PointCloud
+to_ply(path) void
}
class PointCloudVoxelizer {
+VoxelizerConfig config
+process(input_path, output_path, remove_ground, transform_coords) void
-_remove_outliers_o3d(pc, cfg) PointCloud
-_detect_ground_gpu(pc, cfg) tuple
-_voxel_downsample_numpy(pc) PointCloud
}
class VoxelizerConfig {
+float voxel_size
+int statistical_nb_neighbors
+float statistical_std_ratio
+float ransac_distance_threshold
+float ground_normal_threshold
}
class MultiframeFusion {
+RegistrationConfig config
+fuse_sequence(pointclouds, initial_poses) FusionResult
+fuse_from_directory(input_dir, pattern) FusionResult
+register_pair(source, target, initial_transform) RegistrationResult
+save_fused_map(result, output_path, include_poses) void
+save_poses(poses, output_path) void
}
class RegistrationConfig {
+float icp_max_correspondence_distance
+int icp_max_iteration
+float icp_relative_fitness
+float icp_relative_rmse
+float voxel_size_for_features
+float fpfh_radius
+int fpfh_max_nn
+float normal_radius
+float pose_graph_edge_prune_threshold
+float pose_graph_preference_loop_closure
+float fusion_voxel_size
+float min_fitness
+int min_points
}
class RegistrationResult {
+ndarray transformation
+float fitness
+float inlier_rmse
+int correspondence_count
}
class FramePose {
+int frame_index
+ndarray transformation
+float fitness
+float rmse
+bool is_keyframe
+to_dict() dict~str, any~
+from_dict(data) FramePose
}
class FusionResult {
+PointCloud fused_pointcloud
+list~FramePose~ frame_poses
+int total_frames
+int successful_registrations
+float fusion_time
}
class ObstacleBox3D {
+ndarray center
+ndarray dimensions
+SemanticLabel label
+float confidence
+ndarray point_indices
+int track_id
+int frame_id
+float timestamp
+tuple velocity
+tuple motion_vector
+bool is_movable
+to_dict() dict~str, any~
}
class ObstacleMarker {
+highlight_obstacles(points, labels) colors
+export_to_json(obstacles, output_path, frame_id, timestamp) void
}
class PipelineConfig {
+bool async_mode
+bool enable_slice
+float slice_radius
+bool enable_semantic
+int internal_resolution
+str semantic_model
+float semantic_confidence
+bool colorize_semantic
+bool output_navigation_ply
}
class PipelineProcessor {
-PipelineConfig config
-Module _predictor
-PointCloudVoxelizer _voxelizer
-ObjectDetector _detector
+run_pipeline(input_dir, output_dir) dict
-_predict_single(task, output_dir) bool
-_voxelize_single(task, output_dir, navigation_dir) Path
-_semantic_and_navigation(task, voxel_path, navigation_dir) bool
-_apply_slice(task) Path
-_apply_semantic_fusion(task, voxel_path, navigation_dir) void
-_execute_pipeline(output_dir, voxel_output_dir, navigation_dir) void
}
VideoPipelineProcessor --> VideoPipelineConfig
VideoPipelineProcessor --> FrameInfo
VideoPipelineProcessor --> PointCloudVoxelizer
VideoPipelineProcessor --> ObjectDetector
VideoPipelineProcessor --> MultiObjectTracker
VideoPipelineProcessor --> MotionEstimator
VideoPipelineProcessor --> ObstacleMarker
VideoPipelineProcessor --> Detection2D
VideoPipelineProcessor --> ObstacleBox3D
MultiObjectTracker --> TrackedObject
MotionEstimator --> MotionVector
MotionEstimator --> KalmanConfig
TrackedObject3D --> MotionVector
ObjectDetector --> DetectorConfig
ObjectDetector --> Detection2D
PointCloudVoxelizer --> VoxelizerConfig
MultiframeFusion --> RegistrationConfig
MultiframeFusion --> RegistrationResult
MultiframeFusion --> FramePose
MultiframeFusion --> FusionResult
PipelineProcessor --> PipelineConfig
PipelineProcessor --> PointCloudVoxelizer
PipelineProcessor --> ObjectDetector
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 6 issues, and left some high level feedback:
- You now have two separate IoU implementations (
_compute_iouinVideoPipelineProcessorand inobject_tracker); consider centralizing this into a single shared helper to avoid divergence and keep matching behavior consistent. - In the CLI video commands (
cmd_video_process/cmd_video_extract) you useprintto log discovery ofvideo_config.yaml; for consistency with the rest of the toolchain and to support non-interactive environments, consider routing this through the existing logger instead.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- You now have two separate IoU implementations (`_compute_iou` in `VideoPipelineProcessor` and in `object_tracker`); consider centralizing this into a single shared helper to avoid divergence and keep matching behavior consistent.
- In the CLI video commands (`cmd_video_process`/`cmd_video_extract`) you use `print` to log discovery of `video_config.yaml`; for consistency with the rest of the toolchain and to support non-interactive environments, consider routing this through the existing logger instead.
## Individual Comments
### Comment 1
<location path="src/aylm/tools/video_pipeline.py" line_range="457-466" />
<code_context>
+ def _export_obstacles_with_tracking(
</code_context>
<issue_to_address>
**issue (bug_risk):** Detection-to-obstacle index mapping may be incorrect, leading to wrong track IDs and motion data on obstacles.
`det_to_track` is built using detection indices, but later you index it with `obs_idx` from `enumerate(obstacles)`. This implicitly assumes `obstacles` is in the same order and has the same cardinality as the original detections, which may not hold if `ObstacleMarker` filters, merges, or reorders them, and can silently attach wrong `track_id` and motion data.
Please introduce an explicit mapping between detections and obstacles (e.g., store the originating detection index/ID on each obstacle, or recompute the association via IoU or a similar metric) rather than relying on positional indices.
</issue_to_address>
### Comment 2
<location path="src/aylm/tools/video_pipeline.py" line_range="582-583" />
<code_context>
+ if not self.principles:
+ self.principles = self._default_principles()
+
+ @staticmethod
+ def _default_principles() -> list[PrincipleConfig]:
+ """默认宪法原则。"""
</code_context>
<issue_to_address>
**suggestion:** _compute_iou duplicates IoU logic already present in the object tracker, increasing maintenance overhead.
There are now two `_compute_iou` implementations (here and in `object_tracker.py`) with equivalent logic. This duplication increases the risk that future fixes or extensions (e.g., new bbox formats) will only be applied in one place. Please extract a shared utility or reuse the tracker’s implementation so IoU behavior is defined in a single location.
</issue_to_address>
### Comment 3
<location path="src/aylm/tools/multiframe_fusion.py" line_range="188-197" />
<code_context>
+ logger.error("有效帧数不足,无法配准")
+ return FusionResult(fused_pointcloud=PointCloud(np.zeros((0, 3))))
+
+ # 2. 相邻帧配准(前一帧 -> 后一帧)
+ logger.info("执行相邻帧配准...")
+ pairwise_results: list[RegistrationResult | None] = []
+ for i in range(len(processed) - 1):
+ source_pcd, source_fpfh, _ = processed[i]
+ target_pcd, target_fpfh, _ = processed[i + 1]
+
+ result = self._register_pair_o3d(
+ source_pcd, target_pcd, source_fpfh, target_fpfh
+ )
+
+ if result.fitness < self.config.min_fitness:
+ logger.warning(
+ f"帧 {valid_indices[i]} -> {valid_indices[i+1]} "
+ f"配准质量低 (fitness={result.fitness:.3f})"
+ )
+ pairwise_results.append(result)
+ logger.debug(
+ f"帧 {valid_indices[i]} -> {valid_indices[i+1]}: "
</code_context>
<issue_to_address>
**question:** Pairwise registration ignoring the optional initial_poses argument leaves that parameter unused.
`fuse_sequence` takes `initial_poses` but never uses it when building the pose graph. Either wire this into the pairwise registration/pose-graph initialization (e.g., as priors or initial transforms) or remove the parameter from the API to avoid a misleading, unused argument.
</issue_to_address>
### Comment 4
<location path="tests/unit/test_object_tracker.py" line_range="174-180" />
<code_context>
+ assert 1 in track_ids
+ assert 2 in track_ids
+
+ def test_track_association(self):
+ """测试跨帧轨迹关联。"""
+ tracker = MultiObjectTracker(min_hits=1, iou_threshold=0.3)
+
+ # 第一帧
+ det1 = [self._create_detection([0, 0, 50, 50])]
+ tracked1 = tracker.update(det1, frame_id=0)
+ track_id = tracked1[0].track_id
+
+ # 第二帧,目标稍微移动
+ det2 = [self._create_detection([5, 5, 55, 55])]
+ tracked2 = tracker.update(det2, frame_id=1)
+
+ # 应该保持相同的 track_id
+ assert len(tracked2) == 1
+ assert tracked2[0].track_id == track_id
+
+ def test_new_track_creation(self):
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test where IoU is below the threshold to verify that a new track is created instead of incorrectly reusing an existing one.
The current `test_track_association` only verifies the reuse of a `track_id` when IoU exceeds the threshold. To better protect the IoU threshold behavior, please add a test that:
- initializes a track in frame 0,
- in frame 1 supplies a detection with IoU below `iou_threshold`,
- checks that the existing track is not matched (`time_since_update` increments) and a new track is created for the low-overlap detection.
This will exercise `_hungarian_match` and the IoU cutoff in non-happy-path matching scenarios.
```suggestion
def test_iou_below_threshold_creates_new_track(self):
"""当 IoU 低于阈值时,应创建新轨迹而不是复用旧轨迹。"""
tracker = MultiObjectTracker(min_hits=1, iou_threshold=0.3)
# 第一帧:初始化一条轨迹
det1 = [self._create_detection([0, 0, 50, 50])]
tracked1 = tracker.update(det1, frame_id=0)
assert len(tracked1) == 1
original_track_id = tracked1[0].track_id
# 第二帧:给出一个与原轨迹 IoU < iou_threshold 的检测(这里完全不重叠)
det2 = [self._create_detection([100, 100, 150, 150])]
tracked2 = tracker.update(det2, frame_id=1)
# 应该为新检测创建新轨迹
assert len(tracked2) == 1
new_track_id = tracked2[0].track_id
assert new_track_id != original_track_id
# 原轨迹此帧未被匹配,time_since_update 应该递增
original_tracks = [t for t in tracker.tracks if t.track_id == original_track_id]
assert len(original_tracks) == 1
assert original_tracks[0].time_since_update == 1
# 新轨迹是刚更新的,time_since_update 应为 0
new_tracks = [t for t in tracker.tracks if t.track_id == new_track_id]
assert len(new_tracks) == 1
assert new_tracks[0].time_since_update == 0
def test_new_track_creation(self):
"""测试新目标出现时创建新轨迹。"""
tracker = MultiObjectTracker(min_hits=1)
# 第一帧:一个目标
det1 = [self._create_detection([0, 0, 50, 50])]
tracker.update(det1, frame_id=0)
```
</issue_to_address>
### Comment 5
<location path="tests/unit/test_motion_estimator.py" line_range="172-191" />
<code_context>
+ assert result is not None
+ assert result.is_stationary is False
+
+ def test_predict_future_position(self):
+ """测试位置预测。"""
+ estimator = MotionEstimator(fps=1.0)
+
+ # 建立轨迹
+ estimator.update(
+ track_id=1, position_cv=np.array([0.0, 0.0, 0.0]), frame_id=0, timestamp=0.0
+ )
+ estimator.update(
+ track_id=1, position_cv=np.array([1.0, 0.0, 0.0]), frame_id=1, timestamp=1.0
+ )
+
+ # 预测 1 秒后的位置
+ predicted = estimator.predict(track_id=1, dt=1.0)
+
+ assert predicted is not None
+ # 应该在 X 方向继续移动
+ assert predicted[0] > 1.0
+
+ def test_predict_nonexistent_track(self):
</code_context>
<issue_to_address>
**suggestion (testing):** You may want a test covering non-monotonic timestamps (e.g., second update with an earlier timestamp) to ensure the fallback `dt_default` path behaves correctly.
Current tests cover explicit timestamps and the `frame_id / fps` path, but only with increasing time. Since `MotionEstimator.update` has special handling for `dt <= 0`, adding a test like:
- first update with `timestamp=1.0`
- second update with `timestamp=0.5` or equal
- assert it doesn’t raise and returns a valid `MotionVector`
would ensure that branch stays covered and regression-free.
```suggestion
def test_predict_future_position(self):
"""测试位置预测。"""
estimator = MotionEstimator(fps=1.0)
# 建立轨迹
estimator.update(
track_id=1, position_cv=np.array([0.0, 0.0, 0.0]), frame_id=0, timestamp=0.0
)
estimator.update(
track_id=1, position_cv=np.array([1.0, 0.0, 0.0]), frame_id=1, timestamp=1.0
)
# 预测 1 秒后的位置
predicted = estimator.predict(track_id=1, dt=1.0)
assert predicted is not None
# 应该在 X 方向继续移动
assert predicted[0] > 1.0
def test_update_non_monotonic_timestamp(self):
"""测试非单调时间戳时的回退 dt_default 逻辑。"""
estimator = MotionEstimator(fps=1.0)
# 第一次更新使用较晚的时间戳
first = estimator.update(
track_id=1,
position_cv=np.array([0.0, 0.0, 0.0]),
frame_id=1,
timestamp=1.0,
)
assert first is not None
# 第二次更新使用更早的时间戳,触发 dt <= 0 分支
second = estimator.update(
track_id=1,
position_cv=np.array([1.0, 0.0, 0.0]),
frame_id=2,
timestamp=0.5,
)
# 确认不会报错,并且仍然返回有效的 MotionVector
assert second is not None
assert hasattr(second, "is_stationary")
def test_predict_nonexistent_track(self):
```
</issue_to_address>
### Comment 6
<location path="README.md" line_range="745" />
<code_context>
+ author={TRIP (appergb)},
+ year={2026},
+ url={https://github.com/appergb/A.YLM},
+ note={Self-supervised safety framework extending Constitutional AI to physical world}
+}
+```
</code_context>
<issue_to_address>
**nitpick (typo):** Minor grammar improvement in the BibTeX note field ('to the physical world').
In the BibTeX note, consider "Self-supervised safety framework extending Constitutional AI to the physical world" for grammatical completeness.
```suggestion
url={https://github.com/appergb/A.YLM},
note={Self-supervised safety framework extending Constitutional AI to the physical world}
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| def _export_obstacles_with_tracking( | ||
| self, | ||
| obstacles: list, | ||
| tracked_objects: list[TrackedObject], | ||
| detections: list, | ||
| points: np.ndarray, | ||
| intrinsics, | ||
| image_shape: tuple[int, int], | ||
| frame_id: int, | ||
| timestamp: float, |
There was a problem hiding this comment.
issue (bug_risk): Detection-to-obstacle index mapping may be incorrect, leading to wrong track IDs and motion data on obstacles.
det_to_track is built using detection indices, but later you index it with obs_idx from enumerate(obstacles). This implicitly assumes obstacles is in the same order and has the same cardinality as the original detections, which may not hold if ObstacleMarker filters, merges, or reorders them, and can silently attach wrong track_id and motion data.
Please introduce an explicit mapping between detections and obstacles (e.g., store the originating detection index/ID on each obstacle, or recompute the association via IoU or a similar metric) rather than relying on positional indices.
| @staticmethod | ||
| def _compute_iou(bbox1: np.ndarray, bbox2: np.ndarray) -> float: |
There was a problem hiding this comment.
suggestion: _compute_iou duplicates IoU logic already present in the object tracker, increasing maintenance overhead.
There are now two _compute_iou implementations (here and in object_tracker.py) with equivalent logic. This duplication increases the risk that future fixes or extensions (e.g., new bbox formats) will only be applied in one place. Please extract a shared utility or reuse the tracker’s implementation so IoU behavior is defined in a single location.
| # 2. 相邻帧配准(前一帧 -> 后一帧) | ||
| logger.info("执行相邻帧配准...") | ||
| pairwise_results: list[RegistrationResult | None] = [] | ||
| for i in range(len(processed) - 1): | ||
| source_pcd, source_fpfh, _ = processed[i] | ||
| target_pcd, target_fpfh, _ = processed[i + 1] | ||
|
|
||
| result = self._register_pair_o3d( | ||
| source_pcd, target_pcd, source_fpfh, target_fpfh | ||
| ) |
There was a problem hiding this comment.
question: Pairwise registration ignoring the optional initial_poses argument leaves that parameter unused.
fuse_sequence takes initial_poses but never uses it when building the pose graph. Either wire this into the pairwise registration/pose-graph initialization (e.g., as priors or initial transforms) or remove the parameter from the API to avoid a misleading, unused argument.
| def test_new_track_creation(self): | ||
| """测试新目标出现时创建新轨迹。""" | ||
| tracker = MultiObjectTracker(min_hits=1) | ||
|
|
||
| # 第一帧:一个目标 | ||
| det1 = [self._create_detection([0, 0, 50, 50])] | ||
| tracker.update(det1, frame_id=0) |
There was a problem hiding this comment.
suggestion (testing): Consider adding a test where IoU is below the threshold to verify that a new track is created instead of incorrectly reusing an existing one.
The current test_track_association only verifies the reuse of a track_id when IoU exceeds the threshold. To better protect the IoU threshold behavior, please add a test that:
- initializes a track in frame 0,
- in frame 1 supplies a detection with IoU below
iou_threshold, - checks that the existing track is not matched (
time_since_updateincrements) and a new track is created for the low-overlap detection.
This will exercise_hungarian_matchand the IoU cutoff in non-happy-path matching scenarios.
| def test_new_track_creation(self): | |
| """测试新目标出现时创建新轨迹。""" | |
| tracker = MultiObjectTracker(min_hits=1) | |
| # 第一帧:一个目标 | |
| det1 = [self._create_detection([0, 0, 50, 50])] | |
| tracker.update(det1, frame_id=0) | |
| def test_iou_below_threshold_creates_new_track(self): | |
| """当 IoU 低于阈值时,应创建新轨迹而不是复用旧轨迹。""" | |
| tracker = MultiObjectTracker(min_hits=1, iou_threshold=0.3) | |
| # 第一帧:初始化一条轨迹 | |
| det1 = [self._create_detection([0, 0, 50, 50])] | |
| tracked1 = tracker.update(det1, frame_id=0) | |
| assert len(tracked1) == 1 | |
| original_track_id = tracked1[0].track_id | |
| # 第二帧:给出一个与原轨迹 IoU < iou_threshold 的检测(这里完全不重叠) | |
| det2 = [self._create_detection([100, 100, 150, 150])] | |
| tracked2 = tracker.update(det2, frame_id=1) | |
| # 应该为新检测创建新轨迹 | |
| assert len(tracked2) == 1 | |
| new_track_id = tracked2[0].track_id | |
| assert new_track_id != original_track_id | |
| # 原轨迹此帧未被匹配,time_since_update 应该递增 | |
| original_tracks = [t for t in tracker.tracks if t.track_id == original_track_id] | |
| assert len(original_tracks) == 1 | |
| assert original_tracks[0].time_since_update == 1 | |
| # 新轨迹是刚更新的,time_since_update 应为 0 | |
| new_tracks = [t for t in tracker.tracks if t.track_id == new_track_id] | |
| assert len(new_tracks) == 1 | |
| assert new_tracks[0].time_since_update == 0 | |
| def test_new_track_creation(self): | |
| """测试新目标出现时创建新轨迹。""" | |
| tracker = MultiObjectTracker(min_hits=1) | |
| # 第一帧:一个目标 | |
| det1 = [self._create_detection([0, 0, 50, 50])] | |
| tracker.update(det1, frame_id=0) |
| def test_predict_future_position(self): | ||
| """测试位置预测。""" | ||
| estimator = MotionEstimator(fps=1.0) | ||
|
|
||
| # 建立轨迹 | ||
| estimator.update( | ||
| track_id=1, position_cv=np.array([0.0, 0.0, 0.0]), frame_id=0, timestamp=0.0 | ||
| ) | ||
| estimator.update( | ||
| track_id=1, position_cv=np.array([1.0, 0.0, 0.0]), frame_id=1, timestamp=1.0 | ||
| ) | ||
|
|
||
| # 预测 1 秒后的位置 | ||
| predicted = estimator.predict(track_id=1, dt=1.0) | ||
|
|
||
| assert predicted is not None | ||
| # 应该在 X 方向继续移动 | ||
| assert predicted[0] > 1.0 | ||
|
|
||
| def test_predict_nonexistent_track(self): |
There was a problem hiding this comment.
suggestion (testing): You may want a test covering non-monotonic timestamps (e.g., second update with an earlier timestamp) to ensure the fallback dt_default path behaves correctly.
Current tests cover explicit timestamps and the frame_id / fps path, but only with increasing time. Since MotionEstimator.update has special handling for dt <= 0, adding a test like:
- first update with
timestamp=1.0 - second update with
timestamp=0.5or equal - assert it doesn’t raise and returns a valid
MotionVector
would ensure that branch stays covered and regression-free.
| def test_predict_future_position(self): | |
| """测试位置预测。""" | |
| estimator = MotionEstimator(fps=1.0) | |
| # 建立轨迹 | |
| estimator.update( | |
| track_id=1, position_cv=np.array([0.0, 0.0, 0.0]), frame_id=0, timestamp=0.0 | |
| ) | |
| estimator.update( | |
| track_id=1, position_cv=np.array([1.0, 0.0, 0.0]), frame_id=1, timestamp=1.0 | |
| ) | |
| # 预测 1 秒后的位置 | |
| predicted = estimator.predict(track_id=1, dt=1.0) | |
| assert predicted is not None | |
| # 应该在 X 方向继续移动 | |
| assert predicted[0] > 1.0 | |
| def test_predict_nonexistent_track(self): | |
| def test_predict_future_position(self): | |
| """测试位置预测。""" | |
| estimator = MotionEstimator(fps=1.0) | |
| # 建立轨迹 | |
| estimator.update( | |
| track_id=1, position_cv=np.array([0.0, 0.0, 0.0]), frame_id=0, timestamp=0.0 | |
| ) | |
| estimator.update( | |
| track_id=1, position_cv=np.array([1.0, 0.0, 0.0]), frame_id=1, timestamp=1.0 | |
| ) | |
| # 预测 1 秒后的位置 | |
| predicted = estimator.predict(track_id=1, dt=1.0) | |
| assert predicted is not None | |
| # 应该在 X 方向继续移动 | |
| assert predicted[0] > 1.0 | |
| def test_update_non_monotonic_timestamp(self): | |
| """测试非单调时间戳时的回退 dt_default 逻辑。""" | |
| estimator = MotionEstimator(fps=1.0) | |
| # 第一次更新使用较晚的时间戳 | |
| first = estimator.update( | |
| track_id=1, | |
| position_cv=np.array([0.0, 0.0, 0.0]), | |
| frame_id=1, | |
| timestamp=1.0, | |
| ) | |
| assert first is not None | |
| # 第二次更新使用更早的时间戳,触发 dt <= 0 分支 | |
| second = estimator.update( | |
| track_id=1, | |
| position_cv=np.array([1.0, 0.0, 0.0]), | |
| frame_id=2, | |
| timestamp=0.5, | |
| ) | |
| # 确认不会报错,并且仍然返回有效的 MotionVector | |
| assert second is not None | |
| assert hasattr(second, "is_stationary") | |
| def test_predict_nonexistent_track(self): |
| title={A-YLM: Geometric Constitutional AI for Embodied Intelligence}, | ||
| author={TRIP (appergb)}, | ||
| year={2026}, | ||
| url={https://github.com/appergb/A.YLM}, |
There was a problem hiding this comment.
nitpick (typo): Minor grammar improvement in the BibTeX note field ('to the physical world').
In the BibTeX note, consider "Self-supervised safety framework extending Constitutional AI to the physical world" for grammatical completeness.
| url={https://github.com/appergb/A.YLM}, | |
| url={https://github.com/appergb/A.YLM}, | |
| note={Self-supervised safety framework extending Constitutional AI to the physical world} |
将项目整体的结构进行了重新的更改和更新,更新了项目总体的目标和需求 提出了3D宪法算法
Summary by Sourcery
Introduce a geometric constitutional safety framework for A-YLM, extending the project from a 3D reconstruction/navigation tool into an embodied AI safety system with tracking, multi-frame fusion, and an open safety-constitution API.
New Features:
Enhancements:
Build:
CI:
Documentation:
Tests: