Skip to content
7 changes: 7 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ the sampled values for all the global parameters and objects in the scene from t

ego has foo = 2.083099362726706

Utilities
---------

Scenic provides top level utility functions for functions like setting Scenic's random seed.

.. autofunction:: scenic.setSeed

Running Dynamic Simulations
---------------------------

Expand Down
1 change: 1 addition & 0 deletions src/scenic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import scenic.core.errors as _errors
from scenic.core.errors import setDebuggingOptions
from scenic.core.utils import setSeed
from scenic.syntax.translator import scenarioFromFile, scenarioFromString

_errors.showInternalBacktrace = False # see comment in errors module
Expand Down
3 changes: 1 addition & 2 deletions src/scenic/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,7 @@
if args.verbosity >= 1:
print(f"Using random seed = {args.seed}")

random.seed(args.seed)
numpy.random.seed(args.seed)
scenic.setSeed(args.seed)

# Load scenario from file
if args.verbosity >= 1:
Expand Down
256 changes: 241 additions & 15 deletions src/scenic/core/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import dataclasses
import io
import itertools
import multiprocessing
import os
import random
import sys
import time
import warnings

import numpy
import trimesh
Expand Down Expand Up @@ -38,6 +41,7 @@
)
from scenic.core.sample_checking import BasicChecker, WeightedAcceptanceChecker
from scenic.core.serialization import Serializer, dumpAsScenicCode
from scenic.core.utils import setSeed
from scenic.core.vectors import Vector

