diff --git a/critical/undergraduate/content/appendix.tex b/critical/undergraduate/content/appendix.tex index 4bc8e7d..c295c91 100644 --- a/critical/undergraduate/content/appendix.tex +++ b/critical/undergraduate/content/appendix.tex @@ -1,1296 +1,357 @@ -\%!TEX root = ../csuthesis\_main.tex -\% \begin{appendix} \% 无章节编号 -\chapter{附录代码} - -附录部分用于存放这里用来存放不适合放置在正文的大篇幅内容、典型如代码、图纸、完整数学证明过程等内容。 - -\section{chatscene} - -Step 1: Setup conda environment - -conda create -n chatscene python=3.8 -conda activate chatscene -Step 2: Clone this git repo in an appropriate folder - -git clone git@github.com:javyduck/ChatScene.git -Step 3: Enter the repo root folder and install the packages: - -cd ChatScene -pip install -r requirements.txt -pip install decorator==5.1.1 +%!TEX root = ../csuthesis\_main.tex +% \begin{appendix} % 无章节编号 + \chapter{附录代码} + + + \section{更多技术支持} + \begin{lstlisting} +交通场景指的是交通参与者在仿真世界中多样的动态行为,通过这些动态行为对运行在其中的自动驾驶车辆进行充分测试。交通场景的丰富性依赖于交通参与者的种类和其能实现的动态行为,CARLA支持轿车、SUV、客车、卡车、摩托车、自行车、行人等多种动态参与者及锥桶、售货机等多种静态参与者,动态参与者的行为可通过预先定义的场景和在线运行的交通流来控制。 + +CARLA中的交通管理器(Traffic Manager)模块可进行场景和交通流的模拟,不过鉴于基于OpenSCENARIO格式的场景仿真更通用,我们选用CARLA提供的场景运行器(ScenarioRunner,以下简称SR)来进行场景的模拟。下面对SR的安装和使用进行说明。 + +01. ScenarioRunner的安装 +ScenarioRunner是由CARLA官方提供的、与CARLA配合使用的场景解析和运行工具,支持CARLA自定义的scenario格式、route格式和OpenSCENARIO格式等多种预定义场景文件的运行,本书主要使用其OpenSCENARIO场景运行功能。OpenSCENARIO目前已经发布1.2和2.0版本,其中1.0和2.0版本都在ScenarioRunner中得到了支持。 + +OpenSCENARIO是德国自动化及测量系统标准协会ASAM提供的一个描述动态场景的标准格式,关于OpenSCENARIO格式的内容,大家可以看下之前对OpenSCENARIO的格式介绍和实例分析。 + +SR的安装步骤如下: + +(1)下载源码 + +SR的github上提供了与CARLA版本相配合的Release版本[ https://github.com/carla-simulator/scenario\_runner/releases],如SR0.9.13与CARLA 0.9.13相配合。这是因为SR需要使用PythonAPI从CARLA获取信息并对CARLA中的交通参与者、天气等进行控制,如果版本不匹配的话会操作失败。为了获取最新的特性,我们这里使用下载源码的方式进行安装。 + +大家可以选择将SR下载到任何位置,为了方便起见这里下载到CARLA文件夹下。 + +cd /path/to/carla +git clone https://github.com/carla-simulator/scenario\_runner.git +(2)依赖库安装 + +进入scenario\_runner文件夹,并根据其中的requirements.txt安装依赖库: + +cd scenario\_runner +sudo apt remove python3-networkx \#若安装过networkx则先将其卸载 +pip3 install -r requirements.txt +按照以上步骤安装依赖后,若本地numpy版本高于1.20,运行时可能有包含如下字符的报错:......networkx/readwrite/graphml.py......module 'numpy' has no attribute 'int'...... 。这是因为requirements.txt中指定的networkx 2.2版本使用了http://np.int,该用法在nump 1.20版本以上已经不再支持。读者可以根据实际情况安装高版本networkx或者低版本的numpy。 + +以下两种方法选一即可 +1.卸载networkx,并重新安装新版本 + pip3 uninstall networkx + pip3 install networkx +2.卸载numpy,并重新安装低版本 + pip3 uninstall numpy + pip3 install numpy==1.20 +(3)设置环境变量 + +为了在运行时能够找到相关的文件,需要设置一些环境变量。打开~/.bashrc文件,并在其结尾加入如下内容: + +export CARLA\_ROOT=/path/to/carla + +export SCENARIO\_RUNNER\_ROOT=\$CARLA\_ROOT/scenario\_runner + +export PYTHONPATH=\$PYTHONPATH:\$CARLA\_ROOT/PythonAPI/carla + +大家请注意将其中路径修改为自己电脑上的实际路径,然后运行source ~/.bashrc使设置生效。 + +至此用于运行OpenSCENARIO 1.0文件(以下简称xosc文件)的安装工作都已完成,大家可尝试按照下一节的方法运行相关文件。若想运行OpenSCENARIO 2.0文件(以下简称osc文件),还需要进行如下操作。 + +(4)安装OpenSCENARIO 2.0相关依赖 + +安装JDK + +sudo apt install openjdk-17-jdk + +安装Antlr + +curl -O https://www.antlr.org/download/antlr-4.10.1-complete.jar + +sudo cp antlr-4.10.1-complete.jar /usr/local/lib/ + +pip3 install antlr4-python3-runtime==4.10 + +打开~/.bashrc文件,并在其结尾加入如下内容,然后运行source ~/.bashrc使设置生效。 + + +export CLASSPATH=".:/usr/local/lib/antlr-4.10.1-complete.jar:\$CLASSPATH" + +alias antlr4='java -jar /usr/local/lib/antlr-4.10.1-complete.jar' + +alias grun='java org.antlr.v4.gui.TestRig'xport CARLA\_ROOT=/path/to/carla + +export SCENARIO\_RUNNER\_ROOT=\$CARLA\_ROOT/scenario\_runner + +export PYTHONPATH=\$PYTHONPATH:\$CARLA\_ROOT/PythonAPI/carla + +02.运行OpenSCENARIO文件 +使用SR运行xosc/osc文件的步骤十分简单,首先启动CARLA,然后运行SR并指定xosc/osc文件即可: + +启动CARLA: + +cd /path/to/carla + +./CarlaUE4.sh + +配置ego车 + +实际测试时应由被测算法控制ego车,此处为了进行演示,通过SR自带的manual\_control.py为ego车配置自动驾驶: + +cd /path/to/scenario\_runner + +加载xosc文件示例时使用 + +python3 manual\_control.py -a + +加载osc文件示例时使用 + +python3 manual\_control.py -a --rolename ego\_vehicl + +需要注意的是,manual\_control.py根据rolename查找ego车辆并为其配置自动驾驶,默认ego车辆的rolename为“hero”,在下面的xosc文件示例中ego车辆的rolename恰好为“hero”,故无需配置“--rolename”参数,而osc文件示例中ego车辆的rolename为“ego\_vehicle”,从而需要指定“--rolename”。 + +运行ScenarioRunner + +cd /path/to/scenario\_runner + +运行xosc文件示例 + +python3 scenario\_runner.py --output --openscenario + +srunner/examples/FollowLeadingVehicle.xosc + +运行osc文件示例 + +python3 scenario\_runner.py --outpu --openscenario2 + +srunner/examples/cut\_in\_and\_slow\_range.osc + +运行上述命令后,可以在CARLA渲染窗口中观察到地图根据xosc文件中定义变更,同时生成了ego车和其前方的障碍车。 + +\end{lstlisting} + + +\section{ASIL算法相关操作与代码} + \begin{lstlisting} + 1. 环境准备 +安装 CARLA Simulator + +从 carla-simulator/carla 克隆并编译(或下载预编译版本)。 + +确认 CARLA 服务器可以在后台正常运行: + + +./CarlaUE4.sh \# Linux +\# 或者双击 Windows 下的 CarlaUE4.exe +安装 Scenario Runner + + +git clone https://github.com/carla-simulator/scenario \_runner.git +cd scenario\_runner pip install -e . -(you can ignore the error after installing the decorator) - -Step 4: Install the Scenic package: - -cd Scenic -python -m pip install -e . -Step 5: Download our CARLA\_0.9.13 and extract it to your folder. - -Step 6: Run sudo apt install libomp5 as per this git issue. - -Step 7: Add the python API of CARLA to the PYTHONPATH environment variable. You can add the following commands to your ~/.bashrc: - -export CARLA\_ROOT={path/to/your/carla} -export PYTHONPATH=$PYTHONPATH:${CARLA\_ROOT}/PythonAPI/carla/dist/carla-0.9.13-py3.8-linux-x86\_64.egg -export PYTHONPATH=$PYTHONPATH:${CARLA\_ROOT}/PythonAPI/carla/agents -export PYTHONPATH=$PYTHONPATH:${CARLA\_ROOT}/PythonAPI/carla -export PYTHONPATH=$PYTHONPATH:${CARLA\_ROOT}/PythonAPI -Then, do source ~/.bashrc to update the environment variable. - -1. Desktop Users -Enter the CARLA root folder, launch the CARLA server and run our platform with - -\# Launch CARLA -./CarlaUE4.sh -prefernvidia -windowed -carla-port=2000 -2. Remote Server Users -Enter the CARLA root folder, launch the CARLA server with headless mode, and run our platform with - -\# Launch CARLA -./CarlaUE4.sh -prefernvidia -RenderOffScreen -carla-port=2000 -(Optional) You can also visualize the pygame window using TurboVNC. First, launch CARLA with headless mode, and run our platform on a virtual display. - -\# Launch CARLA -./CarlaUE4.sh -prefernvidia -RenderOffScreen -carla-port=2000 - -\# Run a remote VNC-Xserver. This will create a virtual display "8". -/opt/TurboVNC/bin/vncserver :8 -noxstartup -You can use the TurboVNC client on your local machine to connect to the virtual display. - -\# Use the built-in SSH client of TurboVNC Viewer -/opt/TurboVNC/bin/vncviewer -via user@host localhost:n - -\# Or you can manually forward connections to the remote server by -ssh -L fp:localhost:5900+n user@host -\# Open another terminal on local machine -/opt/TurboVNC/bin/vncviewer localhost::fp -where user@host is your remote server, fp is a free TCP port on the local machine, and n is the display port specified when you started the VNC server on the remote server ("8" in our example). - -ChatScene -In ChatScene, we ensure a fair comparison with the baselines by using the same eight scenarios, sampling five behaviors for each scenario from the database. The corresponding generated comeplete Scenic files, with some modifications, have been provided in safebench/scenario/scenario\_data/scenic\_data (with some manual modifications to use the same fixed 10 routes for the ego agent to ensure fair comparison with the baselines). - -The ego agent is controlled by a default RL model, while the surrounding adversarial agent is controlled by Scenic. - -The agent configuration is provided in safebench/agent/config/adv\_scenic.yaml. By default, it loads a pretrained RL model from Safebench-v1. - -Modes in ChatScene: -train\_scenario: Select the most challenging scenes for the same behavior under the same scenario. - -Configuration can be found in safebench/scenario/config/train\_agent\_scenic.yaml. - -The sample\_num = 50, opt\_step = 10, select\_num = 2 settings in the file mean we sample 50 scenes and select the 2 most challenging ones for evaluation. The default setting is to choose scenes that lead to a collision of the ego agent and provide the lowest overall score. We optimize the range of parameters, like speed, every 10 steps based on collision statistics from previously sampled scenes. - -Example command for optimizing the scene: - -python scripts/run\_train.py --agent\_cfg=adv\_scenic.yaml --scenario\_cfg=train\_scenario\_scenic.yaml --mode train\_scenario --scenario\_id 1 -Use the following command if you are using a TurboVNC client on your local machine to connect to the virtual display: - -DISPLAY=:8 python scripts/run\_train.py --agent\_cfg=adv\_scenic.yaml --scenario\_cfg=train\_scenario\_scenic.yaml --mode train\_scenario --scenario\_id 1 -The IDs for the final selected scenes will be stored in safebench/scenario/scenario\_data/scenic\_data/scenario\_1/scenario\_1.json. - -train\_agent: Train the agent based on the selected challenging scenes: - -python scripts/run\_train.py --agent\_cfg=adv\_scenic.yaml --scenario\_cfg=train\_agent\_scenic.yaml --mode train\_agent --scenario\_id 1 -We have a total of 10 routes for each scenario. We use the first 8 for training and the last 2 for testing (route IDs: [0,1,2,3,4,5,6,7]). The configuration, including scenario\_1.json, will train the agent based on the most challenging scenes (the ones leading to a collision of the ego agent). - -eval: Evaluate the trained agent on the last 2 routes (route IDs: [8,9]), the test\_epoch is for loading a finetuned model after a specific training epoch: - -python scripts/run\_eval.py --agent\_cfg=adv\_scenic.yaml --scenario\_cfg=eval\_scenic.yaml --mode eval --scenario\_id 1 --test\_epoch -1 -The -1 here is for loading our provided fine-tuned agent in each scenario based on our Scenic scenarios in safebench/agent/model\_ckpt/adv\_train/sac/scenic/scenario\_1/model.sac.-001.torch. - -Dynamic Mode -The above part ensures using the same scenario and routes for fair comparison with baselines. However, ChatScene can generate scenarios and scenes freely without any constraints. Simply provide a text description, such as "The ego vehicle is driving on a straight road; the adversarial pedestrian suddenly crosses the road from the right front and suddenly stops in front of the ego." is enough for the training. We are currently integrating our database with GPT-4o for generating more diverse scenarios based on our pre-built retrieval database, and will upload both soonly. - -Please first install openai and sentence\_transformers packages following the requirements. - -Put your description under file retrieve/scenario\_descriptions.txt - -run python retrieve.py to get the corresponding scenic code under safebench/scenario/scenario\_data/scenic\_data/dynamic\_scenario - -Then, for running the dynamic scenarios, just replace the run\_train.py or run\_eval.py with run\_train\_dynamic.py or run\_eval\_dynamic.py, and use dynamic\_scenic.yaml (please specify your settings there), an exmaple could be: - -python scripts/run\_train\_dynamic.py --agent\_cfg=adv\_scenic.yaml --scenario\_cfg=dynamic\_scenic.yaml --mode train\_scenario -Integrate GPT-4o with our retrieval database (v1) and commit to the dynamic mode. Some mechanisms have been changed based on the previous version to incorporate more adversarial behavior, geometry, and spawn point definitions. Currently, it is still in beta. If you encounter any problems, please submit an issue, and I will address potential errors in the new retrieval pipeline. -Some snippets are still under cleaning of the updated framework (i.e., incorprating GPT-4o to generate more diverse scenarios), the new retrieve database v2 will be pushed based on the new design. -Finetune an LLM for generating snippets end-to-end based on the data constructed from our database. - -\section{ASIL-Gen} - -ASIL-Gen: Automotive Scenario Generation and ASIL Classification Toolkit -A toolkit for generating automotive scenarios, evaluating their safety levels (ASIL), and comparing optimization algorithms (NSGA-II vs. Random Search) for scenario selection. - -Overview -This repository contains tools for: - -Generating variations of driving scenarios -Executing scenarios in CARLA simulator -Selecting critical scenarios using NSGA and Random Search algorithms -Determining Automotive Safety Integrity Levels (ASIL) of selected scenarios -Analyzing results with statistical methods -Prerequisites -CARLA Simulator: -Install CARLA from carla-simulator/carla. -Scenario Runner: -Install the CARLA Scenario Runner from carla-simulator/scenario\_runner. -Setup -Clone this repository: - -git clone https://github.com/Fizza129/ASIL-Gen.git -Install Python dependencies (if required): -Ensure , , (for NSGA-II), and other standard libraries are installed.numpypandasdeap - -Usage -1. Running Scenarios -Pre-generated Scenarios: - -Extract .Scenario Dataset/Scenario\_Dataset\_1-4.zip -Copy scenario files to ..pyscenario\_runner/srunner/scenarios/ -Copy configuration files to ..xmlscenario\_runner/srunner/examples/ -Follow Scenario Runner documentation to execute scenarios. -Custom Scenarios: -Use scripts in to generate new variations. Example:Scenario Generation Scripts/ - -python script\_change\_lane.py \# Generates new lane-change variations -2. Scenario Selection -NSGA-II Optimization: -Run scripts in the folder on scenario execution results: NSGA/ -python "NSGA/NSGA\_choice.py" -Random Search: -Execute scripts in the folder: The selected scenarios will be saved in JSON format.Random Search/ -python "Random Search/random\_search\_choice.py" -3. ASIL Classification -Use scripts in the folder to compute ASIL levels (A/B/C/D/QM) for selected scenarios: ASIL/ -python "ASIL/ASIL.py" \# Calculates ASIL levels -python "ASIL/ASIL\_percentages.py" \# Calculates ASIL distribution percentages -4. Statistical Comparison (NSGA vs. Random Search) -Run Mann-Whitney U Test scripts in : Mann Whitney Test/ -python "Mann Whitney Test/Mann Whitney and Effect Size.py" -Repository Structure -ASIL-Gen/ -├── ASIL/ \# ASIL classification and percentage calculation -├── Mann Whitney Test/ \# Statistical tests for comparing NSGA and Random Search -├── NSGA/ \# NSGA-II optimization for scenario selection -├── Random Search/ \# Random Search-based scenario selection -├── Scenario Dataset/ \# Pre-generated scenario variations (Python + XML) -├── Scenario Generation Scripts/ \# Scripts to generate new scenario variations -└── Scenario Results/ \# Output results -Scenario Types -The repository contains 13 unique scenarios: - -Lane change -Cut-in with static vehicle -Dynamic object crossing -Following leading vehicle -Following leading vehicle with obstacle -Hazard at side lane -No signal junction crossing -Opposite vehicle running red light -Other leading vehicle -Parked obstacle -Parking crossing pedestrian -Vehicle opens door -Vehicle turning right -Results -The folder contains the outputs from executed scenarios.Scenario Results - -Notes -Dataset Usage: The contains 1,000 variations for each unique scenario. Extract and copy files as instructed.Scenario Dataset -Custom Generation: Modify scripts in to create new scenario variations.Scenario Generation Scripts/ - -\section{scene\_runner} - -''' -Date: 2023-01-31 22:23:17 -LastEditTime: 2023-03-07 12:28:17 -Description: -Copyright (c) 2022-2023 Safebench Team - -This work is licensed under the terms of the MIT license. -For a copy, see -''' - -import copy +克隆 ASIL-Gen 仓库,并安装依赖 + + +git clone https://github.com/Fizza129/ASIL-Gen.git GitHub - Fizza129/ASIL-Gen GitHub - Fizza129/ASIL-Gen +cd ASIL-Gen +\# 建议使用虚拟环境 +python3 -m venv venv source venv/bin/activate +pip install -r requirements.txt +requirements.txt 中应包含 numpy、pandas、deap、scipy、matplotlib 等。 + +2. 加载预生成场景 +解压数据集: + +unzip "Scenario Dataset/Scenario\_Dataset\_1-4.zip" -d Scenario\_Dataset/ +拷贝到 Scenario Runner: + +cp -r Scenario\_Dataset/* ../scenario\_runner/srunner/scenarios/ + +cp -r xml/ ../scenario\_runner/srunner/examples/ +3. 自定义场景生成 +所有脚本都放在 Scenario Generation Scripts/ 下。以 script\_change\_lane.py 为例: + + +\# Scenario Generation Scripts/script\_change\_lane.py import os -import json -import glob import random - -import numpy as np -import carla -import pygame -from tqdm import tqdm - -from safebench.gym\_carla.env\_wrapper import VectorWrapper -from safebench.gym\_carla.envs.render import BirdeyeRender -from safebench.gym\_carla.replay\_buffer import RouteReplayBuffer, PerceptionReplayBuffer - -from safebench.agent import AGENT\_POLICY\_LIST -from safebench.scenario import SCENARIO\_POLICY\_LIST - -from safebench.scenario.scenario\_manager.carla\_data\_provider import CarlaDataProvider -from safebench.scenario.scenario\_data\_loader import ScenarioDataLoader, ScenicDataLoader -from safebench.scenario.tools.scenario\_utils import scenario\_parse, scenic\_parse - -from safebench.util.logger import Logger, setup\_logger\_kwargs -from safebench.util.metric\_util import get\_route\_scores, get\_perception\_scores -from safebench.util.scenic\_utils import ScenicSimulator - -class ScenicRunner: -def \_\_init\_\_(self, agent\_config, scenario\_config): -self.scenario\_config = scenario\_config -self.agent\_config = agent\_config - -self.seed = scenario\_config['seed'] -self.exp\_name = scenario\_config['exp\_name'] -self.output\_dir = scenario\_config['output\_dir'] -self.mode = scenario\_config['mode'] -self.save\_video = scenario\_config['save\_video'] - -self.render = scenario\_config['render'] -self.num\_scenario = scenario\_config['num\_scenario'] -self.fixed\_delta\_seconds = scenario\_config['fixed\_delta\_seconds'] -self.scenario\_category = scenario\_config['scenario\_category'] - -\# continue training flag -self.continue\_agent\_training = scenario\_config['continue\_agent\_training'] -self.continue\_scenario\_training = scenario\_config['continue\_scenario\_training'] - -\# apply settings to carla -self.client = carla.Client('localhost', scenario\_config['port']) -self.client.set\_timeout(10.0) -self.world = None -self.env = None - -self.env\_params = { - 'auto\_ego': scenario\_config['auto\_ego'], - 'obs\_type': agent\_config['obs\_type'], - 'scenario\_category': self.scenario\_category, - 'ROOT\_DIR': scenario\_config['ROOT\_DIR'], - 'warm\_up\_steps': 9, \# number of ticks after spawning the vehicles - 'disable\_lidar': True, \# show bird-eye view lidar or not - 'display\_size': 128, \# screen size of one bird-eye view window - 'obs\_range': 32, \# observation range (meter) - 'd\_behind': 12, \# distance behind the ego vehicle (meter) - 'max\_past\_step': 1, \# the number of past steps to draw - 'discrete': False, \# whether to use discrete control space - 'discrete\_acc': [-3.0, 0.0, 3.0], \# discrete value of accelerations - 'discrete\_steer': [-0.2, 0.0, 0.2], \# discrete value of steering angles - 'continuous\_accel\_range': [-3.0, 3.0], \# continuous acceleration range - 'continuous\_steer\_range': [-0.3, 0.3], \# continuous steering angle range - 'max\_episode\_step': scenario\_config['max\_episode\_step'], \# maximum timesteps per episode - 'max\_waypt': 12, \# maximum number of waypoints - 'lidar\_bin': 0.125, \# bin size of lidar sensor (meter) - 'out\_lane\_thres': 4, \# threshold for out of lane (meter) - 'desired\_speed': 8, \# desired speed (m/s) - 'image\_sz': 1024, \# TODO: move to config of od scenario -} - - -\# pass config from scenario to agent -agent\_config['mode'] = scenario\_config['mode'] -agent\_config['ego\_action\_dim'] = scenario\_config['ego\_action\_dim'] -agent\_config['ego\_state\_dim'] = scenario\_config['ego\_state\_dim'] -agent\_config['ego\_action\_limit'] = scenario\_config['ego\_action\_limit'] - -\# define logger -logger\_kwargs = setup\_logger\_kwargs( -self.exp\_name, -self.output\_dir, -self.seed, -agent=agent\_config['policy\_type'], -scenario=scenario\_config['policy\_type'], -scenario\_category=self.scenario\_category -) -self.logger = Logger(**logger\_kwargs) - -\# prepare parameters -if self.mode == 'train\_agent': -self.buffer\_capacity = agent\_config['buffer\_capacity'] -self.eval\_in\_train\_freq = agent\_config['eval\_in\_train\_freq'] -self.save\_freq = agent\_config['save\_freq'] -self.train\_episode = agent\_config['train\_episode'] -self.current\_episode = -1 -self.logger.save\_config(agent\_config) -self.logger.create\_training\_dir() -elif self.mode == 'train\_scenario': -self.save\_freq = agent\_config['save\_freq'] -self.logger.create\_eval\_dir(load\_existing\_results=False) -elif self.mode == 'eval': -self.save\_freq = agent\_config['save\_freq'] -self.logger.log('>> Evaluation Mode, skip config saving', 'yellow') -self.logger.create\_eval\_dir(load\_existing\_results=False) -else: -raise NotImplementedError(f"Unsupported mode: {self.mode}.") - -\# define agent and scenario -self.logger.log('>> Agent Policy: ' + agent\_config['policy\_type']) -self.logger.log('>> Scenario Policy: ' + scenario\_config['policy\_type']) - -if self.scenario\_config['auto\_ego']: -self.logger.log('>> Using auto-polit for ego vehicle, action of policy will be ignored', 'yellow') -if scenario\_config['policy\_type'] == 'ordinary' and self.mode != 'train\_agent': -self.logger.log('>> Ordinary scenario can only be used in agent training', 'red') -raise Exception() -self.logger.log('>> ' + '-' * 40) - -\# define agent and scenario policy -self.agent\_policy = AGENT\_POLICY\_LIST[agent\_config['policy\_type']](agent\_config, logger=self.logger) -self.scenario\_policy = SCENARIO\_POLICY\_LIST[scenario\_config['policy\_type']](scenario\_config, logger=self.logger) -if self.save\_video: -assert self.mode == 'eval', "only allow video saving in eval mode" -self.logger.init\_video\_recorder() - -def \_init\_world(self): -self.logger.log(">> Initializing carla world") -self.world = self.client.get\_world() -settings = self.world.get\_settings() -settings.synchronous\_mode = True -settings.fixed\_delta\_seconds = self.fixed\_delta\_seconds -self.world.apply\_settings(settings) -CarlaDataProvider.set\_client(self.client) -CarlaDataProvider.set\_world(self.world) -CarlaDataProvider.set\_traffic\_manager\_port(self.scenario\_config['tm\_port']) - -def \_init\_scenic(self, config): -self.logger.log(f">> Initializing scenic simulator: {config.scenic\_file}") -self.scenic = ScenicSimulator(config.scenic\_file, config.extra\_params) - -def \_init\_renderer(self): -self.logger.log(">> Initializing pygame birdeye renderer") -pygame.init() -flag = pygame.HWSURFACE | pygame.DOUBLEBUF -if not self.render: -flag = flag | pygame.HIDDEN -if self.scenario\_category in ['planning', 'scenic']: -\# [bird-eye view, Lidar, front view] or [bird-eye view, front view] -if self.env\_params['disable\_lidar']: -window\_size = (self.env\_params['display\_size'] * 2, self.env\_params['display\_size'] * self.num\_scenario) -else: -window\_size = (self.env\_params['display\_size'] * 3, self.env\_params['display\_size'] * self.num\_scenario) -else: -window\_size = (self.env\_params['display\_size'], self.env\_params['display\_size'] * self.num\_scenario) -self.display = pygame.display.set\_mode(window\_size, flag) - -\# initialize the render for generating observation and visualization -pixels\_per\_meter = self.env\_params['display\_size'] / self.env\_params['obs\_range'] -pixels\_ahead\_vehicle = (self.env\_params['obs\_range'] / 2 - self.env\_params['d\_behind']) * pixels\_per\_meter -self.birdeye\_params = { - 'screen\_size': [self.env\_params['display\_size'], self.env\_params['display\_size']], - 'pixels\_per\_meter': pixels\_per\_meter, - 'pixels\_ahead\_vehicle': pixels\_ahead\_vehicle, -} -self.birdeye\_render = BirdeyeRender(self.world, self.birdeye\_params, logger=self.logger) - -def run\_scenes(self, scenes): -self.logger.log(f">> Begin to run the scene...") -\#\# currently there is only one scene in this list \#\# -for scene in scenes: -if self.scenic.setSimulation(scene): -self.scenic.update\_behavior = self.scenic.runSimulation() -next(self.scenic.update\_behavior) - -def train(self, data\_loader, start\_episode=0, replay\_buffer = None): -\# general buffer for both agent and scenario - -for \_ in tqdm(range(len(data\_loader))): -self.current\_episode += 1 -if self.current\_episode >= self.train\_episode: -return -if self.current\_episode < start\_episode: -continue -\# sample scenarios -sampled\_scenario\_configs, \_ = data\_loader.sampler() -\# reset the index counter to create endless loader -\# data\_loader.reset\_idx\_counter() - -scenes = [config.scene for config in sampled\_scenario\_configs] -\# begin to run the scene -self.run\_scenes(scenes) - -\# get static obs and then reset with init action -static\_obs = self.env.get\_static\_obs(sampled\_scenario\_configs) -self.scenario\_policy.load\_model(sampled\_scenario\_configs) -scenario\_init\_action, additional\_dict = self.scenario\_policy.get\_init\_action(static\_obs) -obs, infos = self.env.reset(sampled\_scenario\_configs, scenario\_init\_action) -replay\_buffer.store\_init([static\_obs, scenario\_init\_action], additional\_dict=additional\_dict) - -\# get ego vehicle from scenario -self.agent\_policy.set\_ego\_and\_route(self.env.get\_ego\_vehicles(), infos) - -\# start loop -episode\_reward = [] -while not self.env.all\_scenario\_done(): -\# get action from agent policy and scenario policy (assume using one batch) -ego\_actions = self.agent\_policy.get\_action(obs, infos, deterministic=False) -scenario\_actions = self.scenario\_policy.get\_action(obs, infos, deterministic=False) - -\# apply action to env and get obs -next\_obs, rewards, dones, infos = self.env.step(ego\_actions=ego\_actions, scenario\_actions=scenario\_actions) -replay\_buffer.store([ego\_actions, scenario\_actions, obs, next\_obs, rewards, dones], additional\_dict=infos) -obs = copy.deepcopy(next\_obs) -episode\_reward.append(np.mean(rewards)) - -\# train off-policy agent or scenario -if self.mode == 'train\_agent' and self.agent\_policy.type == 'offpolicy': -loss = self.agent\_policy.train(replay\_buffer) -elif self.mode == 'train\_scenario' and self.scenario\_policy.type == 'offpolicy': -self.scenario\_policy.train(replay\_buffer) - -score\_function = get\_route\_scores if self.scenario\_category in ['planning', 'scenic'] else get\_perception\_scores -all\_scores = score\_function(self.env.running\_results) - -\# end up environment -self.env.clean\_up() -replay\_buffer.finish\_one\_episode() -self.logger.add\_training\_results('episode', self.current\_episode) -self.logger.add\_training\_results('episode\_reward', np.sum(episode\_reward)) -for key, value in all\_scores.items(): -self.logger.add\_training\_results(key, value) -if loss is not None: -critic\_loss, actor\_loss = loss -self.logger.add\_training\_results('critic\_loss', critic\_loss) -self.logger.add\_training\_results('actor\_loss', actor\_loss) -else: -critic\_loss, actor\_loss = 0, 0 -self.logger.add\_training\_results('critic\_loss', critic\_loss) -self.logger.add\_training\_results('actor\_loss', actor\_loss) -self.logger.log(f">> Episode: {self.current\_episode}, \#buffer\_len: {replay\_buffer.buffer\_len}, critic: {critic\_loss:.3f}, actor: {actor\_loss:.3f}") -self.logger.save\_training\_results() - -\# train on-policy agent or scenario -if self.mode == 'train\_agent' and self.agent\_policy.type == 'onpolicy': -self.agent\_policy.train(replay\_buffer) -elif self.mode == 'train\_scenario' and self.scenario\_policy.type in ['init\_state', 'onpolicy']: -self.scenario\_policy.train(replay\_buffer) - -\# eval during training -if (self.current\_episode+1) \% self.eval\_in\_train\_freq == 0: -\#self.eval(env, data\_loader) -pass - -\# save checkpoints -if (self.current\_episode+1) \% self.save\_freq == 0: -if self.mode == 'train\_agent': -self.agent\_policy.save\_model(self.current\_episode, replay\_buffer) -if self.mode == 'train\_scenario': -self.scenario\_policy.save\_model(self.current\_episode) - -self.scenic.destroy() - -def eval(self, data\_loader, select = False): -num\_finished\_scenario = 0 -data\_loader.reset\_idx\_counter() -\# recording the score and the id of corresponding selected scenes -map\_id\_score = {} -behavior\_name = data\_loader.behavior -route\_id = data\_loader.route\_id -opt\_step = data\_loader.opt\_step -opt\_time = 0 - -if route\_id is None: -log\_name = f'OPT\_{behavior\_name}' -else: -log\_name = f'OPT\_{behavior\_name}\_ROUTE-{route\_id}' - -if select: -self.scene\_map[log\_name] = {} -self.scene\_map[log\_name][f'opt\_time\_{opt\_time}'] = self.scenic.save\_params() - -while len(data\_loader) > 0: -\# sample scenarios -sampled\_scenario\_configs, num\_sampled\_scenario = data\_loader.sampler() -num\_finished\_scenario += num\_sampled\_scenario -assert num\_sampled\_scenario == 1, 'scenic can only run one scene at one time' - -scenes = [config.scene for config in sampled\_scenario\_configs] -\# begin to run the scene -self.run\_scenes(scenes) - -\# reset envs with new config, get init action from scenario policy, and run scenario -static\_obs = self.env.get\_static\_obs(sampled\_scenario\_configs) -self.scenario\_policy.load\_model(sampled\_scenario\_configs) -scenario\_init\_action, \_ = self.scenario\_policy.get\_init\_action(static\_obs, deterministic=True) -obs, infos = self.env.reset(sampled\_scenario\_configs, scenario\_init\_action) - -\# get ego vehicle from scenario -self.agent\_policy.set\_ego\_and\_route(self.env.get\_ego\_vehicles(), infos) - -score\_list = {s\_i: [] for s\_i in range(num\_sampled\_scenario)} -while not self.env.all\_scenario\_done(): -\# get action from agent policy and scenario policy (assume using one batch) -ego\_actions = self.agent\_policy.get\_action(obs, infos, deterministic=True) -scenario\_actions = self.scenario\_policy.get\_action(obs, infos, deterministic=True) - -\# apply action to env and get obs -obs, rewards, \_, infos = self.env.step(ego\_actions=ego\_actions, scenario\_actions=scenario\_actions) - -\# save video -if self.save\_video: -if self.scenario\_category in ['planning', 'scenic']: -self.logger.add\_frame(pygame.surfarray.array3d(self.display).transpose(1, 0, 2)) -else: -self.logger.add\_frame({s\_i['scenario\_id']: ego\_actions[n\_i]['annotated\_image'] for n\_i, s\_i in enumerate(infos)}) - -\# accumulate scores of corresponding scenario -reward\_idx = 0 -for s\_i in infos: -score = rewards[reward\_idx] if self.scenario\_category in ['planning', 'scenic'] else 1-infos[reward\_idx]['iou\_loss'] -score\_list[s\_i['scenario\_id']].append(score) -reward\_idx += 1 - -\# clean up all things -self.logger.log(">> All scenarios are completed. Clearning up all actors") -self.env.clean\_up() - -\# save video -if self.save\_video: -data\_ids = [config.data\_id for config in sampled\_scenario\_configs] -self.logger.save\_video(data\_ids=data\_ids, log\_name= log\_name) - -\# print score for ranking -self.logger.log(f'[{num\_finished\_scenario}/{data\_loader.num\_total\_scenario}] Ranking scores for batch scenario:', color='yellow') -for s\_i in score\_list.keys(): -self.logger.log('\t Env id ' + str(s\_i) + ': ' + str(np.mean(score\_list[s\_i])), color='yellow') - -\# calculate evaluation results -score\_function = get\_route\_scores if self.scenario\_category in ['planning', 'scenic'] else get\_perception\_scores -all\_running\_results = self.logger.add\_eval\_results(records=self.env.running\_results) -all\_scores = score\_function(all\_running\_results) -self.logger.add\_eval\_results(scores=all\_scores) -self.logger.print\_eval\_results() -if len(self.env.running\_results) \% self.save\_freq == 0: -self.logger.save\_eval\_results(log\_name) - -if infos[0]['collision']: -self.scenic.record\_params() -if select and (num\_finished\_scenario \% opt\_step == 0): -opt\_time += 1 -self.scenic.update\_params() -self.scene\_map[log\_name][f'opt\_time\_{opt\_time}'] = self.scenic.save\_params() -data\_loader.train\_scene(opt\_time) - -self.logger.save\_eval\_results(log\_name) - -if select: -self.scene\_map[log\_name]['select\_id'] = self.select\_adv\_scene(self.logger.eval\_records, score\_function, data\_loader.select\_num) -self.dump\_scene\_map(sampled\_scenario\_configs[0].scenario\_id) - -self.logger.clear() -self.scenic.destroy() - -def select\_adv\_scene(self, results, score\_function, select\_num): -\# define your own selection mechanism here -map\_id\_score\_collision = {} -map\_id\_score\_non\_collision = {} -for i in results.keys(): -score = score\_function({i:results[i]}) -if score['collision\_rate'] == 1: -map\_id\_score\_collision[i] = score['final\_score'] -else: -map\_id\_score\_non\_collision[i] = score['final\_score'] - -\# Sort the collision scenes by their scores -collision\_scenes\_sorted = sorted(map\_id\_score\_collision.items(), key=lambda x: x[1]) - -\# Get the number of scenes to select from the collision cases -num\_collision\_selected = min(select\_num, len(collision\_scenes\_sorted)) - -\# Select the lowest scored scenes with collision -selected\_scene\_id = [scene[0] for scene in collision\_scenes\_sorted[:num\_collision\_selected]] - -\# If not enough collision scenes, select remaining scenes -num\_non\_collision\_selected = select\_num - num\_collision\_selected -if num\_non\_collision\_selected > 0: -\# Sort the non-collision scenes by their scores -non\_collision\_scenes\_sorted = sorted(map\_id\_score\_non\_collision.items(), key=lambda x: x[1]) -\# Select the lowest scored scenes from the non-collision cases -selected\_scene\_id.extend([scene[0] for scene in non\_collision\_scenes\_sorted[:num\_non\_collision\_selected]]) -return sorted(selected\_scene\_id) - -def run(self, test\_epoch = None): -\# get scenario data of different maps -config\_list = scenic\_parse(self.scenario\_config, self.logger) - - -\#\#\# load rl model \#\# -if self.mode == 'train\_scenario': -\#\# we only need the pretrained surrogate model here \#\# -pass -elif self.mode == 'train\_agent': -\#\# initlize buffer \#\#\# -Buffer = RouteReplayBuffer if self.scenario\_category in ['scenic', 'planning'] else PerceptionReplayBuffer -replay\_buffer = Buffer(self.num\_scenario, self.mode, self.buffer\_capacity) - -\#\#\# repeat the training, 20 is just a random placeholder -config\_list = config\_list * 20 - -\#\#\# check if resume \#\#\# -if self.continue\_agent\_training: -self.logger.load\_training\_results() -start\_episode = self.check\_continue\_training(self.agent\_policy, replay\_buffer) + 1 -if start\_episode >= self.train\_episode: -return -else: -self.clean\_cache(self.agent\_policy.model\_path) -start\_episode = -1 - -elif self.mode == 'eval': -\#\#\# load trained model for evaluation \#\#\# -if test\_epoch: -self.agent\_policy.load\_model(episode=test\_epoch) - -last\_town = None -for config in config\_list: - -\#\# set log name \#\# -if config.route\_id is None: -log\_name = f'OPT\_{config.behavior}' -else: -log\_name = f'OPT\_{config.behavior}\_ROUTE-{config.route\_id}' - -\#\# check if all done \#\# -if self.mode == 'eval': -if self.logger.check\_eval\_dir(log\_name) == config.select\_num: -self.logger.log(f">> This scenario and route have been done.") -continue -elif self.mode == 'train\_agent': -if self.current\_episode >= self.train\_episode - 1: -return - -if self.current\_episode + config.select\_num < start\_episode: -self.current\_episode += config.select\_num -continue - -\# initialize scenic -self.\_init\_scenic(config) - -\# initialize map and render -if last\_town != config.extra\_params['town']: -self.\_init\_world() -self.\_init\_renderer() -last\_town = config.extra\_params['town'] -self.world.scenic = self.scenic - -\# create scenarios within the vectorized wrapper -self.env = VectorWrapper( -self.env\_params, -self.scenario\_config, -self.world, -self.birdeye\_render, -self.display, -self.logger -) - -\# prepare data loader and buffer -data\_loader = ScenicDataLoader(self.scenic, config, self.num\_scenario) -\# run with different modes - -if self.mode == 'train\_scenario': -\#\#\# select hard scenic scenario config on the surrogate model \#\#\# -self.scene\_map = self.load\_scene\_map(config.scenario\_id) -self.agent\_policy.set\_mode('eval') -self.scenario\_policy.set\_mode('eval') -self.eval(data\_loader, select = True) -elif self.mode == 'train\_agent': -\#\#\# train the surrogate model on the selected hard scenrios \#\#\# -self.agent\_policy.set\_mode('train') -self.scenario\_policy.set\_mode('eval') -self.train(data\_loader, start\_episode, replay\_buffer) -elif self.mode == 'eval': -\#\#\# evaluate the trained agent on different test models \#\#\# -self.agent\_policy.set\_mode('eval') -self.scenario\_policy.set\_mode('eval') -self.eval(data\_loader) -else: -raise NotImplementedError(f"Unsupported mode: {self.mode}.") - -def check\_continue\_training(self, policy, replay\_buffer): -\# load previous checkpoint -policy.load\_model(replay\_buffer = replay\_buffer) -if policy.continue\_episode == 0: -start\_episode = 0 -self.logger.log('>> Previous checkpoint not found. Training from scratch.') -else: -start\_episode = policy.continue\_episode -self.logger.log(f'>> Continue training from previous checkpoint, epoch: {start\_episode}.') -return start\_episode - -def dump\_scene\_map(self, scenario\_id): -\# load previous checkpoint -scenic\_dir = os.path.join(self.scenario\_config['scenic\_dir'], f'scenario\_{scenario\_id}') -f = open(os.path.join(scenic\_dir, f"{scenic\_dir.split('/')[-1]}.json"), 'w') -json\_dumps\_str = json.dumps(self.scene\_map, indent=4) -print(json\_dumps\_str, file=f) -f.close() - -def load\_scene\_map(self, scenario\_id): -\# load previous checkpoint -scenic\_dir = os.path.join(self.scenario\_config['scenic\_dir'], f'scenario\_{scenario\_id}') -try: -with open(os.path.join(scenic\_dir, f"{scenic\_dir.split('/')[-1]}.json"), 'r') as f: -data = json.loads(f.read()) -except: -data = {} -return data - -def clean\_cache(self, path): -\# Get a list of all files in directory -all\_files = glob.glob(os.path.join(path, '*')) - -\# Specify the file to keep -file\_to\_keep = os.path.join(path, 'model.sac.-001.torch') - -\# Remove all files except the one to keep -for file in all\_files: -if file != file\_to\_keep: -os.remove(file) - -def close(self): -pygame.quit() \# close pygame renderer -if self.env: -self.env.clean\_up() - -\section{scene\_runner} -''' -Date: 2023-01-31 22:23:17 -LastEditTime: 2023-03-07 12:28:17 -Description: -Copyright (c) 2022-2023 Safebench Team - -This work is licensed under the terms of the MIT license. -For a copy, see -''' - -import copy -import os +import xml.etree.ElementTree as ET + +INPUT\_TEMPLATE = "templates/change\_lane\_template.xml" +OUTPUT\_DIR = "../scenario\_runner/srunner/scenarios/generated/change\_lane/" + +def random\_param(): +return { +'start\_speed': random.uniform(5, 15), \# 起始速度 m/s +'target\_lane': random.choice([1, 2, 3]), \# 目标车道 +'start\_delay': random.uniform(0.5, 2.0) \# 延迟秒数 + } + +def gen\_scenario\_xml(idx, params): +tree = ET.parse(INPUT\_TEMPLATE) +root = tree.getroot() +\# 修改对应的 XML 节点 +root.find(".//Vehicle/Speed").set('value', str(params['start\_speed'])) +root.find(".//Action/TargetLane").set('value', str(params['target\_lane'])) +root.find(".//Action/StartDelay").set('value', str(params['start\_delay'])) +os.makedirs(OUTPUT\_DIR, exist\_ok=True) +tree.write(f"{OUTPUT\_DIR}change\_lane\_{idx:04d}.xml") + +if \_\_name\_\_ == "\_\_main\_\_": +for i in range(1000): +p = random\_param() +gen\_scenario\_xml(i, p) +print("已生成 1000 个变道场景。") +运行: + + +python "Scenario Generation Scripts/script\_change\_lane.py" +4. 场景选择(NSGA-II vs. Random Search) +4.1 NSGA-II + +python "NSGA/NSGA\_choice.py" \ +--input\_results Scenario\_Results/raw\_results.json \ +--pop\_size 50 --generations 40 \ +--output selected\_nsga.json +示例脚本结构(NSGA/NSGA\_choice.py): + + +from deap import base, creator, tools, algorithms import json -import glob -import random - -import numpy as np -import carla -import pygame -from tqdm import tqdm - -from safebench.gym\_carla.env\_wrapper import VectorWrapper -from safebench.gym\_carla.envs.render import BirdeyeRender -from safebench.gym\_carla.replay\_buffer import RouteReplayBuffer, PerceptionReplayBuffer - -from safebench.agent import AGENT\_POLICY\_LIST -from safebench.scenario import SCENARIO\_POLICY\_LIST - -from safebench.scenario.scenario\_manager.carla\_data\_provider import CarlaDataProvider -from safebench.scenario.scenario\_data\_loader import ScenarioDataLoader, ScenicDataLoader -from safebench.scenario.tools.scenario\_utils import scenario\_parse, dynamic\_scenic\_parse - -from safebench.util.logger import Logger, setup\_logger\_kwargs -from safebench.util.metric\_util import get\_route\_scores, get\_perception\_scores -from safebench.util.scenic\_utils import ScenicSimulator - -class ScenicRunner: -def \_\_init\_\_(self, agent\_config, scenario\_config): -self.scenario\_config = scenario\_config -self.agent\_config = agent\_config - -self.seed = scenario\_config['seed'] -self.exp\_name = scenario\_config['exp\_name'] -self.output\_dir = scenario\_config['output\_dir'] -self.mode = scenario\_config['mode'] -self.save\_video = scenario\_config['save\_video'] - -self.render = scenario\_config['render'] -self.num\_scenario = scenario\_config['num\_scenario'] -self.fixed\_delta\_seconds = scenario\_config['fixed\_delta\_seconds'] -self.scenario\_category = scenario\_config['scenario\_category'] - -\# continue training flag -self.continue\_agent\_training = scenario\_config['continue\_agent\_training'] -self.continue\_scenario\_training = scenario\_config['continue\_scenario\_training'] - -\# apply settings to carla -self.client = carla.Client('localhost', scenario\_config['port']) -self.client.set\_timeout(10.0) -self.world = None -self.env = None - -self.env\_params = { - 'auto\_ego': scenario\_config['auto\_ego'], - 'obs\_type': agent\_config['obs\_type'], - 'scenario\_category': self.scenario\_category, - 'ROOT\_DIR': scenario\_config['ROOT\_DIR'], - 'warm\_up\_steps': 9, \# number of ticks after spawning the vehicles - 'disable\_lidar': True, \# show bird-eye view lidar or not - 'display\_size': 128, \# screen size of one bird-eye view window - 'obs\_range': 32, \# observation range (meter) - 'd\_behind': 12, \# distance behind the ego vehicle (meter) - 'max\_past\_step': 1, \# the number of past steps to draw - 'discrete': False, \# whether to use discrete control space - 'discrete\_acc': [-3.0, 0.0, 3.0], \# discrete value of accelerations - 'discrete\_steer': [-0.2, 0.0, 0.2], \# discrete value of steering angles - 'continuous\_accel\_range': [-3.0, 3.0], \# continuous acceleration range - 'continuous\_steer\_range': [-0.3, 0.3], \# continuous steering angle range - 'max\_episode\_step': scenario\_config['max\_episode\_step'], \# maximum timesteps per episode - 'max\_waypt': 12, \# maximum number of waypoints - 'lidar\_bin': 0.125, \# bin size of lidar sensor (meter) - 'out\_lane\_thres': 4, \# threshold for out of lane (meter) - 'desired\_speed': 8, \# desired speed (m/s) - 'image\_sz': 1024, \# TODO: move to config of od scenario -} - - -\# pass config from scenario to agent -agent\_config['mode'] = scenario\_config['mode'] -agent\_config['ego\_action\_dim'] = scenario\_config['ego\_action\_dim'] -agent\_config['ego\_state\_dim'] = scenario\_config['ego\_state\_dim'] -agent\_config['ego\_action\_limit'] = scenario\_config['ego\_action\_limit'] - -\# define logger -logger\_kwargs = setup\_logger\_kwargs( -self.exp\_name, -self.output\_dir, -self.seed, -agent=agent\_config['policy\_type'], -scenario=scenario\_config['policy\_type'], -scenario\_category=self.scenario\_category -) -self.logger = Logger(**logger\_kwargs) - -\# prepare parameters -if self.mode == 'train\_agent': -self.buffer\_capacity = agent\_config['buffer\_capacity'] -self.eval\_in\_train\_freq = agent\_config['eval\_in\_train\_freq'] -self.save\_freq = agent\_config['save\_freq'] -self.train\_episode = agent\_config['train\_episode'] -self.current\_episode = -1 -self.logger.save\_config(agent\_config) -self.logger.create\_training\_dir() -elif self.mode == 'train\_scenario': -self.save\_freq = agent\_config['save\_freq'] -self.logger.create\_eval\_dir(load\_existing\_results=False) -elif self.mode == 'eval': -self.save\_freq = agent\_config['save\_freq'] -self.logger.log('>> Evaluation Mode, skip config saving', 'yellow') -self.logger.create\_eval\_dir(load\_existing\_results=False) -else: -raise NotImplementedError(f"Unsupported mode: {self.mode}.") - -\# define agent and scenario -self.logger.log('>> Agent Policy: ' + agent\_config['policy\_type']) -self.logger.log('>> Scenario Policy: ' + scenario\_config['policy\_type']) - -if self.scenario\_config['auto\_ego']: -self.logger.log('>> Using auto-polit for ego vehicle, action of policy will be ignored', 'yellow') -if scenario\_config['policy\_type'] == 'ordinary' and self.mode != 'train\_agent': -self.logger.log('>> Ordinary scenario can only be used in agent training', 'red') -raise Exception() -self.logger.log('>> ' + '-' * 40) - -\# define agent and scenario policy -self.agent\_policy = AGENT\_POLICY\_LIST[agent\_config['policy\_type']](agent\_config, logger=self.logger) -self.scenario\_policy = SCENARIO\_POLICY\_LIST[scenario\_config['policy\_type']](scenario\_config, logger=self.logger) -if self.save\_video: -assert self.mode == 'eval', "only allow video saving in eval mode" -self.logger.init\_video\_recorder() - -def \_init\_world(self): -self.logger.log(">> Initializing carla world") -self.world = self.client.get\_world() -settings = self.world.get\_settings() -settings.synchronous\_mode = True -settings.fixed\_delta\_seconds = self.fixed\_delta\_seconds -self.world.apply\_settings(settings) -CarlaDataProvider.set\_client(self.client) -CarlaDataProvider.set\_world(self.world) -CarlaDataProvider.set\_traffic\_manager\_port(self.scenario\_config['tm\_port']) - -def \_init\_scenic(self, config): -self.logger.log(f">> Initializing scenic simulator: {config.scenic\_file}") -self.scenic = ScenicSimulator(config.scenic\_file, config.extra\_params) - -def \_init\_renderer(self): -self.logger.log(">> Initializing pygame birdeye renderer") -pygame.init() -flag = pygame.HWSURFACE | pygame.DOUBLEBUF -if not self.render: -flag = flag | pygame.HIDDEN -if self.scenario\_category in ['planning', 'scenic']: -\# [bird-eye view, Lidar, front view] or [bird-eye view, front view] -if self.env\_params['disable\_lidar']: -window\_size = (self.env\_params['display\_size'] * 2, self.env\_params['display\_size'] * self.num\_scenario) -else: -window\_size = (self.env\_params['display\_size'] * 3, self.env\_params['display\_size'] * self.num\_scenario) -else: -window\_size = (self.env\_params['display\_size'], self.env\_params['display\_size'] * self.num\_scenario) -self.display = pygame.display.set\_mode(window\_size, flag) - -\# initialize the render for generating observation and visualization -pixels\_per\_meter = self.env\_params['display\_size'] / self.env\_params['obs\_range'] -pixels\_ahead\_vehicle = (self.env\_params['obs\_range'] / 2 - self.env\_params['d\_behind']) * pixels\_per\_meter -self.birdeye\_params = { - 'screen\_size': [self.env\_params['display\_size'], self.env\_params['display\_size']], - 'pixels\_per\_meter': pixels\_per\_meter, - 'pixels\_ahead\_vehicle': pixels\_ahead\_vehicle, -} -self.birdeye\_render = BirdeyeRender(self.world, self.birdeye\_params, logger=self.logger) - -def run\_scenes(self, scenes): -self.logger.log(f">> Begin to run the scene...") -\#\# currently there is only one scene in this list \#\# -for scene in scenes: -if self.scenic.setSimulation(scene): -self.scenic.update\_behavior = self.scenic.runSimulation() -next(self.scenic.update\_behavior) - -def train(self, data\_loader, start\_episode=0, replay\_buffer = None): -\# general buffer for both agent and scenario - -for \_ in tqdm(range(len(data\_loader))): -self.current\_episode += 1 -if self.current\_episode >= self.train\_episode: -return -if self.current\_episode < start\_episode: -continue -\# sample scenarios -sampled\_scenario\_configs, \_ = data\_loader.sampler() -\# reset the index counter to create endless loader -\# data\_loader.reset\_idx\_counter() - -scenes = [config.scene for config in sampled\_scenario\_configs] -\# begin to run the scene -self.run\_scenes(scenes) - -\# get static obs and then reset with init action -static\_obs = self.env.get\_static\_obs(sampled\_scenario\_configs) -self.scenario\_policy.load\_model(sampled\_scenario\_configs) -scenario\_init\_action, additional\_dict = self.scenario\_policy.get\_init\_action(static\_obs) -obs, infos = self.env.reset(sampled\_scenario\_configs, scenario\_init\_action) -replay\_buffer.store\_init([static\_obs, scenario\_init\_action], additional\_dict=additional\_dict) - -\# get ego vehicle from scenario -self.agent\_policy.set\_ego\_and\_route(self.env.get\_ego\_vehicles(), infos) - -\# start loop -episode\_reward = [] -while not self.env.all\_scenario\_done(): -\# get action from agent policy and scenario policy (assume using one batch) -ego\_actions = self.agent\_policy.get\_action(obs, infos, deterministic=False) -scenario\_actions = self.scenario\_policy.get\_action(obs, infos, deterministic=False) - -\# apply action to env and get obs -next\_obs, rewards, dones, infos = self.env.step(ego\_actions=ego\_actions, scenario\_actions=scenario\_actions) -replay\_buffer.store([ego\_actions, scenario\_actions, obs, next\_obs, rewards, dones], additional\_dict=infos) -obs = copy.deepcopy(next\_obs) -episode\_reward.append(np.mean(rewards)) - -\# train off-policy agent or scenario -if self.mode == 'train\_agent' and self.agent\_policy.type == 'offpolicy': -loss = self.agent\_policy.train(replay\_buffer) -elif self.mode == 'train\_scenario' and self.scenario\_policy.type == 'offpolicy': -self.scenario\_policy.train(replay\_buffer) - -score\_function = get\_route\_scores if self.scenario\_category in ['planning', 'scenic'] else get\_perception\_scores -all\_scores = score\_function(self.env.running\_results) - -\# end up environment -self.env.clean\_up() -replay\_buffer.finish\_one\_episode() -self.logger.add\_training\_results('episode', self.current\_episode) -self.logger.add\_training\_results('episode\_reward', np.sum(episode\_reward)) -for key, value in all\_scores.items(): -self.logger.add\_training\_results(key, value) -if loss is not None: -critic\_loss, actor\_loss = loss -self.logger.add\_training\_results('critic\_loss', critic\_loss) -self.logger.add\_training\_results('actor\_loss', actor\_loss) -else: -critic\_loss, actor\_loss = 0, 0 -self.logger.add\_training\_results('critic\_loss', critic\_loss) -self.logger.add\_training\_results('actor\_loss', actor\_loss) -self.logger.log(f">> Episode: {self.current\_episode}, \#buffer\_len: {replay\_buffer.buffer\_len}, critic: {critic\_loss:.3f}, actor: {actor\_loss:.3f}") -self.logger.save\_training\_results() - -\# train on-policy agent or scenario -if self.mode == 'train\_agent' and self.agent\_policy.type == 'onpolicy': -self.agent\_policy.train(replay\_buffer) -elif self.mode == 'train\_scenario' and self.scenario\_policy.type in ['init\_state', 'onpolicy']: -self.scenario\_policy.train(replay\_buffer) - -\# eval during training -if (self.current\_episode+1) \% self.eval\_in\_train\_freq == 0: -\#self.eval(env, data\_loader) -pass - -\# save checkpoints -if (self.current\_episode+1) \% self.save\_freq == 0: -if self.mode == 'train\_agent': -self.agent\_policy.save\_model(self.current\_episode, replay\_buffer) -if self.mode == 'train\_scenario': -self.scenario\_policy.save\_model(self.current\_episode) - -self.scenic.destroy() - -def eval(self, data\_loader, select = False): -num\_finished\_scenario = 0 -data\_loader.reset\_idx\_counter() -\# recording the score and the id of corresponding selected scenes -map\_id\_score = {} -behavior\_name = data\_loader.behavior -opt\_step = data\_loader.opt\_step -opt\_time = 0 - -log\_name = f'OPT\_{behavior\_name}' - -if select: -self.scene\_map[log\_name] = {} -self.scene\_map[log\_name][f'opt\_time\_{opt\_time}'] = self.scenic.save\_params() - -while len(data\_loader) > 0: -\# sample scenarios -sampled\_scenario\_configs, num\_sampled\_scenario = data\_loader.sampler() -num\_finished\_scenario += num\_sampled\_scenario -assert num\_sampled\_scenario == 1, 'scenic can only run one scene at one time' - -scenes = [config.scene for config in sampled\_scenario\_configs] -\# begin to run the scene -self.run\_scenes(scenes) - -\# reset envs with new config, get init action from scenario policy, and run scenario -static\_obs = self.env.get\_static\_obs(sampled\_scenario\_configs) -self.scenario\_policy.load\_model(sampled\_scenario\_configs) -scenario\_init\_action, \_ = self.scenario\_policy.get\_init\_action(static\_obs, deterministic=True) -obs, infos = self.env.reset(sampled\_scenario\_configs, scenario\_init\_action) - -\# get ego vehicle from scenario -self.agent\_policy.set\_ego\_and\_route(self.env.get\_ego\_vehicles(), infos) - -score\_list = {s\_i: [] for s\_i in range(num\_sampled\_scenario)} -while not self.env.all\_scenario\_done(): -\# get action from agent policy and scenario policy (assume using one batch) -ego\_actions = self.agent\_policy.get\_action(obs, infos, deterministic=True) -scenario\_actions = self.scenario\_policy.get\_action(obs, infos, deterministic=True) - -\# apply action to env and get obs -obs, rewards, \_, infos = self.env.step(ego\_actions=ego\_actions, scenario\_actions=scenario\_actions) - -\# save video -if self.save\_video: -self.logger.add\_frame(pygame.surfarray.array3d(self.display).transpose(1, 0, 2)) - -\# accumulate scores of corresponding scenario -reward\_idx = 0 -for s\_i in infos: -score = rewards[reward\_idx] if self.scenario\_category in ['planning', 'scenic'] else 1-infos[reward\_idx]['iou\_loss'] -score\_list[s\_i['scenario\_id']].append(score) -reward\_idx += 1 - -\# clean up all things -self.logger.log(">> All scenarios are completed. Clearning up all actors") -self.env.clean\_up() - -\# save video -if self.save\_video: -data\_ids = [config.data\_id for config in sampled\_scenario\_configs] -self.logger.save\_video(data\_ids=data\_ids, log\_name= log\_name) - -\# print score for ranking -self.logger.log(f'[{num\_finished\_scenario}/{data\_loader.num\_total\_scenario}] Ranking scores for batch scenario:', color='yellow') -for s\_i in score\_list.keys(): -self.logger.log('\t Env id ' + str(s\_i) + ': ' + str(np.mean(score\_list[s\_i])), color='yellow') - -\# calculate evaluation results -score\_function = get\_route\_scores if self.scenario\_category in ['planning', 'scenic'] else get\_perception\_scores -all\_running\_results = self.logger.add\_eval\_results(records=self.env.running\_results) -all\_scores = score\_function(all\_running\_results) -self.logger.add\_eval\_results(scores=all\_scores) -self.logger.print\_eval\_results() -if len(self.env.running\_results) \% self.save\_freq == 0: -self.logger.save\_eval\_results(log\_name) - -if infos[0]['collision']: -self.scenic.record\_params() - -if select and (num\_finished\_scenario \% opt\_step == 0): -opt\_time += 1 -self.scenic.update\_params() -self.scene\_map[log\_name][f'opt\_time\_{opt\_time}'] = self.scenic.save\_params() -data\_loader.train\_scene(opt\_time) - -self.logger.save\_eval\_results(log\_name) - -if select: -self.scene\_map[log\_name]['select\_id'] = self.select\_adv\_scene(self.logger.eval\_records, score\_function, data\_loader.select\_num) -self.dump\_scene\_map() - -self.logger.clear() -self.scenic.destroy() - -def select\_adv\_scene(self, results, score\_function, select\_num): -\# define your own selection mechanism here -map\_id\_score\_collision = {} -map\_id\_score\_non\_collision = {} -for i in results.keys(): -score = score\_function({i:results[i]}) -if score['collision\_rate'] == 1: -map\_id\_score\_collision[i] = score['final\_score'] -else: -map\_id\_score\_non\_collision[i] = score['final\_score'] - -\# Sort the collision scenes by their scores -collision\_scenes\_sorted = sorted(map\_id\_score\_collision.items(), key=lambda x: x[1]) - -\# Get the number of scenes to select from the collision cases -num\_collision\_selected = min(select\_num, len(collision\_scenes\_sorted)) - -\# Select the lowest scored scenes with collision -selected\_scene\_id = [scene[0] for scene in collision\_scenes\_sorted[:num\_collision\_selected]] - -\# If not enough collision scenes, select remaining scenes -num\_non\_collision\_selected = select\_num - num\_collision\_selected -if num\_non\_collision\_selected > 0: -\# Sort the non-collision scenes by their scores -non\_collision\_scenes\_sorted = sorted(map\_id\_score\_non\_collision.items(), key=lambda x: x[1]) -\# Select the lowest scored scenes from the non-collision cases -selected\_scene\_id.extend([scene[0] for scene in non\_collision\_scenes\_sorted[:num\_non\_collision\_selected]]) -return sorted(selected\_scene\_id) - -def run(self, test\_epoch = None): -\# get scenario data of different maps -config\_list = dynamic\_scenic\_parse(self.scenario\_config, self.logger) - - -\#\#\# load rl model \#\# -if self.mode == 'train\_scenario': -\#\# we only need the pretrained surrogate model here \#\# -pass -elif self.mode == 'train\_agent': -\#\# initlize buffer \#\#\# -Buffer = RouteReplayBuffer if self.scenario\_category in ['scenic', 'planning'] else PerceptionReplayBuffer -replay\_buffer = Buffer(self.num\_scenario, self.mode, self.buffer\_capacity) - -\#\#\# repeat the training, 20 is just a random placeholder -config\_list = config\_list * 20 - -\#\#\# check if resume \#\#\# -if self.continue\_agent\_training: -self.logger.load\_training\_results() -start\_episode = self.check\_continue\_training(self.agent\_policy, replay\_buffer) + 1 -if start\_episode >= self.train\_episode: -return -else: -self.clean\_cache(self.agent\_policy.model\_path) -start\_episode = -1 - -elif self.mode == 'eval' and test\_epoch: -\#\#\# load trained model for evaluation \#\#\# -self.agent\_policy.load\_model(episode=test\_epoch) - -last\_town = None -for config in config\_list: - -\#\# set log name \#\# -log\_name = f'OPT\_{config.behavior}' - -\#\# check if all done \#\# -if self.mode == 'eval': -if self.logger.check\_eval\_dir(log\_name) == config.select\_num: -self.logger.log(f">> This scenario and route have been done.") -continue -elif self.mode == 'train\_agent': -if self.current\_episode >= self.train\_episode - 1: -return - -if self.current\_episode + config.select\_num < start\_episode: -self.current\_episode += config.select\_num -continue - -\# initialize scenic -self.\_init\_scenic(config) - -\# initialize map and render -self.\_init\_world() -self.\_init\_renderer() -self.world.scenic = self.scenic - -\# create scenarios within the vectorized wrapper -self.env = VectorWrapper( -self.env\_params, -self.scenario\_config, -self.world, -self.birdeye\_render, -self.display, -self.logger -) - -\# prepare data loader and buffer -data\_loader = ScenicDataLoader(self.scenic, config, self.num\_scenario) -\# run with different modes - -if self.mode == 'train\_scenario': -\#\#\# select hard scenic scenario config on the surrogate model \#\#\# -self.scene\_map = self.load\_scene\_map() -self.agent\_policy.set\_mode('eval') -self.scenario\_policy.set\_mode('eval') -self.eval(data\_loader, select = True) -elif self.mode == 'train\_agent': -\#\#\# train the surrogate model on the selected hard scenrios \#\#\# -self.agent\_policy.set\_mode('train') -self.scenario\_policy.set\_mode('eval') -self.train(data\_loader, start\_episode, replay\_buffer) -elif self.mode == 'eval': -\#\#\# evaluate the trained agent on different test models \#\#\# -self.agent\_policy.set\_mode('eval') -self.scenario\_policy.set\_mode('eval') -self.eval(data\_loader) -else: -raise NotImplementedError(f"Unsupported mode: {self.mode}.") - -def check\_continue\_training(self, policy, replay\_buffer): -\# load previous checkpoint -policy.load\_model(replay\_buffer = replay\_buffer) -if policy.continue\_episode == 0: -start\_episode = 0 -self.logger.log('>> Previous checkpoint not found. Training from scratch.') -else: -start\_episode = policy.continue\_episode -self.logger.log(f'>> Continue training from previous checkpoint, epoch: {start\_episode}.') -return start\_episode - -def dump\_scene\_map(self): -\# load previous checkpoint -scenic\_dir = self.scenario\_config['scenic\_dir'] -f = open(os.path.join(scenic\_dir, f"dynamic\_scenario.json"), 'w') -json\_dumps\_str = json.dumps(self.scene\_map, indent=4) -print(json\_dumps\_str, file=f) -f.close() - -def load\_scene\_map(self): -\# load previous checkpoint -scenic\_dir = self.scenario\_config['scenic\_dir'] -try: -with open(os.path.join(scenic\_dir, f"dynamic\_scenario.json"), 'r') as f: -data = json.loads(f.read()) -except: -data = {} -return data - -def clean\_cache(self, path): -\# Get a list of all files in directory -all\_files = glob.glob(os.path.join(path, '*')) - -\# Specify the file to keep -file\_to\_keep = os.path.join(path, 'model.sac.-001.torch') - -\# Remove all files except the one to keep -for file in all\_files: -if file != file\_to\_keep: -os.remove(file) - -def close(self): -pygame.quit() \# close pygame renderer -if self.env: -self.env.clean\_up() - - - - -\% \end{appendix} + +\# 读取所有场景执行后的性能指标 +with open("Scenario\_Results/raw\_results.json") as f: +data = json.load(f) + +\# 定义多目标:安全距离最小化、碰撞风险最大化(示例) +creator.create("FitnessMulti", base.Fitness, weights=(-1.0, 1.0)) +creator.create("Individual", list, fitness=creator.FitnessMulti) + +toolbox = base.Toolbox() +toolbox.register("attr\_scenario", lambda: random.choice(list(data.keys()))) +toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr\_scenario, n=10) +toolbox.register("population", tools.initRepeat, list, toolbox.individual) + +def eval\_scenarios(ind): +\# 返回一个二元组(distance, risk) +distances = [ data[s]['min\_distance'] for s in ind ] +risks = [ data[s]['collision\_risk'] for s in ind ] +return (sum(distances)/len(distances), sum(risks)/len(risks)) + +toolbox.register("evaluate", eval\_scenarios) +toolbox.register("mate", tools.cxTwoPoint) +toolbox.register("mutate", tools.mutShuffleIndexes, indpb=0.1) +toolbox.register("select", tools.selNSGA2) + +pop = toolbox.population(n=50) +algorithms.eaMuPlusLambda(pop, mu=50, lambda\_=100, cxpb=0.7, mutpb=0.2, ngen=40) +\# 保存 Pareto 前沿 +best = tools.sortNondominated(pop, k=50, first\_front\_only=True)[0] +with open("selected\_nsga.json", "w") as f: +json.dump([list(ind) for ind in best], f, indent=2) +4.2 随机搜索 + +python "Random Search/random\_search\_choice.py" \ +--input\_results Scenario\_Results/raw\_results.json \ +--trials 5000 --select\_k 50 \ +--output selected\_random.json +示例脚本(Random Search/random\_search\_choice.py)核心逻辑: + + +import json, random + +with open("Scenario\_Results/raw\_results.json") as f: +data = json.load(f) + +all\_ids = list(data.keys()) +best = [] +for \_ in range(5000): +sample = random.sample(all\_ids, 10) +dist = sum(data[s]['min\_distance'] for s in sample)/10 +risk = sum(data[s]['collision\_risk'] for s in sample)/10 +best.append((sample, dist, risk)) + +\# 按某一指标排序取前50 +best.sort(key=lambda x: (x[1], -x[2])) +selected = [s for s,\_,\_ in best[:50]] +with open("selected\_random.json", "w") as f: +json.dump(selected, f, indent=2) +5. ASIL 等级分类 + + +python "ASIL/ASIL.py" \ +--input selected\_nsga.json \ +--output nsga\_asil\_levels.json + +python "ASIL/ASIL\_percentages.py" \ +--input nsga\_asil\_levels.json \ +--output nsga\_asil\_distribution.csv +ASIL.py:根据 ISO 26262 定义的性能指标阈值,将场景分为 A/B/C/D/QM。 + +ASIL\_percentages.py:统计各等级占比并导出 CSV。 + +示例 ASIL/ASIL.py 中核心片段: + + +import json + +THRESHOLDS = { +'A': {'max\_risk': 0.2, 'min\_distance': 5}, +'B': {'max\_risk': 0.5, 'min\_distance': 3}, +\# ... + } + +def classify(scenario): +risk = scenario['collision\_risk'] +dist = scenario['min\_distance'] +for level, th in THRESHOLDS.items(): +if risk <= th['max\_risk'] and dist >= th['min\_distance']: +return level +return 'QM' + +data = json.load(open("selected\_nsga.json")) +result = {sid: classify(data[sid]) for sid in data} +with open("nsga\_asil\_levels.json", "w") as f: +json.dump(result, f, indent=2) +6. 统计检验(Mann–Whitney U) + + +python "Mann Whitney Test/Mann Whitney and Effect Size.py" \ +--input1 nsga\_asil\_levels.json \ +--input2 random\_asil\_levels.json \ +--output mannwhitney\_results.txt +主要步骤: + +按 ASIL 级别将两个算法的分布抽取为数值(如 A→1, B→2…)。 + +使用 scipy.stats.mannwhitneyu 完成检验。 + +计算效果量(如 Cliff’s delta)。 + + +from scipy.stats import mannwhitneyu +import json + +map\_level = {'QM':0, 'A':1, 'B':2, 'C':3, 'D':4} +d1 = [map\_level[v] for v in json.load(open("nsga\_asil\_levels.json")).values()] +d2 = [map\_level[v] for v in json.load(open("random\_asil\_levels.json")).values()] + +stat, p = mannwhitneyu(d1, d2, alternative='two-sided') +print(f"U={stat:.2f}, p-value={p:.4f}") +\# 此外可自行计算 Cliff’s delta +7. 可视化与结果解读 +分布柱状图:展示 NSGA vs. Random 在不同 ASIL 级别上的分布。 + +Pareto 前沿图:展示 NSGA 选出的群体。 + +示例(用 Matplotlib): + + +import json, matplotlib.pyplot as plt + +dist = json.load(open("nsga\_asil\_distribution.csv")) +levels = list(dist.keys()) +counts = list(dist.values()) + +plt.bar(levels, counts) +plt.xlabel("ASIL 级别") +plt.ylabel("场景数") +plt.title("NSGA-II 场景 ASIL 分布") +plt.show() + \end{lstlisting}