diff --git a/src/Autonomous_Navigation/airsim.py b/src/Autonomous_Navigation/airsim.py index 25de18d2e..913488ead 100644 --- a/src/Autonomous_Navigation/airsim.py +++ b/src/Autonomous_Navigation/airsim.py @@ -1,140 +1,306 @@ -# airsim_connection.py -import airsim +""" +AbandonedPark 无人机模拟控制模块 +使用 AirSim API 连接并控制无人机在废弃公园场景中执行任务 +""" + import time +import logging +from typing import List, Tuple, Optional +from dataclasses import dataclass +from pathlib import Path + +import airsim +import numpy as np +import cv2 + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +@dataclass +class ParkExplorerConfig: + """探索任务的配置参数""" + takeoff_altitude: float = 10.0 # 起飞高度(米) + movement_speed: float = 3.0 # 移动速度(m/s) + image_save_dir: str = "captures" # 图像保存目录 + camera_name: str = "0" # 相机名称 + waypoints: List[Tuple[float, float, float]] = None # 路径点列表 (x, y, z) + + def __post_init__(self): + if self.waypoints is None: + # 默认探索路径:围绕公园 + self.waypoints = [ + (20, 0, -10), # 向前20米,高度-10 + (20, 15, -10), # 向右15米 + (0, 15, -12), # 向后20米,下降2米 + (0, 0, -10), # 向左15米,回到起点 + ] class AbandonedParkSimulator: - def __init__(self): - print("连接到AbandonedPark模拟器...") + """ + 废弃公园无人机模拟器控制类 + 提供连接、起飞、移动、拍照、探索等功能 + """ + + def __init__(self, config: Optional[ParkExplorerConfig] = None): + """ + 初始化模拟器连接 - # 连接到本地的AbandonedPark模拟器 + Args: + config: 任务配置参数,若为None则使用默认配置 + """ + self.config = config or ParkExplorerConfig() self.client = airsim.MultirotorClient() - self.client.confirmConnection() + self._connected = False + self._api_control_enabled = False + + # 确保图像保存目录存在 + Path(self.config.image_save_dir).mkdir(parents=True, exist_ok=True) + + def __enter__(self): + """上下文管理器入口:自动连接""" + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """上下文管理器出口:自动清理""" + self.cleanup() - # 检查连接状态 - print(f"连接状态: {self.client.ping()}") + def connect(self, retries: int = 3, delay: float = 1.0) -> bool: + """ + 连接到AirSim模拟器 - print("模拟器已连接!") + Args: + retries: 重试次数 + delay: 重试间隔(秒) - def ensure_drone_mode(self): - """确保切换到无人机模式""" - print("切换到无人机模式...") + Returns: + 是否连接成功 + + Raises: + ConnectionError: 连接失败 + """ + logger.info("正在连接到 AbandonedPark 模拟器...") + for attempt in range(1, retries + 1): + try: + self.client.confirmConnection() + if self.client.ping(): + self._connected = True + logger.info(f"连接成功,ping: {self.client.ping()}") + return True + except Exception as e: + logger.warning(f"连接尝试 {attempt}/{retries} 失败: {e}") + if attempt < retries: + time.sleep(delay) + + raise ConnectionError("无法连接到 AirSim 模拟器,请确保模拟器已运行") + + def enable_drone_mode(self) -> bool: + """ + 启用无人机模式:获取API控制并解锁 + + Returns: + 是否成功启用 + """ + if not self._connected: + logger.error("未连接到模拟器,请先调用 connect()") + return False - # 尝试解锁无人机 try: + logger.info("启用API控制...") self.client.enableApiControl(True) + self._api_control_enabled = True + + logger.info("解锁无人机...") self.client.armDisarm(True) - print("无人机已解锁") + + logger.info("无人机模式已启用") return True except Exception as e: - print(f"切换模式时出错: {e}") - print("请确保模拟器中已选择无人机模式") + logger.error(f"启用无人机模式失败: {e}") return False - def takeoff_and_hover(self, altitude=10): - """起飞并悬停""" - print(f"起飞到 {altitude} 米高度...") + def takeoff(self, altitude: Optional[float] = None) -> bool: + """ + 起飞并悬停到指定高度 - # 起飞 - self.client.takeoffAsync().join() - time.sleep(2) + Args: + altitude: 目标高度(米),若为None则使用配置中的高度 - # 移动到指定高度 - self.client.moveToZAsync(-altitude, 3).join() - time.sleep(1) + Returns: + 是否成功 + """ + target_z = -(altitude or self.config.takeoff_altitude) + logger.info(f"起飞并爬升至 {target_z} 米高度...") - print(f"已在 {altitude} 米高度悬停") + try: + # 起飞 + self.client.takeoffAsync().join() + time.sleep(1) - def capture_park_image(self): - """捕获废弃公园图像""" - print("捕获图像...") + # 移动到目标高度 + self.client.moveToZAsync(target_z, self.config.movement_speed).join() + logger.info(f"已稳定在 {target_z} 米") + return True + except Exception as e: + logger.error(f"起飞失败: {e}") + return False - # 从相机获取图像 - responses = self.client.simGetImages([ - airsim.ImageRequest( - "0", # 前置摄像头 - airsim.ImageType.Scene, - False, False # 不压缩 - ) - ]) + def capture_image(self, filename: Optional[str] = None) -> Optional[np.ndarray]: + """ + 从指定相机捕获图像并保存 + + Args: + filename: 保存的文件名,若为None则自动生成时间戳文件名 + + Returns: + RGB图像数组,若失败则返回None + """ + logger.info("正在捕获图像...") + try: + responses = self.client.simGetImages([ + airsim.ImageRequest( + self.config.camera_name, + airsim.ImageType.Scene, + False, False + ) + ]) + + if not responses or len(responses) == 0: + logger.error("未收到图像响应") + return None - if responses and len(responses) > 0: response = responses[0] + if response.width == 0 or response.height == 0: + logger.error("图像数据无效") + return None - # 转换为numpy数组 - import numpy as np - img1d = np.frombuffer(response.image_data_uint8, dtype=np.uint8) - img_rgb = img1d.reshape(response.height, response.width, 3) + # 转换为numpy数组 (RGB) + img_1d = np.frombuffer(response.image_data_uint8, dtype=np.uint8) + img_rgb = img_1d.reshape(response.height, response.width, 3) # 保存图像 - import cv2 - timestamp = time.strftime("%Y%m%d_%H%M%S") - cv2.imwrite(f"park_capture_{timestamp}.jpg", img_rgb) - print(f"图像已保存: park_capture_{timestamp}.jpg") + if filename is None: + timestamp = time.strftime("%Y%m%d_%H%M%S") + filename = f"park_capture_{timestamp}.jpg" + + save_path = Path(self.config.image_save_dir) / filename + cv2.imwrite(str(save_path), cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)) + logger.info(f"图像已保存: {save_path}") return img_rgb - else: - print("未能捕获图像") + except Exception as e: + logger.error(f"捕获图像失败: {e}") return None - def explore_park(self): - """探索公园的简单路径""" - print("开始探索废弃公园...") + def move_to_position(self, x: float, y: float, z: float, + speed: Optional[float] = None) -> bool: + """ + 移动到指定位置 - # 定义探索路径(围绕公园) - waypoints = [ - (20, 0, -10), # 向前20米 - (20, 15, -10), # 向右15米 - (0, 15, -12), # 向后20米,下降2米 - (0, 0, -10), # 向左15米,回到起点 - ] + Args: + x, y, z: 目标坐标(z为负值表示高度) + speed: 移动速度,若为None则使用配置速度 - for x, y, z in waypoints: - print(f"飞往位置: ({x}, {y}, {z})") - self.client.moveToPositionAsync(x, y, z, 3).join() + Returns: + 是否成功 + """ + speed = speed or self.config.movement_speed + try: + logger.info(f"移动到 ({x}, {y}, {z})...") + self.client.moveToPositionAsync(x, y, z, speed).join() + return True + except Exception as e: + logger.error(f"移动失败: {e}") + return False - # 在每个位置捕获图像 - self.capture_park_image() - time.sleep(1) + def explore(self, waypoints: Optional[List[Tuple[float, float, float]]] = None) -> None: + """ + 沿路径点探索,并在每个点拍照 - print("探索完成!") + Args: + waypoints: 路径点列表,若为None则使用配置中的路径 + """ + points = waypoints or self.config.waypoints + logger.info(f"开始探索,共 {len(points)} 个路径点") - def cleanup(self): - """清理资源""" - print("正在降落...") - self.client.landAsync().join() - self.client.armDisarm(False) - self.client.enableApiControl(False) - print("无人机已降落") + for i, (x, y, z) in enumerate(points, 1): + logger.info(f"前往路径点 {i}/{len(points)}: ({x}, {y}, {z})") + if self.move_to_position(x, y, z): + self.capture_image(filename=f"waypoint_{i}.jpg") + time.sleep(1) # 短暂悬停 + logger.info("探索完成") -# 快速测试脚本 -if __name__ == "__main__": - print("=== AbandonedPark无人机测试 ===") + def cleanup(self) -> None: + """清理资源:降落、锁定、释放控制""" + logger.info("开始清理资源...") + try: + if self._api_control_enabled: + logger.info("正在降落...") + self.client.landAsync().join() + + logger.info("锁定无人机...") + self.client.armDisarm(False) + + logger.info("禁用API控制...") + self.client.enableApiControl(False) + self._api_control_enabled = False + except Exception as e: + logger.error(f"清理过程中出错: {e}") + finally: + self._connected = False + logger.info("清理完成") - # 1. 确保模拟器已经运行 - input("请确保AbandonedPark.exe已运行,然后按回车继续...") - # 2. 连接模拟器 - simulator = AbandonedParkSimulator() +def main(): + """主函数:演示如何使用 AbandonedParkSimulator""" + logger.info("=== AbandonedPark 无人机测试 ===") + # 创建配置(可自定义) + config = ParkExplorerConfig( + takeoff_altitude=10, + movement_speed=3, + image_save_dir="park_captures", + waypoints=[ + (20, 0, -10), + (20, 15, -10), + (0, 15, -12), + (0, 0, -10), + ] + ) + + # 使用上下文管理器自动处理连接和清理 try: - # 3. 切换到无人机模式 - if simulator.ensure_drone_mode(): - # 4. 起飞 - simulator.takeoff_and_hover(10) + with AbandonedParkSimulator(config) as simulator: + # 启用无人机模式 + if not simulator.enable_drone_mode(): + logger.error("无法启用无人机模式,退出") + return + + # 起飞 + if not simulator.takeoff(): + logger.error("起飞失败,退出") + return - # 5. 捕获初始图像 - simulator.capture_park_image() + # 捕获初始图像 + simulator.capture_image(filename="initial.jpg") - # 6. 简单探索 - simulator.explore_park() + # 执行探索 + simulator.explore() - # 7. 降落 - simulator.cleanup() + except ConnectionError as e: + logger.error(f"连接错误: {e}") except KeyboardInterrupt: - print("用户中断") - simulator.cleanup() + logger.info("用户中断") except Exception as e: - print(f"发生错误: {e}") - simulator.cleanup() \ No newline at end of file + logger.error(f"未预期的错误: {e}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/Autonomous_Navigation/config/settings.yaml b/src/Autonomous_Navigation/config/settings.yaml new file mode 100644 index 000000000..e098ab652 --- /dev/null +++ b/src/Autonomous_Navigation/config/settings.yaml @@ -0,0 +1,27 @@ +# 无人机导航系统配置文件 +mission: + default_time: 300 # 默认任务时长(秒) + takeoff_height: 15 # 起飞高度 + max_battery: 100.0 + low_battery_warning: 20 + critical_battery: 10 + +navigation: + speeds: + ruins: 2.0 + building: 2.0 + forest: 1.5 + road: 3.0 + sky: 4.0 + water: 2.0 + fire: 5.0 + animal: 1.0 + vehicle: 2.0 + default: 1.0 + +classifier: + # 可覆盖默认阈值 + thresholds: + fire_red: 0.25 + fire_bright: 200 + # ... 其他阈值 \ No newline at end of file diff --git a/src/Autonomous_Navigation/connect_test.py b/src/Autonomous_Navigation/connect_test.py index f74305cc8..256a17bf9 100644 --- a/src/Autonomous_Navigation/connect_test.py +++ b/src/Autonomous_Navigation/connect_test.py @@ -1,157 +1,251 @@ -# airsim_connection.py -# 该模块用于连接 AirSim 模拟器(特别是 AbandonedPark 场景),并控制无人机执行基本任务。 -# 包含连接、解锁、起飞、图像捕获、路径探索和降落等功能。 +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +airsim_connection.py - AbandonedPark 模拟器无人机控制模块 +提供连接、解锁、起飞、图像捕获、路径探索和降落等功能。 +""" import airsim import time +import os +import sys +import cv2 +import numpy as np +from datetime import datetime class AbandonedParkSimulator: """废弃公园场景的无人机模拟器控制类""" - def __init__(self): - """初始化:连接到本地 AirSim 模拟器并确认连接""" - print("连接到AbandonedPark模拟器...") - - # 创建 AirSim 多旋翼客户端对象(默认连接本地) - self.client = airsim.MultirotorClient() - # 确认连接,如果失败会抛出异常 - self.client.confirmConnection() - - # 使用 ping 检查连接状态,正常应返回 True - print(f"连接状态: {self.client.ping()}") - print("模拟器已连接!") + def __init__(self, ip="127.0.0.1", port=41451, auto_connect=True): + """ + 初始化控制器 + :param ip: 模拟器 IP 地址 + :param port: 通信端口 + :param auto_connect: 是否自动连接 + """ + self.ip = ip + self.port = port + self.client = None + self.connected = False + self.flying = False + self.image_save_dir = None # 图像保存目录,由 capture 方法自动创建 + + if auto_connect: + self.connect() + + def connect(self, max_retries=3, retry_delay=2): + """ + 连接到 AirSim 模拟器 + :param max_retries: 最大重试次数 + :param retry_delay: 重试间隔(秒) + :return: 是否连接成功 + """ + print(f"正在连接到 AbandonedPark 模拟器 {self.ip}:{self.port}...") + for attempt in range(1, max_retries + 1): + try: + self.client = airsim.MultirotorClient(ip=self.ip, port=self.port) + self.client.confirmConnection() + # 使用 ping 验证连接 + ping_time = self.client.ping() + print(f"✓ 连接成功!响应时间: {ping_time} ms (尝试 {attempt})") + self.connected = True + return True + except Exception as e: + print(f"✗ 连接尝试 {attempt} 失败: {e}") + if attempt < max_retries: + print(f"等待 {retry_delay} 秒后重试...") + time.sleep(retry_delay) + self.connected = False + print("✗ 无法连接到模拟器,请检查模拟器是否已启动。") + return False def ensure_drone_mode(self): """确保无人机处于受控模式(启用 API 控制并解锁)""" - print("切换到无人机模式...") + if not self.connected: + print("错误:未连接到模拟器") + return False + print("切换到无人机模式...") try: - # 启用 API 控制,允许通过代码控制无人机 self.client.enableApiControl(True) - # 解锁无人机(模拟器中的电机解锁) self.client.armDisarm(True) - print("无人机已解锁") + print("✓ 无人机已解锁") return True except Exception as e: - print(f"切换模式时出错: {e}") - print("请确保模拟器中已选择无人机模式") + print(f"✗ 切换模式失败: {e}") + print("提示:请确保模拟器中已选择无人机模式(按 F2 切换)") return False - def takeoff_and_hover(self, altitude=10): - """起飞到指定高度并悬停""" - print(f"起飞到 {altitude} 米高度...") - - # 起飞(异步操作,使用 join 等待完成) - self.client.takeoffAsync().join() - time.sleep(2) # 等待稳定 + def takeoff_and_hover(self, altitude=10, timeout=10): + """ + 起飞到指定高度并悬停 + :param altitude: 目标高度(米) + :param timeout: 起飞超时时间(秒) + """ + if not self.connected: + print("错误:未连接到模拟器") + return False - # 移动到指定高度(负值表示上升,因为 AirSim 中 Z 轴向下) - self.client.moveToZAsync(-altitude, 3).join() - time.sleep(1) # 等待稳定 + print(f"起飞到 {altitude} 米高度...") + try: + self.client.takeoffAsync().join(timeout=timeout) + time.sleep(2) + self.client.moveToZAsync(-altitude, 3).join(timeout=timeout) + time.sleep(1) + self.flying = True + print(f"✓ 已在 {altitude} 米高度悬停") + return True + except Exception as e: + print(f"✗ 起飞失败: {e}") + self.flying = False + return False - print(f"已在 {altitude} 米高度悬停") + def capture_park_image(self, camera_name="0", save_dir=None): + """ + 从指定摄像头捕获图像并保存 + :param camera_name: 摄像头名称,默认为前置 "0" + :param save_dir: 保存目录,若为 None 则自动创建带时间戳的目录 + :return: 图像 numpy 数组,失败返回 None + """ + if not self.connected: + print("错误:未连接到模拟器") + return None - def capture_park_image(self): - """从无人机前置摄像头捕获图像并保存为文件""" - print("捕获图像...") + # 设置保存目录 + if save_dir is None: + if self.image_save_dir is None: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + self.image_save_dir = f"captures_{timestamp}" + save_dir = self.image_save_dir + os.makedirs(save_dir, exist_ok=True) - # 请求图像数据(前置摄像头 "0",场景类型,不压缩,不使用像素格式转换) - responses = self.client.simGetImages([ - airsim.ImageRequest( - "0", # 相机名称,0 通常代表前置摄像头 - airsim.ImageType.Scene, # 场景图像(彩色) - False, # 不压缩为 JPEG - False # 不进行像素格式转换 - ) - ]) + try: + responses = self.client.simGetImages([ + airsim.ImageRequest(camera_name, airsim.ImageType.Scene, False, False) + ]) + if not responses: + print("✗ 未收到图像响应") + return None - # 检查是否有图像返回 - if responses and len(responses) > 0: response = responses[0] - - # 将字节数据转换为 numpy 数组 - import numpy as np img1d = np.frombuffer(response.image_data_uint8, dtype=np.uint8) - # 根据图像高度和宽度重塑为三维数组 (H, W, 3) img_rgb = img1d.reshape(response.height, response.width, 3) - # 保存图像(使用 OpenCV,BGR 格式,但这里 RGB 也可以保存) - import cv2 - timestamp = time.strftime("%Y%m%d_%H%M%S") - filename = f"park_capture_{timestamp}.jpg" - cv2.imwrite(filename, img_rgb) - print(f"图像已保存: {filename}") - + # 生成文件名 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3] + filename = os.path.join(save_dir, f"capture_{timestamp}.jpg") + cv2.imwrite(filename, cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)) + print(f"✓ 图像已保存: {filename}") return img_rgb - else: - print("未能捕获图像") + except Exception as e: + print(f"✗ 图像捕获失败: {e}") return None - def explore_park(self): - """执行一个简单的路径探索:依次飞往几个航点,并在每个航点拍照""" - print("开始探索废弃公园...") - - # 定义航点列表 (x, y, z) ,z 为负值表示高度 - waypoints = [ - (20, 0, -10), # 向前(x正方向)20米,保持高度 - (20, 15, -10), # 向右(y正方向)15米 - (0, 15, -12), # 向后20米,同时下降2米 - (0, 0, -10), # 向左15米回到起点,恢复高度 - ] - - for x, y, z in waypoints: - print(f"飞往位置: ({x}, {y}, {z})") - # 以速度 3 m/s 飞往目标点 - self.client.moveToPositionAsync(x, y, z, 3).join() + def explore_park(self, waypoints=None): + """ + 执行路径探索:依次飞往航点并在每个航点拍照 + :param waypoints: 航点列表,每个元素为 (x, y, z) 元组,z 为负值 + 若为 None,则使用默认路径 + """ + if not self.flying: + print("错误:无人机尚未起飞") + return False - # 到达后捕获一张图像 - self.capture_park_image() - time.sleep(1) # 等待一下再飞向下一个点 + if waypoints is None: + waypoints = [ + (20, 0, -10), + (20, 15, -10), + (0, 15, -12), + (0, 0, -10), + ] + print("开始探索废弃公园...") + for idx, (x, y, z) in enumerate(waypoints, 1): + print(f"[{idx}/{len(waypoints)}] 飞往位置: ({x}, {y}, {z})") + try: + self.client.moveToPositionAsync(x, y, z, 3).join() + self.capture_park_image() + time.sleep(1) + except Exception as e: + print(f"✗ 移动失败: {e}") + return False print("探索完成!") + return True + + def land(self): + """降落无人机并清理资源""" + if not self.connected: + return - def cleanup(self): - """清理资源:降落、锁定无人机、禁用 API 控制""" print("正在降落...") - # 降落并等待完成 - self.client.landAsync().join() - # 锁定无人机(上锁) - self.client.armDisarm(False) - # 禁用 API 控制,交还控制权给模拟器 - self.client.enableApiControl(False) - print("无人机已降落") + try: + if self.flying: + self.client.landAsync().join() + self.flying = False + self.client.armDisarm(False) + self.client.enableApiControl(False) + print("✓ 无人机已降落并锁定") + except Exception as e: + print(f"✗ 降落过程中出错: {e}") + def cleanup(self): + """清理资源(同 land,作为别名)""" + self.land() -# 快速测试脚本(当直接运行此文件时执行) -if __name__ == "__main__": - print("=== AbandonedPark无人机测试 ===") - # 1. 确保模拟器已经运行(用户需手动启动) - input("请确保AbandonedPark.exe已运行,然后按回车继续...") +# ---------------------------------------------------------------------- +# 交互式测试脚本 +# ---------------------------------------------------------------------- +def run_test(): + """运行交互式测试""" + print("=== AbandonedPark 无人机测试 ===") + print("请确保 AbandonedPark.exe 已运行并按 F2 切换到无人机模式。") + input("按回车键继续...") - # 2. 连接模拟器 - simulator = AbandonedParkSimulator() + sim = AbandonedParkSimulator(auto_connect=True) + if not sim.connected: + print("无法连接,退出测试。") + return try: - # 3. 切换到无人机模式(解锁) - if simulator.ensure_drone_mode(): - # 4. 起飞至10米高度 - simulator.takeoff_and_hover(10) - - # 5. 捕获初始图像 - simulator.capture_park_image() + while True: + print("\n" + "-" * 40) + print("请选择测试项:") + print("1. 切换无人机模式(解锁)") + print("2. 起飞并悬停") + print("3. 捕获图像") + print("4. 探索公园(默认路径)") + print("5. 降落") + print("0. 退出") + choice = input("请输入数字: ").strip() + + if choice == '1': + sim.ensure_drone_mode() + elif choice == '2': + alt = input("请输入高度(米,默认10): ").strip() + alt = int(alt) if alt.isdigit() else 10 + sim.takeoff_and_hover(alt) + elif choice == '3': + sim.capture_park_image() + elif choice == '4': + if not sim.flying: + print("请先起飞!") + else: + sim.explore_park() + elif choice == '5': + sim.land() + elif choice == '0': + break + else: + print("无效输入,请重新选择。") + except KeyboardInterrupt: + print("\n用户中断") + finally: + sim.cleanup() + print("测试结束。") - # 6. 执行简单探索(飞航点并拍照) - simulator.explore_park() - # 7. 降落并清理 - simulator.cleanup() - except KeyboardInterrupt: - # 捕获 Ctrl+C,安全降落 - print("用户中断") - simulator.cleanup() - except Exception as e: - # 其他异常处理 - print(f"发生错误: {e}") - simulator.cleanup() \ No newline at end of file +if __name__ == "__main__": + run_test() \ No newline at end of file diff --git a/src/Autonomous_Navigation/scripts/test_takeoff_land.py b/src/Autonomous_Navigation/scripts/test_takeoff_land.py new file mode 100644 index 000000000..98e421810 --- /dev/null +++ b/src/Autonomous_Navigation/scripts/test_takeoff_land.py @@ -0,0 +1,49 @@ +# scripts/test_takeoff_land.py +""" +测试无人机的起飞、悬停和降落功能。 +运行前请确保 AirSim 模拟器已启动。 +""" +import airsim +import time +import sys + + +def main(): + print("=" * 50) + print("起飞/降落测试") + print("=" * 50) + client = airsim.MultirotorClient() + client.confirmConnection() + print("✓ 已连接模拟器") + + # 解锁 + client.enableApiControl(True) + client.armDisarm(True) + print("✓ 无人机已解锁") + + # 起飞 + print("起飞至 5 米...") + client.takeoffAsync().join() + time.sleep(2) + client.moveToZAsync(-5, 2).join() + print("✓ 已到达 5 米高度,悬停 3 秒") + time.sleep(3) + + # 降落 + print("降落...") + client.landAsync().join() + print("✓ 降落完成") + + # 锁定 + client.armDisarm(False) + client.enableApiControl(False) + print("测试结束") + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n用户中断") + except Exception as e: + print(f"错误: {e}") \ No newline at end of file