# Global params
Expand Down Expand Up @@ -315,6 +319,7 @@ def __init__(
self.dependencies = (
self._instances + paramDeps + tuple(requirementDeps) + tuple(behaviorDeps)
)
self._scenarioCreationData = None

# Setup the default checker
self.defaultRequirements = self.generateDefaultRequirements()
Expand Down Expand Up @@ -400,11 +405,26 @@ def generate(self, maxIterations=2000, verbosity=0, feedback=None):
Raises:
`RejectionException`: if no valid sample is found in **maxIterations** iterations.
"""
scenes, iterations = self.generateBatch(1, maxIterations, verbosity, feedback)
return scenes[0], iterations
scenes, totalIterations = self.generateBatch(
numScenes=1,
maxIterations=maxIterations,
verbosity=verbosity,
feedback=feedback,
numWorkers=0,
mute=False,
)
assert len(scenes) == 1
return (scenes[0], totalIterations)

def generateBatch(
self, numScenes, maxIterations=float("inf"), verbosity=0, feedback=None
self,
numScenes,
maxIterations=float("inf"),
verbosity=0,
feedback=None,
numWorkers=0,
mute=True,
serialized=False,
):
"""Sample several `Scene` objects from this scenario.

Expand All @@ -416,6 +436,10 @@ def generateBatch(
verbosity (int): Verbosity level.
feedback (float): Feedback to pass to external samplers doing active sampling.
See :mod:`scenic.core.external_params`.
numWorkers (int): The number of workers to be used when generating scenes. If numWorkers
is 0, scenes will be generated in the main process.
mute (bool): Whether or not to mute stdOut and stdErr in the worker processes.
serialized (bool): Whether or not to return scenes in a serialized format.

Returns:
A pair with a list of the sampled `Scene` objects and the total number
Expand All @@ -424,21 +448,193 @@ def generateBatch(
Raises:
`RejectionException`: if not enough valid samples are found in **maxIterations** iterations.
"""
totalIterations = 0
scenes = []
stream = self.generateStream(
numScenes=numScenes,
maxIterations=maxIterations,
verbosity=verbosity,
feedback=feedback,
numWorkers=numWorkers,
mute=mute,
serialized=serialized,
deterministic=True,
iterationCount=True,
)
results_list = list(stream)
scenes = tuple(r[0] for r in results_list)
totalIterations = sum([r[1] for r in results_list])
return (scenes, totalIterations)

for _ in range(numScenes):
try:
remainingIts = maxIterations - totalIterations
scene, iterations = self._generateInner(remainingIts, verbosity, feedback)
scenes.append(scene)
totalIterations += iterations
except RejectionException:
raise RejectionException(
f"failed to generate scenario in {maxIterations} iterations"
def generateStream(
self,
numScenes,
maxIterations=float("inf"),
verbosity=0,
feedback=None,
numWorkers=0,
bufferSize=None,
mute=True,
serialized=False,
deterministic=True,
iterationCount=False,
):
"""Sample a stream of `Scene` objects from this scenario.

For a description of how scene generation is done, see `scene generation`. This function can produce
both finite and infinite streams (depending on the value of ``numScenes``).

.. note::
NOTE: The ``deterministic`` parameter is by default set to True, meaning that scenes
will be returned in a fixed order for a given Scenic seed. Setting this to False
means scenes will be returned in a possibly non-deterministic order, but with possibly
decreased latency. When ``deterministic`` is set to ``False``, the ordering of the returned scenes
is not fully random, with scenes that are easier to generate being more likely to be returned
earlier in the stream. Despite this, the overall distribution of the returned scenes still
matches the scenario. When generating an infinite stream, ``deterministic`` must be set to ``True``.

Args:
numScenes (int): Number of scenes to generate, or ``float('inf')`` to sample an infinite stream
of Scenes.
maxIterations (int): Maximum number of rejection sampling iterations (over all scenes).
verbosity (int): Verbosity level.
feedback (float): Feedback to pass to external samplers doing active sampling.
See :mod:`scenic.core.external_params`.
numWorkers (int): The number of workers to be used when generating scenes. If numWorkers
is 0, scenes will be generated in the main process.
bufferSize (int): The number of scenes to have available at any given time, or `None` to
use default values. If set to ``None`` and ``numScenes`` is finite, all scenes are generated
in a greedy fashon. If set to ``None`` and ``numScenes`` is infinite, set to a default
value of ``2*numWorkers``.
mute (bool): Whether or not to mute stdOut and stdErr in the worker processes.
serialized (bool): Whether or not to return scenes in a serialized format.
deterministic (bool): Whether or not scenes will be returned in a deterministic order. This
must be set to `True` when generating an infinite stream.
iterationCount (bool): Whether or not to return the number of iterations used to generate each
scene. If this is set to `False`, the return type is simply an iterable of `Scene` objects.

Returns:
An iterable of pairs with a sampled `Scene` and the number of iterations used for that scene
(if iterationCount is set to True).
"""
if numScenes <= 0:
raise ValueError("`numScenes` must be at least 1.")

if not isinstance(numScenes, int) and numScenes != float("inf"):
raise ValueError("`numScenes` must be either an `int` or `float('inf')`.")

if numScenes == float("inf") and not deterministic:
raise ValueError(
"`deterministic` must be set to `True` when generating an infinite stream."
)

if bufferSize is None and numScenes == float("inf"):
bufferSize = 2 * numWorkers

if numWorkers == 0:
totalIterations = 0
returnedResultCount = 0

while returnedResultCount < numScenes:
try:
remainingIts = maxIterations - totalIterations
rawScene, iterations = self._generateInner(
remainingIts, verbosity, feedback
)
scene = self.sceneToBytes(rawScene) if serialized else rawScene
totalIterations += iterations
if iterationCount:
yield (scene, iterations)
else:
yield scene
returnedResultCount += 1
except RejectionException:
raise RejectionException(
f"failed to generate scenario in {maxIterations} iterations"
)
else:
if maxIterations != float("inf"):
raise ValueError("maxIterations not supported for parallel sampling.")

if feedback is not None:
raise ValueError("Feedback not supported for parallel sampling.")

# Initialize results tracking data
returnedResultCount = 0

# Initialize random generator
rand_generator = numpy.random.default_rng(random.getrandbits(32))

# Initialize queues
seedQueue = multiprocessing.Queue()
seedHistory = []
putSeedCount = 0

def putSeed():
nonlocal putSeedCount
newSeed = int(rand_generator.integers(2**32))
seedQueue.put(newSeed)
if deterministic:
seedHistory.append(newSeed)
putSeedCount += 1

initialSeedCount = numScenes if bufferSize is None else bufferSize
for _ in range(initialSeedCount):
putSeed()

sceneQueue = multiprocessing.Queue()

# Initialize processes
params = (self._scenarioCreationData, seedQueue, sceneQueue, verbosity, mute)
processes = [
multiprocessing.Process(target=generateInnerBatchHelper, args=params)
for _ in range(numWorkers)
]

# Initialized result management functions
resultsDict = {}

def getResult():
sceneBytes, resultIterations, resultSeed = sceneQueue.get()
resultScene = (
sceneBytes
if serialized
else self.sceneFromBytes(sceneBytes, verify=False)
)
return (resultScene, resultIterations), resultSeed

def getNextResult():
if not deterministic:
nextResult = getResult()[0]
return nextResult if iterationCount else nextResult[0]

while True:
if seedHistory[0] in resultsDict:
nextResult = resultsDict[seedHistory.pop(0)]
return nextResult if iterationCount else nextResult[0]

result, resultSeed = getResult()
resultsDict[resultSeed] = result

# Start sampling processes and yield samples
try:
# Start processes
for process in processes:
process.start()

return scenes, totalIterations
# Retrieve results
while returnedResultCount < numScenes:
yield getNextResult()
returnedResultCount += 1

if putSeedCount < numWorkers:
putSeed()

finally:
# Close processes and queues
for process in processes:
process.terminate()

seedQueue.close()
sceneQueue.close()

def _generateInner(self, maxIterations, verbosity, feedback):
# choose which custom requirements will be enforced for this sample
Expand Down Expand Up @@ -753,3 +949,33 @@ def simulationFromBytes(
data = io.BytesIO(data)
scene = self.sceneFromBytes(data, verify=verify, allowPickle=allowPickle)
return simulator.simulate(scene, replay=data, **kwargs)


def generateInnerBatchHelper(
scenarioCreationData, seedQueue, sceneQueue, verbosity, mute
):
if mute:
sys.stdout = open(os.devnull, "w")
sys.stderr = open(os.devnull, "w")

from scenic.syntax.translator import _scenarioFromStream

scenario = _scenarioFromStream(
stream=io.BytesIO(scenarioCreationData["streamLines"]),
compileOptions=scenarioCreationData["compileOptions"],
filename=scenarioCreationData["filename"],
scenario=scenarioCreationData["scenario"],
path=scenarioCreationData["path"],
_cacheImports=False,
)

while True:
seed = seedQueue.get()
setSeed(seed)

scene, iterations = scenario._generateInner(
maxIterations=float("inf"), verbosity=verbosity, feedback=None
)
sceneBytes = scenario.sceneToBytes(scene)

sceneQueue.put((sceneBytes, iterations, seed))
Loading
Loading