From ef9d735fa83b3d024d7615e6db8b2b5888d52a3f Mon Sep 17 00:00:00 2001 From: Eric Vin Date: Wed, 6 Nov 2024 16:49:04 -0800 Subject: [PATCH 1/2] Initial simulator stepping implementation. --- src/scenic/core/simulators.py | 351 +++++++++++++++++++++------------- 1 file changed, 223 insertions(+), 128 deletions(-) diff --git a/src/scenic/core/simulators.py b/src/scenic/core/simulators.py index 832b03632..18c1fa5c8 100644 --- a/src/scenic/core/simulators.py +++ b/src/scenic/core/simulators.py @@ -11,7 +11,8 @@ """ import abc -from collections import defaultdict +from collections import OrderedDict, defaultdict +from contextlib import contextmanager import enum import math import numbers @@ -135,9 +136,8 @@ def simulate( (rarely) and its security implications. Returns: - A `Simulation` object representing the completed simulation, or `None` if no - simulation satisfying the requirements could be found within - **maxIterations** iterations. + An initialized simulation, or `None` if no simulation satisfying + the requirements could be found within **maxIterations** iterations. Raises: SimulationCreationError: if an error occurred while trying to run a @@ -191,6 +191,53 @@ def simulate( ) return simulation + @contextmanager + def simulateStepped( + self, + scene, + maxSteps=None, + *, + name="SteppedSimulation", + timestep=None, + verbosity=None, + replay=None, + enableReplay=True, + enableDivergenceCheck=False, + divergenceTolerance=0, + continueAfterDivergence=False, + allowPickle=False, + ): + if self._destroyed: + raise RuntimeError( + "simulator cannot run additional simulations " + "(the destroy() method has already been called)" + ) + if verbosity is None: + verbosity = errors.verbosityLevel + + simulation = self.createSimulation( + scene, + maxSteps=maxSteps, + name=name, + verbosity=verbosity, + timestep=timestep, + replay=replay, + enableReplay=enableReplay, + enableDivergenceCheck=enableDivergenceCheck, + divergenceTolerance=divergenceTolerance, + continueAfterDivergence=continueAfterDivergence, + allowPickle=allowPickle, + ) + try: + yield simulation + except (RejectSimulationException, RejectionException, GuardViolation) as e: + # This simulation will be thrown out, but attach it to the exception + # to aid in debugging. + e.simulation = self + raise + finally: + simulation.cleanup() + def replay(self, scene, replay, **kwargs): """Replay a simulation. @@ -207,13 +254,15 @@ def _runSingleSimulation( if verbosity >= 2: print(f" Starting simulation {name}...") try: - simulation = self.createSimulation( + with self.simulateStepped( scene, maxSteps=maxSteps, name=name, verbosity=verbosity, **kwargs, - ) + ) as simulation: + simulation._run() + except (RejectSimulationException, RejectionException, GuardViolation) as e: if verbosity >= 2: print( @@ -339,11 +388,14 @@ def __init__( self.currentTime = 0 self.timestep = 1 if timestep is None else float(timestep) self.verbosity = verbosity + self.maxSteps = maxSteps self.name = name self.worker_num = 0 self.actionSequence = [] + self._cleaned = False + # Prepare to save or load a replay. self.initializeReplay(replay, enableReplay, enableDivergenceCheck, allowPickle) self.divergenceTolerance = divergenceTolerance @@ -356,153 +408,192 @@ def __init__( import scenic.syntax.veneer as veneer veneer.beginSimulation(self) - dynamicScenario = self.scene.dynamicScenario + self.dynamicScenario = self.scene.dynamicScenario # Create objects and perform simulator-specific initialization. self.setup() # Initialize the top-level dynamic scenario. - dynamicScenario._start() + self.dynamicScenario._start() # Update all objects in case the simulator has adjusted any dynamic # properties during setup. self.updateObjects() - # Run the simulation. - terminationType, terminationReason = self._run(dynamicScenario, maxSteps) + # Set terminationType and terminationReason to default None + self.terminationType = None + self.terminationReason = None - # Stop all remaining scenarios. - # (and reject if some 'require eventually' condition was never satisfied) - for scenario in tuple(reversed(veneer.runningScenarios)): - scenario._stop("simulation terminated") - - # Record finally-recorded values. - values = dynamicScenario._evaluateRecordedExprs(RequirementType.recordFinal) - for name, val in values.items(): - self.records[name] = val - - # Package up simulation results into a compact object. - result = SimulationResult( - self.trajectory, - self.actionSequence, - terminationType, - terminationReason, - self.records, - ) - self.result = result except (RejectSimulationException, RejectionException, GuardViolation) as e: # This simulation will be thrown out, but attach it to the exception # to aid in debugging. + self.cleanup() e.simulation = self raise - finally: - self.destroy() - for obj in self.objects: - disableDynamicProxyFor(obj) - for agent in self.agents: - if agent.behavior and agent.behavior._isRunning: - agent.behavior._stop() - # If the simulation was terminated by an exception (including rejections), - # some scenarios may still be running; we need to clean them up without - # checking their requirements, which could raise rejection exceptions. - for scenario in tuple(reversed(veneer.runningScenarios)): - scenario._stop("exception", quiet=True) - veneer.endSimulation(self) - - def _run(self, dynamicScenario, maxSteps): + + def _run(self): assert self.currentTime == 0 while True: - if self.verbosity >= 3: - print(f" Time step {self.currentTime}:") - - # Run compose blocks of compositional scenarios - # (and check if any requirements defined therein fail) - # N.B. if the top-level scenario completes, we don't immediately end - # the simulation since we need to check if any monitors reject first. - terminationReason = dynamicScenario._step() - terminationType = TerminationType.scenarioComplete - - # Record current state of the simulation - self.recordCurrentState() - - # Run monitors - newReason = dynamicScenario._runMonitors() - if newReason is not None: - terminationReason = newReason - terminationType = TerminationType.terminatedByMonitor - - # "Always" and scenario-level requirements have been checked; - # now safe to terminate if the top-level scenario has finished, - # a monitor requested termination, or we've hit the timeout - if terminationReason is not None: - return terminationType, terminationReason - terminationReason = dynamicScenario._checkSimulationTerminationConditions() - if terminationReason is not None: - return TerminationType.simulationTerminationCondition, terminationReason - if maxSteps and self.currentTime >= maxSteps: - return TerminationType.timeLimit, f"reached time limit ({maxSteps} steps)" - - # Clear lastActions for all objects - for obj in self.objects: - obj.lastActions = tuple() - - # Update agents with any objects that now have behaviors (and are not already agents) - self.agents += [ - obj for obj in self.objects if obj.behavior and obj not in self.agents - ] - - # Compute the actions of the agents in this time step - allActions = defaultdict(tuple) - schedule = self.scheduleForAgents() - if not set(self.agents) == set(schedule): - raise RuntimeError("Simulator schedule does not contain all agents") - for agent in schedule: - # If agent doesn't have a behavior right now, continue - if not agent.behavior: - continue - - # Run the agent's behavior to get its actions - actions = agent.behavior._step() - - # Handle pseudo-actions marking the end of a simulation/scenario - if isinstance(actions, _EndSimulationAction): - return TerminationType.terminatedByBehavior, str(actions) - elif isinstance(actions, _EndScenarioAction): - scenario = actions.scenario - if scenario._isRunning: - scenario._stop(actions) - if scenario is dynamicScenario: - # Top-level scenario was terminated, so whole simulation will end. - return TerminationType.terminatedByBehavior, str(actions) - actions = () - - # Check ordinary actions for compatibility - assert isinstance(actions, tuple) - if len(actions) == 1 and isinstance(actions[0], (list, tuple)): - actions = tuple(actions[0]) - if not self.actionsAreCompatible(agent, actions): - raise InvalidScenarioError( - f"agent {agent} tried incompatible action(s) {actions}" + self.advance() + + if self.terminationType: + return + + def advance(self): + if self.terminationType or self._cleaned: + raise TerminatedSimulationException() + + if self.verbosity >= 3: + print(f" Time step {self.currentTime}:") + + # Run compose blocks of compositional scenarios + # (and check if any requirements defined therein fail) + # N.B. if the top-level scenario completes, we don't immediately end + # the simulation since we need to check if any monitors reject first. + terminationReason = self.dynamicScenario._step() + terminationType = TerminationType.scenarioComplete + + # Record current state of the simulation + self.recordCurrentState() + + # Run monitors + newReason = self.dynamicScenario._runMonitors() + if newReason is not None: + terminationReason = newReason + terminationType = TerminationType.terminatedByMonitor + + # "Always" and scenario-level requirements have been checked; + # now safe to terminate if the top-level scenario has finished, + # a monitor requested termination, or we've hit the timeout + if terminationReason is not None: + return self.terminateSimulation(terminationType, terminationReason) + terminationReason = self.dynamicScenario._checkSimulationTerminationConditions() + if terminationReason is not None: + return self.terminateSimulation( + TerminationType.simulationTerminationCondition, terminationReason + ) + if self.maxSteps and self.currentTime >= self.maxSteps: + return self.terminateSimulation( + TerminationType.timeLimit, f"reached time limit ({self.maxSteps} steps)" + ) + + # Clear lastActions for all objects + for obj in self.objects: + obj.lastActions = tuple() + + # Update agents with any objects that now have behaviors (and are not already agents) + self.agents += [ + obj for obj in self.objects if obj.behavior and obj not in self.agents + ] + + # Compute the actions of the agents in this time step + allActions = defaultdict(tuple) + schedule = self.scheduleForAgents() + if not set(self.agents) == set(schedule): + raise RuntimeError("Simulator schedule does not contain all agents") + for agent in schedule: + # If agent doesn't have a behavior right now, continue + if not agent.behavior: + continue + # Run the agent's behavior to get its actions + actions = agent.behavior._step() + + # Handle pseudo-actions marking the end of a simulation/scenario + if isinstance(actions, _EndSimulationAction): + return self.terminateSimulation( + TerminationType.terminatedByBehavior, str(actions) + ) + elif isinstance(actions, _EndScenarioAction): + scenario = actions.scenario + if scenario._isRunning: + scenario._stop(actions) + if scenario is self.dynamicScenario: + # Top-level scenario was terminated, so whole simulation will end. + return self.terminateSimulation( + TerminationType.terminatedByBehavior, str(actions) ) + actions = () + + # Check ordinary actions for compatibility + assert isinstance(actions, tuple) + if len(actions) == 1 and isinstance(actions[0], (list, tuple)): + actions = tuple(actions[0]) + if not self.actionsAreCompatible(agent, actions): + raise InvalidScenarioError( + f"agent {agent} tried incompatible action(s) {actions}" + ) - # Save actions for execution below - allActions[agent] = actions + # Save actions for execution below + allActions[agent] = actions - # Log lastActions + # Log lastActions + agent.lastActions = actions + + # Execute the actions + if self.verbosity >= 3: + for agent, actions in allActions.items(): + print(f" Agent {agent} takes action(s) {actions}") agent.lastActions = actions + self.actionSequence.append(allActions) + self.executeActions(allActions) - # Execute the actions - if self.verbosity >= 3: - for agent, actions in allActions.items(): - print(f" Agent {agent} takes action(s) {actions}") - self.actionSequence.append(allActions) - self.executeActions(allActions) + # Run the simulation for a single step and read its state back into Scenic + self.step() + self.currentTime += 1 + self.updateObjects() - # Run the simulation for a single step and read its state back into Scenic - self.step() - self.currentTime += 1 - self.updateObjects() + def terminateSimulation(self, terimnationType, terminationReason): + import scenic.syntax.veneer as veneer + + # Log terminationType and terminationReason + self.terminationType = terimnationType + self.terminationReason = terminationReason + + # Stop all remaining scenarios. + # (and reject if some 'require eventually' condition was never satisfied) + for scenario in tuple(reversed(veneer.runningScenarios)): + scenario._stop("simulation terminated") + + # Record finally-recorded values. + values = self.dynamicScenario._evaluateRecordedExprs(RequirementType.recordFinal) + for name, val in values.items(): + self.records[name] = val + + # Package up simulation results into a compact object. + result = SimulationResult( + self.trajectory, + self.actionSequence, + self.terminationType, + self.terminationReason, + self.records, + ) + self.result = result + + self.cleanup() + + def cleanup(self): + # No need to repeat cleanup if we've already done it + if self._cleaned: + return + + # Remember that we have cleaned up. + self._cleaned = True + + import scenic.syntax.veneer as veneer + + self.destroy() + for obj in self.objects: + disableDynamicProxyFor(obj) + for agent in self.agents: + if agent.behavior and agent.behavior._isRunning: + agent.behavior._stop() + # If the simulation was terminated by an exception (including rejections), + # some scenarios may still be running; we need to clean them up without + # checking their requirements, which could raise rejection exceptions. + for scenario in tuple(reversed(veneer.runningScenarios)): + scenario._stop("exception", quiet=True) + veneer.endSimulation(self) def setup(self): """Set up the simulation to run in the simulator. @@ -911,3 +1002,7 @@ def __init__(self, trajectory, actions, terminationType, terminationReason, reco self.terminationType = terminationType self.terminationReason = str(terminationReason) self.records = dict(records) + + +class TerminatedSimulationException(Exception): + pass From a452067a6131eb27a9ff97d8b1248d01f7f68c35 Mon Sep 17 00:00:00 2001 From: Eric Vin Date: Wed, 6 Nov 2024 17:07:06 -0800 Subject: [PATCH 2/2] Added stepped simulation test. --- src/scenic/core/simulators.py | 18 +++++------------- tests/core/test_simulators.py | 30 +++++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/src/scenic/core/simulators.py b/src/scenic/core/simulators.py index 18c1fa5c8..57574767d 100644 --- a/src/scenic/core/simulators.py +++ b/src/scenic/core/simulators.py @@ -420,10 +420,6 @@ def __init__( # properties during setup. self.updateObjects() - # Set terminationType and terminationReason to default None - self.terminationType = None - self.terminationReason = None - except (RejectSimulationException, RejectionException, GuardViolation) as e: # This simulation will be thrown out, but attach it to the exception # to aid in debugging. @@ -437,11 +433,11 @@ def _run(self): while True: self.advance() - if self.terminationType: + if self.result: return def advance(self): - if self.terminationType or self._cleaned: + if self.result or self._cleaned: raise TerminatedSimulationException() if self.verbosity >= 3: @@ -543,13 +539,9 @@ def advance(self): self.currentTime += 1 self.updateObjects() - def terminateSimulation(self, terimnationType, terminationReason): + def terminateSimulation(self, terminationType, terminationReason): import scenic.syntax.veneer as veneer - # Log terminationType and terminationReason - self.terminationType = terimnationType - self.terminationReason = terminationReason - # Stop all remaining scenarios. # (and reject if some 'require eventually' condition was never satisfied) for scenario in tuple(reversed(veneer.runningScenarios)): @@ -564,8 +556,8 @@ def terminateSimulation(self, terimnationType, terminationReason): result = SimulationResult( self.trajectory, self.actionSequence, - self.terminationType, - self.terminationReason, + terminationType, + terminationReason, self.records, ) self.result = result diff --git a/tests/core/test_simulators.py b/tests/core/test_simulators.py index 149c1cad1..5358a8cec 100644 --- a/tests/core/test_simulators.py +++ b/tests/core/test_simulators.py @@ -1,6 +1,11 @@ import pytest -from scenic.core.simulators import DummySimulation, DummySimulator, Simulation +from scenic.core.simulators import ( + DummySimulation, + DummySimulator, + Simulation, + TerminatedSimulationException, +) from tests.utils import compileScenic, sampleResultFromScene, sampleSceneFrom @@ -35,6 +40,29 @@ def test_simulator_destruction(): assert "destroy() called twice" in str(e) +def test_simulator_stepped(): + simulator = DummySimulator() + scene = sampleSceneFrom("ego = new Object") + + with simulator.simulateStepped(scene, maxSteps=5) as simulation: + while simulation.result is None: + simulation.advance() + + assert simulation.result is not None + assert simulation.currentTime == 5 + + # advance() should do nothing but raise an exception + # if the simulation is already terminated + with pytest.raises(TerminatedSimulationException): + simulation.advance() + + assert simulation.currentTime == 5 + + # Ensure all values are preserved after leaving the context manager + assert simulation.result is not None + assert simulation.currentTime == 5 + + def test_simulator_set_property(): class TestSimulation(DummySimulation): def createObjectInSimulator(self, obj):