diff --git a/src/Autonomous_Navigation/basic.py b/src/Autonomous_Navigation/basic.py new file mode 100644 index 000000000..169f1f3b3 --- /dev/null +++ b/src/Autonomous_Navigation/basic.py @@ -0,0 +1,80 @@ +# test_basic.py +""" +Basic connection test +""" + +import airsim +import cv2 +import numpy as np +import time + + +def test_connection(): + print("=" * 50) + print("BASIC CONNECTION TEST") + print("=" * 50) + + # Test AirSim connection + try: + print("Connecting to simulator...") + client = airsim.MultirotorClient() + client.confirmConnection() + print("✓ Connection successful!") + except Exception as e: + print(f"✗ Connection failed: {e}") + return False + + # Test image capture + try: + print("Testing image capture...") + responses = client.simGetImages([ + airsim.ImageRequest("0", airsim.ImageType.Scene) + ]) + + if responses: + response = responses[0] + print(f"✓ Image size: {response.width}x{response.height}") + + # Convert to numpy array + img1d = np.frombuffer(response.image_data_uint8, dtype=np.uint8) + img = img1d.reshape(response.height, response.width, 3) + + # Save test image + cv2.imwrite("test_capture.jpg", img) + print("✓ Test image saved: test_capture.jpg") + except Exception as e: + print(f"✗ Image capture failed: {e}") + + # Test drone control + try: + print("Testing drone control...") + client.enableApiControl(True) + client.armDisarm(True) + print("✓ Drone unlocked") + + print("Simple takeoff test...") + client.takeoffAsync().join() + time.sleep(2) + print("✓ Takeoff successful") + + print("Hovering for 2 seconds...") + time.sleep(2) + + print("Landing...") + client.landAsync().join() + print("✓ Landing successful") + + except Exception as e: + print(f"✗ Control test failed: {e}") + print("You may need to switch to drone mode in simulator") + + print("\n" + "=" * 50) + print("TEST COMPLETE!") + print("=" * 50) + + return True + + +if __name__ == "__main__": + test_connection() + input("\nPress Enter to exit...") \ No newline at end of file diff --git a/src/Autonomous_Navigation/connect_test.py b/src/Autonomous_Navigation/connect_test.py index f74305cc8..e5b21d95d 100644 --- a/src/Autonomous_Navigation/connect_test.py +++ b/src/Autonomous_Navigation/connect_test.py @@ -1,157 +1,269 @@ -# airsim_connection.py -# 该模块用于连接 AirSim 模拟器(特别是 AbandonedPark 场景),并控制无人机执行基本任务。 -# 包含连接、解锁、起飞、图像捕获、路径探索和降落等功能。 +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +airsim_connection.py - AbandonedPark 模拟器无人机连接与控制模块 + +提供与 AirSim 模拟器(AbandonedPark 场景)的连接、解锁、起飞、 +图像捕获、路径探索和降落等基础功能。 +""" import airsim import time +import os +import sys +from typing import Optional, Tuple, List + +# 常量定义 +DEFAULT_ALTITUDE = 10 # 默认起飞高度(米) +MOVE_SPEED = 3.0 # 默认移动速度 (m/s) +STABILIZE_DELAY = 2.0 # 起飞后稳定等待时间(秒) +POST_MOVE_DELAY = 1.0 # 移动后等待时间(秒) +IMAGE_SAVE_DIR = "captures" # 默认图像保存目录 class AbandonedParkSimulator: """废弃公园场景的无人机模拟器控制类""" - def __init__(self): - """初始化:连接到本地 AirSim 模拟器并确认连接""" - print("连接到AbandonedPark模拟器...") - - # 创建 AirSim 多旋翼客户端对象(默认连接本地) - self.client = airsim.MultirotorClient() - # 确认连接,如果失败会抛出异常 - self.client.confirmConnection() - - # 使用 ping 检查连接状态,正常应返回 True - print(f"连接状态: {self.client.ping()}") - print("模拟器已连接!") + def __init__(self, ip: str = "127.0.0.1", port: int = 41451): + """ + 初始化控制器,可选择自动连接 + + :param ip: 模拟器 IP 地址,默认本地 + :param port: 模拟器端口,默认 41451 + """ + self.ip = ip + self.port = port + self.client: Optional[airsim.MultirotorClient] = None + self.is_connected = False + self.is_armed = False # 是否已解锁 + self.is_flying = False # 是否在飞行 + + print(f"初始化 AbandonedPark 控制器,目标 {ip}:{port}") + self._connect() + + # ------------------------------------------------------------------ + # 私有连接方法 + # ------------------------------------------------------------------ + def _connect(self, timeout: float = 10.0) -> bool: + """ + 连接到模拟器(内部调用) + + :param timeout: 连接超时时间(秒) + :return: 是否连接成功 + """ + try: + self.client = airsim.MultirotorClient(ip=self.ip, port=self.port) + self.client.confirmConnection() + # 使用 ping 测试连接 + ping_time = self.client.ping() + print(f"✓ 连接成功!响应时间: {ping_time} ms") + self.is_connected = True + return True + except Exception as e: + print(f"✗ 连接失败: {e}") + self.is_connected = False + return False - def ensure_drone_mode(self): - """确保无人机处于受控模式(启用 API 控制并解锁)""" - print("切换到无人机模式...") + # ------------------------------------------------------------------ + # 公共接口 + # ------------------------------------------------------------------ + def ensure_drone_mode(self) -> bool: + """ + 确保无人机处于受控模式(启用 API 控制并解锁) + + :return: 是否成功切换 + """ + if not self.is_connected: + print("错误:未连接到模拟器") + return False + print("尝试切换到无人机模式...") try: - # 启用 API 控制,允许通过代码控制无人机 self.client.enableApiControl(True) - # 解锁无人机(模拟器中的电机解锁) self.client.armDisarm(True) - print("无人机已解锁") + self.is_armed = True + print("✓ 无人机已解锁") return True except Exception as e: - print(f"切换模式时出错: {e}") - print("请确保模拟器中已选择无人机模式") + print(f"✗ 切换失败: {e}") + print("提示:请确保模拟器中已选择无人机模式(按 F2 切换)") + return False + + def takeoff_and_hover(self, altitude: float = DEFAULT_ALTITUDE) -> bool: + """ + 起飞到指定高度并悬停 + + :param altitude: 目标高度(米) + :return: 是否成功 + """ + if not self.is_connected: + print("错误:未连接到模拟器") + return False + if not self.is_armed: + print("错误:无人机未解锁,请先调用 ensure_drone_mode()") return False - def takeoff_and_hover(self, altitude=10): - """起飞到指定高度并悬停""" print(f"起飞到 {altitude} 米高度...") + try: + # 起飞 + self.client.takeoffAsync().join() + time.sleep(STABILIZE_DELAY) - # 起飞(异步操作,使用 join 等待完成) - self.client.takeoffAsync().join() - time.sleep(2) # 等待稳定 + # 上升到指定高度(AirSim 中 Z 轴向下,负值表示上升) + self.client.moveToZAsync(-altitude, MOVE_SPEED).join() + time.sleep(POST_MOVE_DELAY) + + self.is_flying = True + print(f"✓ 已在 {altitude} 米高度悬停") + return True + except Exception as e: + print(f"✗ 起飞失败: {e}") + return False - # 移动到指定高度(负值表示上升,因为 AirSim 中 Z 轴向下) - self.client.moveToZAsync(-altitude, 3).join() - time.sleep(1) # 等待稳定 + def capture_park_image(self, save_dir: str = IMAGE_SAVE_DIR) -> Optional[airsim.ImageRequest]: + """ + 从无人机前置摄像头捕获图像并保存为文件 - print(f"已在 {altitude} 米高度悬停") + :param save_dir: 保存图像的目录(自动创建) + :return: 图像对象(numpy 数组),失败返回 None + """ + if not self.is_connected: + print("错误:未连接到模拟器") + return None - def capture_park_image(self): - """从无人机前置摄像头捕获图像并保存为文件""" - print("捕获图像...") + # 确保保存目录存在 + os.makedirs(save_dir, exist_ok=True) - # 请求图像数据(前置摄像头 "0",场景类型,不压缩,不使用像素格式转换) - responses = self.client.simGetImages([ - airsim.ImageRequest( - "0", # 相机名称,0 通常代表前置摄像头 - airsim.ImageType.Scene, # 场景图像(彩色) - False, # 不压缩为 JPEG - False # 不进行像素格式转换 - ) - ]) + try: + responses = self.client.simGetImages([ + airsim.ImageRequest("0", airsim.ImageType.Scene, False, False) + ]) + if not responses: + print("✗ 未收到图像响应") + return None - # 检查是否有图像返回 - if responses and len(responses) > 0: response = responses[0] - - # 将字节数据转换为 numpy 数组 + # 转换为 numpy 数组 import numpy as np img1d = np.frombuffer(response.image_data_uint8, dtype=np.uint8) - # 根据图像高度和宽度重塑为三维数组 (H, W, 3) img_rgb = img1d.reshape(response.height, response.width, 3) - # 保存图像(使用 OpenCV,BGR 格式,但这里 RGB 也可以保存) + # 保存图像(使用 OpenCV) import cv2 timestamp = time.strftime("%Y%m%d_%H%M%S") - filename = f"park_capture_{timestamp}.jpg" - cv2.imwrite(filename, img_rgb) - print(f"图像已保存: {filename}") + filename = os.path.join(save_dir, f"park_capture_{timestamp}.jpg") + cv2.imwrite(filename, cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)) + print(f"✓ 图像已保存: {filename}") return img_rgb - else: - print("未能捕获图像") + except Exception as e: + print(f"✗ 图像捕获失败: {e}") return None - def explore_park(self): - """执行一个简单的路径探索:依次飞往几个航点,并在每个航点拍照""" - print("开始探索废弃公园...") + def explore_park(self, waypoints: Optional[List[Tuple[float, float, float]]] = None) -> bool: + """ + 执行路径探索:依次飞往航点并在每个航点拍照 - # 定义航点列表 (x, y, z) ,z 为负值表示高度 - waypoints = [ - (20, 0, -10), # 向前(x正方向)20米,保持高度 - (20, 15, -10), # 向右(y正方向)15米 - (0, 15, -12), # 向后20米,同时下降2米 - (0, 0, -10), # 向左15米回到起点,恢复高度 - ] + :param waypoints: 航点列表,每个元素为 (x, y, z) 元组(z 为负值表示高度) + 若为 None,则使用默认路径 + :return: 是否全部成功 + """ + if not self.is_flying: + print("错误:无人机未起飞,请先调用 takeoff_and_hover()") + return False - for x, y, z in waypoints: - print(f"飞往位置: ({x}, {y}, {z})") - # 以速度 3 m/s 飞往目标点 - self.client.moveToPositionAsync(x, y, z, 3).join() + if waypoints is None: + waypoints = [ + (20, 0, -10), + (20, 15, -10), + (0, 15, -12), + (0, 0, -10), + ] - # 到达后捕获一张图像 - self.capture_park_image() - time.sleep(1) # 等待一下再飞向下一个点 + print("开始探索废弃公园...") + for idx, (x, y, z) in enumerate(waypoints, 1): + print(f"[{idx}/{len(waypoints)}] 飞往位置: ({x}, {y}, {z})") + try: + self.client.moveToPositionAsync(x, y, z, MOVE_SPEED).join() + time.sleep(POST_MOVE_DELAY) + self.capture_park_image() + except Exception as e: + print(f"✗ 移动失败: {e}") + return False print("探索完成!") + return True - def cleanup(self): + def cleanup(self) -> None: """清理资源:降落、锁定无人机、禁用 API 控制""" + if not self.is_connected: + return + print("正在降落...") - # 降落并等待完成 - self.client.landAsync().join() - # 锁定无人机(上锁) - self.client.armDisarm(False) - # 禁用 API 控制,交还控制权给模拟器 - self.client.enableApiControl(False) - print("无人机已降落") + try: + if self.is_flying: + self.client.landAsync().join() + self.is_flying = False + if self.is_armed: + self.client.armDisarm(False) + self.is_armed = False + self.client.enableApiControl(False) + print("✓ 无人机已降落并锁定") + except Exception as e: + print(f"✗ 清理过程中出错: {e}") -# 快速测试脚本(当直接运行此文件时执行) -if __name__ == "__main__": - print("=== AbandonedPark无人机测试 ===") +# ---------------------------------------------------------------------- +# 交互式测试脚本(当直接运行此文件时执行) +# ---------------------------------------------------------------------- +def interactive_test(): + """交互式测试菜单""" + print("=== AbandonedPark 无人机测试 ===") + print("请确保 AbandonedPark.exe 已运行并按 F2 切换到无人机模式。") + input("按回车键继续...") - # 1. 确保模拟器已经运行(用户需手动启动) - input("请确保AbandonedPark.exe已运行,然后按回车继续...") + # 创建控制器对象(自动连接) + sim = AbandonedParkSimulator() - # 2. 连接模拟器 - simulator = AbandonedParkSimulator() + if not sim.is_connected: + print("无法连接,测试终止。") + return try: - # 3. 切换到无人机模式(解锁) - if simulator.ensure_drone_mode(): - # 4. 起飞至10米高度 - simulator.takeoff_and_hover(10) - - # 5. 捕获初始图像 - simulator.capture_park_image() + while True: + print("\n" + "-" * 40) + print("请选择操作:") + print("1. 切换到无人机模式(解锁)") + print("2. 起飞并悬停") + print("3. 捕获图像") + print("4. 探索公园(默认路径)") + print("5. 降落并清理") + print("0. 退出") + choice = input("请输入数字: ").strip() + + if choice == '1': + sim.ensure_drone_mode() + elif choice == '2': + alt = input("请输入高度(米,默认10): ").strip() + alt = float(alt) if alt else 10.0 + sim.takeoff_and_hover(alt) + elif choice == '3': + sim.capture_park_image() + elif choice == '4': + sim.explore_park() + elif choice == '5': + sim.cleanup() + elif choice == '0': + break + else: + print("无效输入,请重新选择。") + except KeyboardInterrupt: + print("\n用户中断") + finally: + sim.cleanup() + print("测试结束。") - # 6. 执行简单探索(飞航点并拍照) - simulator.explore_park() - # 7. 降落并清理 - simulator.cleanup() - except KeyboardInterrupt: - # 捕获 Ctrl+C,安全降落 - print("用户中断") - simulator.cleanup() - except Exception as e: - # 其他异常处理 - print(f"发生错误: {e}") - simulator.cleanup() \ No newline at end of file +if __name__ == "__main__": + interactive_test() \ No newline at end of file diff --git a/src/Autonomous_Navigation/test_connection.py b/src/Autonomous_Navigation/test_connection.py index 9ccd5adc5..85b144b40 100644 --- a/src/Autonomous_Navigation/test_connection.py +++ b/src/Autonomous_Navigation/test_connection.py @@ -1,14 +1,90 @@ import airsim import time +import logging -client = airsim.MultirotorClient() -client.confirmConnection() -client.enableApiControl(True) -client.armDisarm(True) +# 配置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) -print("连接成功!当前无人机状态:", client.getMultirotorState()) -# 简单起飞和降落测试(可选) -client.takeoffAsync().join() -time.sleep(2) -client.landAsync().join() \ No newline at end of file +def connect_and_arm() -> airsim.MultirotorClient: + """ + 连接到 AirSim 模拟器,启用 API 控制并解锁无人机。 + + Returns: + 已连接的客户端对象 + + Raises: + Exception: 如果连接、启用控制或解锁失败 + """ + client = airsim.MultirotorClient() + client.confirmConnection() + logger.info("已连接到模拟器") + + # 启用 API 控制 + if not client.enableApiControl(True): + raise RuntimeError("启用 API 控制失败") + logger.info("API 控制已启用") + + # 解锁无人机 + if not client.armDisarm(True): + raise RuntimeError("解锁失败") + logger.info("无人机已解锁") + + return client + + +def takeoff_and_land(client: airsim.MultirotorClient, hover_time: float = 2.0): + """ + 执行起飞、悬停和降落测试。 + + Args: + client: AirSim 客户端对象 + hover_time: 悬停时间(秒) + """ + # 获取起飞前状态 + state = client.getMultirotorState() + logger.info(f"起飞前状态 - 位置: {state.kinematics_estimated.position}, 速度: {state.speed:.2f} m/s") + + # 起飞 + logger.info("正在起飞...") + client.takeoffAsync().join() + logger.info("起飞完成,开始悬停") + + # 悬停 + time.sleep(hover_time) + + # 降落 + logger.info("正在降落...") + client.landAsync().join() + logger.info("降落完成") + + +def disarm_and_release(client: airsim.MultirotorClient): + """锁定无人机并释放 API 控制(清理操作)""" + try: + client.armDisarm(False) + client.enableApiControl(False) + logger.info("已锁定并释放控制") + except Exception as e: + logger.warning(f"清理过程中出现异常: {e}") + + +def main(): + """主函数:连接、起飞、降落""" + client = None + try: + client = connect_and_arm() + takeoff_and_land(client, hover_time=2.0) + except KeyboardInterrupt: + logger.info("用户中断操作") + except Exception as e: + logger.exception("执行过程中发生错误") + finally: + if client: + disarm_and_release(client) + logger.info("脚本结束") + + +if __name__ == "__main__": + main() \ No newline at end of file