diff --git a/src/scenic/core/simulators.py b/src/scenic/core/simulators.py index 3e9c0308f..578d71d83 100644 --- a/src/scenic/core/simulators.py +++ b/src/scenic/core/simulators.py @@ -11,7 +11,8 @@ """ import abc -from collections import defaultdict +from collections import OrderedDict, defaultdict +from contextlib import contextmanager import enum import math import numbers @@ -138,9 +139,8 @@ def simulate( (rarely) and its security implications. Returns: - A `Simulation` object representing the completed simulation, or `None` if no - simulation satisfying the requirements could be found within - **maxIterations** iterations. + An initialized simulation, or `None` if no simulation satisfying + the requirements could be found within **maxIterations** iterations. Raises: SimulationCreationError: if an error occurred while trying to run a @@ -198,6 +198,53 @@ def simulate( ) return simulation + @contextmanager + def simulateStepped( + self, + scene, + maxSteps=None, + *, + name="SteppedSimulation", + timestep=None, + verbosity=None, + replay=None, + enableReplay=True, + enableDivergenceCheck=False, + divergenceTolerance=0, + continueAfterDivergence=False, + allowPickle=False, + ): + if self._destroyed: + raise RuntimeError( + "simulator cannot run additional simulations " + "(the destroy() method has already been called)" + ) + if verbosity is None: + verbosity = errors.verbosityLevel + + simulation = self.createSimulation( + scene, + maxSteps=maxSteps, + name=name, + verbosity=verbosity, + timestep=timestep, + replay=replay, + enableReplay=enableReplay, + enableDivergenceCheck=enableDivergenceCheck, + divergenceTolerance=divergenceTolerance, + continueAfterDivergence=continueAfterDivergence, + allowPickle=allowPickle, + ) + try: + yield simulation + except (RejectSimulationException, RejectionException, GuardViolation) as e: + # This simulation will be thrown out, but attach it to the exception + # to aid in debugging. + e.simulation = self + raise + finally: + simulation.cleanup() + def replay(self, scene, replay, **kwargs): """Replay a simulation. @@ -214,13 +261,15 @@ def _runSingleSimulation( if verbosity >= 2: print(f" Starting simulation {name}...") try: - simulation = self.createSimulation( + with self.simulateStepped( scene, maxSteps=maxSteps, name=name, verbosity=verbosity, **kwargs, - ) + ) as simulation: + simulation._run() + except (RejectSimulationException, RejectionException, GuardViolation) as e: if verbosity >= 2: print( @@ -347,11 +396,14 @@ def __init__( self.currentTime = 0 self.timestep = 1 if timestep is None else float(timestep) self.verbosity = verbosity + self.maxSteps = maxSteps self.name = name self.worker_num = 0 self.actionSequence = [] + self._cleaned = False + # Prepare to save or load a replay. self.initializeReplay(replay, enableReplay, enableDivergenceCheck, allowPickle) self.divergenceTolerance = divergenceTolerance @@ -364,170 +416,201 @@ def __init__( import scenic.syntax.veneer as veneer veneer.beginSimulation(self) - dynamicScenario = self.scene.dynamicScenario + self.dynamicScenario = self.scene.dynamicScenario # Create objects and perform simulator-specific initialization. self.setup() # Initialize the top-level dynamic scenario. - dynamicScenario._start() + self.dynamicScenario._start() # Update all objects in case the simulator has adjusted any dynamic # properties during setup. self.updateObjects() - # Run the simulation. - terminationType, terminationReason = self._run(dynamicScenario, maxSteps) - - # Stop all remaining scenarios. - # (and reject if some 'require eventually' condition was never satisfied) - for scenario in tuple(reversed(veneer.runningScenarios)): - scenario._stop("simulation terminated") - - # Record finally-recorded values. - values = dynamicScenario._evaluateRecordedExprs( - RequirementType.recordFinal, self.currentTime - ) - for name, val in values.items(): - self.records[name] = val - - # Package up simulation results into a compact object. - result = SimulationResult( - self.trajectory, - self.actionSequence, - terminationType, - terminationReason, - self.records, - ) - self.result = result except (RejectSimulationException, RejectionException, GuardViolation) as e: # This simulation will be thrown out, but attach it to the exception # to aid in debugging. + self.cleanup() e.simulation = self raise - finally: - self.destroy() - for obj in self.objects: - disableDynamicProxyFor(obj) - for agent in self.agents: - if agent.behavior and agent.behavior._isRunning: - agent.behavior._stop() - # If the simulation was terminated by an exception (including rejections), - # some scenarios may still be running; we need to clean them up without - # checking their requirements, which could raise rejection exceptions. - for scenario in tuple(reversed(veneer.runningScenarios)): - scenario._stop("exception", quiet=True) - veneer.endSimulation(self) - - def _run(self, dynamicScenario, maxSteps): + + def _run(self): assert self.currentTime == 0 while True: - if self.verbosity >= 3: - print(f" Time step {self.currentTime}:") - - # Run compose blocks of compositional scenarios - # (and check if any requirements defined therein fail) - # N.B. if the top-level scenario completes, we don't immediately end - # the simulation since we need to check if any monitors reject first. - terminationReason = dynamicScenario._step() - terminationType = TerminationType.scenarioComplete - - # Update observations of objects with sensors - for obj in self.objects: - if not obj.sensors: - continue - obj.observations.update( - {key: sensor.getObservation() for key, sensor in obj.sensors.items()} - ) + self.advance() - # Record current state of the simulation - self.recordCurrentState() + if self.result: + return - # Run monitors - newReason = dynamicScenario._runMonitors() - if newReason is not None: - terminationReason = newReason - terminationType = TerminationType.terminatedByMonitor + def advance(self): + if self.result or self._cleaned: + raise TerminatedSimulationException() - # Check if users manually closed out display for simulator - if "Dead" in str(self.screen): - return ( - TerminationType.terminatedByUser, - "user manually terminated simulation", - ) + if self.verbosity >= 3: + print(f" Time step {self.currentTime}:") + + # Run compose blocks of compositional scenarios + # (and check if any requirements defined therein fail) + # N.B. if the top-level scenario completes, we don't immediately end + # the simulation since we need to check if any monitors reject first. + terminationReason = self.dynamicScenario._step() + terminationType = TerminationType.scenarioComplete + + # Update observations of objects with sensors + for obj in self.objects: + if not obj.sensors: + continue + obj.observations.update( + {key: sensor.getObservation() for key, sensor in obj.sensors.items()} + ) - # "Always" and scenario-level requirements have been checked; - # now safe to terminate if the top-level scenario has finished, - # a monitor requested termination, or we've hit the timeout - if terminationReason is not None: - return terminationType, terminationReason - terminationReason = dynamicScenario._checkSimulationTerminationConditions() - if terminationReason is not None: - return TerminationType.simulationTerminationCondition, terminationReason - if maxSteps and self.currentTime >= maxSteps: - return TerminationType.timeLimit, f"reached time limit ({maxSteps} steps)" - - # Clear lastActions for all objects - for obj in self.objects: - obj.lastActions = tuple() - - # Update agents with any objects that now have behaviors (and are not already agents) - self.agents += [ - obj for obj in self.objects if obj.behavior and obj not in self.agents - ] - - # Compute the actions of the agents in this time step - allActions = defaultdict(tuple) - schedule = self.scheduleForAgents() - if not set(self.agents) == set(schedule): - raise RuntimeError("Simulator schedule does not contain all agents") - for agent in schedule: - # If agent doesn't have a behavior right now, continue - if not agent.behavior: - continue - - # Run the agent's behavior to get its actions - actions = agent.behavior._step() - - # Handle pseudo-actions marking the end of a simulation/scenario - if isinstance(actions, _EndSimulationAction): - return TerminationType.terminatedByBehavior, str(actions) - elif isinstance(actions, _EndScenarioAction): - scenario = actions.scenario - if scenario._isRunning: - scenario._stop(actions) - if scenario is dynamicScenario: - # Top-level scenario was terminated, so whole simulation will end. - return TerminationType.terminatedByBehavior, str(actions) - actions = () - - # Check ordinary actions for compatibility - assert isinstance(actions, tuple) - if len(actions) == 1 and isinstance(actions[0], (list, tuple)): - actions = tuple(actions[0]) - if not self.actionsAreCompatible(agent, actions): - raise InvalidScenarioError( - f"agent {agent} tried incompatible action(s) {actions}" + # Record current state of the simulation + self.recordCurrentState() + + # Run monitors + newReason = self.dynamicScenario._runMonitors() + if newReason is not None: + terminationReason = newReason + terminationType = TerminationType.terminatedByMonitor + + # Check if users manually closed out display for simulator + if "Dead" in str(self.screen): + return ( + TerminationType.terminatedByUser, + "user manually terminated simulation", + ) + + # "Always" and scenario-level requirements have been checked; + # now safe to terminate if the top-level scenario has finished, + # a monitor requested termination, or we've hit the timeout + if terminationReason is not None: + return self.terminateSimulation(terminationType, terminationReason) + terminationReason = self.dynamicScenario._checkSimulationTerminationConditions() + if terminationReason is not None: + return self.terminateSimulation( + TerminationType.simulationTerminationCondition, terminationReason + ) + if self.maxSteps and self.currentTime >= self.maxSteps: + return self.terminateSimulation( + TerminationType.timeLimit, f"reached time limit ({self.maxSteps} steps)" + ) + + # Clear lastActions for all objects + for obj in self.objects: + obj.lastActions = tuple() + + # Update agents with any objects that now have behaviors (and are not already agents) + self.agents += [ + obj for obj in self.objects if obj.behavior and obj not in self.agents + ] + + # Compute the actions of the agents in this time step + allActions = defaultdict(tuple) + schedule = self.scheduleForAgents() + if not set(self.agents) == set(schedule): + raise RuntimeError("Simulator schedule does not contain all agents") + for agent in schedule: + # If agent doesn't have a behavior right now, continue + if not agent.behavior: + continue + # Run the agent's behavior to get its actions + actions = agent.behavior._step() + + # Handle pseudo-actions marking the end of a simulation/scenario + if isinstance(actions, _EndSimulationAction): + return self.terminateSimulation( + TerminationType.terminatedByBehavior, str(actions) + ) + elif isinstance(actions, _EndScenarioAction): + scenario = actions.scenario + if scenario._isRunning: + scenario._stop(actions) + if scenario is self.dynamicScenario: + # Top-level scenario was terminated, so whole simulation will end. + return self.terminateSimulation( + TerminationType.terminatedByBehavior, str(actions) ) + actions = () + + # Check ordinary actions for compatibility + assert isinstance(actions, tuple) + if len(actions) == 1 and isinstance(actions[0], (list, tuple)): + actions = tuple(actions[0]) + if not self.actionsAreCompatible(agent, actions): + raise InvalidScenarioError( + f"agent {agent} tried incompatible action(s) {actions}" + ) + + # Save actions for execution below + allActions[agent] = actions - # Save actions for execution below - allActions[agent] = actions + # Log lastActions + agent.lastActions = actions - # Log lastActions + # Execute the actions + if self.verbosity >= 3: + for agent, actions in allActions.items(): + print(f" Agent {agent} takes action(s) {actions}") agent.lastActions = actions + self.actionSequence.append(allActions) + self.executeActions(allActions) - # Execute the actions - if self.verbosity >= 3: - for agent, actions in allActions.items(): - print(f" Agent {agent} takes action(s) {actions}") - self.actionSequence.append(allActions) - self.executeActions(allActions) + # Run the simulation for a single step and read its state back into Scenic + self.step() + self.currentTime += 1 + self.updateObjects() - # Run the simulation for a single step and read its state back into Scenic - self.step() - self.currentTime += 1 - self.updateObjects() + def terminateSimulation(self, terminationType, terminationReason): + import scenic.syntax.veneer as veneer + + # Stop all remaining scenarios. + # (and reject if some 'require eventually' condition was never satisfied) + for scenario in tuple(reversed(veneer.runningScenarios)): + scenario._stop("simulation terminated") + + # Record finally-recorded values. + values = self.dynamicScenario._evaluateRecordedExprs( + RequirementType.recordFinal, self.currentTime + ) + for name, val in values.items(): + self.records[name] = val + + # Package up simulation results into a compact object. + result = SimulationResult( + self.trajectory, + self.actionSequence, + terminationType, + terminationReason, + self.records, + ) + self.result = result + + self.cleanup() + + def cleanup(self): + # No need to repeat cleanup if we've already done it + if self._cleaned: + return + + # Remember that we have cleaned up. + self._cleaned = True + + import scenic.syntax.veneer as veneer + + self.destroy() + for obj in self.objects: + disableDynamicProxyFor(obj) + for agent in self.agents: + if agent.behavior and agent.behavior._isRunning: + agent.behavior._stop() + # If the simulation was terminated by an exception (including rejections), + # some scenarios may still be running; we need to clean them up without + # checking their requirements, which could raise rejection exceptions. + for scenario in tuple(reversed(veneer.runningScenarios)): + scenario._stop("exception", quiet=True) + veneer.endSimulation(self) def setup(self): """Set up the simulation to run in the simulator. @@ -942,3 +1025,7 @@ def __init__(self, trajectory, actions, terminationType, terminationReason, reco self.terminationType = terminationType self.terminationReason = str(terminationReason) self.records = dict(records) + + +class TerminatedSimulationException(Exception): + pass diff --git a/tests/core/test_simulators.py b/tests/core/test_simulators.py index 149c1cad1..5358a8cec 100644 --- a/tests/core/test_simulators.py +++ b/tests/core/test_simulators.py @@ -1,6 +1,11 @@ import pytest -from scenic.core.simulators import DummySimulation, DummySimulator, Simulation +from scenic.core.simulators import ( + DummySimulation, + DummySimulator, + Simulation, + TerminatedSimulationException, +) from tests.utils import compileScenic, sampleResultFromScene, sampleSceneFrom @@ -35,6 +40,29 @@ def test_simulator_destruction(): assert "destroy() called twice" in str(e) +def test_simulator_stepped(): + simulator = DummySimulator() + scene = sampleSceneFrom("ego = new Object") + + with simulator.simulateStepped(scene, maxSteps=5) as simulation: + while simulation.result is None: + simulation.advance() + + assert simulation.result is not None + assert simulation.currentTime == 5 + + # advance() should do nothing but raise an exception + # if the simulation is already terminated + with pytest.raises(TerminatedSimulationException): + simulation.advance() + + assert simulation.currentTime == 5 + + # Ensure all values are preserved after leaving the context manager + assert simulation.result is not None + assert simulation.currentTime == 5 + + def test_simulator_set_property(): class TestSimulation(DummySimulation): def createObjectInSimulator(self, obj):