Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ omit =
src/scenic/simulators/carla/*
src/scenic/simulators/gta/*
src/scenic/simulators/lgsvl/*
src/scenic/simulators/metadrive/*
src/scenic/simulators/webots/*
src/scenic/simulators/xplane/*

Expand Down
10 changes: 10 additions & 0 deletions src/scenic/domains/driving/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ def canBeTakenBy(self, agent):
class SetThrottleAction(SteeringAction):
"""Set the throttle.

The throttle setting will remain constant until this action is taken again.

Arguments:
throttle: Throttle value between 0 and 1.
"""
Expand All @@ -138,6 +140,8 @@ def applyTo(self, obj, sim):
class SetSteerAction(SteeringAction):
"""Set the steering 'angle'.

The steering setting will remain constant until this action is taken again.

Arguments:
steer: Steering 'angle' between -1 and 1.
"""
Expand All @@ -154,6 +158,8 @@ def applyTo(self, obj, sim):
class SetBrakeAction(SteeringAction):
"""Set the amount of brake.

The brake setting will remain constant until this action is taken again.

Arguments:
brake: Amount of braking between 0 and 1.
"""
Expand All @@ -170,6 +176,8 @@ def applyTo(self, obj, sim):
class SetHandBrakeAction(SteeringAction):
"""Set or release the hand brake.

The handbrake setting will remain constant until this action is taken again.

Arguments:
handBrake: Whether or not the hand brake is set.
"""
Expand All @@ -186,6 +194,8 @@ def applyTo(self, obj, sim):
class SetReverseAction(SteeringAction):
"""Engage or release reverse gear.

The reverse setting will remain constant until this action is taken again.

Arguments:
reverse: Whether or not the car is in reverse.
"""
Expand Down
51 changes: 38 additions & 13 deletions src/scenic/simulators/metadrive/model.scenic
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ from scenic.simulators.metadrive.sensors import MetaDriveSSSensor as SSSensor

try:
from scenic.simulators.metadrive.simulator import MetaDriveSimulator
from scenic.simulators.metadrive.utils import scenicToMetaDriveHeading
from scenic.simulators.metadrive.utils import scenicToMetaDriveHeading, scenicToMetaDrivePosition
except ModuleNotFoundError:
# for convenience when testing without the metadrive package
from scenic.core.simulators import SimulatorInterfaceWarning
Expand Down Expand Up @@ -117,30 +117,55 @@ class MetaDriveActor(DrivingObject):
"""
metaDriveActor: None

def setPosition(self, pos, elevation):
position = scenicToMetaDrivePosition(pos, simulation().scenic_offset)
self.metaDriveActor.set_position(position)

def setVelocity(self, vel):
self.metaDriveActor.set_velocity(vel)


class Vehicle(Vehicle, Steers, MetaDriveActor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._control = {"steering": 0, "throttle": 0, "brake": 0}

def _reset_control(self):
self._control = {"steering": 0, "throttle": 0, "brake": 0}
self._control = {"steer": 0.0, "throttle": 0.0, "brake": 0.0}
self._reverse = False
self._handbrake = False

def setThrottle(self, throttle):
self._control["throttle"] = throttle

def setSteering(self, steering):
self._control["steering"] = steering
self._control["steer"] = steering

def setBraking(self, braking):
self._control["brake"] = braking

def _collect_action(self):
steering = -self._control["steering"] # Invert the steering to match MetaDrive's convention
action = [
steering,
self._control["throttle"] - self._control["brake"],
]
return action
def setReverse(self, reverse):
self._reverse = reverse

def setHandbrake(self, handbrake):
self._handbrake = handbrake

def _prepare_action(self):
# MetaDrive uses the opposite convention: positive steer turns left
steer = -self._control["steer"]

# Handbrake overrides everything: disable reverse, full brake.
if self._handbrake:
self.metaDriveActor.enable_reverse = False
return [steer, -1.0]

# MetaDrive uses a single combined throttle/brake value
throttle_brake = self._control["throttle"] - self._control["brake"]

# Enable reverse only if requested AND there’s forward drive effort.
enable_reverse = self._reverse and (throttle_brake > 0.0)
self.metaDriveActor.enable_reverse = enable_reverse
if enable_reverse:
throttle_brake = -throttle_brake # negative drives backward
return [steer, throttle_brake]


class Car(Vehicle):
@property
Expand Down
8 changes: 3 additions & 5 deletions src/scenic/simulators/metadrive/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,8 @@ def executeActions(self, allActions):
# Apply control updates to vehicles and pedestrians
for obj in self.scene.objects[1:]: # Skip ego vehicle (it is handled separately)
if obj.isVehicle:
action = obj._collect_action()
action = obj._prepare_action()
obj.metaDriveActor.before_step(action)
obj._reset_control()
else:
# For Pedestrians
if obj._walking_direction is None:
Expand All @@ -292,9 +291,8 @@ def step(self):

# Special handling for the ego vehicle
ego_obj = self.scene.objects[0]
action = ego_obj._collect_action()
self.client.step(action) # Apply action in the simulator
ego_obj._reset_control()
action = ego_obj._prepare_action()
self.client.step(action)

# Render the scene in 2D if needed
if self.render and not self.render3D:
Expand Down
194 changes: 176 additions & 18 deletions tests/simulators/metadrive/test_metadrive.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
import os

import numpy as np
Expand Down Expand Up @@ -46,11 +47,14 @@ def test_pickle(loadLocalScenario):
def getMetadriveSimulator(getAssetPath):
base = getAssetPath("maps/CARLA")

def _getMetadriveSimulator(town, *, render=False, render3D=False):
def _getMetadriveSimulator(town, *, render=False, render3D=False, **kwargs):
openDrivePath = os.path.join(base, f"{town}.xodr")
sumoPath = os.path.join(base, f"{town}.net.xml")
simulator = MetaDriveSimulator(
sumo_map=sumoPath, render=render, render3D=render3D
sumo_map=sumoPath,
render=render,
render3D=render3D,
**kwargs,
)
return simulator, openDrivePath, sumoPath

Expand Down Expand Up @@ -112,39 +116,103 @@ def test_brake(getMetadriveSimulator):
assert finalSpeed == pytest.approx(0.0, abs=1e-1)


def test_pedestrian_movement(getMetadriveSimulator):
def test_reverse_and_brake(getMetadriveSimulator):
simulator, openDrivePath, sumoPath = getMetadriveSimulator("Town01")
code = f"""
param map = r'{openDrivePath}'
param sumo_map = r'{sumoPath}'

model scenic.simulators.metadrive.model

behavior WalkForward():
behavior Reverse():
while True:
take SetWalkingDirectionAction(self.heading), SetWalkingSpeedAction(0.5)
take SetReverseAction(True), SetThrottleAction(1), SetBrakeAction(0)

behavior StopWalking():
behavior Brake():
while True:
take SetWalkingSpeedAction(0)
take SetThrottleAction(0), SetBrakeAction(1)

behavior WalkThenStop():
do WalkForward() for 2 steps
do StopWalking() for 2 steps
behavior ReverseThenBrake():
do Reverse() for 3 steps
do Brake() for 10 steps

ego = new Car at (30, 2)
pedestrian = new Pedestrian at (50, 6), with behavior WalkThenStop
ego = new Car at (369, -326), with behavior ReverseThenBrake

record pedestrian.position as Pos
terminate after 4 steps
record initial ego.heading as Heading
record ego.velocity as Vel
record final ego.speed as Speed

terminate after 13 steps
"""
scenario = compileScenic(code, mode2D=True)
scene = sampleScene(scenario)
simulation = simulator.simulate(scene)
series = simulation.result.records["Pos"]
initialPos = series[0][1]
finalPos = series[-1][1]
assert initialPos != finalPos

h = simulation.result.records["Heading"]
fwd = (-math.sin(h), math.cos(h))
vx, vy, _ = simulation.result.records["Vel"][2][1]
proj = vx * fwd[0] + vy * fwd[1]
assert proj < -0.02, f"Expected reverse velocity (negative proj), got {proj}"

finalSpeed = simulation.result.records["Speed"]
assert finalSpeed == pytest.approx(0.0, abs=0.5)


def test_handbrake(getMetadriveSimulator):
simulator, openDrivePath, sumoPath = getMetadriveSimulator("Town01")
code = f"""
param map = r'{openDrivePath}'
param sumo_map = r'{sumoPath}'

model scenic.simulators.metadrive.model

behavior HandbrakeAndThrottle():
while True:
take SetHandBrakeAction(True), SetThrottleAction(1)

ego = new Car at (369, -326), with behavior HandbrakeAndThrottle
record initial ego.position as InitialPos
record final ego.position as FinalPos
record final ego.speed as Speed
terminate after 6 steps
"""
scenario = compileScenic(code, mode2D=True)
scene = sampleScene(scenario)
simulation = simulator.simulate(scene)

p0 = simulation.result.records["InitialPos"]
p1 = simulation.result.records["FinalPos"]
finalSpeed = simulation.result.records["Speed"]

assert p0 == pytest.approx(p1, abs=0.05)
assert finalSpeed == pytest.approx(0.0, abs=0.1)


def test_set_position(getMetadriveSimulator):
simulator, openDrivePath, sumoPath = getMetadriveSimulator("Town01")
code = f"""
param map = r'{openDrivePath}'
param sumo_map = r'{sumoPath}'

model scenic.simulators.metadrive.model

behavior Teleport():
wait
take SetPositionAction(Vector(120, -56))

ego = new Car at (30, 2), with behavior Teleport
record initial ego.position as InitialPos
record final ego.position as FinalPos
terminate after 2 steps
"""
scenario = compileScenic(code, mode2D=True)
scene = sampleScene(scenario)
simulation = simulator.simulate(scene)
p0 = simulation.result.records["InitialPos"]
p1 = simulation.result.records["FinalPos"]

assert p0 != p1
assert p1 == pytest.approx((120, -56, 0), abs=0.01)


def test_initial_velocity_movement(getMetadriveSimulator):
Expand All @@ -170,6 +238,41 @@ def test_initial_velocity_movement(getMetadriveSimulator):
assert dx < -0.1, f"Expected car to move west (negative dx), but got dx = {dx}"


def test_pedestrian_movement(getMetadriveSimulator):
simulator, openDrivePath, sumoPath = getMetadriveSimulator("Town01")
code = f"""
param map = r'{openDrivePath}'
param sumo_map = r'{sumoPath}'

model scenic.simulators.metadrive.model

behavior WalkForward():
while True:
take SetWalkingDirectionAction(self.heading), SetWalkingSpeedAction(0.5)

behavior StopWalking():
while True:
take SetWalkingSpeedAction(0)

behavior WalkThenStop():
do WalkForward() for 2 steps
do StopWalking() for 2 steps

ego = new Car at (30, 2)
pedestrian = new Pedestrian at (50, 6), with behavior WalkThenStop

record pedestrian.position as Pos
terminate after 4 steps
"""
scenario = compileScenic(code, mode2D=True)
scene = sampleScene(scenario)
simulation = simulator.simulate(scene)
series = simulation.result.records["Pos"]
initialPos = series[0][1]
finalPos = series[-1][1]
assert initialPos != finalPos


@pytest.mark.slow
@pytest.mark.graphical
def test_static_pedestrian(getMetadriveSimulator):
Expand Down Expand Up @@ -237,3 +340,58 @@ def test_duplicate_sensor_names(getMetadriveSimulator):
assert img0.shape == (64, 64, 3)
assert img1.shape == (64, 64, 3)
assert not np.array_equal(img0, img1)


def test_render_2D_saves_gif(getMetadriveSimulator, tmp_path):
outdir = tmp_path / "md_gifs"
outdir.mkdir()

simulator, openDrivePath, sumoPath = getMetadriveSimulator(
"Town01",
render=True,
screen_record=True,
screen_record_filename="render2d.gif",
screen_record_path=str(outdir),
)

code = f"""
param map = r'{openDrivePath}'
param sumo_map = r'{sumoPath}'

model scenic.simulators.metadrive.model

ego = new Car at (30, 2)
terminate after 2 steps
"""
scenario = compileScenic(code, mode2D=True)
scene = sampleScene(scenario)
simulator.simulate(scene)

assert (outdir / "render2d.gif").exists()


def test_follow_lane(getMetadriveSimulator):
# Exercise MetaDrive's getLaneFollowingControllers via FollowLaneBehavior:
# car should stay on a lane and actually accelerate.
simulator, openDrivePath, sumoPath = getMetadriveSimulator("Town01")
code = f"""
param map = r'{openDrivePath}'
param sumo_map = r'{sumoPath}'

model scenic.simulators.metadrive.model

ego = new Car with behavior FollowLaneBehavior(target_speed=8)

record final (ego._lane is not None) as OnLane
record final ego.speed as FinalSpeed
terminate after 8 steps
"""
scenario = compileScenic(code, mode2D=True)
scene = sampleScene(scenario, maxIterations=1000)
simulation = simulator.simulate(scene)
assert simulation.result.records[
"OnLane"
], "Vehicle left the lane under FollowLaneBehavior."
assert (
simulation.result.records["FinalSpeed"] > 0.2
), "Vehicle did not accelerate along the lane."