diff --git a/examples/graph-builder-path.py b/examples/graph-builder-path.py new file mode 100644 index 0000000..2054078 --- /dev/null +++ b/examples/graph-builder-path.py @@ -0,0 +1,511 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: Copyright (c) 2025 +# SPDX-License-Identifier: Apache-2.0 + +""" +Graph Builder with Coverage Path Planning + +This script demonstrates incremental graph building with a planned exploration path +that covers the entire warehouse map. The robot follows a lawnmower pattern to +systematically explore all navigable areas. +""" + +import os +import sys +import math +import cv2 +import numpy as np +import json +import logging +import warnings +import time +from PIL import Image + +# Suppress pandas warnings +warnings.filterwarnings('ignore', category=UserWarning, module='pandas') + +# Suppress logging from SWAGGER library +logging.getLogger('swagger.waypoint_graph_generator').setLevel(logging.ERROR) + +# Set matplotlib backend before importing pyplot +import matplotlib +if os.environ.get('DISPLAY') is None or os.environ.get('DISPLAY') == '': + matplotlib.use('Agg') +else: + try: + matplotlib.use('TkAgg') + except: + matplotlib.use('Agg') + +import matplotlib.pyplot as plt + +from swagger import ( + WaypointGraphGenerator, + WaypointGraphGeneratorConfig, + Point, +) +from visualize_global_graph_on_image import visualize_global_graph_on_image +import networkx as nx +from swagger.global_graph_generator import GlobalGraphGenerator +from networkx.readwrite import json_graph + + +# ============================================================ +# Configuration +# ============================================================ + +RESOLUTION = 0.039 # meters per pixel +CROP_SIZE = 300 # Size of local frame (pixels) +SAFE_DIST = 0.30 # Robot safety distance (meters) + +# Visualization settings +ENABLE_LIVE_VISUALIZATION = True +SAVE_GIF = True # Save animation as GIF +GIF_FILENAME = "results/graph_building_animation.gif" +GIF_DURATION = 500 # Duration per frame in milliseconds + +# Path planning parameters +OVERLAP_PIXELS = 120 # 40% overlap for good connectivity +VERTICAL_STEP = 150 # Balanced vertical steps to cover entire map + + +# ============================================================ +# Coverage Path Planner +# ============================================================ + +def plan_coverage_path(map_image, crop_size, overlap, vertical_step): + """ + Plan a systematic grid coverage path that explores the entire map. + + Returns list of (x_px, y_px, rotation_rad) tuples representing robot poses. + """ + H, W = map_image.shape[:2] + path = [] + + # Calculate horizontal step (with overlap) + horizontal_step = crop_size - overlap + + y = 0 + + # Simple left-to-right, top-to-bottom grid pattern + while y + crop_size <= H: + x = 0 + while x + crop_size <= W: + path.append((x, y, 0.0)) # All frames facing same direction + x += horizontal_step + + # Move down to next row + y += vertical_step + + return path + + +# ============================================================ +# Frame Generator with Path +# ============================================================ + +def generate_frames_from_path(input_image_path: str, output_dir: str, + path_waypoints: list, crop_size: int): + """ + Generator that yields frames following a planned coverage path. + + Yields: (frame_idx, image_crop, filename, x_px, y_px, rotation_rad) + """ + os.makedirs(output_dir, exist_ok=True) + + # Load base map + image = cv2.imread(input_image_path, cv2.IMREAD_GRAYSCALE) + if image is None: + raise FileNotFoundError(f"Could not read image at {input_image_path}") + + H, W = image.shape[:2] + + for i, (x_px, y_px, rotation_rad) in enumerate(path_waypoints): + # Extract crop + cropped = image[y_px:y_px + crop_size, x_px:x_px + crop_size] + + # Save frame + crop_filename = os.path.join(output_dir, f"frame_{i:03d}.png") + cv2.imwrite(crop_filename, cropped) + + yield i, cropped, crop_filename, x_px, y_px, rotation_rad + + +# ============================================================ +# Visualization +# ============================================================ + +def visualize_graph_incremental(current_graph: nx.Graph, previous_graph: nx.Graph | None, + frame_idx: int, ax, base_image=None, resolution=0.039, + save_path=None, robot_pose_px=None, all_robot_poses=None, + previous_robot_pose=None): + """ + Visualize the graph with robot position marker. + """ + ax.clear() + + # Show base image + if base_image is not None: + ax.imshow(base_image, cmap='gray', origin='upper') + ax.set_aspect('equal') + + if current_graph is None or current_graph.number_of_nodes() == 0: + ax.set_title(f"Frame {frame_idx}: No nodes yet") + return + + # Determine known vs new nodes + known_nodes = set() + new_nodes = set() + + if previous_graph is not None and previous_graph.number_of_nodes() > 0: + known_nodes = set(previous_graph.nodes()) + + for node in current_graph.nodes(): + if node not in known_nodes: + new_nodes.add(node) + + # Convert world coordinates to pixel coordinates + # World origin (0,0) is at the center of the first frame, which was at pixel position (0,0) + # So world (0,0) maps to pixel (CROP_SIZE/2, CROP_SIZE/2) in the full map + pos = {} + + world_origin_x_px = CROP_SIZE / 2 # World (0,0) is at this pixel position in full map + world_origin_y_px = CROP_SIZE / 2 + + for node, data in current_graph.nodes(data=True): + if "world" in data: + world = data["world"] + x_m, y_m = world[0], world[1] + + # Reverse the transform: + # We pass dy_m = -(y_px * res) to generator + # Generator does: y_world = -(row - center) * res + dy_m + # = -(row - center) * res - y_px * res + # For frame center (row=center): y_world = -y_px * res + # So: y_px = -y_world / res + # Then add world_origin offset: pixel_y = world_origin_y - y_world/res + + x_px = (x_m / resolution) + world_origin_x_px + y_px = world_origin_y_px - (y_m / resolution) + + pos[node] = (x_px, y_px) + + # Debug: Print sample positions (only if needed for debugging) + # if len(pos) > 0 and frame_idx == 7: # Final frame + # ... debug code commented out + + # Draw edges + known_edges = [(u, v) for u, v in current_graph.edges() if u in known_nodes and v in known_nodes] + new_edges = [(u, v) for u, v in current_graph.edges() if u not in known_nodes or v not in known_nodes] + + for u, v in known_edges: + if u in pos and v in pos: + ax.plot([pos[u][0], pos[v][0]], [pos[u][1], pos[v][1]], + 'm-', linewidth=1.5, alpha=0.6, zorder=1) + + for u, v in new_edges: + if u in pos and v in pos: + ax.plot([pos[u][0], pos[v][0]], [pos[u][1], pos[v][1]], + 'b-', linewidth=2.0, alpha=0.8, zorder=2) + + # Draw nodes + if known_nodes and pos: + known_x = [pos[n][0] for n in known_nodes if n in pos] + known_y = [pos[n][1] for n in known_nodes if n in pos] + if known_x: + ax.scatter(known_x, known_y, c='magenta', s=30, alpha=0.8, + edgecolors='black', linewidths=0.5, zorder=3, label='Known') + + if new_nodes and pos: + new_x = [pos[n][0] for n in new_nodes if n in pos] + new_y = [pos[n][1] for n in new_nodes if n in pos] + if new_x: + ax.scatter(new_x, new_y, c='red', s=50, alpha=0.9, + edgecolors='black', linewidths=1.0, zorder=4, label='New') + + # Draw robot position + if robot_pose_px is not None: + x_px, y_px, rotation = robot_pose_px + # Robot at center of its local frame + robot_x = x_px + CROP_SIZE / 2 + robot_y = y_px + CROP_SIZE / 2 + + # Calculate actual movement direction if we have previous pose + if previous_robot_pose is not None: + prev_x, prev_y, _ = previous_robot_pose + prev_center_x = prev_x + CROP_SIZE / 2 + prev_center_y = prev_y + CROP_SIZE / 2 + + # Calculate angle from previous to current position + dx = robot_x - prev_center_x + dy = robot_y - prev_center_y + if abs(dx) > 1 or abs(dy) > 1: # Only update if moved significantly + rotation = math.atan2(-dy, dx) # Negative dy for image coordinates + + # Draw robot as triangle + size = 20 + dx = size * math.cos(rotation) + dy = -size * math.sin(rotation) # Negative for image coordinates + + triangle = plt.Polygon([ + [robot_x + dx, robot_y + dy], + [robot_x - dx/2 - dy/2, robot_y - dy/2 + dx/2], + [robot_x - dx/2 + dy/2, robot_y - dy/2 - dx/2] + ], color='green', alpha=0.8, zorder=10, label='Robot') + ax.add_patch(triangle) + + # Draw field of view rectangle + fov_rect = plt.Rectangle((x_px, y_px), CROP_SIZE, CROP_SIZE, + linewidth=2, edgecolor='green', facecolor='none', + alpha=0.5, zorder=9) + ax.add_patch(fov_rect) + + ax.set_title(f"Frame {frame_idx}: {len(current_graph.nodes())} nodes " + f"({len(new_nodes)} new, {len(known_nodes)} known)") + ax.legend(loc='upper right', fontsize=8) + ax.axis('equal') + + if base_image is not None: + ax.set_xlim(0, base_image.shape[1]) + ax.set_ylim(base_image.shape[0], 0) # Inverted Y-axis like graph-builder.py + + plt.tight_layout() + + if save_path: + plt.savefig(save_path, dpi=100, bbox_inches='tight') + + # Always update display if interactive + if not matplotlib.get_backend() == 'Agg': + plt.draw() + plt.pause(0.5) # Increased pause to observe frame transitions + + +# ============================================================ +# Main +# ============================================================ + +def main(): + # Clean up old results + output_dir = "results" + maps_dir = "results/simulated_route" + + if os.path.exists(maps_dir): + import shutil + shutil.rmtree(maps_dir) + + os.makedirs(output_dir, exist_ok=True) + + # Load base map for path planning + base_map_path = "../maps/carter_warehouse_navigation.png" + base_map = cv2.imread(base_map_path, cv2.IMREAD_GRAYSCALE) + if base_map is None: + raise FileNotFoundError(f"Could not load: {base_map_path}") + + # Plan coverage path + coverage_path = plan_coverage_path( + base_map, + crop_size=CROP_SIZE, + overlap=OVERLAP_PIXELS, + vertical_step=VERTICAL_STEP + ) + + # Configure SWAGGER - Adjusted for less dense graph + config = WaypointGraphGeneratorConfig( + skeleton_sample_distance=0.25, # Increased from 0.20 (fewer skeleton nodes) + boundary_inflation_factor=2, + boundary_sample_distance=0.40, # Increased from 0.30 (fewer boundary nodes) + free_space_sampling_threshold=0.65, # Increased from 0.50 (more selective sampling) + merge_node_distance=0.50, # Increased to better merge nearby nodes (was 0.30) + min_subgraph_length=0.50, # Increased from 0.35 (prune shorter paths) + use_skeleton_graph=False, + use_boundary_sampling=True, + use_free_space_sampling=True, + use_delaunay_shortcuts=True, # Re-enabled for cross-connections between regions + prune_graph=True, + debug=False, + ) + + # Create generator with WARNING level logging to suppress INFO messages + generator = WaypointGraphGenerator(config, logger_level=logging.WARNING) + global_builder = GlobalGraphGenerator( + merge_distance=0.50, # Increased to better merge overlapping frames (0.5m = ~12.8 pixels) + retention_factor=1.0 + ) + + persisted_known_points = [] + + # Setup visualization + fig, ax = None, None + is_headless = matplotlib.get_backend() == 'Agg' + gif_frames = [] # Store frames for GIF creation + + if ENABLE_LIVE_VISUALIZATION: + if is_headless or SAVE_GIF: + os.makedirs("results/viz_frames", exist_ok=True) + if not is_headless: + plt.ion() + + # Set figure size to match map aspect ratio (480W x 776H = 0.618 aspect) + map_aspect = 480 / 776 # width / height + fig_height = 16 + fig_width = fig_height * map_aspect + fig, ax = plt.subplots(figsize=(fig_width, fig_height)) + + previous_global_graph = None + previous_robot_pose = None # Track previous robot position for direction + + # Generate frames and build graph + frame_generator = generate_frames_from_path( + base_map_path, + maps_dir, + coverage_path, + CROP_SIZE + ) + + for i, occupancy, img_path, x_px, y_px, rotation_rad in frame_generator: + # Only print frame number and position + print(f"Frame {i}/{len(coverage_path)-1}: position=({x_px}, {y_px})px") + + # Compute world offset + # CRITICAL: Generator uses ROS convention where Y+ points UP + # But image pixel Y+ points DOWN + # So we need to NEGATE y_px when converting to world offset + dx_m = x_px * RESOLUTION + dy_m = -(y_px * RESOLUTION) # NEGATE for ROS convention + + # Build local graph + robot_pose = (dx_m, dy_m, 0.0) # Force rotation=0 for world coordinate alignment + + # Build local graph (library debug output suppressed via config.debug=False) + local_start = time.time() + local_graph = generator.build_graph_from_grid_map( + image=occupancy, + resolution=RESOLUTION, + x_offset=robot_pose[0], + y_offset=robot_pose[1], + rotation=robot_pose[2], + known_points=persisted_known_points, + safety_distance=SAFE_DIST, + occupancy_threshold=127, + ) + local_time = time.time() - local_start + print(f" └─ Local graph: {local_time*1000:.1f}ms ({local_graph.number_of_nodes()} nodes, {local_graph.number_of_edges()} edges)") + + if local_graph is None: + continue + + # Debug: Check local graph node coordinates (commented out) + # if i == 5 and local_graph.number_of_nodes() > 0: + # ... debug code + + # Merge into global graph + global_start = time.time() + global_builder.add_local_graph(local_graph) + current_global = global_builder.get_global_graph() + global_time = time.time() - global_start + print(f" └─ Global merge: {global_time*1000:.1f}ms (total: {current_global.number_of_nodes()} nodes, {current_global.number_of_edges()} edges)\n") + + # Debug: Check global graph node coordinates (commented out) + # if i == 5 and current_global.number_of_nodes() > 0: + # ... debug code + + # Visualize + if ENABLE_LIVE_VISUALIZATION and ax is not None: + save_path = f"results/viz_frames/frame_{i:03d}.png" if (is_headless or SAVE_GIF) else None + + visualize_graph_incremental( + current_graph=current_global, + previous_graph=previous_global_graph, + frame_idx=i, + ax=ax, + base_image=base_map, + resolution=RESOLUTION, + save_path=save_path, + robot_pose_px=(x_px, y_px, rotation_rad), + previous_robot_pose=previous_robot_pose + ) + + # Update previous pose for next iteration + previous_robot_pose = (x_px, y_px, rotation_rad) + + # If saving GIF, capture frame + if SAVE_GIF: + # Convert matplotlib figure to PIL Image + fig.canvas.draw() + img_array = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8) + img_array = img_array.reshape(fig.canvas.get_width_height()[::-1] + (3,)) + gif_frames.append(Image.fromarray(img_array)) + + previous_global_graph = current_global.copy() + + # Save final results + current_global = global_builder.get_global_graph() + + # Save visualization + if isinstance(current_global, nx.Graph) and current_global.number_of_nodes() > 0: + overlay_path = os.path.join(output_dir, "global_graph_overlay.png") + visualize_global_graph_on_image( + current_global, + base_image_path=base_map_path, + output_path=overlay_path, + ) + + # Save JSON + def _make_json_serializable(obj): + if obj is None or isinstance(obj, (str, bool, int, float)): + return obj + try: + import numpy as np + if isinstance(obj, (np.integer, np.floating)): + return obj.item() + if isinstance(obj, np.ndarray): + return obj.tolist() + except: + pass + if isinstance(obj, dict): + return {str(k): _make_json_serializable(v) for k, v in obj.items()} + if isinstance(obj, (list, tuple)): + return [_make_json_serializable(v) for v in obj] + try: + return float(obj) + except: + return str(obj) + + mapping = {n: i for i, n in enumerate(current_global.nodes())} + G_export = nx.relabel_nodes(current_global, mapping, copy=True) + + json_path = os.path.join(output_dir, "global_graph.json") + node_link = json_graph.node_link_data(G_export, edges="links") + with open(json_path, "w") as f: + json.dump(_make_json_serializable(node_link), f, indent=2) + + # Print final summary + print(f"\nComplete! {len(coverage_path)} frames → {len(current_global.nodes())} nodes, {len(current_global.edges())} edges") + + # Check for potential issues + degrees = [d for n, d in current_global.degree()] + max_degree = max(degrees) if degrees else 0 + avg_degree = sum(degrees) / len(degrees) if degrees else 0 + print(f" └─ Max degree: {max_degree}, Avg degree: {avg_degree:.2f}") + print(f" └─ Self-loops: {len(list(nx.selfloop_edges(current_global)))}") + + # Save GIF animation + if SAVE_GIF and gif_frames: + print(f"\nSaving animation GIF with {len(gif_frames)} frames...") + gif_frames[0].save( + GIF_FILENAME, + save_all=True, + append_images=gif_frames[1:], + duration=GIF_DURATION, + loop=0 + ) + print(f" └─ Saved to {GIF_FILENAME}") + + if ENABLE_LIVE_VISUALIZATION and fig is not None and not is_headless: + plt.ioff() + plt.show() + + +if __name__ == "__main__": + main() diff --git a/examples/graph-builder.py b/examples/graph-builder.py new file mode 100644 index 0000000..03bfadb --- /dev/null +++ b/examples/graph-builder.py @@ -0,0 +1,724 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: Copyright (c) 2025 +# SPDX-License-Identifier: Apache-2.0 + +""" +Graph Builder Example - Incremental waypoint graph generation with rotation support + +This script demonstrates incremental graph building from simulated robot exploration. +For Isaac Lab integration, replace the frame generator with: + + # In Isaac Lab simulation loop: + robot_x, robot_y, robot_yaw = get_robot_pose() # Get from Isaac Lab + elevation_map = get_elevation_map_crop(robot_x, robot_y, crop_size=256) # Get from Isaac Lab + + # Convert to occupancy grid (threshold elevation, inflate obstacles, etc.) + occupancy_grid = elevation_to_occupancy(elevation_map) + + # Pass to graph generator with actual robot pose + robot_pose_meters = (robot_x, robot_y, robot_yaw) + local_graph = generator.build_graph_from_grid_map( + image=occupancy_grid, + resolution=0.039, # meters per pixel + x_offset=robot_x, + y_offset=robot_y, + rotation=robot_yaw, # IMPORTANT: Include rotation! + known_points=known_points_from_previous_frames + ) + + # Merge into global graph + global_builder.add_local_graph(local_graph) +""" + +import os +import glob +import math +from pathlib import Path + +import cv2 +import numpy as np +import json +import os +import sys + +# Set matplotlib backend before importing pyplot +import matplotlib +# Check if running in headless environment +if os.environ.get('DISPLAY') is None or os.environ.get('DISPLAY') == '': + matplotlib.use('Agg') # Non-interactive backend for headless systems +else: + try: + matplotlib.use('TkAgg') + except: + matplotlib.use('Agg') + +import matplotlib.pyplot as plt + +from swagger import ( + WaypointGraphGenerator, + WaypointGraphGeneratorConfig, + Point, +) +from swagger.graph_manager import GraphManager +from simulate_robot_graph_route import simulate_robot_graph_route +from visualize_global_graph_on_image import visualize_global_graph_on_image +import networkx as nx +from swagger.global_graph_generator import GlobalGraphGenerator +from networkx.readwrite import json_graph + + +# ============================================================ +# Configuration +# ============================================================ + +RESOLUTION = 0.039 # meters per pixel +STEP_PIXELS = 100 # how far robot moves per frame (px) +MAX_EDGE_DIST = 2.0 # not always used +SAFE_DIST = 0.30 # used for sampling/collision distance +USE_LEGACY_GENERATOR = True # toggle between current and legacy sampling logic + +# Visualization settings +ENABLE_LIVE_VISUALIZATION = True # Set to False to disable real-time plotting + +# Export toggles for JSON output (controls what gets streamed) +# If you set these to True, the exporter will include pixel/pos/origin in the +# id_map; otherwise they are omitted to save bandwidth. +EXPORT_PIXEL = False +EXPORT_POS = False +EXPORT_ORIGIN = False + + +# ============================================================ +# Utility Functions +# ============================================================ + +def generate_robot_frames(input_image_path: str, output_dir: str, num_frames: int, + crop_width: int, crop_height: int, step_size: int, + motion_direction: str = "x", rotation: float = 0.0): + """ + Generator that yields frames one at a time instead of pre-generating all. + Useful for simulating real-time exploration. + + Calculates valid frames based on map size and yields: (frame_idx, image, filename, x_offset_px, y_offset_px, rotation) + + Args: + rotation: Rotation in radians (default 0.0 for pure translation) + """ + os.makedirs(output_dir, exist_ok=True) + + # Load base map + image = cv2.imread(input_image_path, cv2.IMREAD_GRAYSCALE) + if image is None: + raise FileNotFoundError(f"Could not read image at {input_image_path}") + H, W = image.shape[:2] + + # Calculate how many valid frames we can actually generate + if motion_direction == "x": + max_frames = (W - crop_width) // step_size + 1 + elif motion_direction == "y": + max_frames = (H - crop_height) // step_size + 1 + elif motion_direction == "xy": + max_frames = min((W - crop_width) // step_size, (H - crop_height) // step_size) + 1 + else: + raise ValueError("motion_direction must be 'x', 'y', or 'xy'") + + actual_frames = min(num_frames, max_frames) + print(f"[INFO] Map size: {W}x{H}, crop: {crop_width}x{crop_height}, step: {step_size}") + print(f"[INFO] Requested {num_frames} frames, generating {actual_frames} valid frames") + + for i in range(actual_frames): + # Calculate crop region + if motion_direction == "x": + x_start, y_start = i * step_size, 0 + elif motion_direction == "y": + x_start, y_start = 0, i * step_size + elif motion_direction == "xy": + x_start, y_start = i * step_size, i * step_size + else: + raise ValueError("motion_direction must be 'x', 'y', or 'xy'") + + cropped = image[y_start:y_start + crop_height, x_start:x_start + crop_width] + crop_filename = os.path.join(output_dir, f"frame_{i:03d}.png") + cv2.imwrite(crop_filename, cropped) + + print(f"[INFO] Generated frame {i}/{actual_frames-1} (x_offset={x_start}px, y_offset={y_start}px)") + + # Yield frame index, image, filename, AND pixel offsets, AND rotation + yield i, cropped, crop_filename, x_start, y_start, rotation + + +def offset_graph(local_graph: nx.Graph, dx_m: float, dy_m: float): + """ + Apply world-coordinate offset to every node in the local graph. + """ + for n, data in local_graph.nodes(data=True): + if "world" in data: + x, y, z = data["world"] + data["world"] = (x + dx_m, y + dy_m, z) + return local_graph + + +def offset_known_points(points, dx_m, dy_m): + """ + Returns a NEW list of Point() with world-coordinate offsets applied. + """ + out = [] + for p in points: + out.append(Point(x=p.x + dx_m, y=p.y + dy_m, z=p.z)) + return out + + +def ensure_graph_pos_consistent(graph: nx.Graph): + """ + Forces every graph node to have a consistent 'pos' = (x, y) in meters. + """ + for node, data in graph.nodes(data=True): + if "world" in data: + world = data["world"] + if len(world) == 3: + x, y, _ = world + elif len(world) == 2: + x, y = world + else: + # Skip malformed entries but leave debug breadcrumbs. + print(f"[WARN] Node {node} has unexpected world data: {world}") + continue + data["pos"] = (float(x), float(y)) + elif "pixel" in data: + px, py = data["pixel"] + data["pos"] = (px * RESOLUTION, py * RESOLUTION) + return graph + + +def smart_merge_graphs(global_graph: nx.Graph | None, local_graph: nx.Graph): + """ + Merge local_graph into global_graph WITHOUT duplicating nodes or + creating self-loops. This is the same logic you were using before. + """ + if global_graph is None or len(global_graph) == 0: + return local_graph.copy() + + # Add nodes + for n, d in local_graph.nodes(data=True): + if n not in global_graph: + global_graph.add_node(n, **d) + + # Add edges + for u, v, ed in local_graph.edges(data=True): + if not global_graph.has_edge(u, v): + global_graph.add_edge(u, v, **ed) + + # Clean up bad edges + global_graph.remove_edges_from(nx.selfloop_edges(global_graph)) + + return global_graph + + +def visualize_graph_incremental(current_graph: nx.Graph, previous_graph: nx.Graph | None, + frame_idx: int, ax, base_image=None, resolution=0.039, + save_path=None): + """ + Visualize the graph with color coding: + - Magenta: Known nodes from previous iterations + - Red: Newly added nodes + - Magenta: Edges between known nodes + - Blue: New edges (at least one endpoint is new) + + If save_path is provided, saves the figure instead of showing it interactively. + """ + import matplotlib.pyplot as plt + + ax.clear() + + # Show base image if provided + if base_image is not None: + ax.imshow(base_image, cmap='gray', origin='upper') + ax.set_aspect('equal') + + if current_graph is None or current_graph.number_of_nodes() == 0: + ax.set_title(f"Frame {frame_idx}: No nodes yet") + return + + # Determine which nodes are known vs new + known_nodes = set() + new_nodes = set() + + if previous_graph is not None and previous_graph.number_of_nodes() > 0: + known_nodes = set(previous_graph.nodes()) + + for node in current_graph.nodes(): + if node not in known_nodes: + new_nodes.add(node) + + # Prepare positions for plotting + # The generator uses a ROS-style centered coordinate system where world (0,0) is at the center of each frame + # We need to convert world coordinates (meters) to pixel coordinates accounting for this + pos = {} + + # Each frame is 256x256, so center is at (128, 128) in local frame coordinates + # But we're displaying on the full base map, so we need to account for centering in each frame + frame_height = 256 + frame_width = 256 + + for node, data in current_graph.nodes(data=True): + if "world" in data: + world = data["world"] + x_m, y_m = world[0], world[1] + + # Convert world coordinates (meters) to pixels + # The generator's coordinate system has (0,0) at the center of the frame + # x is horizontal (left-to-right), y is vertical (but flipped: positive is up, negative is down) + x_px = x_m / resolution + y_px = -y_m / resolution # Flip y-axis: positive y in world = up = negative in image coords + + # Add center offset: world (0,0) is at pixel (128, 128) in the local frame + x_px += frame_width / 2 + y_px += frame_height / 2 + + pos[node] = (x_px, y_px) + elif "pos" in data: + x_m, y_m = data["pos"] + x_px = x_m / resolution + y_px = -y_m / resolution + x_px += frame_width / 2 + y_px += frame_height / 2 + pos[node] = (x_px, y_px) + elif "pixel" in data: + pos[node] = data["pixel"] + + # Debug: print coordinate ranges + if pos and frame_idx % 1 == 0: + x_coords = [p[0] for p in pos.values()] + y_coords = [p[1] for p in pos.values()] + print(f"[DEBUG Frame {frame_idx}] X range: {min(x_coords):.1f} to {max(x_coords):.1f} px") + print(f"[DEBUG Frame {frame_idx}] Y range: {min(y_coords):.1f} to {max(y_coords):.1f} px") + if base_image is not None: + print(f"[DEBUG Frame {frame_idx}] Image shape: {base_image.shape} (H={base_image.shape[0]}, W={base_image.shape[1]})") + + + # Draw edges with color coding + known_edges = [] + new_edges = [] + + for u, v in current_graph.edges(): + if u in known_nodes and v in known_nodes: + known_edges.append((u, v)) + else: + new_edges.append((u, v)) + + # Draw known edges (magenta) + if known_edges and pos: + for u, v in known_edges: + if u in pos and v in pos: + x_coords = [pos[u][0], pos[v][0]] + y_coords = [pos[u][1], pos[v][1]] + ax.plot(x_coords, y_coords, 'm-', linewidth=1.5, alpha=0.6, zorder=1) + + # Draw new edges (blue) + if new_edges and pos: + for u, v in new_edges: + if u in pos and v in pos: + x_coords = [pos[u][0], pos[v][0]] + y_coords = [pos[u][1], pos[v][1]] + ax.plot(x_coords, y_coords, 'b-', linewidth=2.0, alpha=0.8, zorder=2) + + # Draw known nodes (magenta) + if known_nodes and pos: + known_x = [pos[n][0] for n in known_nodes if n in pos] + known_y = [pos[n][1] for n in known_nodes if n in pos] + if known_x: + ax.scatter(known_x, known_y, c='magenta', s=50, alpha=0.8, + edgecolors='black', linewidths=0.5, zorder=3, label='Known nodes') + + # Draw new nodes (red) + if new_nodes and pos: + new_x = [pos[n][0] for n in new_nodes if n in pos] + new_y = [pos[n][1] for n in new_nodes if n in pos] + if new_x: + ax.scatter(new_x, new_y, c='red', s=80, alpha=0.9, + edgecolors='black', linewidths=1.0, zorder=4, label='New nodes') + + # Add shading for boundary nodes if available + boundary_nodes = [n for n, data in current_graph.nodes(data=True) + if data.get('node_type') == 'boundary'] + if boundary_nodes and pos: + boundary_x = [pos[n][0] for n in boundary_nodes if n in pos] + boundary_y = [pos[n][1] for n in boundary_nodes if n in pos] + if boundary_x: + ax.scatter(boundary_x, boundary_y, c='yellow', s=30, alpha=0.3, zorder=0) + + ax.set_title(f"Frame {frame_idx}: {len(current_graph.nodes())} nodes " + f"({len(new_nodes)} new, {len(known_nodes)} known)") + ax.legend(loc='upper right', fontsize=8) + ax.axis('equal') + + if base_image is not None: + ax.set_xlim(0, base_image.shape[1]) + ax.set_ylim(base_image.shape[0], 0) + + plt.tight_layout() + + if save_path: + # Save to file in headless mode + plt.savefig(save_path, dpi=100, bbox_inches='tight') + print(f"[DEBUG] Saved visualization frame → {save_path}") + else: + # Interactive display + plt.draw() + plt.pause(0.5) # Pause to allow display update + + +# ============================================================ +# Main Builder +# ============================================================ + +def main(): + + # ---------------------------------------------- + # 0. Clean up old results to keep only latest + # ---------------------------------------------- + output_dir = "results" + maps_dir = "results/simulated_route" + + # Remove old simulated_route if it exists + if os.path.exists(maps_dir): + import shutil + shutil.rmtree(maps_dir) + print(f"[INFO] Cleaned up old simulated route") + + os.makedirs(output_dir, exist_ok=True) + + # ---------------------------------------------- + # 1. Setup parameters for simulation + # ---------------------------------------------- + num_frames = 5 + crop_width = 256 + crop_height = 256 + + # ---------------------------------------------- + # 1b. Calculate actual valid frames based on map size + # ---------------------------------------------- + base_map_path = "../maps/carter_warehouse_navigation.png" + temp_img = cv2.imread(base_map_path, cv2.IMREAD_GRAYSCALE) + if temp_img is None: + raise FileNotFoundError(f"Could not load base map: {base_map_path}") + + map_H, map_W = temp_img.shape[:2] + max_frames_x = (map_W - crop_width) // STEP_PIXELS + 1 + actual_frames = min(num_frames, max_frames_x) + + print(f"[INFO] Base map: {map_W}x{map_H}, requested {num_frames} frames, will generate {actual_frames} valid frames") + + # ---------------------------------------------- + # 2. Build a global occupancy canvas and load the full base image + # ---------------------------------------------- + H, W = crop_height, crop_width + # Use actual_frames to calculate the global map width + global_w = W + (actual_frames - 1) * STEP_PIXELS + global_occupancy = np.zeros((H, global_w), dtype=np.uint8) + global_map_path = os.path.join(output_dir, "global_map.png") + out_overlay = os.path.join(output_dir, "global_graph_overlay.png") + + # Load the FULL base map for visualization background + full_base_map = cv2.imread(base_map_path, cv2.IMREAD_GRAYSCALE) + if full_base_map is None: + print("[WARN] Could not load full base map for visualization") + full_base_map = global_occupancy + else: + # Crop the base map to match the area being explored (using actual_frames) + explore_width = (actual_frames - 1) * STEP_PIXELS + crop_width + full_base_map = full_base_map[:crop_height, :explore_width] + print(f"[INFO] Cropped base map to exploration area: {full_base_map.shape}") + + # ---------------------------------------------- + # 3. Configure SWAGGER generation + # ---------------------------------------------- + # Use the main WaypointGraphGenerator (not the V2 variant) + ConfigClass = WaypointGraphGeneratorConfig + GeneratorClass = WaypointGraphGenerator + + config = ConfigClass( + skeleton_sample_distance=0.20, + boundary_inflation_factor=2, + boundary_sample_distance=0.30, + free_space_sampling_threshold=0.50, + merge_node_distance=0.30, + min_subgraph_length=0.35, + use_skeleton_graph=False, + use_boundary_sampling=True, + use_free_space_sampling=True, + use_delaunay_shortcuts=True, + prune_graph=True, + debug=True, # Enable debug output for this example + ) + + generator = GeneratorClass(config) + global_graph = None + persisted_known_points: list[tuple[float, float]] = list(getattr(generator, "_known_points", [])) + + # Use a merge distance that matches the graph generator's merge_node_distance + # This prevents duplicate nodes in overlapping regions between frames + global_builder = GlobalGraphGenerator( + merge_distance=0.30, # Match the generator's merge_node_distance + retention_factor=1.0 # Keep all nodes (don't decay unseen nodes) + ) + + # ---------------------------------------------- + # Setup live visualization if enabled + # ---------------------------------------------- + fig, ax = None, None + is_headless = matplotlib.get_backend() == 'Agg' + + if ENABLE_LIVE_VISUALIZATION: + if is_headless: + print("[INFO] Headless mode detected - saving visualization frames instead of displaying") + os.makedirs("results/viz_frames", exist_ok=True) + else: + plt.ion() # Turn on interactive mode only if not headless + print("[INFO] Live visualization enabled") + + fig, ax = plt.subplots(figsize=(12, 8)) + + previous_global_graph = None + + # ---------------------------------------------- + # 4. Build + merge graph for each frame (generate on-the-fly) + # ---------------------------------------------- + frame_generator = generate_robot_frames( + input_image_path="../maps/carter_warehouse_navigation.png", + output_dir=maps_dir, + num_frames=num_frames, + crop_width=crop_width, + crop_height=crop_height, + step_size=STEP_PIXELS, + motion_direction="x", + rotation=0.0 # For Isaac Lab, you would get this from robot's actual yaw + ) + + for i, occupancy, img_path, x_offset_px, y_offset_px, rotation_rad in frame_generator: + print(f"\n=== Processing frame {i} ===") + # Place the occupancy tile using ACTUAL offsets from generator + x_px = x_offset_px + y_px = y_offset_px + global_occupancy[y_px:y_px + H, x_px:x_px + W] = occupancy + cv2.imwrite(global_map_path, global_occupancy) + + + # Compute world offsets for this frame using ACTUAL pixel offsets + # Use top-left corner of the crop for world offset + dx_m = x_px * RESOLUTION + dy_m = y_px * RESOLUTION + + print(f"[DEBUG OFFSETS] Frame {i}: x_offset={x_px}px, y_offset={y_px}px, rotation={rotation_rad:.3f}rad → dx_m={dx_m:.4f}m, dy_m={dy_m:.4f}m") + + # Will print the world coordinates of the first node after graph generation below + + # Known points accumulated so far, provided in world coordinates + known_points_world = list(persisted_known_points) + + # ---------------------------------------------- + # 4A. Generate local graph from occupancy grid + # ---------------------------------------------- + robot_pose = (dx_m, dy_m, rotation_rad) # Pass actual rotation from generator + build_kwargs = dict( + image=occupancy, + resolution=RESOLUTION, + safety_distance=SAFE_DIST, + occupancy_threshold=127, + ) + known_arg = known_points_world if known_points_world else [] + + if hasattr(generator, "build_local_and_global_graphs"): + local_graph, global_graph = generator.build_local_and_global_graphs( + robot_pose=robot_pose, + known_points=known_arg, + **build_kwargs, + ) + else: + local_graph = generator.build_graph_from_grid_map( + x_offset=robot_pose[0], + y_offset=robot_pose[1], + rotation=robot_pose[2], + known_points=known_arg, + **build_kwargs, + ) + human_readable_global = getattr(generator, "_global_graph", None) + if isinstance(human_readable_global, nx.Graph) and human_readable_global.number_of_nodes() > 0: + global_graph = human_readable_global + else: + global_graph = local_graph + + # Debug: print world coordinates of a few nodes for this frame + sample_nodes = list(local_graph.nodes(data=True))[:5] + print(f"[DEBUG] Frame {i} sample node world coords:") + for n, data in sample_nodes: + if 'world' in data: + print(f" Node {n}: world={data['world']}") + elif 'pos' in data: + print(f" Node {n}: pos={data['pos']}") + else: + print(f" Node {n}: no world/pos info") + if local_graph is None: + print("[WARN] Failed to generate graph for frame") + continue + + + print(f"local graph node number: {len(local_graph)}") + # Print world coordinates of the first node for this frame + first_node = next(iter(local_graph.nodes(data=True)), None) + if first_node: + n, data = first_node + if 'world' in data: + print(f"[DEBUG] Frame {i} FIRST NODE: {n} world={data['world']}") + elif 'pos' in data: + print(f"[DEBUG] Frame {i} FIRST NODE: {n} pos={data['pos']}") + else: + print(f"[DEBUG] Frame {i} FIRST NODE: {n} no world/pos info") + + local_graph = ensure_graph_pos_consistent(local_graph) + global_builder.add_local_graph(local_graph) + print(f"global graph node number: {len(global_builder.get_global_graph())}") + + # ---------------------------------------------- + # Live visualization of graph building + # ---------------------------------------------- + if ENABLE_LIVE_VISUALIZATION and ax is not None: + current_global = global_builder.get_global_graph() + save_path = f"results/viz_frames/frame_{i:03d}.png" if is_headless else None + + # Use full base map for background, not the growing global_occupancy + visualize_graph_incremental( + current_graph=current_global, + previous_graph=previous_global_graph, + frame_idx=i, + ax=ax, + base_image=full_base_map, # Show full map, not stitched + resolution=RESOLUTION, + save_path=save_path + ) + + previous_global_graph = current_global.copy() + + # ---------------------------------------------- + # Export latest global graph as JSON + # ---------------------------------------------- + try: + def _make_json_serializable(obj): + """Recursively convert numpy types, tuples, and other non-JSON-serializable + objects into native Python types (lists, floats, ints, strs). + """ + if obj is None or isinstance(obj, (str, bool, int, float)): + return obj + try: + import numpy as _np + if isinstance(obj, (_np.integer, _np.floating)): + return obj.item() + if isinstance(obj, _np.ndarray): + return obj.tolist() + except Exception: + pass + if isinstance(obj, dict): + return {str(k): _make_json_serializable(v) for k, v in obj.items()} + if isinstance(obj, list): + return [_make_json_serializable(v) for v in obj] + if isinstance(obj, tuple): + return [_make_json_serializable(v) for v in obj] + try: + return float(obj) + except Exception: + return str(obj) + + # Get the current global graph + current_global = global_builder.get_global_graph() + + # Reindex nodes to simple integer ids + mapping = {} + id_map = {} + for idx, n in enumerate(current_global.nodes()): + mapping[n] = idx + data = current_global.nodes[n] + entry = { + "orig_id": list(n) if isinstance(n, (tuple, list)) else n, + "world": _make_json_serializable(data.get("world")), + "pos": _make_json_serializable(data.get("pos")), + "node_type": data.get("node_type"), + } + id_map[idx] = entry + + G_export = nx.relabel_nodes(current_global, mapping, copy=True) + + # Save latest global graph JSON (overwrites each iteration) + json_path = os.path.join(output_dir, "latest_global_graph.json") + node_link = json_graph.node_link_data(G_export) + serializable = _make_json_serializable(node_link) + with open(json_path, "w") as jf: + json.dump(serializable, jf, indent=2) + + # Save id_map (overwrites each iteration) + idmap_path = os.path.join(output_dir, "latest_global_graph_id_map.json") + with open(idmap_path, "w") as imf: + json.dump(id_map, imf, indent=2) + + print(f"[OK] Updated latest global graph JSON → {json_path}") + except Exception as e: + print(f"[WARN] Failed to export JSON graph: {e}") + + # Some generator implementations may not expose `global_graph` as a public + # attribute. Use getattr() with a fallback to the global builder to avoid + # AttributeError on generators that don't provide it. + + # ---------------------------------------------- + # Generate final overlay after all frames processed + # ---------------------------------------------- + if ENABLE_LIVE_VISUALIZATION and fig is not None: + # Keep the last visualization visible + print("[INFO] Generating final overlay...") + + current_global = global_builder.get_global_graph() + if isinstance(current_global, nx.Graph) and current_global.number_of_nodes() > 0: + cv2.imwrite(global_map_path, global_occupancy) + overlay_path = os.path.join(output_dir, "global_graph_overlay.png") + visualize_global_graph_on_image( + current_global, + base_image_path=global_map_path, + output_path=overlay_path, + ) + print(f"[OK] Global graph overlay saved → {overlay_path}") + + # ============================================================ + # 5. Final global graph saved in loop above + # ============================================================ + + print("[OK] Processing complete.") + print(f" - Simulated route frames: {maps_dir}/") + print(f" - Global graph overlay: {output_dir}/global_graph_overlay.png") + print(f" - Latest global graph JSON: {output_dir}/latest_global_graph.json") + if is_headless and ENABLE_LIVE_VISUALIZATION: + print(f" - Visualization frames: results/viz_frames/") + + # Keep visualization window open if enabled (only in non-headless mode) + if ENABLE_LIVE_VISUALIZATION and fig is not None and not is_headless: + plt.ioff() # Turn off interactive mode + print("[INFO] Close the visualization window to continue...") + plt.show() # This will block until the window is closed + + # ============================================================ + # 6. Example route query using pixel → world conversion + # ============================================================ + + start_px = (50, 150) + goal_px = (450, 150) + + start = Point(x=start_px[0] * RESOLUTION, y=start_px[1] * RESOLUTION, z=0.0) + goal = Point(x=goal_px[0] * RESOLUTION, y=goal_px[1] * RESOLUTION, z=0.0) + + route = generator.find_route(start, goal) + if route: + print(f"[OK] Found route with {len(route)} waypoints.") + else: + print("[ERR] No route found.") + + +if __name__ == "__main__": + main() diff --git a/examples/simulate_robot_graph_route.py b/examples/simulate_robot_graph_route.py new file mode 100644 index 0000000..666155f --- /dev/null +++ b/examples/simulate_robot_graph_route.py @@ -0,0 +1,70 @@ +import os +import cv2 +import time +import numpy as np +from pathlib import Path +from swagger import WaypointGraphGenerator, WaypointGraphGeneratorConfig +from swagger.models import Point + + +def simulate_robot_graph_route( + input_image_path: str, + output_dir: str = "results/simulated_route", + num_frames: int = 10, + crop_width: int = 256, + crop_height: int = 256, + step_size: int = 100, + motion_direction: str = "x", +): + """ + Simulates a robot moving through a large environment by cropping submaps, + generating graphs for each, and combining them into a global graph. + + Args: + input_image_path (str): Path to the large occupancy map. + output_dir (str): Directory to save results. + num_frames (int): Number of cropped regions (steps). + crop_width (int): Width of each cropped map (pixels). + crop_height (int): Height of each cropped map (pixels). + step_size (int): Pixel distance between consecutive crops. + motion_direction (str): 'x', 'y', or 'xy' for movement direction. + offset_distance (float): Distance offset between graphs (meters). + visualize_route (bool): Whether to visualize the robot’s movement. + + Returns: + generator (WaypointGraphGenerator): The graph generator object with all subgraphs. + """ + os.makedirs(output_dir, exist_ok=True) + + # Load base map + image = cv2.imread(input_image_path, cv2.IMREAD_GRAYSCALE) + if image is None: + raise FileNotFoundError(f"Could not read image at {input_image_path}") + H, W = image.shape[:2] + print(f"[INFO] Loaded base map: {W}x{H}") + + # Iterate through simulated motion + for i in range(num_frames): + # Calculate crop region + if motion_direction == "x": + x_start, y_start = i * step_size, 0 + elif motion_direction == "y": + x_start, y_start = 0, i * step_size + elif motion_direction == "xy": + x_start, y_start = i * step_size, i * step_size + else: + raise ValueError("motion_direction must be 'x', 'y', or 'xy'") + + x_start = min(x_start, W - crop_width) + y_start = min(y_start, H - crop_height) + x_offset = i * step_size * 0.039 + print(f"[DEBUG] Frame {i}: x_start={x_start}px, x_offset={x_offset:.3f}m") + + cropped = image[y_start:y_start + crop_height, x_start:x_start + crop_width] + crop_filename = os.path.join(output_dir, f"frame_{i+1:03d}.png") + cv2.imwrite(crop_filename, cropped) + + print(f"[INFO] Saved cropped frame {i+1}/{num_frames} to {crop_filename}") + + print(f"\n[INFO] Generated {num_frames} cropped maps in '{output_dir}'") + diff --git a/examples/visualize_dynamic_path_planning.py b/examples/visualize_dynamic_path_planning.py new file mode 100644 index 0000000..7cbd829 --- /dev/null +++ b/examples/visualize_dynamic_path_planning.py @@ -0,0 +1,300 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: Copyright (c) 2025 +# SPDX-License-Identifier: Apache-2.0 + +""" +Dynamic Path Planning Visualization + +This script visualizes path planning with dynamic graph updates. As the robot +moves, it periodically reloads the graph and replans the path if the graph changes. +""" + +import os +import json +import cv2 +import numpy as np +import matplotlib.pyplot as plt +import networkx as nx +from networkx.readwrite import json_graph +from matplotlib.animation import FuncAnimation +import time + +# Configuration +GRAPH_PATH = "results/global_graph.json" +MAP_PATH = "../maps/carter_warehouse_navigation.png" +RESOLUTION = 0.039 # meters per pixel +REPLAN_INTERVAL = 30 # Replan every N frames (~1.5 seconds at 50ms interval) + +class DynamicPathPlanner: + def __init__(self, graph_path, map_path, start_pixel, goal_pixel): + self.graph_path = graph_path + self.map_path = map_path + self.start_pixel = start_pixel + self.goal_pixel = goal_pixel + + # Load initial data + self.graph = self.load_graph() + self.map_image = cv2.imread(map_path, cv2.IMREAD_GRAYSCALE) + + # Find start and goal nodes + self.start_node = self.find_nearest_node(start_pixel) + self.goal_node = self.find_nearest_node(goal_pixel) + + # Current state + self.current_path = None + self.current_positions = [] + self.position_index = 0 + self.replan_counter = 0 + self.trail_x = [] + self.trail_y = [] + + # Initial path planning + self.replan() + + print(f"Initial path: {len(self.current_path)} nodes" if self.current_path else "No initial path found") + + def load_graph(self): + """Load the navigation graph from JSON.""" + try: + with open(self.graph_path, 'r') as f: + data = json.load(f) + G = json_graph.node_link_graph(data, edges="links") + return G + except Exception as e: + print(f"Error loading graph: {e}") + return nx.Graph() + + def world_to_pixel(self, world_coords, crop_size=300): + """Convert world coordinates to pixel coordinates.""" + x_m, y_m = world_coords[0], world_coords[1] + world_origin_x_px = crop_size / 2 + world_origin_y_px = crop_size / 2 + + x_px = (x_m / RESOLUTION) + world_origin_x_px + y_px = world_origin_y_px - (y_m / RESOLUTION) + + return x_px, y_px + + def find_nearest_node(self, target_pixel): + """Find the nearest graph node to a target pixel location.""" + min_dist = float('inf') + nearest_node = None + + for node, data in self.graph.nodes(data=True): + world = data.get('world') + if world is None: + continue + + node_px = self.world_to_pixel(world) + dist = np.hypot(node_px[0] - target_pixel[0], node_px[1] - target_pixel[1]) + + if dist < min_dist: + min_dist = dist + nearest_node = node + + return nearest_node + + def compute_path(self, start_node, goal_node): + """Compute shortest path using A* algorithm.""" + def heuristic(n1, n2): + w1 = self.graph.nodes[n1]['world'] + w2 = self.graph.nodes[n2]['world'] + return np.hypot(w1[0] - w2[0], w1[1] - w2[1]) + + try: + path = nx.astar_path(self.graph, start_node, goal_node, heuristic=heuristic, weight='weight') + return path + except (nx.NetworkXNoPath, KeyError): + return None + + def interpolate_path(self, path_nodes, num_points=20): + """Interpolate smooth positions along the path.""" + positions = [] + + for i in range(len(path_nodes) - 1): + n1 = path_nodes[i] + n2 = path_nodes[i + 1] + + p1 = self.world_to_pixel(self.graph.nodes[n1]['world']) + p2 = self.world_to_pixel(self.graph.nodes[n2]['world']) + + for t in np.linspace(0, 1, num_points, endpoint=False): + x = p1[0] * (1 - t) + p2[0] * t + y = p1[1] * (1 - t) + p2[1] * t + positions.append((x, y)) + + # Add final position + final_pos = self.world_to_pixel(self.graph.nodes[path_nodes[-1]]['world']) + positions.append(final_pos) + + return positions + + def replan(self): + """Reload graph and replan path.""" + print("Replanning...") + + # Reload graph + old_num_nodes = len(self.graph.nodes) if self.graph else 0 + self.graph = self.load_graph() + new_num_nodes = len(self.graph.nodes) + + if new_num_nodes != old_num_nodes: + print(f"Graph updated: {old_num_nodes} → {new_num_nodes} nodes") + + # Update start node to current position if we're already moving + if self.current_positions and self.position_index < len(self.current_positions): + current_pos = self.current_positions[self.position_index] + self.start_node = self.find_nearest_node(current_pos) + else: + self.start_node = self.find_nearest_node(self.start_pixel) + + # Always update goal node in case it changed + self.goal_node = self.find_nearest_node(self.goal_pixel) + + # Compute new path + self.current_path = self.compute_path(self.start_node, self.goal_node) + + if self.current_path: + self.current_positions = self.interpolate_path(self.current_path) + self.position_index = 0 + print(f"New path: {len(self.current_path)} nodes") + else: + print("No path found during replan") + + def get_current_position(self): + """Get the current robot position.""" + if not self.current_positions or self.position_index >= len(self.current_positions): + return None + return self.current_positions[self.position_index] + + def step(self): + """Advance one step in the animation.""" + self.replan_counter += 1 + + # Check if we need to replan + if self.replan_counter >= REPLAN_INTERVAL: + self.replan() + self.replan_counter = 0 + + # Advance position + if self.current_positions and self.position_index < len(self.current_positions): + self.position_index += 1 + + # If reached end, loop back + if self.position_index >= len(self.current_positions): + self.position_index = 0 + self.trail_x.clear() + self.trail_y.clear() + +def visualize_dynamic_planning(planner): + """Create animated visualization with dynamic replanning.""" + + fig, ax = plt.subplots(figsize=(12, 10)) + + # Robot marker and trail + robot, = ax.plot([], [], 'o', color='lime', markersize=20, zorder=10, + markeredgecolor='darkgreen', markeredgewidth=2, label='Robot') + trail, = ax.plot([], [], 'g-', linewidth=2, alpha=0.5, zorder=9) + path_line, = ax.plot([], [], 'b-', linewidth=3, alpha=0.7, zorder=3, label='Current Path') + + # Static elements + ax.imshow(planner.map_image, cmap='gray', origin='upper') + ax.set_aspect('equal') + ax.set_xlim(0, planner.map_image.shape[1]) + ax.set_ylim(planner.map_image.shape[0], 0) + + # Draw start and goal + ax.plot(planner.start_pixel[0], planner.start_pixel[1], 'go', markersize=15, + label='Start', zorder=5, markeredgecolor='darkgreen', markeredgewidth=2) + ax.plot(planner.goal_pixel[0], planner.goal_pixel[1], 'ro', markersize=15, + label='Goal', zorder=5, markeredgecolor='darkred', markeredgewidth=2) + + ax.legend(loc='upper right', fontsize=10) + title = ax.set_title('Dynamic Path Planning') + plt.tight_layout() + + def init(): + robot.set_data([], []) + trail.set_data([], []) + path_line.set_data([], []) + return robot, trail, path_line, title + + def animate(frame): + # Step the planner + planner.step() + + # Update graph visualization (edges - light) + ax.clear() + ax.imshow(planner.map_image, cmap='gray', origin='upper') + ax.set_aspect('equal') + ax.set_xlim(0, planner.map_image.shape[1]) + ax.set_ylim(planner.map_image.shape[0], 0) + + # Draw graph edges + for u, v in planner.graph.edges(): + w1 = planner.graph.nodes[u].get('world') + w2 = planner.graph.nodes[v].get('world') + if w1 and w2: + p1 = planner.world_to_pixel(w1) + p2 = planner.world_to_pixel(w2) + ax.plot([p1[0], p2[0]], [p1[1], p2[1]], 'gray', alpha=0.2, linewidth=0.5, zorder=1) + + # Draw current path + if planner.current_path: + path_pixels = [planner.world_to_pixel(planner.graph.nodes[n]['world']) + for n in planner.current_path] + path_x = [p[0] for p in path_pixels] + path_y = [p[1] for p in path_pixels] + ax.plot(path_x, path_y, 'b-', linewidth=3, alpha=0.7, zorder=3, label='Current Path') + + # Draw waypoints + for px in path_pixels: + ax.plot(px[0], px[1], 'o', color='blue', markersize=4, zorder=4) + + # Draw start and goal + ax.plot(planner.start_pixel[0], planner.start_pixel[1], 'go', markersize=15, + label='Start', zorder=5, markeredgecolor='darkgreen', markeredgewidth=2) + ax.plot(planner.goal_pixel[0], planner.goal_pixel[1], 'ro', markersize=15, + label='Goal', zorder=5, markeredgecolor='darkred', markeredgewidth=2) + + # Update robot position + pos = planner.get_current_position() + if pos: + x, y = pos + ax.plot([x], [y], 'o', color='lime', markersize=20, zorder=10, + markeredgecolor='darkgreen', markeredgewidth=2, label='Robot') + + planner.trail_x.append(x) + planner.trail_y.append(y) + ax.plot(planner.trail_x, planner.trail_y, 'g-', linewidth=2, alpha=0.5, zorder=9) + + # Update title with stats + num_nodes = len(planner.graph.nodes) if planner.graph else 0 + path_len = len(planner.current_path) if planner.current_path else 0 + ax.set_title(f'Dynamic Path Planning (Graph: {num_nodes} nodes, Path: {path_len} waypoints)') + + ax.legend(loc='upper right', fontsize=10) + plt.tight_layout() + + return [] + + # Create animation + anim = FuncAnimation(fig, animate, init_func=init, frames=None, + interval=50, blit=False, repeat=True) + + plt.show() + +def main(): + # Define start and goal positions (pixel coordinates) + start_pixel = (150, 200) # Upper left area + goal_pixel = (400, 600) # Lower right area + + print("Initializing dynamic path planner...") + planner = DynamicPathPlanner(GRAPH_PATH, MAP_PATH, start_pixel, goal_pixel) + + print(f"Starting visualization (replanning every {REPLAN_INTERVAL} frames)...") + visualize_dynamic_planning(planner) + +if __name__ == "__main__": + main() diff --git a/examples/visualize_global_graph_on_image.py b/examples/visualize_global_graph_on_image.py new file mode 100644 index 0000000..4fa7e55 --- /dev/null +++ b/examples/visualize_global_graph_on_image.py @@ -0,0 +1,62 @@ +import cv2 +import numpy as np +import os + +def visualize_global_graph_on_image(global_graph, base_image_path, output_path="results/global_graph_overlay.png"): + """ + Overlay a combined graph onto the given base map image. + Draws edges in red and nodes in green. + """ + + # --- Load the base map image --- + base_image = cv2.imread(base_image_path, cv2.IMREAD_COLOR) + if base_image is None: + raise FileNotFoundError(f"[ERROR] Could not load base image: {base_image_path}") + + # Create a copy to draw on + vis = base_image.copy() + H, W, _ = vis.shape + resolution = 0.05 + x_offset_m = 0.0 + y_offset_m = 0.0 + + # --- Get all coordinates --- + xs = [] + ys = [] + for _, n in global_graph.nodes(data=True): + world = n.get("world", [0.0, 0.0, 0.0]) + xs.append(float(world[0])) + ys.append(float(world[1])) + + # --- Auto scale factor --- + x_scale = W / (max(xs) - min(xs) + 1e-6) + y_scale = H / (max(ys) - min(ys) + 1e-6) + scale = min(x_scale, y_scale) + + # --- Helper: convert world (m) → image (px) coordinates --- + def world_to_pixel(x_m, y_m): + x_px = int((x_m - min(xs)) * scale) + y_px = int((y_m - min(ys)) * scale) + return x_px, H - y_px # flip Y so it’s upright + + # --- Draw edges --- + for u, v, edge_data in global_graph.edges(data=True): + node_u = global_graph.nodes[u] + node_v = global_graph.nodes[v] + + wu = node_u.get("world", [0.0, 0.0, 0.0]) + wv = node_v.get("world", [0.0, 0.0, 0.0]) + x1, y1 = world_to_pixel(wu[0], wu[1]) + x2, y2 = world_to_pixel(wv[0], wv[1]) + cv2.line(vis, (x1, y1), (x2, y2), (0, 0, 255), 2) + + # --- Draw nodes --- + for node_id, node_data in global_graph.nodes(data=True): + world = node_data.get("world", [0.0, 0.0, 0.0]) + x, y = world_to_pixel(world[0], world[1]) + cv2.circle(vis, (x, y), 4, (0, 255, 0), -1) # Green nodes + + # --- Save the result --- + os.makedirs(os.path.dirname(output_path), exist_ok=True) + cv2.imwrite(output_path, vis) + print(f"Global graph visualization saved to: {output_path}") diff --git a/examples/visualize_path_planning.py b/examples/visualize_path_planning.py new file mode 100644 index 0000000..9e2afa2 --- /dev/null +++ b/examples/visualize_path_planning.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: Copyright (c) 2025 +# SPDX-License-Identifier: Apache-2.0 + +""" +Path Planning Visualization + +This script loads a saved navigation graph and visualizes A* path planning +with an animated robot moving from start to goal positions. +""" + +import os +import json +import cv2 +import numpy as np +import matplotlib.pyplot as plt +import matplotlib.patches as patches +from matplotlib.animation import FuncAnimation +import networkx as nx +from networkx.readwrite import json_graph + +# Configuration +GRAPH_PATH = "results/global_graph.json" +MAP_PATH = "../maps/carter_warehouse_navigation.png" +RESOLUTION = 0.039 # meters per pixel + +def load_graph(graph_path): + """Load the navigation graph from JSON.""" + with open(graph_path, 'r') as f: + data = json.load(f) + G = json_graph.node_link_graph(data, edges="links") + return G + +def world_to_pixel(world_coords, resolution, crop_size=300): + """Convert world coordinates to pixel coordinates for visualization.""" + x_m, y_m = world_coords[0], world_coords[1] + world_origin_x_px = crop_size / 2 + world_origin_y_px = crop_size / 2 + + x_px = (x_m / resolution) + world_origin_x_px + y_px = world_origin_y_px - (y_m / resolution) + + return x_px, y_px + +def find_nearest_node(graph, target_pixel): + """Find the nearest graph node to a target pixel location.""" + min_dist = float('inf') + nearest_node = None + + for node, data in graph.nodes(data=True): + world = data.get('world') + if world is None: + continue + + node_px = world_to_pixel(world, RESOLUTION) + dist = np.hypot(node_px[0] - target_pixel[0], node_px[1] - target_pixel[1]) + + if dist < min_dist: + min_dist = dist + nearest_node = node + + return nearest_node + +def compute_path(graph, start_node, goal_node): + """Compute shortest path using A* algorithm.""" + def heuristic(n1, n2): + w1 = graph.nodes[n1]['world'] + w2 = graph.nodes[n2]['world'] + return np.hypot(w1[0] - w2[0], w1[1] - w2[1]) + + try: + path = nx.astar_path(graph, start_node, goal_node, heuristic=heuristic, weight='weight') + return path + except nx.NetworkXNoPath: + print("No path found!") + return None + +def interpolate_path(path_nodes, graph, num_points=50): + """Interpolate smooth positions along the path for animation.""" + positions = [] + + for i in range(len(path_nodes) - 1): + n1 = path_nodes[i] + n2 = path_nodes[i + 1] + + p1 = world_to_pixel(graph.nodes[n1]['world'], RESOLUTION) + p2 = world_to_pixel(graph.nodes[n2]['world'], RESOLUTION) + + for t in np.linspace(0, 1, num_points, endpoint=False): + x = p1[0] * (1 - t) + p2[0] * t + y = p1[1] * (1 - t) + p2[1] * t + positions.append((x, y)) + + # Add final position + final_pos = world_to_pixel(graph.nodes[path_nodes[-1]]['world'], RESOLUTION) + positions.append(final_pos) + + return positions + +def visualize_path_planning(graph, map_image, start_pixel, goal_pixel): + """Create animated visualization of path planning.""" + + # Find nearest nodes + start_node = find_nearest_node(graph, start_pixel) + goal_node = find_nearest_node(graph, goal_pixel) + + if start_node is None or goal_node is None: + print("Could not find start or goal node") + return + + print(f"Start node: {start_node}") + print(f"Goal node: {goal_node}") + + # Compute path + path = compute_path(graph, start_node, goal_node) + + if path is None: + return + + print(f"Path length: {len(path)} nodes") + + # Interpolate positions for smooth animation + positions = interpolate_path(path, graph, num_points=20) + + # Setup figure + fig, ax = plt.subplots(figsize=(12, 10)) + ax.imshow(map_image, cmap='gray', origin='upper') + ax.set_aspect('equal') + ax.set_xlim(0, map_image.shape[1]) + ax.set_ylim(map_image.shape[0], 0) + + # Draw graph edges (light) + for u, v in graph.edges(): + w1 = graph.nodes[u].get('world') + w2 = graph.nodes[v].get('world') + if w1 and w2: + p1 = world_to_pixel(w1, RESOLUTION) + p2 = world_to_pixel(w2, RESOLUTION) + ax.plot([p1[0], p2[0]], [p1[1], p2[1]], 'gray', alpha=0.3, linewidth=0.5, zorder=1) + + # Draw graph nodes (small) + for node, data in graph.nodes(data=True): + world = data.get('world') + if world: + px = world_to_pixel(world, RESOLUTION) + ax.plot(px[0], px[1], 'o', color='lightblue', markersize=2, alpha=0.5, zorder=2) + + # Draw planned path (highlighted) + path_pixels = [world_to_pixel(graph.nodes[n]['world'], RESOLUTION) for n in path] + path_x = [p[0] for p in path_pixels] + path_y = [p[1] for p in path_pixels] + ax.plot(path_x, path_y, 'b-', linewidth=3, alpha=0.7, zorder=3, label='Planned Path') + + # Draw waypoints on path + for px in path_pixels: + ax.plot(px[0], px[1], 'o', color='blue', markersize=6, zorder=4) + + # Draw start and goal + ax.plot(start_pixel[0], start_pixel[1], 'go', markersize=15, label='Start', zorder=5) + ax.plot(goal_pixel[0], goal_pixel[1], 'ro', markersize=15, label='Goal', zorder=5) + + # Robot marker (to be animated) + robot, = ax.plot([], [], 'o', color='lime', markersize=20, zorder=10, + markeredgecolor='darkgreen', markeredgewidth=2, label='Robot') + trail, = ax.plot([], [], 'g-', linewidth=2, alpha=0.5, zorder=9) + + # Trail storage + trail_x = [] + trail_y = [] + + ax.legend(loc='upper right', fontsize=10) + ax.set_title('Path Planning Visualization') + plt.tight_layout() + + def init(): + robot.set_data([], []) + trail.set_data([], []) + return robot, trail + + def animate(frame): + if frame < len(positions): + x, y = positions[frame] + robot.set_data([x], [y]) + + trail_x.append(x) + trail_y.append(y) + trail.set_data(trail_x, trail_y) + + return robot, trail + + # Create animation + anim = FuncAnimation(fig, animate, init_func=init, frames=len(positions), + interval=50, blit=True, repeat=True) + + plt.show() + +def main(): + # Load graph and map + print("Loading graph...") + graph = load_graph(GRAPH_PATH) + print(f"Loaded graph with {len(graph.nodes)} nodes and {len(graph.edges)} edges") + + print("Loading map...") + map_image = cv2.imread(MAP_PATH, cv2.IMREAD_GRAYSCALE) + + # Define start and goal positions (pixel coordinates) + # You can modify these to test different paths + start_pixel = (150, 200) # Upper left area + goal_pixel = (400, 600) # Lower right area + + print(f"Planning path from {start_pixel} to {goal_pixel}") + + # Visualize + visualize_path_planning(graph, map_image, start_pixel, goal_pixel) + +if __name__ == "__main__": + main() diff --git a/examples/waypoint_graph_generation_example.py b/examples/waypoint_graph_generation_example.py index 6fc52f9..faeaddf 100644 --- a/examples/waypoint_graph_generation_example.py +++ b/examples/waypoint_graph_generation_example.py @@ -18,14 +18,37 @@ import cv2 +# To add in timing to the example +import time + from swagger import Point, WaypointGraphGenerator, WaypointGraphGeneratorConfig def main(): # Prepare the required data occupancy_grid = cv2.imread( - Path(__file__).parent.parent / "data" / "carter_warehouse_navigation.png", cv2.IMREAD_GRAYSCALE + Path(__file__).parent.parent / "maps" / "carter_warehouse_navigation.png", cv2.IMREAD_GRAYSCALE ) + occupany_grid = cv2.imread("results/simulated_route/frame_001.png", cv2.IMREAD_GRAYSCALE) + + if occupancy_grid is None: + print("Error: Image could not be loaded. Check the file path.") + return + else: + height, width = occupancy_grid.shape + print(f"Image size: {width}x{height} pixels") + print(f"Image shape: {occupancy_grid.shape}") + + occupancy_grid = cv2.resize(occupancy_grid, (250, 250), interpolation=cv2.INTER_AREA) + + if occupancy_grid is None: + print("Error: Image could not be loaded. Check the file path.") + return + else: + height, width = occupancy_grid.shape + print(f"Image size: {width}x{height} pixels") + print(f"Image shape: {occupancy_grid.shape}") + occupancy_threshold = 127 safety_distance = 0.3 resolution = 0.05 @@ -35,12 +58,12 @@ def main(): # Create a custom configuration config = WaypointGraphGeneratorConfig( - skeleton_sample_distance=1.0, # Distance between waypoints (in meters) - boundary_inflation_factor=2.0, # Larger margins around obstacles - boundary_sample_distance=1.5, # Distance between boundary samples (in meters) - free_space_sampling_threshold=2.0, # Distance from obstacles for free space sampling (in meters) - merge_node_distance=0.3, # Distance to merge nearby nodes (in meters) - min_subgraph_length=1.0, # Minimum subgraph length to keep (in meters) + skeleton_sample_distance=0.5, # Distance between waypoints (in meters) + boundary_inflation_factor=1.4, # Larger margins around obstacles + boundary_sample_distance=0.8, # Distance between boundary samples (in meters) + free_space_sampling_threshold=0.3, # Distance from obstacles for free space sampling (in meters) + merge_node_distance=0.2, # Distance to merge nearby nodes (in meters) + min_subgraph_length=0.5, # Minimum subgraph length to keep (in meters) use_skeleton_graph=True, # Create a graph along the medial axis use_boundary_sampling=True, # Sample nodes along boundaries use_free_space_sampling=True, # Sample nodes in open areas @@ -62,8 +85,8 @@ def main(): rotation=rotation, ) - print(f"Node 1: {graph.nodes[1]}") - print(f"Edge (0, 1): {graph.edges[(0, 1)]}") + #print(f"Node 1: {graph.nodes[1]}") + #print(f"Edge (0, 1): {graph.edges[(0, 1)]}") # Visualize the graph output_dir = "results" @@ -75,13 +98,28 @@ def main(): start = Point(x=13.0, y=18.0) goal = Point(x=15.0, y=10.0) route = generator.find_route(start, goal) - print(f"Route: {[(point.x, point.y) for point in route]}") + #print(f"Route: {[(point.x, point.y) for point in route]}") # Get node IDs for a list of points points = [Point(x=13.0, y=18.0), Point(x=15.0, y=10.0), Point(x=0.0, y=0.0)] node_ids = generator.get_node_ids(points) - print(f"Node IDs: {node_ids}") + #print(f"Node IDs: {node_ids}") if __name__ == "__main__": - main() + # Replaced main() with this to loop 10 times and to time each instance + N = 10 + times = [] + for i in range(N): + start = time.perf_counter() + main() + end = time.perf_counter() + elapsed = end - start + times.append(elapsed) + + print("\n --- Timing Results ---") + for i, t in enumerate(times, 1): + print(f"\nRun {i}/{N}: {t:.3f} seconds") + avg_time = sum(times) / N + print(f"\nAverage time over {N} runs: {avg_time:.3f} seconds") + diff --git a/swagger/global_graph_generator.py b/swagger/global_graph_generator.py new file mode 100644 index 0000000..df85d9e --- /dev/null +++ b/swagger/global_graph_generator.py @@ -0,0 +1,297 @@ +"""Global Graph Assembler for SWAGGER. + +This helper consumes per-frame/local graphs (such as the ones produced by +``WaypointGraphGenerator``) and incrementally builds a stitched, world-frame +networkx graph. It handles: + +* merging nearby free-space nodes (ignores skeleton nodes) +* persisting/freeing nodes using a retention factor (0 → only current frame, + 1 → keep everything forever) +* maintaining a probabilistic heatmap of boundary observations. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Dict, Tuple, Iterable, Set +import math + +import cv2 +import numpy as np +import networkx as nx + + +@dataclass +class GlobalGraphGenerator: + """Incrementally build a stitched global graph. + + Args: + merge_distance: Maximum world-distance between two free-space nodes + to consider them the "same" node when merging (meters). + retention_factor: Controls how long unseen nodes stay around. + 0.0 → drop nodes immediately if not observed this frame. + 1.0 → keep nodes forever. + boundary_increment: How much to increment the obstacle probability + whenever a boundary node is re-observed. + boundary_decay: How quickly unseen obstacle probabilities decay. + """ + + merge_distance: float = 0.05 + retention_factor: float = 0.5 + boundary_increment: float = 0.2 + boundary_decay: float = 0.9 + boundary_cell_size: float = 0.05 + + global_graph: nx.Graph = field(default_factory=nx.Graph, init=False) + _next_node_id: int = field(default=0, init=False) + _node_usage: Dict[int, float] = field(default_factory=dict, init=False) + _boundary_probs: Dict[Tuple[int, int], float] = field(default_factory=dict, init=False) + + def add_local_graph(self, local_graph: nx.Graph) -> None: + """Merge a local graph into the persistent global graph.""" + + if not 0.0 <= self.retention_factor <= 1.0: + raise ValueError("retention_factor must be between 0 and 1") + + local_to_global: Dict = {} + touched_nodes: Set[int] = set() + seen_boundary_cells: Set[Tuple[int, int]] = set() + + for node, data in local_graph.nodes(data=True): + node_type = str(data.get("node_type", "")).lower() + if node_type == "skeleton": + continue # never ingest skeleton nodes + + world = data.get("world") + if world is None: + continue + world_xy = (float(world[0]), float(world[1])) + + if node_type == "boundary": + key = self._quantize(world_xy) + seen_boundary_cells.add(key) + prob = self._boundary_probs.get(key, 0.0) + prob = min(1.0, prob + self.boundary_increment) + self._boundary_probs[key] = prob + continue + + if node_type not in {"free_space", "known"}: + continue + + global_id = self._merge_or_add_node(world_xy, data) + local_to_global[node] = global_id + touched_nodes.add(global_id) + + # Add edges only between nodes mapped above + local_edge_pairs: Set[Tuple[int, int]] = set() + for src, dst, edata in local_graph.edges(data=True): + g_src = local_to_global.get(src) + g_dst = local_to_global.get(dst) + if g_src is None or g_dst is None or g_src == g_dst: + continue + pair = tuple(sorted((g_src, g_dst))) + if pair in local_edge_pairs: + continue # keep only one connection per pair from this local graph + local_edge_pairs.add(pair) + + if self.global_graph.has_edge(*pair): + n1 = self.global_graph.nodes[g_src]["world"] + n2 = self.global_graph.nodes[g_dst]["world"] + dist = math.hypot(float(n1[0]) - float(n2[0]), float(n1[1]) - float(n2[1])) + self.global_graph[g_src][g_dst]["weight"] = dist + continue + + n1 = self.global_graph.nodes[g_src]["world"] + n2 = self.global_graph.nodes[g_dst]["world"] + dist = math.hypot(float(n1[0]) - float(n2[0]), float(n1[1]) - float(n2[1])) + self.global_graph.add_edge(g_src, g_dst, weight=dist) + + # Clean up redundant edges after merging + self._prune_redundant_edges() + + self._decay_nodes(touched_nodes) + self._decay_boundaries(seen_boundary_cells) + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + def _merge_or_add_node(self, world_xy: Tuple[float, float], attrs: dict) -> int: + """Merge with an existing node or create a new one.""" + best_node = None + best_dist = self.merge_distance + for node_id, data in self.global_graph.nodes(data=True): + existing_world = data.get("world") + if existing_world is None: + continue + dist = math.hypot(world_xy[0] - float(existing_world[0]), world_xy[1] - float(existing_world[1])) + if dist < best_dist: + best_dist = dist + best_node = node_id + + if best_node is None: + node_id = self._next_node_id + self._next_node_id += 1 + z = 0.0 + if isinstance(attrs.get("world"), (tuple, list)) and len(attrs["world"]) >= 3: + z = float(attrs["world"][2]) + node_data = { + "world": (world_xy[0], world_xy[1], z), + "node_type": "free_space", + "origin": attrs.get("origin", "new"), + } + self.global_graph.add_node(node_id, **node_data) + self._node_usage[node_id] = 1.0 + return node_id + + # Merge into existing node + data = self.global_graph.nodes[best_node] + existing_world = data.get("world", (world_xy[0], world_xy[1], 0.0)) + data["world"] = (world_xy[0], world_xy[1], existing_world[2]) + data["origin"] = attrs.get("origin", data.get("origin", "new")) + self._node_usage[best_node] = 1.0 + return best_node + + def _decay_nodes(self, touched: Set[int]) -> None: + threshold = 1e-3 + to_remove = [] + for node_id in list(self._node_usage.keys()): + if node_id in touched: + self._node_usage[node_id] = 1.0 + continue + self._node_usage[node_id] *= self.retention_factor + if self._node_usage[node_id] < threshold: + to_remove.append(node_id) + for node_id in to_remove: + self.global_graph.remove_node(node_id) + self._node_usage.pop(node_id, None) + + def _prune_redundant_edges(self) -> None: + """Limit node degree by keeping only the closest neighbors.""" + max_degree = 8 # Reduced back to 8 for cleaner graph + + # Only prune every N frames to reduce overhead + if not hasattr(self, '_prune_counter'): + self._prune_counter = 0 + self._prune_counter += 1 + + # Only run every 10 frames (less frequent) + if self._prune_counter % 10 != 0: + return + + edges_to_remove = set() # Use set to avoid duplicates + + for node in self.global_graph.nodes(): + neighbors = list(self.global_graph.neighbors(node)) + degree = len(neighbors) + + # Only prune if extremely over-connected + if degree <= max_degree: + continue + + # Use list comprehension for efficiency + neighbor_weights = [(self.global_graph[node][neighbor]["weight"], neighbor) + for neighbor in neighbors] + + neighbor_weights.sort() # Sort by distance (shortest first) + + # Keep only the max_degree closest neighbors + for weight, neighbor in neighbor_weights[max_degree:]: + edges_to_remove.add(tuple(sorted((node, neighbor)))) + + # Batch remove edges + self.global_graph.remove_edges_from(edges_to_remove) + + def _decay_boundaries(self, seen: Set[Tuple[int, int]]) -> None: + for key in list(self._boundary_probs.keys()): + if key in seen: + continue + self._boundary_probs[key] *= self.boundary_decay + if self._boundary_probs[key] < 1e-3: + del self._boundary_probs[key] + + def _quantize(self, world_xy: Tuple[float, float]) -> Tuple[int, int]: + cell = self.boundary_cell_size + return (int(math.floor(world_xy[0] / cell)), int(math.floor(world_xy[1] / cell))) + + def _cell_center(self, key: Tuple[int, int]) -> Tuple[float, float]: + cell = self.boundary_cell_size + return (key[0] * cell + cell / 2.0, key[1] * cell + cell / 2.0) + + # ------------------------------------------------------------------ + # Accessors + # ------------------------------------------------------------------ + + def get_global_graph(self) -> nx.Graph: + """Return the stitched global graph.""" + return self.global_graph + + def boundary_probabilities(self) -> Dict[Tuple[int, int], float]: + """Return the current obstacle probability map.""" + return dict(self._boundary_probs) + + def debug_visualize(self, path: str, scale: float = 100.0) -> None: + """Render a simple 2D visualization of the global graph.""" + + if len(self.global_graph) == 0: + blank = np.zeros((512, 512, 3), dtype=np.uint8) + cv2.imwrite(path, blank) + return + + xs, ys = [], [] + for _, data in self.global_graph.nodes(data=True): + world = data.get("world") + if world is None: + continue + xs.append(float(world[0])) + ys.append(float(world[1])) + if not xs or not ys: + blank = np.zeros((512, 512, 3), dtype=np.uint8) + cv2.imwrite(path, blank) + return + + min_x, max_x = min(xs), max(xs) + min_y, max_y = min(ys), max(ys) + width = max(64, int((max_x - min_x) * scale) + 64) + height = max(64, int((max_y - min_y) * scale) + 64) + canvas = np.zeros((height, width, 3), dtype=np.uint8) + + def to_px(wx: float, wy: float) -> Tuple[int, int]: + x = int((wx - min_x) * scale) + 32 + y = int((max_y - wy) * scale) + 32 + return x, y + + # Draw boundary probability shading (probabilities updated via exponential decay) + if self._boundary_probs: + overlay = canvas.copy() + for key, prob in self._boundary_probs.items(): + wx, wy = self._cell_center(key) + x, y = to_px(wx, wy) + radius = max(2, int(self.boundary_cell_size * scale * 0.8)) + clamped = max(0.0, min(1.0, prob)) + color_val = int(clamped * 255) + color = (0, color_val, 255 - color_val) + cv2.circle(overlay, (x, y), radius, color, -1) + canvas = cv2.addWeighted(overlay, 0.35, canvas, 0.65, 0) + + # Draw edges + for u, v in self.global_graph.edges(): + n1 = self.global_graph.nodes[u] + n2 = self.global_graph.nodes[v] + if "world" not in n1 or "world" not in n2: + continue + x1, y1 = to_px(float(n1["world"][0]), float(n1["world"][1])) + x2, y2 = to_px(float(n2["world"][0]), float(n2["world"][1])) + cv2.line(canvas, (x1, y1), (x2, y2), (0, 255, 255), 1) + + # Draw nodes + for _, data in self.global_graph.nodes(data=True): + world = data.get("world") + if world is None: + continue + x, y = to_px(float(world[0]), float(world[1])) + origin = str(data.get("origin", "new")) + color = (0, 0, 255) if origin == "known" else (255, 0, 0) + cv2.circle(canvas, (x, y), 3, color, -1) + + cv2.imwrite(path, canvas) diff --git a/swagger/utils.py b/swagger/utils.py index 046af52..158081c 100644 --- a/swagger/utils.py +++ b/swagger/utils.py @@ -19,8 +19,93 @@ from swagger.models import CsrGraph, Point - def pixel_to_world( + row: float, col: float, resolution: float, x_offset: float, y_offset: float, cos_rot: float, sin_rot: float, image_shape: Tuple[int, int] = None +) -> Point: + """ + Convert pixel coordinates to world coordinates with the given transform. + + Coordinate system mapping: + - In image frame: origin at top-left, x-axis points down (rows), y-axis points right (columns) + - In ROS world frame: origin at bottom-left, x-axis points right, y-axis points up + Args: + row: Row coordinate in image frame (down direction) + col: Column coordinate in image frame (right direction) + resolution: Meters per pixel in the map + x_offset: Translation in the x direction (meters) + y_offset: Translation in the y direction (meters). When using ROS coordinates, + this should include the image height in meters (image_height * resolution) + cos_rot: Cosine of the rotation angle + sin_rot: Sine of the rotation angle + + Returns: + Point object with world coordinates + """ + # Convert from image coordinates to ROS world coordinates + # In ROS: origin at bottom-left, x right, y up + # In image: origin at top-left, row down, col right + H, W = image_shape + cx_m = W / 2.0 + cy_m = H / 2.0 + + # --- interpret pixel coords (swapping row/col) --- + x_local = (col - cx_m) * resolution # row = forward + y_local = -(row - cy_m) * resolution # col = lateral + + #x_fix = y_local + #y_fix = -x_local + x_fix = x_local + y_fix = y_local + + # --- rotate 90° CW and mirror by swapping x/y + sign --- + x_rot = cos_rot * x_fix - sin_rot * y_fix + y_rot = sin_rot * x_fix + cos_rot * y_fix + + x_world = x_rot + x_offset + y_world = y_rot + y_offset + + return (x_world, y_world) + +def world_to_pixel( + point: Point, resolution: float, x_offset: float, y_offset: float, cos_rot: float, sin_rot: float, image_shape: Tuple[int, int] = None +) -> Tuple[int, int]: + """ + Inverse rigid-body transform: world → pixel coordinates. + """ + + # --- robust unpacking for both Point or tuple --- + if hasattr(point, "x") and hasattr(point, "y"): + x_world, y_world = point.x, point.y + else: + x_world, y_world = point + + H, W = image_shape + cx_m = W / 2.0 + cy_m = H / 2.0 + + final_x = x_world + final_y = y_world + + # translate into local centered frame + x_local = final_x - x_offset + y_local = final_y - y_offset + + # inverse rotate (undo 90° CW + mirror) + x_unrot = cos_rot * x_local + sin_rot * y_local + y_unrot = -sin_rot * x_local + cos_rot * y_local + + #x_fix = -y_unrot + #y_fix = x_unrot + x_fix = x_unrot + y_fix = y_unrot + + # swap back row/col semantics + col = (x_fix / resolution) + cx_m + row = -(y_fix / resolution) + cy_m + + return int(round(row)), int(round(col)) + +def pixel_to_world1( row: float, col: float, resolution: float, x_offset: float, y_offset: float, cos_rot: float, sin_rot: float ) -> Point: """ @@ -57,10 +142,10 @@ def pixel_to_world( x_world = x_rot + x_offset y_world = y_rot + y_offset - return Point(x=x_world, y=y_world, z=0.0) + #return Point(x=x_world, y=y_world, z=0.0) + return (x_world, y_world) - -def world_to_pixel( +def world_to_pixel1( point: Point, resolution: float, x_offset: float, y_offset: float, cos_rot: float, sin_rot: float ) -> Tuple[int, int]: """ @@ -82,9 +167,14 @@ def world_to_pixel( Returns: Tuple of (row, column) pixel coordinates in image frame """ + if isinstance(point, tuple): + x_w, y_w = point + else: + x_w, y_w = point.x, point.y + # Remove translation (y_offset includes image height in meters) - x = point.x - x_offset - y = point.y - y_offset + x = x_w - x_offset + y = y_w - y_offset # Remove rotation (use negative angle) x_unrot = x * cos_rot + y * sin_rot diff --git a/swagger/waypoint_graph_generator.py b/swagger/waypoint_graph_generator.py index 5b1e928..36fafca 100644 --- a/swagger/waypoint_graph_generator.py +++ b/swagger/waypoint_graph_generator.py @@ -33,6 +33,8 @@ from swagger.models import Point from swagger.utils import pixel_to_world, world_to_pixel +# Modified --- added math +import math @dataclass class Color: @@ -77,6 +79,9 @@ class WaypointGraphGeneratorConfig: use_free_space_sampling: bool = True use_delaunay_shortcuts: bool = True prune_graph: bool = True + + # Debug flag to control debug file output + debug: bool = False def __post_init__(self): """Validate configuration parameters after initialization.""" @@ -123,6 +128,12 @@ def __init__( self._node_map: np.ndarray | None = ( None # Map of nodes for quick lookup, pixel values are node indices if not NaN ) + self._known_points: list[tuple[float, float]] = [] + + def _debug_print(self, *args, **kwargs): + """Print debug messages only if debug flag is enabled.""" + if self._config.debug: + print(*args, **kwargs) @property def graph(self) -> nx.Graph: @@ -137,6 +148,7 @@ def build_graph_from_grid_map( x_offset: float = 0.0, y_offset: float = 0.0, rotation: float = 0.0, + known_points: list = None, # Modified --- added the concept of known points ) -> nx.Graph: """ Build a waypoint graph from an occupancy grid map. @@ -153,6 +165,8 @@ def build_graph_from_grid_map( Returns: NetworkX.Graph object containing the waypoint graph """ + self._image_shape = image.shape[:2] + self._logger.info("Building graph from grid map...") self._resolution = resolution self._safety_distance = safety_distance @@ -160,10 +174,14 @@ def build_graph_from_grid_map( self._occupancy_threshold = occupancy_threshold self._x_offset = x_offset + self._debug_print(f"[DEBUG internal] received x_offset={x_offset}, self._resolution={resolution}") + self._debug_print(f"[DEBUG internal] effective world offset applied? {x_offset / resolution} pixels worth") + # When using ROS coordinates, we need to add the image height to the y_offset # This shifts the origin from top-left to bottom-left image_height = image.shape[0] - self._y_offset = y_offset + (image_height * resolution) # Add image height in meters + #self._y_offset = y_offset + (image_height * resolution) # Add image height in meters + self._y_offset = y_offset self._cos_rot = np.cos(rotation) self._sin_rot = np.sin(rotation) @@ -171,6 +189,8 @@ def build_graph_from_grid_map( # Check if map is completely free before performing distance transform free_map = (self._original_map > self._occupancy_threshold).astype(np.uint8) + self._debug_save_stage("01_free_map", free_map, color=True) + if np.all(free_map): # Map is completely free (all values > threshold), create grid graph directly self._graph = self._create_grid_graph( @@ -181,44 +201,447 @@ def build_graph_from_grid_map( # Map has obstacles, perform distance transform self._distance_transform(free_map) + self._debug_save_stage("02_inflated_map", self._inflated_map) + + # Modified: mask known points before skeletonization + #if not hasattr(self, "_visited_mask") or self._visited_mask is None: + # Initialize the persistent visited-mask once (same size as map) + #self._visited_mask = np.zeros_like(self._inflated_map, dtype=np.uint8) + + self._visited_mask = np.zeros_like(self._inflated_map, dtype=np.uint8) + + inflated_for_skeleton = self._inflated_map.copy() + self._debug_save_stage("03a1_inflated_for_skeleton", inflated_for_skeleton) + known_points_px: list[tuple[int, int]] = [] + visible_known_world: list[tuple[float, float]] = [] + pending_known_nodes: list[tuple[int, int, tuple[float, float, float]]] = [] + if not known_points: + known_points = list(getattr(self, "_known_points", [])) + + self._debug_print(f"[DEBUG mask] known pixels: {len(known_points) if known_points else 0}") + if known_points: + for (x, y) in known_points: + pix = self._world_to_pixel(Point(x=x, y=y, z=0.0)) + if pix is None: + continue + row, col = pix + if 0 <= row < inflated_for_skeleton.shape[0] and 0 <= col < inflated_for_skeleton.shape[1]: + if not self._is_free_pixel(row, col): + self._debug_print(f"[DEBUG] Skipping known point {(x, y)} — falls in occupied space.") + continue + known_points_px.append((row, col)) + visible_known_world.append((x, y)) + world = self._pixel_to_world(row, col) + world_tuple = self._point_to_tuple(world) + pending_known_nodes.append((row, col, world_tuple)) + else: + self._debug_print(f"[DEBUG] Skipping known point {(x, y)} — outside current frame bounds.") + + self._debug_print(f"[DEBUG] Converted {len(known_points_px)} known points inside frame.") + + inflated_tmp = inflated_for_skeleton.copy() + + graph = nx.Graph() + + # ------------------------------------------------------------ # Build graph from the skeleton of the map if self._config.use_skeleton_graph: - graph = self._build_graph_from_skeleton( + skeleton_graph = self._build_graph_from_skeleton( skeleton_sample_distance=self._to_pixels_int(self._config.skeleton_sample_distance) ) else: graph = nx.Graph() + for row, col, world_tuple in pending_known_nodes: + if (row, col) not in graph: + graph.add_node( + (row, col), + node_type="known", + pixel=(row, col), + world=world_tuple, + origin="known", + ) + # Sample nodes along obstacle boundaries if self._config.use_boundary_sampling: self._sample_obstacle_boundaries( graph, sample_distance=self._to_pixels_int(self._config.boundary_sample_distance) ) - # Sample nodes in free space + new_points_px = [] + # Modified --- sampling known points, else sample points in free space if self._config.use_free_space_sampling: - self._sample_free_space( - graph, distance_threshold=self._to_pixels_int(self._config.free_space_sampling_threshold) - ) + if known_points is not None and len(known_points) > 0: + # Convert known world points → pixel coordinates + known_points_px = [ + self._world_to_pixel(Point(x=x, y=y, z=0.0)) + for (x, y) in known_points + ] + + # Filter out any None or malformed points (outside current map) + known_points_px = [ + p for p in known_points_px + if p is not None and len(p) == 2 + ] + + if len(known_points_px) == 0: + self._logger.warning( + "[WARN] All known points were out of frame — skipping incremental sampling." + ) + new_points_px = [] + else: + new_points_px = self._sample_free_space_incremental( + graph, + visible_known_world, + distance_threshold=self._to_pixels_int( + self._config.free_space_sampling_threshold + ) + ) + else: + self._debug_print(f"[DEBUG] Entered sample_free_space") + new_points_px = self._sample_free_space( + graph, + distance_threshold=self._to_pixels_int( + self._config.free_space_sampling_threshold + ), + ) + + # Remove any stray samples that landed on inflated cells before visualization + self._remove_invalid_free_nodes(graph) + new_points_px = [pt for pt in new_points_px if self._is_free_pixel(pt[0], pt[1])] + known_points_px = [pt for pt in known_points_px if self._is_free_pixel(pt[0], pt[1])] + + self._debug_print(f"[DEBUG] Frame offsets: x_off={x_offset:.2f}, y_off={y_offset:.2f}") + self._debug_print(f"[DEBUG] Known pts (world→pixel): {len(known_points_px)} visible this frame") + self._debug_print(f"[DEBUG] New pts: {len(new_points_px)} total") + + if True: + free_img = (inflated_for_skeleton * 255).astype(np.uint8) + free_img = cv2.cvtColor(free_img, cv2.COLOR_GRAY2BGR) + + # color-code: + # known points → magenta + # free-space samples → red + # skeleton pixels (if any) → green overlay + if hasattr(self, "_last_skeleton") and self._last_skeleton is not None: + skel_vis = (self._last_skeleton > 0).astype(np.uint8) + free_img[skel_vis == 1] = (0, 255, 0) + + # optionally overlay known points in magenta (same scale) + if known_points_px is not None and len(known_points_px) > 0: + for (y, x) in known_points_px: + if 0 <= y < free_img.shape[0] and 0 <= x < free_img.shape[1]: + cv2.circle(free_img, (int(x), int(y)), 2, (255, 0, 255), -1) + + # overlay persisted known nodes (magenta) --- + for node, data in graph.nodes(data=True): + ntype = str(data.get("node_type", "")).lower() + if "known" in ntype: + pix = data.get("pixel") + if pix is None and isinstance(node, tuple) and len(node) == 2: + pix = node + if pix is None: + continue + y, x = pix + if 0 <= y < free_img.shape[0] and 0 <= x < free_img.shape[1]: + cv2.circle(free_img, (int(x), int(y)), 2, (255, 0, 255), -1) + + # overlay boundary nodes (cyan) --- + for node, data in graph.nodes(data=True): + ntype = str(data.get("node_type", "")).lower() + if "boundary" in ntype: + y, x = node + if 0 <= y < free_img.shape[0] and 0 <= x < free_img.shape[1]: + cv2.circle(free_img, (int(x), int(y)), 2, (255, 255, 0), -1) + + self._debug_save_stage("05a_free_space_samples", free_img, color=True) + + # visualize new free-space points + self._debug_print(f"[DEBUG free-space] total new nodes in graph after sampling: {len(new_points_px)}") + + if new_points_px is not None and len(new_points_px) > 0: + free_img = (inflated_for_skeleton * 255).astype(np.uint8) + free_img = cv2.cvtColor(free_img, cv2.COLOR_GRAY2BGR) + + # color-code: + # known points → magenta + # free-space samples → red + # skeleton pixels (if any) → green overlay + if hasattr(self, "_last_skeleton") and self._last_skeleton is not None: + skel_vis = (self._last_skeleton > 0).astype(np.uint8) + free_img[skel_vis == 1] = (0, 255, 0) + + # draw free-space sample points as red dots + for (y, x) in new_points_px: + if 0 <= y < free_img.shape[0] and 0 <= x < free_img.shape[1]: + cv2.circle(free_img, (int(x), int(y)), 2, (0, 0, 255), -1) + + # optionally overlay known points in magenta (same scale) + if known_points_px is not None and len(known_points_px) > 0: + for (y, x) in known_points_px: + if 0 <= y < free_img.shape[0] and 0 <= x < free_img.shape[1]: + cv2.circle(free_img, (int(x), int(y)), 2, (255, 0, 255), -1) + + # overlay persisted known nodes (magenta) --- + for node, data in graph.nodes(data=True): + ntype = str(data.get("node_type", "")).lower() + if "known" in ntype: + pix = data.get("pixel") + if pix is None and isinstance(node, tuple) and len(node) == 2: + pix = node + if pix is None: + continue + y, x = pix + if 0 <= y < free_img.shape[0] and 0 <= x < free_img.shape[1]: + cv2.circle(free_img, (int(x), int(y)), 2, (255, 0, 255), -1) + + # overlay boundary nodes (cyan) --- + for node, data in graph.nodes(data=True): + ntype = str(data.get("node_type", "")).lower() + if "boundary" in ntype: + y, x = node + if 0 <= y < free_img.shape[0] and 0 <= x < free_img.shape[1]: + cv2.circle(free_img, (int(x), int(y)), 2, (255, 255, 0), -1) + + self._debug_save_stage("05_free_space_samples", free_img, color=True) + self._debug_print(f"[DEBUG] Saved {len(new_points_px)} free-space samples to debug_stages/05_free_space_samples.png") + else: + self._debug_print("[DEBUG] No free-space samples to visualize.") + + boundary_nodes = sum(1 for _, d in graph.nodes(data=True) if d.get("node_type") == "boundary") + free_nodes = sum( + 1 + for _, d in graph.nodes(data=True) + if d.get("node_type") in {"free_space", "known"} + ) + self._debug_print(f"[DEBUG] Node breakdown — boundary: {boundary_nodes}, free-space/known: {free_nodes}") + + # Remove skeleton nodes before running Delaunay + skeleton_nodes = [ + n for n, data in graph.nodes(data=True) + if str(data.get("node_type", "")).lower() == "skeleton" + ] + if skeleton_nodes: + graph.remove_nodes_from(skeleton_nodes) + self._logger.info(f"Removed {len(skeleton_nodes)} skeleton nodes before Delaunay.") + + # Lightly stitch inherited nodes to newly sampled ones before shortcuts + self._connect_free_nodes(graph) + + # Drop any samples that landed on inflated obstacles/edges + self._remove_invalid_free_nodes(graph) # Add shortcuts if self._config.use_delaunay_shortcuts: + + #edges_px = self.project_edges_to_current_frame(global_graph, inflated_for_skeleton.shape, x_offset, y_offset, rotation) + #for (p1, p2) in edges_px: + # cv2.line(inflated_for_skeleton, (p1[1], p1[0]), (p2[1], p2[0]), color=255, thickness=1) + + for node in list(graph.nodes()): + data = graph.nodes[node] + if "pixel" not in data: + data["pixel"] = node # assume (row, col) + if "world" not in data: + y, x = node + x_w, y_w = self._pixel_to_world(y, x) + data["world"] = (x_w, y_w) + self._add_delaunay_shortcuts(graph) + final_debug = cv2.cvtColor((self._inflated_map * 255).astype(np.uint8), cv2.COLOR_GRAY2BGR) + for (n1, n2, data) in graph.edges(data=True): + if "pixel" not in graph.nodes[n1] or "pixel" not in graph.nodes[n2]: + continue + y1, x1 = map(int, graph.nodes[n1]["pixel"]) + y2, x2 = map(int, graph.nodes[n2]["pixel"]) + + node1_type = str(graph.nodes[n1].get("node_type", "")).lower() + node2_type = str(graph.nodes[n2].get("node_type", "")).lower() + origin1 = graph.nodes[n1].get("origin") + origin2 = graph.nodes[n2].get("origin") + + if "boundary" in (node1_type, node2_type): + color = (255, 255, 0) # cyan for boundary edges + elif origin1 == origin2 == "known": + color = (255, 0, 255) # magenta for known↔known + else: + color = (0, 0, 255) # red for edges touching new nodes + + cv2.line(final_debug, (x1, y1), (x2, y2), color, 1) + + self._debug_save_stage("06_final_graph", final_debug, color=True) + self._debug_print(f"[DEBUG] Delaunay shortcut edges added: {len(graph.edges)} total") + + # Add world coordinates to nodes + #graph = self._to_world_coordinates(graph) + self._debug_transform_vectors() + + # Modified --- ensures every node has "world" + for _, data in graph.nodes(data=True): + if "pixel" not in data and "world" in data: + x_w, y_w = data["world"] + y_p, x_p = self._world_to_pixel(Point(x=x_w, y=y_w, z=0)) + data["pixel"] = (y_p, x_p) + elif "world" not in data and "pixel" in data: + y_p, x_p = data["pixel"] + x_w, y_w = self._pixel_to_world(y_p, x_p) + data["world"] = (x_w, y_w) + + if "pos" not in data and "pixel" in data: + data["pos"] = data["pixel"] + if "node_type" not in data: + data["node_type"] = "free_space" + + if len(graph.nodes) == 0: + self._logger.warning("[WARN] Graph has no nodes after world/pixel consistency check.") + else: + self._logger.info(f"[INFO] Graph consistency OK: {len(graph.nodes)} nodes.") + + # Modified --- update known points (added here for iterative use) + reusable_types = {"free_space", "known"} + raw_points = [ + (float(n["world"][0]), float(n["world"][1])) + for _, n in graph.nodes(data=True) + if "world" in n and str(n.get("node_type", "")).lower() in reusable_types + ] + + def _downsample(points: list[tuple[float, float]], spacing: float) -> list[tuple[float, float]]: + grid: dict[tuple[int, int], tuple[float, float]] = {} + for x, y in points: + key = (int(math.floor(x / spacing)), int(math.floor(y / spacing))) + if key not in grid: + grid[key] = (x, y) + if len(grid) < len(points) * 0.9 and spacing < self._config.free_space_sampling_threshold: + return _downsample(list(grid.values()), spacing * 1.05) + return list(grid.values()) + + spacing = self._config.free_space_sampling_threshold * 0.8 + self._known_points = _downsample(raw_points, spacing) # Prune unnecessary nodes and edges from the graph if self._config.prune_graph: self._prune_graph(graph) - # Add world coordinates to nodes - graph = self._to_world_coordinates(graph) - # Store and convert the graph to CSR format self._graph = graph self._logger.info(f"Final graph has {len(graph.nodes)} nodes and {len(graph.edges)} unique edges") + + # --- Clean up malformed pixel entries before building nearest node map --- + fixed_count = 0 + removed_count = 0 + + for node, data in self._graph.nodes(data=True): + pix = data.get("pixel") + if pix is None: + if isinstance(node, tuple) and len(node) == 2: + data["pixel"] = node + fixed_count += 1 + continue + + pix_arr = np.array(pix).flatten() + + # Validate shape + if pix_arr.shape[0] < 2: + self._debug_print(f"[WARN] Dropping malformed pixel for node {node}: {pix}") + data.pop("pixel", None) + removed_count += 1 + continue + + # Assign clean 2D int tuple + y, x = map(int, pix_arr[:2]) + data["pixel"] = (y, x) + fixed_count += 1 + + self._debug_print(f"[DEBUG] Normalized {fixed_count} pixels, removed {removed_count} malformed entries.") + # ------------------------------------------------------------------------- + self._build_nearest_node_map(self._graph) return self._graph + def project_edges_to_current_frame(self, graph: nx.Graph, inflated_map_shape, x_offset, y_offset, rotation=0.0): + """ + Convert all world-space edges to current local pixel frame for visualization or masking. + """ + edges_px = [] + + for n1, n2, data in graph.edges(data=True): + w1 = graph.nodes[n1].get("world") + w2 = graph.nodes[n2].get("world") + if w1 is None or w2 is None: + continue + + # Build temporary Point objects from stored world coords + p1_world = Point(x=w1[0], y=w1[1], z=0.0) + p2_world = Point(x=w2[0], y=w2[1], z=0.0) + + # Temporarily override offsets/rotation for the current local frame + prev_off_x, prev_off_y = self._x_offset, self._y_offset + prev_cos, prev_sin = self._cos_rot, self._sin_rot + + self._x_offset = x_offset + self._y_offset = y_offset + self._cos_rot = np.cos(rotation) + self._sin_rot = np.sin(rotation) + + try: + pix1 = self._world_to_pixel(p1_world) + pix2 = self._world_to_pixel(p2_world) + finally: + # restore transform parameters + self._x_offset = prev_off_x + self._y_offset = prev_off_y + self._cos_rot = prev_cos + self._sin_rot = prev_sin + + if pix1 is None or pix2 is None: + continue + + row1, col1 = pix1 + row2, col2 = pix2 + + # check bounds + h, w = inflated_map_shape + if not (0 <= row1 < h and 0 <= col1 < w and 0 <= row2 < h and 0 <= col2 < w): + continue + + edges_px.append(((row1, col1), (row2, col2))) + + return edges_px + + def _debug_save_stage(self, name: str, image: np.ndarray, color: bool = False, graph=None): + """Save intermediate debug images with consistent naming.""" + # Only save if debug flag is enabled + if not self._config.debug: + return + + import os + os.makedirs("debug_stages", exist_ok=True) + path = f"debug_stages/{name}.png"# Add shortcuts + + if image is None: + self._logger.warning(f"[WARN] No image provided for {name}") + return + + # Convert to color if needed for node overlays + img = image.copy() + if len(img.shape) == 2: + img = cv2.cvtColor((img * 255).astype(np.uint8), cv2.COLOR_GRAY2BGR) + else: + img = img.copy() + + # Optional: overlay graph nodes if provided + if graph is not None: + for node in graph.nodes(): + try: + y, x = map(int, node) + cv2.circle(img, (x, y), 2, (0, 0, 255), -1) + except Exception: + continue + + cv2.imwrite(path, img) + self._debug_print(f"[DEBUG] Saved {path}") + def _astar_heuristic(self, n1, n2): """A* heuristic for the graph.""" world_n1 = self._graph.nodes[n1]["world"] @@ -339,17 +762,32 @@ def _to_world_coordinates(self, graph: nx.Graph) -> nx.Graph: pixel_to_id_lookup = {} # Convert each node to world coordinates and store mapping for i, (node, data) in enumerate(graph.nodes(data=True)): - row, col = node - world_point = self._pixel_to_world(row, col) - world_point_array = tuple((world_point.x, world_point.y, world_point.z)) + # Modified: handles nodes as either tuple (row, col) or just int + if isinstance(node, tuple): + row, col = node + else: + row, col = data["pixel"] + + x_w, y_w = self._pixel_to_world(row, col) + world_point_array = (x_w, y_w, 0.0) + #world_point_array = tuple((world_point.x, world_point.y, world_point.z)) pixel = (int(row), int(col)) - world_graph.add_node(i, world=world_point_array, pixel=pixel, **data) + + # Modified: fix for duplicated 'pixel' + data_copy = data.copy() + data_copy.pop("pixel", None) + data_copy.pop("world", None) + data_copy["pixel"] = pixel + data_copy["world"] = world_point_array + world_graph.add_node(i, **data_copy) pixel_to_id_lookup[node] = i # Convert edge weights from pixels to meters and copy edges for src, dst, data in graph.edges(data=True): - data["weight"] = float(data["weight"]) * self._resolution - world_graph.add_edge(pixel_to_id_lookup[src], pixel_to_id_lookup[dst], **data) + edge_data = data.copy() + if "weight" in edge_data: + edge_data["weight"] = float(edge_data["weight"]) * self._resolution + world_graph.add_edge(pixel_to_id_lookup[src], pixel_to_id_lookup[dst], **edge_data) return world_graph @@ -357,12 +795,12 @@ def _build_graph_from_skeleton(self, skeleton_sample_distance: int) -> np.ndarra """Generate a graph from the skeleton of the inflated map.""" self._logger.info("Generating skeleton...") try: - skeleton_image = thin((1 - self._inflated_map).view(cp.uint8)) + skeleton_image = thin((1 - inflated_map).view(cp.uint8)) except RuntimeError as e: from skimage.morphology import skeletonize self._logger.warning(f"Failed to generate skeleton with cucim: {e}. Falling back to skimage.") - skeleton_image = skeletonize(1 - self._inflated_map) + skeleton_image = skeletonize(1 - inflated_map) if skeleton_image.sum() == 0: self._logger.warning("No skeleton found in map") @@ -390,6 +828,16 @@ def _build_graph_from_skeleton(self, skeleton_sample_distance: int) -> np.ndarra dist = np.linalg.norm(end - start) graph.add_edge((start[0], start[1]), (end[0], end[1]), weight=dist, edge_type="skeleton") self._logger.info(f"Created skeleton graph with {len(graph.nodes)} nodes and {len(graph.edges)} edges") + + for node in graph.nodes(): + data = graph.nodes[node] + data["node_type"] = "skeleton" + if isinstance(node, tuple) and len(node) == 2: + data.setdefault("pixel", (int(node[0]), int(node[1]))) + if "pixel" in data: + row, col = data["pixel"] + data["world"] = self._pixel_to_world(row, col) + return graph def _check_line_collision(self, p0, p1): @@ -403,9 +851,29 @@ def _check_line_collision(self, p0, p1): Returns: True if there is a collision, False otherwise """ + + # --- NEW: handle world-coordinate inputs --- + def ensure_pixel(pt): + # If already pixel-like (ints), keep them + if all(isinstance(v, (int, np.integer)) for v in pt): + return pt + # Otherwise convert from world → pixel + if isinstance(pt, np.ndarray): + x, y = pt + else: + x, y = pt[0], pt[1] + row, col = self._world_to_pixel(Point(x=float(x), y=float(y), z=0.0)) + return (int(row), int(col)) + + p0_px = ensure_pixel(p0) + p1_px = ensure_pixel(p1) + y0, x0 = p0_px + y1, x1 = p1_px + # --- rest of your collision logic below --- + # Bresenham's line algorithm to get the pixels the line passes through - y0, x0 = p0 - y1, x1 = p1 + #y0, x0 = p0 + #y1, x1 = p1 dx = abs(x1 - x0) dy = abs(y1 - y0) sx = 1 if x0 < x1 else -1 @@ -427,6 +895,22 @@ def _check_line_collision(self, p0, p1): return False # No collision detected + def _free_mask_from_map(self, occupancy: np.ndarray) -> np.ndarray: + """Return binary mask of free space (1 = free).""" + if occupancy is None: + raise RuntimeError("Occupancy map is not initialized.") + if occupancy.max() <= 1: + return (occupancy == 0).astype(np.uint8) + return (occupancy > self._occupancy_threshold).astype(np.uint8) + + def _is_free_pixel(self, row: int, col: int) -> bool: + if self._inflated_map is None: + return True + h, w = self._inflated_map.shape + if not (0 <= row < h and 0 <= col < w): + return False + return self._inflated_map[row, col] == 0 + def _sample_free_space(self, graph, distance_threshold: float): """ Iteratively sample free space in a map by identifying and adding nodes at local maxima @@ -436,19 +920,30 @@ def _sample_free_space(self, graph, distance_threshold: float): graph: The graph to which new nodes will be added. distance_threshold: The threshold to identify areas with large distances. """ - # Initialize distance map with a large value - distance_map = np.full(self._original_map.shape, np.inf, dtype=np.float64) - - # Set occupied pixels to 0 - distance_map[self._original_map <= self._occupancy_threshold] = 0 - - # Set node pixels to 0 - for node in graph.nodes(): - row, col = node - distance_map[row, col] = 0 - - # Initialize R-tree index - idx = index.Index() + new_points = [] + base_map = self._inflated_map if self._inflated_map is not None else self._original_map + free_mask = self._free_mask_from_map(base_map) + + distance_map = np.full(free_mask.shape, np.inf, dtype=np.float64) + distance_map[free_mask == 0] = 0 + + def _node_pixel(node, data): + pixel = data.get("pixel") + if pixel is None and isinstance(node, tuple) and len(node) == 2: + pixel = node + if pixel is None: + return None + row, col = int(pixel[0]), int(pixel[1]) + if 0 <= row < distance_map.shape[0] and 0 <= col < distance_map.shape[1]: + return row, col + return None + + for node, data in graph.nodes(data=True): + if str(data.get("node_type", "")).lower() == "skeleton": + continue + pix = _node_pixel(node, data) + if pix is not None: + distance_map[pix[0], pix[1]] = 0 # Initialize kernel for dilation kernel = np.ones((3, 3), np.uint8) @@ -459,6 +954,16 @@ def _sample_free_space(self, graph, distance_threshold: float): (distance_map > 0).astype(np.uint8), cv2.DIST_L2, cv2.DIST_MASK_PRECISE ) + # --- DEBUG INSERTION --- + if self._config.debug: + self.last_distance_map = distance_map.copy() + cv2.imwrite( + "debug_distance_map_free_space.png", + cv2.normalize(distance_map, None, 0, 255, cv2.NORM_MINMAX), + ) + self._debug_print("[DEBUG free_space] distance_map max (px):", float(distance_map.max())) + # ----------------------- + # Identify large distance areas large_distance_areas = (distance_map > distance_threshold).astype(np.uint8) @@ -471,18 +976,235 @@ def _sample_free_space(self, graph, distance_threshold: float): # Find local maxima local_maxima_mask = (distance_map == dilated) & (large_distance_areas > 0) local_maxima_coords = np.column_stack(np.where(local_maxima_mask)) + self._debug_print(f"[DEBUG sampler] found {len(local_maxima_coords)} local maxima") + + idx = index.Index() + idx_counter = 0 + for node, data in graph.nodes(data=True): + if str(data.get("node_type", "")).lower() == "skeleton": + continue + pix = _node_pixel(node, data) + if pix is None: + continue + row, col = pix + idx.insert(idx_counter, (col, row, col, row)) + idx_counter += 1 - # Add local maxima as nodes in the graph - for coord in local_maxima_coords: - row, col = coord - # Check if there are any nodes within distance_threshold/2 + added = False + for row, col in local_maxima_coords: + if free_mask[row, col] == 0: + continue half_threshold = distance_threshold / 2 bounding_box = (col - half_threshold, row - half_threshold, col + half_threshold, row + half_threshold) - intersections = list(idx.intersection(bounding_box)) - if len(intersections) == 0: - graph.add_node((row, col)) - idx.insert(len(graph.nodes) - 1, (col, row, col, row)) # Insert node into R-tree - distance_map[row, col] = 0 + if list(idx.intersection(bounding_box)): + continue + graph.add_node((row, col), node_type="free_space", pixel=(row, col), world=self._pixel_to_world(row, col)) + idx.insert(idx_counter, (col, row, col, row)) + idx_counter += 1 + distance_map[row, col] = 0 + new_points.append((row, col)) + added = True + + self._debug_print(f"[DEBUG sampler] distance_map max: {distance_map.max():.2f}, threshold(px): {distance_threshold}") + + if not added: + break + + return new_points + + # Moddified --- added a section to account for known points + def _sample_free_space_incremental(self, graph, known_points, distance_threshold): + """ + Incrementally sample free-space areas by treating known points as existing samples, + not as obstacles. This mirrors _sample_free_space() exactly, except it seeds the + distance map with known points. + """ + + # --- Safety mask of free space --- + if not hasattr(self, "_safe_mask") or self._safe_mask is None: + self._safe_mask = (self._original_map > self._occupancy_threshold).astype(np.uint8) + safe_mask = self._safe_mask + H, W = safe_mask.shape + free_mask = safe_mask.copy() + + distance_map = np.full((H, W), np.inf, dtype=np.float64) + distance_map[free_mask == 0] = 0 + + def _node_pixel(node, data): + pixel = data.get("pixel") + if pixel is None and isinstance(node, tuple) and len(node) == 2: + pixel = node + if pixel is None: + return None + row, col = int(pixel[0]), int(pixel[1]) + if 0 <= row < H and 0 <= col < W: + return row, col + return None + + known_pixels = [] + for (x, y) in known_points: + pix = self._world_to_pixel(Point(x=x, y=y, z=0.0)) + if pix is None: + continue + row, col = pix + if 0 <= row < H and 0 <= col < W and free_mask[row, col] == 1: + known_pixels.append((row, col)) + distance_map[row, col] = 0 + + for node, data in graph.nodes(data=True): + if str(data.get("node_type", "")).lower() == "skeleton": + continue + pix = _node_pixel(node, data) + if pix is not None: + distance_map[pix[0], pix[1]] = 0 + + kernel = np.ones((3, 3), np.uint8) + new_points = [] + + while True: + mask = free_mask.copy() + for row, col in known_pixels: + mask[row, col] = 0 + for node, data in graph.nodes(data=True): + if str(data.get("node_type", "")).lower() == "skeleton": + continue + pix = _node_pixel(node, data) + if pix is not None: + mask[pix[0], pix[1]] = 0 + + distance_map = cv2.distanceTransform(mask, cv2.DIST_L2, cv2.DIST_MASK_PRECISE) + + large_distance_areas = (distance_map > distance_threshold).astype(np.uint8) + if not np.any(large_distance_areas): + self._logger.info("[INFO] No new free-space regions above threshold.") + break + + dilated = cv2.dilate(distance_map, kernel) + local_maxima_mask = (distance_map == dilated) & (large_distance_areas > 0) + local_maxima_coords = np.column_stack(np.where(local_maxima_mask)) + + idx = index.Index() + idx_counter = 0 + for node, data in graph.nodes(data=True): + if str(data.get("node_type", "")).lower() == "skeleton": + continue + pix = _node_pixel(node, data) + if pix is None: + continue + row, col = pix + idx.insert(idx_counter, (col, row, col, row)) + idx_counter += 1 + + added = False + for row, col in local_maxima_coords: + if free_mask[row, col] == 0: + continue + + half_threshold = distance_threshold / 2 + bounding_box = (col - half_threshold, row - half_threshold, col + half_threshold, row + half_threshold) + if list(idx.intersection(bounding_box)): + continue + + graph.add_node((row, col), pixel=(row, col), node_type="free_space", world=self._pixel_to_world(row, col)) + idx.insert(idx_counter, (col, row, col, row)) + idx_counter += 1 + new_points.append((row, col)) + distance_map[row, col] = 0 + added = True + + if not added: + break + + return new_points + + def _remove_skeleton_nodes(self, graph: nx.Graph) -> None: + """Remove skeleton nodes so they do not affect downstream shortcuts.""" + nodes_to_remove = [ + n for n, data in graph.nodes(data=True) + if str(data.get("node_type", "")).lower() == "skeleton" + ] + if nodes_to_remove: + graph.remove_nodes_from(nodes_to_remove) + self._logger.info(f"Removed {len(nodes_to_remove)} skeleton nodes before Delaunay.") + + def _connect_free_nodes(self, graph: nx.Graph) -> None: + """Connect nearby free-space/boundary nodes to reduce seams.""" + candidates: list = [] + coords: list[tuple[float, float]] = [] + + for node, data in graph.nodes(data=True): + node_type = str(data.get("node_type", "")).lower() + if node_type not in {"free_space", "boundary", "known"}: + continue + + world = data.get("world") + if world is None: + pix = data.get("pixel") + if pix is None: + continue + world = self._point_to_tuple(self._pixel_to_world(pix[0], pix[1])) + data["world"] = world + + world_tuple = self._point_to_tuple(world) + candidates.append(node) + coords.append((float(world_tuple[0]), float(world_tuple[1]))) + + if len(coords) < 2: + return + + try: + tree = cKDTree(coords) + except Exception: + return + + radius = max(self._config.free_space_sampling_threshold * 1.5, self._config.merge_node_distance * 2.0) + pairs = tree.query_pairs(r=radius) + + for i, j in pairs: + n1 = candidates[i] + n2 = candidates[j] + if n1 == n2: + continue + + w1 = np.asarray(self._point_to_tuple(graph.nodes[n1]["world"])[:2], dtype=float) + w2 = np.asarray(self._point_to_tuple(graph.nodes[n2]["world"])[:2], dtype=float) + dist = float(np.linalg.norm(w2 - w1)) + if not np.isfinite(dist) or dist <= 0.0: + continue + + existing = graph.get_edge_data(n1, n2) + if existing: + current = existing.get("weight") + if current is not None and current <= dist: + continue + existing["weight"] = dist + existing["edge_type"] = existing.get("edge_type", "known") + else: + graph.add_edge(n1, n2, weight=dist, edge_type="known") + + def _remove_invalid_free_nodes(self, graph: nx.Graph) -> None: + if self._inflated_map is None: + return + + height, width = self._inflated_map.shape + to_remove: list = [] + for node, data in graph.nodes(data=True): + node_type = str(data.get("node_type", "")).lower() + if node_type not in {"free_space", "known"}: + continue + + pix = data.get("pixel") + if pix is None: + continue + row, col = int(pix[0]), int(pix[1]) + if not (0 <= row < height and 0 <= col < width): + to_remove.append(node) + continue + if self._inflated_map[row, col] != 0: + to_remove.append(node) + + if to_remove: + graph.remove_nodes_from(to_remove) def _add_delaunay_shortcuts(self, graph: nx.Graph): """ @@ -491,34 +1213,92 @@ def _add_delaunay_shortcuts(self, graph: nx.Graph): Args: graph: NetworkX graph to add shortcuts to """ - self._logger.info("Adding Delaunay shortcuts...") - nodes = list(graph.nodes()) - if len(nodes) < 3: # Need at least 3 points for triangulation + + self._logger.info("Adding Delaunay shortcuts (world-frame)...") + + node_worlds = [] + idx_to_node = [] + + # Collect world positions for all nodes + for n, data in graph.nodes(data=True): + world = data.get("world") + if world is None: + pix = data.get("pixel") + if pix is not None: + y, x = pix + point = self._pixel_to_world(y, x) + world = self._point_to_tuple(point) + data["world"] = world + + if world is None: + continue + + world = self._point_to_tuple(world) + if np.isfinite(world[0]) and np.isfinite(world[1]): + node_worlds.append((world[0], world[1])) + idx_to_node.append(n) + + if len(node_worlds) < 3: + self._logger.warning("[WARN] Not enough nodes for Delaunay triangulation — skipping.") return - # Convert nodes to numpy array and compute triangulation - node_coords = np.array(nodes) + node_coords = np.array(node_worlds, dtype=float) + try: tri = Delaunay(node_coords) except Exception as e: self._logger.error(f"Skipping Delaunay triangulation: {e}") return - # Collect edges for batch processing edge_candidates = set() for simplex in tri.simplices: for i in range(3): - n1, n2 = tuple(sorted([nodes[simplex[i]], nodes[simplex[(i + 1) % 3]]])) - if not graph.has_edge(n1, n2): - edge_candidates.add((n1, n2)) + a = idx_to_node[simplex[i]] + b = idx_to_node[simplex[(i + 1) % 3]] + edge_candidates.add(tuple(sorted((a, b)))) + + # Add edges if free of collision + added = 0 + max_delaunay_distance = max( + self._config.free_space_sampling_threshold * 1.5, # Reduced from 2.0 + self._config.merge_node_distance * 2.5, # Reduced from 3.0 + ) - if not edge_candidates: - return - # Add valid edges for n1, n2 in edge_candidates: - if not self._check_line_collision(n1, n2): - dist = np.sqrt((n2[0] - n1[0]) ** 2 + (n2[1] - n1[1]) ** 2) - graph.add_edge(n1, n2, weight=dist, edge_type="delaunay") + try: + w1 = np.asarray(self._point_to_tuple(graph.nodes[n1]["world"])[:2], dtype=float) + w2 = np.asarray(self._point_to_tuple(graph.nodes[n2]["world"])[:2], dtype=float) + dist = float(np.linalg.norm(w2 - w1)) + if not np.isfinite(dist) or dist <= 0.0 or dist > max_delaunay_distance: + continue + + # For collision check, project back into pixel space of this map + p1 = graph.nodes[n1].get("pixel") + p2 = graph.nodes[n2].get("pixel") + if p1 is None: + p1 = self._world_to_pixel(Point(x=w1[0], y=w1[1], z=0)) + if p2 is None: + p2 = self._world_to_pixel(Point(x=w2[0], y=w2[1], z=0)) + if p1 is None or p2 is None: + continue + + if not self._check_line_collision(p1, p2): + existing = graph.get_edge_data(n1, n2) + if existing: + current = existing.get("weight") + if current is not None and current <= dist: + continue + existing["weight"] = dist + existing["edge_type"] = "delaunay" + added += 1 + else: + graph.add_edge(n1, n2, weight=dist, edge_type="delaunay") + added += 1 + except Exception: + continue + + self._logger.info(f"[INFO] Delaunay shortcuts added: {added}") + def _sample_obstacle_boundaries(self, graph: nx.Graph, sample_distance: float = 50) -> list[tuple[int, int]]: """ @@ -529,13 +1309,14 @@ def _sample_obstacle_boundaries(self, graph: nx.Graph, sample_distance: float = sample_distance: Distance between samples along contour (in pixels). Defaults to 50 (pixels). """ + # Find contours of inflated obstacles contours = self._find_obstacle_contours( self._config.boundary_inflation_factor * self._safety_distance / self._resolution ) initial_num_nodes = len(graph.nodes()) - + # Process each contour for contour in contours: contour_nodes = [] @@ -548,7 +1329,12 @@ def _sample_obstacle_boundaries(self, graph: nx.Graph, sample_distance: float = # Convert first point to y,x format row_1, col_1 = int(p1[1]), int(p1[0]) contour_nodes.append((row_1, col_1)) - graph.add_node((row_1, col_1), node_type="boundary") + graph.add_node( + (row_1, col_1), + node_type="boundary", + pixel=(row_1, col_1), + world=self._pixel_to_world(row_1, col_1), + ) # Calculate distance to next vertex segment_length = np.linalg.norm(p2 - p1) @@ -559,24 +1345,27 @@ def _sample_obstacle_boundaries(self, graph: nx.Graph, sample_distance: float = for point in intermediate_points: # Interpolate point col, row = point - - # Add interpolated point contour_nodes.append((row, col)) - graph.add_node((row, col), node_type="boundary") + graph.add_node( + (row, col), + node_type="boundary", + pixel=(row, col), + world=self._pixel_to_world(row, col), + ) - # Connect consecutive nodes along the contour - self._connect_contour_nodes(contour_nodes, graph) + # Connect consecutive nodes along this contour + if len(contour_nodes) > 1: + self._connect_contour_nodes(contour_nodes, graph) num_nodes_added = len(graph.nodes()) - initial_num_nodes self._logger.info(f"Added {num_nodes_added} nodes along obstacle boundaries") def _find_obstacle_contours(self, boundary_inflation: float) -> list[np.ndarray]: """Find contours of inflated obstacles using the distance map.""" - # Use the distance transform to filter obstacles - filtered_obstacles = (self._dist_transform >= boundary_inflation).astype(np.uint8) - # Find contours + filtered_obstacles = (self._dist_transform >= boundary_inflation).astype(np.uint8) contours, _ = cv2.findContours(filtered_obstacles, cv2.RETR_LIST, cv2.CHAIN_APPROX_TC89_KCOS) + self._debug_print(f"[DEBUG] Found {len(contours)} contours from distance transform.") return contours def _is_within_bounds(self, row: int, col: int) -> bool: @@ -602,8 +1391,8 @@ def _prune_graph(self, graph: nx.Graph): self._logger.info("Pruning graph...") if len(graph.nodes()) > 0: - # Merge nodes that are close to each other - self._merge_close_nodes(graph, threshold=self._to_pixels_int(self._config.merge_node_distance)) + # Merge nodes that are close to each other (threshold expressed in meters) + self._merge_close_nodes(graph, threshold=self._config.merge_node_distance) # Remove isolated nodes and small subgraphs graph.remove_nodes_from(list(nx.isolates(graph))) @@ -617,10 +1406,62 @@ def _prune_graph(self, graph: nx.Graph): graph.remove_nodes_from(component) def _merge_close_nodes(self, graph: nx.Graph, threshold: float): - """Merge nodes that are within a certain distance threshold.""" + """Merge nodes that are within a certain distance threshold (meters).""" + + # --- SAFETY PATCH: ensure all nodes have 'pos' defined --- + for node, data in graph.nodes(data=True): + if "world" in data and isinstance(data["world"], (tuple, list)) and len(data["world"]) == 2: + data["pos"] = np.array(data["world"], dtype=float) + elif "pixel" in data and isinstance(data["pixel"], (tuple, list)) and len(data["pixel"]) == 2: + # convert pixel → world explicitly + y, x = data["pixel"] + x_w, y_w = self._pixel_to_world(y, x) + data["pos"] = np.array((x_w, y_w), dtype=float) + else: + # fallback to node key itself + data["pos"] = np.array(node, dtype=float) + # ---------------------------------------------------------- + while True: nodes = list(graph.nodes()) - node_coords = np.array(nodes) + + # Modified - debug before node_coords + self._debug_print("[DEBUG] Checking node positions before merging...") + for i, n in enumerate(graph.nodes): + pos = graph.nodes[n].get("pos", None) + #print(f"{i:03d} {n}: {pos}") + self._debug_print("[DEBUG] Total nodes:", len(graph.nodes)) + self._debug_print("[DEBUG] Threshold:", threshold) + + node_coords = [data["pos"] for _, data in graph.nodes(data=True) if "pos" in data] + + self._debug_print("[DEBUG] Building KDTree with node_coords:") + self._debug_print(f" Number of coords: {len(node_coords)}") + + bad_entries = [] + for i, coord in enumerate(node_coords): + if not isinstance(coord, (list, tuple)) or len(coord) != 2: + bad_entries.append((i, coord)) + continue + # ensure they are numeric + try: + float(coord[0]) + float(coord[1]) + except Exception as e: + bad_entries.append((i, coord)) + + if bad_entries: + self._debug_print("[ERROR] Found malformed coordinates:") + for idx, bad in bad_entries: + #print(f" Index {idx}: {bad} (type: {type(bad)})") + continue + else: + self._debug_print("[DEBUG] All coords look valid. Example few:", node_coords[:5]) + + #node_coords = np.array(nodes) + node_coords = np.array([[float(x), float(y)] for x, y in node_coords], dtype=float) + self._debug_print(f"[DEBUG] node_coords.shape after conversion: {node_coords.shape}, dtype: {node_coords.dtype}") + tree = cKDTree(node_coords) # Find pairs of nodes that are close to each other @@ -636,26 +1477,31 @@ def _merge_close_nodes(self, graph: nx.Graph, threshold: float): n2 = nodes[n2_idx] # Skip if n2 has already been removed - if not graph.has_node(n2): + if not graph.has_node(n1) or not graph.has_node(n2): continue + # Coordinates of n1 and n2 + n1_pos = graph.nodes[n1]["pos"] + n2_pos = graph.nodes[n2]["pos"] + # Merge n2 into n1 and update edges # Check if all n2's neighbors can connect to n1 without collision - neighbors = [n for n in graph.neighbors(n2) if n != n1] + neighbors = [n for n in graph.neighbors(n2) if n != n1 and graph.has_node(n)] if neighbors: collision = False - for dst in neighbors: - if self._check_line_collision(n1, dst): + for neighbor in neighbors: + neighbor_pos = graph.nodes[neighbor]["pos"] + if self._check_line_collision(n1_pos, neighbor_pos): collision = True break - # Only merge if all connections are valid if not collision: for neighbor in neighbors: - dist = np.sqrt((neighbor[0] - n1[0]) ** 2 + (neighbor[1] - n1[1]) ** 2) + neighbor_pos = graph.nodes[neighbor]["pos"] + dist = np.linalg.norm(np.array(neighbor_pos) - np.array(n1_pos)) graph.add_edge(n1, neighbor, weight=dist, edge_type="merge") graph.remove_node(n2) - merged = True # A merge occurred + merged = True else: # No neighbors to check, safe to remove graph.remove_node(n2) @@ -666,48 +1512,129 @@ def _merge_close_nodes(self, graph: nx.Graph, threshold: float): def _pixel_to_world(self, row: float, col: float) -> Point: """Convert pixel coordinates to world coordinates with the current transform.""" - return pixel_to_world(row, col, self._resolution, self._x_offset, self._y_offset, self._cos_rot, self._sin_rot) + return pixel_to_world(row, col, self._resolution, self._x_offset, self._y_offset, self._cos_rot, self._sin_rot, self._image_shape) def _world_to_pixel(self, point: Point) -> tuple[int, int]: """Convert world coordinates to pixel coordinates with the inverse transform.""" - return world_to_pixel(point, self._resolution, self._x_offset, self._y_offset, self._cos_rot, self._sin_rot) + return world_to_pixel(point, self._resolution, self._x_offset, self._y_offset, self._cos_rot, self._sin_rot, self._image_shape) + + def _point_to_tuple(self, value) -> tuple[float, float, float]: + """Normalize Point-like inputs to an (x, y, z) tuple.""" + if hasattr(value, "x") and hasattr(value, "y"): + x = float(value.x) + y = float(value.y) + z = float(getattr(value, "z", 0.0)) + return (x, y, z) + if isinstance(value, (tuple, list, np.ndarray)) and len(value) >= 2: + x = float(value[0]) + y = float(value[1]) + z = float(value[2]) if len(value) >= 3 else 0.0 + return (x, y, z) + # Fallback: treat as origin + return (0.0, 0.0, 0.0) + + def _debug_transform_vectors(self): + """Print sanity checks for world↔pixel transforms.""" + self._debug_print("=== DEBUG TRANSFORM VECTORS ===") + + test_points = [ + (0, 0), + (1, 0), + (0, 1), + (1, 1) + ] + + for (x, y) in test_points: + px = self._world_to_pixel((x, y)) + if px is None: + self._debug_print(f"World ({x:.2f}, {y:.2f}) -> OUT OF FRAME") + continue + + # px = (row, col) = (y, x) + try: + wx = self._pixel_to_world(px[1], px[0]) + self._debug_print(f"World ({x:.2f}, {y:.2f}) -> Pixel {px} -> Reconstructed {wx}") + except Exception as e: + self._debug_print(f"[WARN] Failed to reconstruct for ({x:.2f}, {y:.2f}): {e}") + def transform_known_points(known_points_world, dx, dy, theta, to_pixel_fn): + """ + Apply frame-to-frame translation and rotation to known world points, then + convert to pixel coordinates for plotting. + Args: + known_points_world: list of (x, y) in world coordinates (previous frame) + dx, dy: translation (meters) between frames + theta: rotation (radians, positive CCW) + to_pixel_fn: function handle like self._world_to_pixel + + Returns: + known_points_px: list of (x_px, y_px) in pixel coords for the current frame + """ + cos_t, sin_t = math.cos(theta), math.sin(theta) + known_points_px = [] + + for x, y in known_points_world: + # Rotate about origin (0,0) + x_rot = cos_t * x - sin_t * y + y_rot = sin_t * x + cos_t * y + + # Translate + x_new = x_rot + dx + y_new = y_rot + dy + + # Convert to pixel space + known_points_px.append(to_pixel_fn((x_new, y_new))) + + return known_points_px + + # Modified - visualize the graph, and nodes def visualize_graph( self, output_dir: str = ".", output_filename: str = "waypoint_graph.png", ) -> None: - """Create and save a visualization of the graph and original map.""" + """ + Create and save a visualization of the graph with node IDs. + Uses the existing map, node_map, and config colors. + """ if self._graph is None: raise RuntimeError("No graph has been built yet") self._logger.info("Starting visualization...") - # Graph visualization using self._original_map + # Convert map to color map_vis = cv2.cvtColor(self._original_map, cv2.COLOR_GRAY2BGR) - # Draw edges in red - lines = [] - for src, dst in self._graph.edges(): - src_pixel = self._graph.nodes[src]["pixel"] - dst_pixel = self._graph.nodes[dst]["pixel"] - lines.append([[src_pixel[1], src_pixel[0]], [dst_pixel[1], dst_pixel[0]]]) - lines = np.array(lines) + # Ensure nearest node map exists + if self._node_map is None: + self._build_nearest_node_map(self._graph) + + # --- Draw edges --- edge_color = self._config.edge_color.to_tuple() - cv2.polylines(map_vis, lines, False, edge_color, 1) + for u, v in self._graph.edges(): + try: + src_y, src_x = self._graph.nodes[u]["pixel"] + dst_y, dst_x = self._graph.nodes[v]["pixel"] - # Draw nodes in blue - node_radius = 2 - node_color = self._config.node_color.to_tuple() - for _, pixel in self._graph.nodes(data="pixel"): - y, x = pixel - cv2.circle(map_vis, (x, y), node_radius, node_color, -1) + cv2.line(map_vis, (src_x, src_y), (dst_x, dst_y), edge_color, 1) + except KeyError: + continue + + # --- Optional: overlay node map mask (for debug) --- + overlay = cv2.applyColorMap( + ((self._node_map > -1).astype(np.uint8) * 180).astype(np.uint8), + cv2.COLORMAP_JET, + ) + map_vis = cv2.addWeighted(map_vis, 0.8, overlay, 0.2, 0) + # --- Save output --- + os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, output_filename) if not cv2.imwrite(output_path, map_vis): raise RuntimeError(f"Failed to save visualization to {output_path}") - self._logger.info(f"Saved visualization to {output_path}") + self._logger.info(f"Saved visualization with node IDs to {output_path}") + def get_node_ids(self, points: list[Point]) -> list[Optional[int]]: """ @@ -773,7 +1700,48 @@ def _build_nearest_node_map(self, graph: nx.Graph): if len(graph) == 0: # Empty graph return - graph_pixels = np.array([(i, y * width + x) for i, (y, x) in graph.nodes(data="pixel")], dtype=np.int32) + # --- SAFETY NORMALIZATION --- + cleaned_nodes = [] + for i, (node, pix) in enumerate(graph.nodes(data="pixel")): + if pix is None: + # fallback: use node key if it looks like coordinates + if isinstance(node, tuple) and len(node) == 2: + y, x = node + else: + self._debug_print(f"[WARN] Node {node} missing pixel info — skipped.") + continue + elif isinstance(pix, np.ndarray): + arr = np.array(pix).flatten() + if arr.size < 2: + skipped += 1 + continue + y, x = arr[:2] + elif isinstance(pix, list): + # handle [[y, x]] or [y, x] + if len(pix) == 1 and isinstance(pix[0], (list, tuple)): + y, x = pix[0][:2] + else: + y, x = pix[:2] + elif isinstance(pix, tuple) and len(pix) >= 2: + y, x = pix[:2] + else: + self._debug_print(f"[WARN] Unexpected pixel format for node {node}: {pix}") + continue + + try: + y, x = pix[:2] + except Exception as e: + self._debug_print(f"[ERROR] Could not clean node {node}: {pix} ({e})") + continue + + cleaned_nodes.append((i, y * width + x)) + # -------------------------------- + + if not cleaned_nodes: + self._logger.warning("[WARN] No valid pixel nodes for nearest-node map.") + return + + graph_pixels = np.array(cleaned_nodes, dtype=np.int32) node_map = self._node_map.reshape(-1) WaypointGraphGenerator._flood_fill_numba(node_map, graph_pixels, width) @@ -788,3 +1756,42 @@ def _to_pixels(self, meters: float) -> float: def _to_pixels_int(self, meters: float) -> int: """Convert a distance from meters to pixels and round to integer.""" return int(self._to_pixels(meters)) + + + def _debug_plot_with_known_points(self, known_points_px=None, new_points_px=None, title="Debug Known Points", save_path="debug_plots"): + """ + Save a debug visualization of the map, inflated map, and any known points overlaid. + Works in SSH/headless mode (no GUI). + + Args: + known_points_px: list of (x_px, y_px) pixel coordinates to plot (optional) + title: figure title + save_path: directory to save the plot + """ + import matplotlib.pyplot as plt + + # Filter out any None or malformed entries + known_points_px = [p for p in known_points_px if p is not None and len(p) == 2] + new_points_px = [p for p in new_points_px if p is not None and len(p) == 2] + + if len(known_points_px) == 0 and len(new_points_px) == 0: + self._logger.warning("[WARN] No valid points to plot in debug visualization.") + return + + ys = [p[0] for p in known_points_px] + xs = [p[1] for p in known_points_px] + new_ys = [p[0] for p in new_points_px] + new_xs = [p[1] for p in new_points_px] + + plt.figure(figsize=(6, 6)) + plt.imshow(self._original_map, cmap='gray') + plt.scatter(xs, ys, c='lime', s=20, label='Known Points') + plt.scatter(new_xs, new_ys, c='red', s=15, label='New Points') + plt.title(title) + plt.legend() + plt.tight_layout() + + save_path = f"debug_stages/{title}.png" + plt.savefig(save_path) + plt.close() + self._debug_print(f"[DEBUG] Saved {save_path}")