Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ omit =
src/scenic/simulators/carla/*
src/scenic/simulators/gta/*
src/scenic/simulators/lgsvl/*
src/scenic/simulators/metadrive/*
src/scenic/simulators/webots/*
src/scenic/simulators/xplane/*

Expand Down
10 changes: 10 additions & 0 deletions src/scenic/domains/driving/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ def canBeTakenBy(self, agent):
class SetThrottleAction(SteeringAction):
"""Set the throttle.

The throttle setting will remain constant until this action is taken again.

Arguments:
throttle: Throttle value between 0 and 1.
"""
Expand All @@ -138,6 +140,8 @@ def applyTo(self, obj, sim):
class SetSteerAction(SteeringAction):
"""Set the steering 'angle'.

The steering setting will remain constant until this action is taken again.

Arguments:
steer: Steering 'angle' between -1 and 1. Positive values steer to the right.
"""
Expand All @@ -154,6 +158,8 @@ def applyTo(self, obj, sim):
class SetBrakeAction(SteeringAction):
"""Set the amount of brake.

The brake setting will remain constant until this action is taken again.

Arguments:
brake: Amount of braking between 0 and 1.
"""
Expand All @@ -170,6 +176,8 @@ def applyTo(self, obj, sim):
class SetHandBrakeAction(SteeringAction):
"""Set or release the hand brake.

The handbrake setting will remain constant until this action is taken again.

Arguments:
handBrake: Whether or not the hand brake is set.
"""
Expand All @@ -186,6 +194,8 @@ def applyTo(self, obj, sim):
class SetReverseAction(SteeringAction):
"""Engage or release reverse gear.

The reverse setting will remain constant until this action is taken again.

Arguments:
reverse: Whether or not the car is in reverse.
"""
Expand Down
51 changes: 38 additions & 13 deletions src/scenic/simulators/metadrive/model.scenic
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ from scenic.simulators.metadrive.sensors import MetaDriveSSSensor as SSSensor

try:
from scenic.simulators.metadrive.simulator import MetaDriveSimulator
from scenic.simulators.metadrive.utils import scenicToMetaDriveHeading
from scenic.simulators.metadrive.utils import scenicToMetaDriveHeading, scenicToMetaDrivePosition
except ModuleNotFoundError:
# for convenience when testing without the metadrive package
from scenic.core.simulators import SimulatorInterfaceWarning
Expand Down Expand Up @@ -117,30 +117,55 @@ class MetaDriveActor(DrivingObject):
"""
metaDriveActor: None

def setPosition(self, pos, elevation):
position = scenicToMetaDrivePosition(pos, simulation().scenic_offset)
self.metaDriveActor.set_position(position)

def setVelocity(self, vel):
self.metaDriveActor.set_velocity(vel)


class Vehicle(Vehicle, Steers, MetaDriveActor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._control = {"steering": 0, "throttle": 0, "brake": 0}

def _reset_control(self):
self._control = {"steering": 0, "throttle": 0, "brake": 0}
self._control = {"steer": 0.0, "throttle": 0.0, "brake": 0.0}
self._reverse = False
self._handbrake = False

def setThrottle(self, throttle):
self._control["throttle"] = throttle

def setSteering(self, steering):
self._control["steering"] = steering
self._control["steer"] = steering

def setBraking(self, braking):
self._control["brake"] = braking

def _collect_action(self):
steering = -self._control["steering"] # Invert the steering to match MetaDrive's convention
action = [
steering,
self._control["throttle"] - self._control["brake"],
]
return action
def setReverse(self, reverse):
self._reverse = reverse

def setHandbrake(self, handbrake):
self._handbrake = handbrake

def _prepare_action(self):
# MetaDrive uses the opposite convention: positive steer turns left
steer = -self._control["steer"]

# Handbrake overrides everything: disable reverse, full brake.
if self._handbrake:
self.metaDriveActor.enable_reverse = False
return [steer, -1.0]

# MetaDrive uses a single combined throttle/brake value
throttle_brake = self._control["throttle"] - self._control["brake"]

# Enable reverse only if requested AND there’s forward drive effort.
enable_reverse = self._reverse and (throttle_brake > 0.0)
self.metaDriveActor.enable_reverse = enable_reverse
if enable_reverse:
throttle_brake = -throttle_brake # negative drives backward
return [steer, throttle_brake]


class Car(Vehicle):
@property
Expand Down
8 changes: 3 additions & 5 deletions src/scenic/simulators/metadrive/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,8 @@ def executeActions(self, allActions):
if obj is self.objects[0]: # Skip ego vehicle (it is handled separately)
continue
if obj.isVehicle:
action = obj._collect_action()
action = obj._prepare_action()
obj.metaDriveActor.before_step(action)
obj._reset_control()
else:
# For Pedestrians
if obj._walking_direction is None:
Expand All @@ -294,9 +293,8 @@ def step(self):

# Special handling for the ego vehicle
ego_obj = self.objects[0]
action = ego_obj._collect_action()
self.client.step(action) # Apply action in the simulator
ego_obj._reset_control()
action = ego_obj._prepare_action()
self.client.step(action)

# Render the scene in 2D if needed
if self.render and not self.render3D:
Expand Down
194 changes: 176 additions & 18 deletions tests/simulators/metadrive/test_metadrive.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
import os
from urllib.error import URLError

Expand Down Expand Up @@ -58,11 +59,14 @@ def test_pickle(loadLocalScenario):
def getMetadriveSimulator(getAssetPath):
base = getAssetPath("maps/CARLA")

def _getMetadriveSimulator(town, *, render=False, render3D=False):
def _getMetadriveSimulator(town, *, render=False, render3D=False, **kwargs):
openDrivePath = os.path.join(base, f"{town}.xodr")
sumoPath = os.path.join(base, f"{town}.net.xml")
simulator = MetaDriveSimulator(
sumo_map=sumoPath, render=render, render3D=render3D
sumo_map=sumoPath,
render=render,
render3D=render3D,
**kwargs,
)
return simulator, openDrivePath, sumoPath

Expand Down Expand Up @@ -124,39 +128,103 @@ def test_brake(getMetadriveSimulator):
assert finalSpeed == pytest.approx(0.0, abs=1e-1)


def test_pedestrian_movement(getMetadriveSimulator):
def test_reverse_and_brake(getMetadriveSimulator):
simulator, openDrivePath, sumoPath = getMetadriveSimulator("Town01")
code = f"""
param map = r'{openDrivePath}'
param sumo_map = r'{sumoPath}'

model scenic.simulators.metadrive.model

behavior WalkForward():
behavior Reverse():
while True:
take SetWalkingDirectionAction(self.heading), SetWalkingSpeedAction(0.5)
take SetReverseAction(True), SetThrottleAction(1), SetBrakeAction(0)

behavior StopWalking():
behavior Brake():
while True:
take SetWalkingSpeedAction(0)
take SetThrottleAction(0), SetBrakeAction(1)

behavior WalkThenStop():
do WalkForward() for 2 steps
do StopWalking() for 2 steps
behavior ReverseThenBrake():
do Reverse() for 3 steps
do Brake() for 10 steps

ego = new Car at (30, 2)
pedestrian = new Pedestrian at (50, 6), with behavior WalkThenStop
ego = new Car at (369, -326), with behavior ReverseThenBrake

record pedestrian.position as Pos
terminate after 4 steps
record initial ego.heading as Heading
record ego.velocity as Vel
record final ego.speed as Speed

terminate after 13 steps
"""
scenario = compileScenic(code, mode2D=True)
scene = sampleScene(scenario)
simulation = simulator.simulate(scene)
series = simulation.result.records["Pos"]
initialPos = series[0][1]
finalPos = series[-1][1]
assert initialPos != finalPos

h = simulation.result.records["Heading"]
fwd = (-math.sin(h), math.cos(h))
vx, vy, _ = simulation.result.records["Vel"][2][1]
proj = vx * fwd[0] + vy * fwd[1]
assert proj < -0.02, f"Expected reverse velocity (negative proj), got {proj}"

finalSpeed = simulation.result.records["Speed"]
assert finalSpeed == pytest.approx(0.0, abs=0.5)


def test_handbrake(getMetadriveSimulator):
simulator, openDrivePath, sumoPath = getMetadriveSimulator("Town01")
code = f"""
param map = r'{openDrivePath}'
param sumo_map = r'{sumoPath}'

model scenic.simulators.metadrive.model

behavior HandbrakeAndThrottle():
while True:
take SetHandBrakeAction(True), SetThrottleAction(1)

ego = new Car at (369, -326), with behavior HandbrakeAndThrottle
record initial ego.position as InitialPos
record final ego.position as FinalPos
record final ego.speed as Speed
terminate after 6 steps
"""
scenario = compileScenic(code, mode2D=True)
scene = sampleScene(scenario)
simulation = simulator.simulate(scene)

p0 = simulation.result.records["InitialPos"]
p1 = simulation.result.records["FinalPos"]
finalSpeed = simulation.result.records["Speed"]

assert p0 == pytest.approx(p1, abs=0.05)
assert finalSpeed == pytest.approx(0.0, abs=0.1)


def test_set_position(getMetadriveSimulator):
simulator, openDrivePath, sumoPath = getMetadriveSimulator("Town01")
code = f"""
param map = r'{openDrivePath}'
param sumo_map = r'{sumoPath}'

model scenic.simulators.metadrive.model

behavior Teleport():
wait
take SetPositionAction(Vector(120, -56))

ego = new Car at (30, 2), with behavior Teleport
record initial ego.position as InitialPos
record final ego.position as FinalPos
terminate after 2 steps
"""
scenario = compileScenic(code, mode2D=True)
scene = sampleScene(scenario)
simulation = simulator.simulate(scene)
p0 = simulation.result.records["InitialPos"]
p1 = simulation.result.records["FinalPos"]

assert p0 != p1
assert p1 == pytest.approx((120, -56, 0), abs=0.01)


def test_initial_velocity_movement(getMetadriveSimulator):
Expand All @@ -182,6 +250,41 @@ def test_initial_velocity_movement(getMetadriveSimulator):
assert dx < -0.1, f"Expected car to move west (negative dx), but got dx = {dx}"


def test_pedestrian_movement(getMetadriveSimulator):
simulator, openDrivePath, sumoPath = getMetadriveSimulator("Town01")
code = f"""
param map = r'{openDrivePath}'
param sumo_map = r'{sumoPath}'

model scenic.simulators.metadrive.model

behavior WalkForward():
while True:
take SetWalkingDirectionAction(self.heading), SetWalkingSpeedAction(0.5)

behavior StopWalking():
while True:
take SetWalkingSpeedAction(0)

behavior WalkThenStop():
do WalkForward() for 2 steps
do StopWalking() for 2 steps

ego = new Car at (30, 2)
pedestrian = new Pedestrian at (50, 6), with behavior WalkThenStop

record pedestrian.position as Pos
terminate after 4 steps
"""
scenario = compileScenic(code, mode2D=True)
scene = sampleScene(scenario)
simulation = simulator.simulate(scene)
series = simulation.result.records["Pos"]
initialPos = series[0][1]
finalPos = series[-1][1]
assert initialPos != finalPos


@pytest.mark.slow
@pytest.mark.graphical
def test_static_pedestrian(getMetadriveSimulator):
Expand Down Expand Up @@ -305,3 +408,58 @@ def test_composed_scenario(getMetadriveSimulator):
assert len(traj[0]) == 2, f"expected 2 objects, got {len(traj[0])}"
assert traj[0][0] != traj[-1][0], f"ego car did not move."
assert traj[0][1] != traj[-1][1], f"subscenario car did not move."


def test_render_2D_saves_gif(getMetadriveSimulator, tmp_path):
outdir = tmp_path / "md_gifs"
outdir.mkdir()

simulator, openDrivePath, sumoPath = getMetadriveSimulator(
"Town01",
render=True,
screen_record=True,
screen_record_filename="render2d.gif",
screen_record_path=str(outdir),
)

code = f"""
param map = r'{openDrivePath}'
param sumo_map = r'{sumoPath}'

model scenic.simulators.metadrive.model

ego = new Car at (30, 2)
terminate after 2 steps
"""
scenario = compileScenic(code, mode2D=True)
scene = sampleScene(scenario)
simulator.simulate(scene)

assert (outdir / "render2d.gif").exists()


def test_follow_lane(getMetadriveSimulator):
# Exercise MetaDrive's getLaneFollowingControllers via FollowLaneBehavior:
# car should stay on a lane and actually accelerate.
simulator, openDrivePath, sumoPath = getMetadriveSimulator("Town01")
code = f"""
param map = r'{openDrivePath}'
param sumo_map = r'{sumoPath}'

model scenic.simulators.metadrive.model

ego = new Car with behavior FollowLaneBehavior(target_speed=8)

record final (ego._lane is not None) as OnLane
record final ego.speed as FinalSpeed
terminate after 8 steps
"""
scenario = compileScenic(code, mode2D=True)
scene = sampleScene(scenario, maxIterations=1000)
simulation = simulator.simulate(scene)
assert simulation.result.records[
"OnLane"
], "Vehicle left the lane under FollowLaneBehavior."
assert (
simulation.result.records["FinalSpeed"] > 0.2
), "Vehicle did not accelerate along the lane."
Loading