diff --git a/README.md b/README.md index 61df97c..daf2513 100644 --- a/README.md +++ b/README.md @@ -33,8 +33,10 @@ graph LR ### 1.1 常用对话 ``` 连接服务器 -生成行人 设置雨天天气条件 +生成车辆 +开启自动驾驶 +生成行人(默认开启自由移动) ``` diff --git a/llm/main_ai.py b/llm/main_ai.py index 58c217b..c4020d0 100644 --- a/llm/main_ai.py +++ b/llm/main_ai.py @@ -37,6 +37,8 @@ def __init__(self): self.client = None self.world = None self.actors = [] + self.tick_task = None + self.is_ticking = False async def connect(self, host='localhost', port=2000): """连接CARLA服务器""" @@ -50,22 +52,274 @@ async def connect(self, host='localhost', port=2000): app_logger.error(f"❌ 连接CARLA失败: {str(e)}") return False - async def spawn_vehicle(self, vehicle_type='model3'): - """生成车辆""" + async def load_world(self, map_name='Town05'): + """加载指定地图""" + try: + if self.client is None: + app_logger.error("❌ 未连接到CARLA服务器") + return False + self.world = self.client.load_world(map_name) + app_logger.info(f"✅ 地图加载成功: {map_name}") + return True + except Exception as e: + app_logger.error(f"❌ 加载地图失败: {str(e)}") + return False + + async def set_synchronous_mode(self, enabled=True, fixed_delta_seconds=0.05): + """设置同步模式 - 参考tuto_G_pedestrian_navigation.py""" try: - blueprint = self.world.get_blueprint_library().find(f'vehicle.tesla.{vehicle_type}') - spawn_point = self.world.get_map().get_spawn_points()[0] - vehicle = self.world.spawn_actor(blueprint, spawn_point) - self.actors.append(vehicle) - app_logger.info(f"🚗 生成车辆: {vehicle_type}") + if self.world is None: + app_logger.error("❌ 未连接到CARLA服务器") + return False + + settings = self.world.get_settings() + settings.synchronous_mode = enabled + if enabled: + settings.fixed_delta_seconds = fixed_delta_seconds + else: + settings.fixed_delta_seconds = None + self.world.apply_settings(settings) + + if enabled: + app_logger.info(f"✅ 同步模式已启用,固定时间步长: {fixed_delta_seconds}s") + else: + app_logger.info("✅ 同步模式已禁用") + return True + except Exception as e: + app_logger.error(f"❌ 设置同步模式失败: {str(e)}") + return False - return vehicle + async def start_tick_loop(self): + """启动后台tick循环,确保世界持续运行""" + import asyncio + if self.is_ticking: + app_logger.info("⚠️ tick循环已在运行") + return + + # 先启用同步模式 + await self.set_synchronous_mode(True, 0.05) + + self.is_ticking = True + app_logger.info("🔄 启动后台tick循环") + + async def tick_loop(): + while self.is_ticking and self.world: + try: + self.world.tick() + await asyncio.sleep(0.05) # 20 FPS + except Exception as e: + app_logger.warning(f"⚠️ tick时出错: {e}") + await asyncio.sleep(0.1) + + self.tick_task = asyncio.create_task(tick_loop()) + + async def stop_tick_loop(self): + """停止后台tick循环""" + self.is_ticking = False + if self.tick_task: + self.tick_task.cancel() + try: + await self.tick_task + except asyncio.CancelledError: + pass + self.tick_task = None + + # 禁用同步模式 + await self.set_synchronous_mode(False) + + app_logger.info("🛑 停止后台tick循环") + + async def spawn_vehicles(self, vehicle_type='model3', count=1): + """生成多辆车辆,返回生成的车辆列表和最后一辆车 + + 支持的车辆类型: + - tesla: model3 + - audi: a2, etron, tt + - bmw: grandtourer, i8, mini + - chevrolet: impala + - citroen: c3 + - dodge: charger_police, charger2020 + - ford: mustang, crown + - jeep: wrangler_rubicon + - lincoln: mkz_2017, mkz_2020 + - mercedes: benz_coupe, cabrio, ccc + - mini: cooper_s + - nissan: micra, patrol + - seat: leon + - volkswagen: t2, t3 + + 数据量支持: 取决于地图生成点数量,通常支持10-100+辆车 + """ + # 检查是否已连接到CARLA服务器 + if self.world is None: + app_logger.error("❌ 未连接到CARLA服务器,请先调用connect_carla") + return [] + + try: + # 车辆类型到蓝图路径的映射 + vehicle_blueprints = { + # Tesla + 'model3': 'vehicle.tesla.model3', + # Audi + 'a2': 'vehicle.audi.a2', + 'etron': 'vehicle.audi.etron', + 'tt': 'vehicle.audi.tt', + # BMW + 'grandtourer': 'vehicle.bmw.grandtourer', + 'i8': 'vehicle.bmw.i8', + 'mini': 'vehicle.bmw.mini', + # Chevrolet + 'impala': 'vehicle.chevrolet.impala', + # Citroen + 'c3': 'vehicle.citroen.c3', + # Dodge + 'charger_police': 'vehicle.dodge.charger_police', + 'charger2020': 'vehicle.dodge.charger2020', + # Ford + 'mustang': 'vehicle.ford.mustang', + 'crown': 'vehicle.ford.crown', + # Jeep + 'wrangler_rubicon': 'vehicle.jeep.wrangler_rubicon', + # Lincoln + 'mkz_2017': 'vehicle.lincoln.mkz_2017', + 'mkz_2020': 'vehicle.lincoln.mkz_2020', + # Mercedes + 'benz_coupe': 'vehicle.mercedes.benz_coupe', + 'cabrio': 'vehicle.mercedes.cabrio', + 'ccc': 'vehicle.mercedes.ccc', + # Mini + 'cooper_s': 'vehicle.mini.cooper_s', + # Nissan + 'micra': 'vehicle.nissan.micra', + 'patrol': 'vehicle.nissan.patrol', + # Seat + 'leon': 'vehicle.seat.leon', + # Volkswagen + 't2': 'vehicle.volkswagen.t2', + 't3': 'vehicle.volkswagen.t3', + } + + # 获取蓝图路径 + blueprint_path = vehicle_blueprints.get(vehicle_type.lower(), f'vehicle.tesla.{vehicle_type}') + + # 获取可用生成点 + spawn_points = self.world.get_map().get_spawn_points() + available_points = len(spawn_points) + + # 限制生成数量不超过可用生成点 + actual_count = min(count, available_points) + if count > available_points: + app_logger.warning(f"⚠️ 请求生成{count}辆车,但地图只有{available_points}个生成点,将生成{actual_count}辆") + + blueprint_library = self.world.get_blueprint_library() + + spawned_vehicles = [] + for i in range(actual_count): + try: + # 尝试查找指定蓝图,如果不存在则使用随机车辆蓝图 + blueprint = blueprint_library.find(blueprint_path) + if blueprint is None: + # 如果指定蓝图不存在,使用随机车辆蓝图 + blueprints = [bp for bp in blueprint_library.filter('vehicle.*') if bp.id.startswith('vehicle.')] + blueprint = blueprints[i % len(blueprints)] if blueprints else None + + if blueprint is None: + app_logger.error(f"❌ 无法找到车辆蓝图") + continue + + # 设置车辆颜色 - 修复颜色格式问题 + if blueprint.has_attribute('color'): + # 生成随机的RGB颜色值 (0-255范围) + import random + r = random.randint(0, 255) + g = random.randint(0, 255) + b = random.randint(0, 255) + color_str = f"{r},{g},{b}" + blueprint.set_attribute('color', color_str) + app_logger.info(f"🎨 设置车辆颜色: RGB({color_str})") + + # 设置role_name属性避免错误 + if blueprint.has_attribute('role_name'): + blueprint.set_attribute('role_name', 'autopilot') + + # 尝试多个位置生成车辆 + spawn_success = False + for attempt in range(10): # 尝试10个位置 + try: + # 使用随机道路位置 + spawn_location = self.world.get_random_location_from_navigation() + if not spawn_location: + # 如果无法获取随机道路位置,使用固定生成点 + import random + spawn_point = random.choice(spawn_points) + spawn_location = spawn_point.location + + # 为避免碰撞,对位置进行更大范围的随机偏移 + import random + offset_x = random.uniform(-3.0, 3.0) + offset_y = random.uniform(-3.0, 3.0) + offset_z = 0.1 # 稍微抬高一点,避免地面碰撞 + + # 创建新的变换,添加偏移 + from carla import Transform, Location + new_location = Location( + x=spawn_location.x + offset_x, + y=spawn_location.y + offset_y, + z=spawn_location.z + offset_z + ) + + # 随机旋转角度 + from carla import Rotation + random_yaw = random.uniform(0, 360) + new_rotation = Rotation(yaw=random_yaw) + new_transform = Transform(new_location, new_rotation) + + app_logger.info(f"📍 尝试生成车辆,位置: {new_location}, 朝向: {random_yaw:.1f}°") + + # 尝试生成车辆 + vehicle = self.world.try_spawn_actor(blueprint, new_transform) + if vehicle: + self.actors.append(vehicle) + spawned_vehicles.append(vehicle) + app_logger.info(f"🚗 生成第{i+1}辆车: {blueprint.id} (ID: {vehicle.id})") + + # 每生成一辆车就将镜头对准它 + self.set_spectator_view(vehicle) + spawn_success = True + break + else: + app_logger.warning(f"⚠️ 尝试 {attempt+1}/10: 生成车辆失败,位置可能被占用") + + except Exception as loc_error: + app_logger.error(f"❌ 位置生成时出错: {loc_error}") + continue + + if not spawn_success: + app_logger.error(f"❌ 生成第{i+1}辆车失败,已尝试10个位置") + + except Exception as e: + app_logger.error(f"❌ 生成第{i+1}辆车时出错: {str(e)}") + continue + + app_logger.info(f"✅ 共生成{len(spawned_vehicles)}辆车") + return spawned_vehicles + except Exception as e: app_logger.error(f"❌ 生成车辆失败: {str(e)}") - return None + return [] + + async def spawn_vehicle(self, vehicle_type='model3'): + """生成单辆车辆(兼容旧接口)""" + vehicles = await self.spawn_vehicles(vehicle_type, count=1) + return vehicles[0] if vehicles else None async def set_weather(self, weather_type='clear'): """设置天气""" + # 检查是否已连接到CARLA服务器 + if self.world is None: + app_logger.error("❌ 未连接到CARLA服务器,请先调用connect_carla") + return False + weather_presets = { 'clear': carla.WeatherParameters( cloudiness=0, precipitation=0, precipitation_deposits=0, @@ -90,19 +344,163 @@ async def set_weather(self, weather_type='clear'): async def get_traffic_lights(self): """获取交通灯状态""" + # 检查是否已连接到CARLA服务器 + if self.world is None: + app_logger.error("❌ 未连接到CARLA服务器,请先调用connect_carla") + return [] + lights = [light for light in self.world.get_actors() if 'traffic_light' in light.type_id] return lights[:5] # 只返回前5个 - async def spawn_pedestrian(self, pedestrian_type='walker'): - """生成行人""" + async def spawn_pedestrians(self, pedestrian_type='pedestrian', count=1): + """生成多个行人,返回生成的行人列表和最后一个行人 + + 支持的行人类型: + - pedestrian: 普通行人 + - elderly: 老年人 + - child: 儿童 + - construction_worker: 建筑工人 + - police: 警察 + - business: 商务人士 + - jogger: 慢跑者 + + 数据量支持: 取决于地图大小,通常支持10-100+个行人 + """ + # 检查是否已连接到CARLA服务器 + if self.world is None: + app_logger.error("❌ 未连接到CARLA服务器,请先调用connect_carla") + return [] + try: - subprocess.run(["python ", "tuto_G_pedestrian_navigation.py"], check=True) + # 检查地图是否支持行人导航 + spawn_location = self.world.get_random_location_from_navigation() + if spawn_location: + app_logger.info(f"✅ 地图支持行人导航,测试位置: {spawn_location}") + else: + app_logger.warning("⚠️ 警告: 地图可能不支持行人导航,get_random_location_from_navigation()返回None") + app_logger.warning("⚠️ 建议: 尝试加载Town05地图(client.load_world('Town05'))") + + # 获取行人蓝图 - 参考tuto_G_pedestrian_navigation.py + pedestrian_blueprints = self.world.get_blueprint_library().filter('*walker.pedestrian*') + if not pedestrian_blueprints: + # 尝试其他过滤模式 + pedestrian_blueprints = self.world.get_blueprint_library().filter('*walker*') + + if not pedestrian_blueprints: + app_logger.error("❌ 无法找到行人蓝图") + return [] + + # 获取控制器蓝图 + controller_bp = self.world.get_blueprint_library().find('controller.ai.walker') + if not controller_bp: + app_logger.error("❌ 无法找到行人控制器蓝图") + return [] + + spawned_pedestrians = [] + + for i in range(count): + try: + # 随机选择一个行人蓝图 + import random + pedestrian_bp = random.choice(pedestrian_blueprints) + app_logger.info(f"📋 尝试生成行人,蓝图: {pedestrian_bp.id}") + + # 尝试多个位置生成行人 + spawn_success = False + for attempt in range(3): # 尝试3次 + try: + # 随机生成位置 + spawn_location = self.world.get_random_location_from_navigation() + if not spawn_location: + # 如果无法获取随机位置,使用默认位置 + spawn_location = carla.Location(x=-134 + i*2, y=78.1, z=1.18) + + spawn_transform = carla.Transform(spawn_location) + app_logger.info(f"📍 尝试在位置生成: {spawn_location}") + + # 生成行人 + pedestrian = self.world.try_spawn_actor(pedestrian_bp, spawn_transform) + if pedestrian: + app_logger.info(f"✅ 行人生成成功: {pedestrian.id}") + + # 为行人设置AI控制器 - 参考tuto_G_pedestrian_navigation.py + try: + # 使用行人的变换作为控制器的生成位置 + controller = self.world.spawn_actor(controller_bp, pedestrian.get_transform(), pedestrian) + if controller: + app_logger.info(f"✅ 控制器生成成功: {controller.id}") + # 启动控制器并给它一个随机位置 + controller.start() + controller.go_to_location(self.world.get_random_location_from_navigation()) + app_logger.info(f"🎯 为行人设置随机目标位置") + + # 存储控制器和行人的关联关系 + self.actors.append(pedestrian) + self.actors.append(controller) + spawned_pedestrians.append(pedestrian) + app_logger.info(f"🚶 生成第{i+1}个行人: {pedestrian_bp.id} (ID: {pedestrian.id})") + + # 将世界移动几帧,让行人生成 - 参考tuto_G_pedestrian_navigation.py + for frame in range(0, 5): + try: + self.world.tick() + except Exception as tick_error: + app_logger.warning(f"⚠️ 推进世界时出错: {tick_error}") + continue + + # 每生成一个行人就将镜头对准它 + self.set_spectator_view(pedestrian) + spawn_success = True + break + else: + # 如果控制器生成失败,销毁行人 + if pedestrian.is_alive: + pedestrian.destroy() + app_logger.error(f"❌ 为第{i+1}个行人创建控制器失败") + except Exception as ctrl_error: + # 如果控制器生成失败,销毁行人 + if pedestrian.is_alive: + pedestrian.destroy() + app_logger.error(f"❌ 控制器生成异常: {ctrl_error}") + else: + app_logger.warning(f"⚠️ 尝试 {attempt+1}/3: 生成行人失败,位置可能被占用") + + except Exception as loc_error: + app_logger.error(f"❌ 位置生成时出错: {loc_error}") + continue + + if not spawn_success: + app_logger.error(f"❌ 生成第{i+1}个行人失败,已尝试3个位置") + + except Exception as e: + app_logger.error(f"❌ 生成第{i+1}个行人时出错: {str(e)}") + continue + + app_logger.info(f"✅ 共生成{len(spawned_pedestrians)}个行人") + + # 启动后台tick循环,确保行人持续移动 + if spawned_pedestrians and not self.is_ticking: + await self.start_tick_loop() + app_logger.info("🔄 已启动后台tick循环,行人将开始移动") + + return spawned_pedestrians + except Exception as e: app_logger.error(f"❌ 生成行人失败: {str(e)}") - return None + return [] + + async def spawn_pedestrian(self, pedestrian_type='walker'): + """生成单个行人(兼容旧接口)""" + pedestrians = await self.spawn_pedestrians(pedestrian_type, count=1) + return pedestrians[0] if pedestrians else None def set_spectator_view(self, target_actor): """将视角对准目标actor""" + # 检查是否已连接到CARLA服务器 + if self.world is None: + app_logger.error("❌ 未连接到CARLA服务器,无法设置视角") + return False + try: spectator = self.world.get_spectator() target_transform = target_actor.get_transform() @@ -131,8 +529,97 @@ def set_spectator_view(self, target_actor): app_logger.error(f"❌ 设置视角失败: {str(e)}") return False + async def setup_autopilot(self, enable=True, radius=0.0): + """设置车辆自动驾驶模式 + + Args: + enable: 是否启用自动驾驶 + radius: 自动驾驶范围半径(米),0表示全图 + """ + # 检查是否已连接到CARLA服务器 + if self.world is None: + app_logger.error("❌ 未连接到CARLA服务器,请先调用connect_carla") + return False + + try: + # 获取所有车辆 + vehicles = [actor for actor in self.world.get_actors() if 'vehicle' in actor.type_id] + + enabled_count = 0 + for vehicle in vehicles: + # 检查车辆是否在指定范围内 + if radius > 0: + # 以当前 spectator 位置为中心 + spectator = self.world.get_spectator() + spectator_location = spectator.get_location() + vehicle_location = vehicle.get_location() + distance = spectator_location.distance(vehicle_location) + + if distance > radius: + continue + + # 启用或禁用自动驾驶 + vehicle.set_autopilot(enable) + enabled_count += 1 + app_logger.info(f"🚗 车辆 {vehicle.id} 自动驾驶已{'启用' if enable else '禁用'}") + + app_logger.info(f"✅ {'启用' if enable else '禁用'}了 {enabled_count} 辆车的自动驾驶") + return True + + except Exception as e: + app_logger.error(f"❌ 设置自动驾驶失败: {str(e)}") + return False + + async def setup_pedestrian_movement(self, enable=True, radius=0.0): + """设置行人自动移动 + + Args: + enable: 是否启用行人移动 + radius: 移动范围半径(米),0表示全图 + """ + # 检查是否已连接到CARLA服务器 + if self.world is None: + app_logger.error("❌ 未连接到CARLA服务器,请先调用connect_carla") + return False + + try: + # 获取所有行人控制器 - 参考官方文档 + controllers = [actor for actor in self.world.get_actors() if 'controller.ai.walker' in actor.type_id] + + updated_count = 0 + for controller in controllers: + try: + if enable: + # 启用控制器并设置随机目标位置 + controller.start() + target_location = self.world.get_random_location_from_navigation() + if target_location: + controller.go_to_location(target_location) + app_logger.info(f"🚶 行人控制器 {controller.id} 已启用并设置目标: {target_location}") + updated_count += 1 + else: + app_logger.warning(f"⚠️ 无法获取随机目标位置") + else: + # 禁用控制器 + controller.stop() + app_logger.info(f"🚶 行人控制器 {controller.id} 已禁用") + updated_count += 1 + except Exception as ctrl_error: + app_logger.error(f"❌ 操作控制器 {controller.id} 时出错: {ctrl_error}") + continue + + app_logger.info(f"✅ {'启用' if enable else '禁用'}了 {updated_count} 个行人控制器") + return True + + except Exception as e: + app_logger.error(f"❌ 设置行人移动失败: {str(e)}") + return False + async def cleanup(self): """清理环境""" + # 停止后台tick循环 + await self.stop_tick_loop() + for actor in self.actors: if actor.is_alive: actor.destroy() @@ -149,16 +636,28 @@ async def connect_carla_impl(host: str = 'localhost', port: int = 2000) -> str: return "✅ CARLA服务器连接成功" if success else "❌ 连接CARLA服务器失败" -async def spawn_vehicle_impl(query: str, **kwargs) -> str: +async def spawn_vehicle_impl(query: str, count: int = 1, **kwargs) -> str: """(实际功能:生成车辆)""" - vehicle = await carla_client.spawn_vehicle(query) - if vehicle: - return f"✅ 已生成车辆: {query} (ID: {vehicle.id})" - return "❌ 车辆生成失败" + # 检查是否已连接到CARLA服务器 + if carla_client.world is None: + return "❌ 未连接到CARLA服务器,请先使用'连接CARLA服务器'命令进行连接" + + vehicles = await carla_client.spawn_vehicles(query, count=count) + if vehicles: + if len(vehicles) == 1: + return f"✅ 已生成1辆{query}车辆 (ID: {vehicles[0].id})" + else: + last_vehicle = vehicles[-1] + return f"✅ 已生成{len(vehicles)}辆{query}车辆,最后一辆车ID: {last_vehicle.id}" + return "❌ 车辆生成失败,请确保CARLA服务器已连接且地图有可用生成点" async def set_weather_impl(owner: str, repo: str) -> str: """(实际功能:设置天气)""" + # 检查是否已连接到CARLA服务器 + if carla_client.world is None: + return "❌ 未连接到CARLA服务器,请先使用'连接CARLA服务器'命令进行连接" + weather_types = {'clear': '晴天', 'rain': '雨天', 'fog': '雾天'} success = await carla_client.set_weather(repo.lower()) return f"✅ 天气已设置为 {weather_types.get(repo.lower(), repo)}" if success else "❌ 不支持的天气类型" @@ -166,7 +665,14 @@ async def set_weather_impl(owner: str, repo: str) -> str: async def get_traffic_lights_impl(query: str, **kwargs) -> str: """(实际功能:获取交通灯信息)""" + # 检查是否已连接到CARLA服务器 + if carla_client.world is None: + return "❌ 未连接到CARLA服务器,请先使用'连接CARLA服务器'命令进行连接" + lights = await carla_client.get_traffic_lights() + if not lights: + return "🚦 未找到交通灯或无法获取交通灯信息" + result = ["🚦 交通灯状态:"] for i, light in enumerate(lights, 1): state = "绿色" if light.state == carla.TrafficLightState.Green else \ @@ -182,12 +688,44 @@ async def cleanup_scene_impl(**kwargs) -> str: return "✅ 已清理所有车辆和物体" -async def spawn_pedestrian_impl(query: str, **kwargs) -> str: +async def spawn_pedestrian_impl(query: str, count: int = 1, **kwargs) -> str: """(实际功能:生成行人)""" - pedestrian = await carla_client.spawn_pedestrian(query) - if pedestrian: - return f"✅ 已生成行人: {query} (ID: {pedestrian.id})" - return "❌ 行人生成失败" + # 检查是否已连接到CARLA服务器 + if carla_client.world is None: + return "❌ 未连接到CARLA服务器,请先使用'连接CARLA服务器'命令进行连接" + + pedestrians = await carla_client.spawn_pedestrians(query, count=count) + if pedestrians: + if len(pedestrians) == 1: + return f"✅ 已生成1个{query}行人 (ID: {pedestrians[0].id})" + else: + last_pedestrian = pedestrians[-1] + return f"✅ 已生成{len(pedestrians)}个{query}行人,最后一个行人ID: {last_pedestrian.id}" + return "❌ 行人生成失败,请确保CARLA服务器已连接且地图有可用导航点" + + +async def setup_autopilot_impl(enable: bool = True, radius: float = 0.0, **kwargs) -> str: + """(实际功能:设置车辆自动驾驶)""" + # 检查是否已连接到CARLA服务器 + if carla_client.world is None: + return "❌ 未连接到CARLA服务器,请先使用'连接CARLA服务器'命令进行连接" + + success = await carla_client.setup_autopilot(enable, radius) + if success: + return f"✅ 车辆自动驾驶已{'启用' if enable else '禁用'}" + return "❌ 设置自动驾驶失败" + + +async def setup_pedestrian_movement_impl(enable: bool = True, radius: float = 0.0, **kwargs) -> str: + """(实际功能:设置行人自动移动)""" + # 检查是否已连接到CARLA服务器 + if carla_client.world is None: + return "❌ 未连接到CARLA服务器,请先使用'连接CARLA服务器'命令进行连接" + + success = await carla_client.setup_pedestrian_movement(enable, radius) + if success: + return f"✅ 行人自动移动已{'启用' if enable else '禁用'}" + return "❌ 设置行人移动失败" # ============ FastMCP 工具装饰器版本 ============ @@ -199,10 +737,9 @@ async def connect_carla(host: str = 'localhost', port: int = 2000) -> str: @mcp.tool() -async def spawn_vehicle(query: str, language: Optional[str] = None, - sort: str = "stars", limit: int = 8) -> str: +async def spawn_vehicle(query: str, count: int = 1) -> str: """(实际功能:生成车辆)""" - return await spawn_vehicle_impl(query) + return await spawn_vehicle_impl(query, count=count) @mcp.tool() @@ -224,9 +761,21 @@ async def cleanup_scene(language: Optional[str] = None, period: str = "daily") - @mcp.tool() -async def spawn_pedestrian(query: str, user_type: Optional[str] = None) -> str: +async def spawn_pedestrian(query: str, count: int = 1) -> str: """(实际功能:生成行人)""" - return await spawn_pedestrian_impl(query) + return await spawn_pedestrian_impl(query, count=count) + + +@mcp.tool() +async def setup_autopilot(enable: bool = True, radius: float = 0.0) -> str: + """(实际功能:设置车辆自动驾驶)""" + return await setup_autopilot_impl(enable, radius=radius) + + +@mcp.tool() +async def setup_pedestrian_movement(enable: bool = True, radius: float = 0.0) -> str: + """(实际功能:设置行人自动移动)""" + return await setup_pedestrian_movement_impl(enable, radius=radius) @@ -257,11 +806,12 @@ def __init__(self): "type": "function", "function": { "name": "spawn_vehicle", - "description": "生成指定类型的车辆(如model3, a2等)", + "description": "生成指定类型和数量的车辆。支持类型: model3(Tesla), a2/etron/tt(Audi), grandtourer/i8/mini(BMW), impala(Chevrolet), c3(Citroen), charger_police/charger2020(Dodge), mustang/crown(Ford), wrangler_rubicon(Jeep), mkz_2017/mkz_2020(Lincoln), benz_coupe/cabrio/ccc(Mercedes), cooper_s(Mini), micra/patrol(Nissan), leon(Seat), t2/t3(Volkswagen)。数据量: 取决于地图生成点数量,通常支持10-100+辆车", "parameters": { "type": "object", "properties": { - "query": {"type": "string", "description": "车辆型号", "enum": ["model3", "a2", "mustang"]} + "query": {"type": "string", "description": "车辆型号,如model3, mustang, a2等", "enum": ["model3", "a2", "etron", "tt", "grandtourer", "i8", "mini", "impala", "c3", "charger_police", "charger2020", "mustang", "crown", "wrangler_rubicon", "mkz_2017", "mkz_2020", "benz_coupe", "cabrio", "ccc", "cooper_s", "micra", "patrol", "leon", "t2", "t3"]}, + "count": {"type": "integer", "description": "生成车辆数量,默认为1", "default": 1} }, "required": ["query"] } @@ -311,16 +861,47 @@ def __init__(self): "type": "function", "function": { "name": "spawn_pedestrian", - "description": "生成行人", + "description": "生成指定类型和数量的行人。支持类型: pedestrian(普通行人), elderly(老年人), child(儿童), construction_worker(建筑工人), police(警察), business(商务人士), jogger(慢跑者)。数据量: 取决于地图大小,通常支持10-100+个行人", "parameters": { "type": "object", "properties": { - "query": {"type": "string", "description": "行人类型,默认为walker"} + "query": {"type": "string", "description": "行人类型,如pedestrian, elderly, child等", "enum": ["pedestrian", "elderly", "child", "construction_worker", "police", "business", "jogger"]}, + "count": {"type": "integer", "description": "生成行人数量,默认为1", "default": 1} }, "required": ["query"] } } }, + { + "type": "function", + "function": { + "name": "setup_autopilot", + "description": "设置车辆自动驾驶模式,可指定范围半径", + "parameters": { + "type": "object", + "properties": { + "enable": {"type": "boolean", "description": "是否启用自动驾驶,默认为true", "default": True}, + "radius": {"type": "number", "description": "自动驾驶范围半径(米),0表示全图,默认为0", "default": 0.0} + }, + "required": [] + } + } + }, + { + "type": "function", + "function": { + "name": "setup_pedestrian_movement", + "description": "设置行人自动移动,可指定范围半径", + "parameters": { + "type": "object", + "properties": { + "enable": {"type": "boolean", "description": "是否启用行人移动,默认为true", "default": True}, + "radius": {"type": "number", "description": "移动范围半径(米),0表示全图,默认为0", "default": 0.0} + }, + "required": [] + } + } + }, ] def process_markdown(self, text): @@ -392,9 +973,7 @@ async def execute_fastmcp_tool_call(self, tool_call): app_logger.info(f"spawn_vehicle参数详情: {arguments}") result = await spawn_vehicle_impl( query=arguments["query"], - language=arguments.get("language"), - sort=arguments.get("sort", "stars"), - limit=arguments.get("limit", 8) + count=arguments.get("count", 1) ) return { "success": True, @@ -434,7 +1013,25 @@ async def execute_fastmcp_tool_call(self, tool_call): elif function_name == "spawn_pedestrian": result = await spawn_pedestrian_impl( query=arguments["query"], - user_type=arguments.get("user_type") + count=arguments.get("count", 1) + ) + return { + "success": True, + "data": result + } + elif function_name == "setup_autopilot": + result = await setup_autopilot_impl( + enable=arguments.get("enable", True), + radius=arguments.get("radius", 50.0) + ) + return { + "success": True, + "data": result + } + elif function_name == "setup_pedestrian_movement": + result = await setup_pedestrian_movement_impl( + enable=arguments.get("enable", True), + radius=arguments.get("radius", 50.0) ) return { "success": True, @@ -494,8 +1091,136 @@ async def execute_fastmcp_tool_call(self, tool_call): "error": str(e) } + # 定义车辆和行人的类型信息 + VEHICLE_TYPES = { + "model3": "Tesla Model 3", + "a2": "Audi A2", + "etron": "Audi e-tron", + "tt": "Audi TT", + "grandtourer": "BMW Grand Tourer", + "i8": "BMW i8", + "mini": "BMW Mini", + "impala": "Chevrolet Impala", + "c3": "Citroen C3", + "charger_police": "Dodge Charger Police", + "charger2020": "Dodge Charger 2020", + "mustang": "Ford Mustang", + "crown": "Ford Crown", + "wrangler_rubicon": "Jeep Wrangler Rubicon", + "mkz_2017": "Lincoln MKZ 2017", + "mkz_2020": "Lincoln MKZ 2020", + "benz_coupe": "Mercedes-Benz Coupe", + "cabrio": "Mercedes-Benz Cabrio", + "ccc": "Mercedes-Benz CCC", + "cooper_s": "Mini Cooper S", + "micra": "Nissan Micra", + "patrol": "Nissan Patrol", + "leon": "Seat Leon", + "t2": "Volkswagen T2", + "t3": "Volkswagen T3" + } + + PEDESTRIAN_TYPES = { + "pedestrian": "普通行人", + "elderly": "老年人", + "child": "儿童", + "construction_worker": "建筑工人", + "police": "警察", + "business": "商务人士", + "jogger": "慢跑者" + } + + def _check_spawn_intent(self, message): + """检测用户是否有生成车辆或行人的意图,但缺少必要参数 + + 只有当缺少类型时才询问,数量默认为1,不询问 + """ + message = message.lower() + + # 车辆相关关键词 + vehicle_keywords = ['车', '车辆', '汽车', '生成车', '创建车', '来车', '加车', '添加车辆'] + # 行人相关关键词 + pedestrian_keywords = ['行人', '人', '生成行人', '创建行人', '来人', '加人', '添加行人', '路人'] + + # 数量相关模式 + import re + has_count = bool(re.search(r'\d+\s*[辆个]', message)) + + # 检测车辆类型 + has_vehicle_type = any(vtype in message for vtype in self.VEHICLE_TYPES.keys()) + # 检测行人类型 + has_pedestrian_type = any(ptype in message for ptype in self.PEDESTRIAN_TYPES.keys()) + + # 检查是否是模糊的车辆生成请求 + is_vehicle_request = any(kw in message for kw in vehicle_keywords) + # 检查是否是模糊的行人生成请求 + is_pedestrian_request = any(kw in message for kw in pedestrian_keywords) + + result = { + 'needs_vehicle_type': False, + 'needs_vehicle_count': False, + 'needs_pedestrian_type': False, + 'needs_pedestrian_count': False, + 'is_ambiguous': False + } + + # 如果是车辆请求但没有指定类型,需要询问 + if is_vehicle_request and not has_vehicle_type: + result['needs_vehicle_type'] = True + result['needs_vehicle_count'] = not has_count # 记录是否也缺少数量 + result['is_ambiguous'] = True + + # 如果是行人请求但没有指定类型,需要询问 + if is_pedestrian_request and not has_pedestrian_type: + result['needs_pedestrian_type'] = True + result['needs_pedestrian_count'] = not has_count # 记录是否也缺少数量 + result['is_ambiguous'] = True + + return result + + def _generate_spawn_prompt(self, check_result): + """生成参数询问提示""" + prompt_parts = [] + + if check_result['needs_vehicle_type']: + vehicle_list = "\n".join([f" • {key} - {name}" for key, name in self.VEHICLE_TYPES.items()]) + prompt_parts.append(f"🚗 **可用车辆类型:**\n{vehicle_list}") + + if check_result['needs_vehicle_count']: + prompt_parts.append("🚗 **车辆数量:** 支持生成 1-100+ 辆车(取决于地图可用生成点数量)") + + if check_result['needs_pedestrian_type']: + pedestrian_list = "\n".join([f" • {key} - {name}" for key, name in self.PEDESTRIAN_TYPES.items()]) + prompt_parts.append(f"🚶 **可用行人类型:**\n{pedestrian_list}") + + if check_result['needs_pedestrian_count']: + prompt_parts.append("🚶 **行人数量:** 支持生成 1-100+ 个行人(取决于地图大小)") + + if prompt_parts: + prompt_parts.insert(0, "请提供以下信息以完成生成:\n") + prompt_parts.append("\n💡 **示例指令:**") + if check_result['needs_vehicle_type'] or check_result['needs_vehicle_count']: + prompt_parts.append(' • "生成5辆model3"') + prompt_parts.append(' • "来10辆mustang"') + if check_result['needs_pedestrian_type'] or check_result['needs_pedestrian_count']: + prompt_parts.append(' • "生成3个police"') + prompt_parts.append(' • "来5个pedestrian"') + + return "\n\n".join(prompt_parts) + async def chat(self, user_message): """处理聊天请求 - 使用FastMCP工具的AI对话""" + + # 检查是否有生成意图但缺少参数 + spawn_check = self._check_spawn_intent(user_message) + if spawn_check['is_ambiguous']: + prompt = self._generate_spawn_prompt(spawn_check) + return { + "message": self.process_markdown(prompt), + "tool_calls": None, + "conversation": [{"role": "user", "content": user_message}] + } + # 初始消息 messages = [ { @@ -505,20 +1230,31 @@ async def chat(self, user_message): CARLA仿真功能: 5. connect_carla - 连接CARLA服务器(默认localhost:2000) -6. spawn_vehicle - 生成车辆(model3/a2/mustang) -7. spawn_pedestrian - 生成行人 -8. set_weather - 设置天气(clear/rain/fog) -9. get_traffic_lights - 查看交通灯状态 -10. cleanup_scene - 清理仿真场景 +6. spawn_vehicle - 生成车辆,支持参数:query(车型), count(数量)。支持车型:model3(Tesla), a2/etron/tt(Audi), grandtourer/i8/mini(BMW), impala(Chevrolet), c3(Citroen), charger_police/charger2020(Dodge), mustang/crown(Ford), wrangler_rubicon(Jeep), mkz_2017/mkz_2020(Lincoln), benz_coupe/cabrio/ccc(Mercedes), cooper_s(Mini), micra/patrol(Nissan), leon(Seat), t2/t3(Volkswagen) +7. spawn_pedestrian - 生成行人,支持参数:query(类型), count(数量)。支持类型:pedestrian(普通行人), elderly(老年人), child(儿童), construction_worker(建筑工人), police(警察), business(商务人士), jogger(慢跑者) +8. setup_autopilot - 设置车辆自动驾驶,支持参数:enable(是否启用), radius(范围半径) +9. setup_pedestrian_movement - 设置行人自动移动,支持参数:enable(是否启用), radius(范围半径) +10. set_weather - 设置天气(clear/rain/fog) +11. get_traffic_lights - 查看交通灯状态 +12. cleanup_scene - 清理仿真场景 CARLA相关: -- 如果用户提到"车辆"、"生成"、"创建汽车"等,使用spawn_vehicle -- 如果用户提到"行人"、"生成行人"、"创建行人"等,使用spawn_pedestrian -- 如果用户提到"天气"、"下雨"、"晴天"、"雾天"等,使用set_weather -- 如果用户提到"交通灯"、"信号灯"、"红绿灯"等,使用get_traffic_lights -- 如果用户提到"清理"、"重置"、"清除场景"等,使用cleanup_scene -- 如果用户明确要连接仿真器,使用connect_carla +- 当用户提到"连接"、"服务器"、"CARLA"等明确要求连接时,使用connect_carla +- 当用户提到"车辆"、"生成"、"创建汽车"、"车"等,使用spawn_vehicle,count参数默认为1 +- 当用户提到"多辆车"、"生成X辆车"、"几辆车"、指定数量(如5辆、10辆),必须设置count参数为对应数字 +- 当用户提到"行人"、"生成行人"、"创建行人"、"人"等,直接使用spawn_pedestrian,count参数默认为1 +- 当用户提到"多个行人"、"生成X个行人"、"几个行人"、指定数量(如5个、10个),必须设置count参数为对应数字 +- 当用户提到"自动驾驶"、"车辆运行"、"车自己开"等,使用setup_autopilot +- 当用户提到"行人移动"、"行人走路"、"行人运行"等,使用setup_pedestrian_movement +- 当用户提到"天气"、"下雨"、"晴天"、"雾天"等,使用set_weather +- 当用户提到"交通灯"、"信号灯"、"红绿灯"等,使用get_traffic_lights +- 当用户提到"清理"、"重置"、"清除场景"等,使用cleanup_scene + +重要规则: +- 如果用户已经连接过CARLA服务器,不要再重复调用connect_carla +- 当用户明确要求生成行人或车辆时,直接调用对应的生成工具,不要先调用connect_carla +- 只有当用户明确要求连接服务器时,才调用connect_carla 通用策略: - 首先判断用户意图是GitHub相关还是CARLA仿真相关 @@ -531,12 +1267,34 @@ async def chat(self, user_message): 用户指令示例: - "连接carla服务器" -> connect_carla(host="localhost", port=2000) -- "生成一辆model3" -> spawn_vehicle(vehicle_type="model3") -- "生成行人" -> spawn_pedestrian(pedestrian_type="walker") +- "生成一辆model3" -> spawn_vehicle(query="model3", count=1) +- "生成5辆mustang" -> spawn_vehicle(query="mustang", count=5) +- "生成10辆车" -> spawn_vehicle(query="model3", count=10) +- "给我来3辆奥迪a2" -> spawn_vehicle(query="a2", count=3) +- "创建20辆车" -> spawn_vehicle(query="model3", count=20) +- "生成一个行人" -> spawn_pedestrian(query="pedestrian", count=1) +- "生成5个行人" -> spawn_pedestrian(query="pedestrian", count=5) +- "生成3个老年人" -> spawn_pedestrian(query="elderly", count=3) +- "生成10个警察" -> spawn_pedestrian(query="police", count=10) +- "生成一个人" -> spawn_pedestrian(query="pedestrian", count=1) +- "生成5个人" -> spawn_pedestrian(query="pedestrian", count=5) +- "开启车辆自动驾驶" -> setup_autopilot(enable=True, radius=0.0) +- "让车辆自己开" -> setup_autopilot(enable=True) +- "开启行人移动" -> setup_pedestrian_movement(enable=True, radius=0.0) +- "让行人走路" -> setup_pedestrian_movement(enable=True) - "设置雨天" -> set_weather(weather_type="rain") - "查看交通灯" -> get_traffic_lights() - "清理场景" -> cleanup_scene() +重要提示: +- 当用户明确要求生成多辆车时(如"生成5辆车"、"来10辆车"),必须在spawn_vehicle的arguments中包含count参数 +- 当用户明确要求生成多个行人时(如"生成5个行人"、"来10个行人"),必须在spawn_pedestrian的arguments中包含count参数 +- count参数必须是整数,表示要生成的车辆或行人数量 +- 如果不指定count,默认为1 +- 当用户要求生成行人时,直接调用spawn_pedestrian,不要先调用connect_carla +- 当用户要求生成车辆时,直接调用spawn_vehicle,不要先调用connect_carla +- 只有当用户明确要求连接服务器时,才调用connect_carla + 本助手基于FastMCP框架构建,提供高效、类型安全的工具调用体验。""" }, {"role": "user", "content": user_message} diff --git a/llm/tuto_G_pedestrian_navigation.py b/llm/tuto_G_pedestrian_navigation.py index 8795797..2edabca 100644 --- a/llm/tuto_G_pedestrian_navigation.py +++ b/llm/tuto_G_pedestrian_navigation.py @@ -204,5 +204,4 @@ def build_skeleton(ped, sk_links, K): settings.synchronous_mode = False settings.no_rendering_mode = False settings.fixed_delta_seconds = None -world.apply_settings(settings) - +world.apply_settings(settings) \ No newline at end of file