diff --git a/examples/driving/car.scenic b/examples/driving/car.scenic index 49c421125..f1506d06c 100644 --- a/examples/driving/car.scenic +++ b/examples/driving/car.scenic @@ -10,4 +10,4 @@ param map = localPath('../../assets/maps/CARLA/Town01.xodr') model scenic.domains.driving.model -ego = new Car +ego = new Car \ No newline at end of file diff --git a/src/scenic/__main__.py b/src/scenic/__main__.py index 05f527fba..9ca17bc5a 100644 --- a/src/scenic/__main__.py +++ b/src/scenic/__main__.py @@ -279,7 +279,10 @@ def runSimulation(scene, sceneCount): successCount += 1 else: successCount += 1 - if mode2D: + use2DMap = getattr( + getattr(scene.workspace, "network", None), "use2DMap", False + ) + if use2DMap: if delay is None: scene.show2D(zoom=args.zoom) else: diff --git a/src/scenic/core/regions.py b/src/scenic/core/regions.py index 38e876d01..f69bc11a0 100644 --- a/src/scenic/core/regions.py +++ b/src/scenic/core/regions.py @@ -2041,6 +2041,33 @@ def intersects(self, other, triedReversed=False): return super().intersects(other, triedReversed) + def union(self, other, triedReversed=False): + """Get a `Region` representing the union of this region with another. + + This function handles union computation for `MeshSurfaceRegion` with: + - `MeshSurfaceRegion` + """ + # If one of the regions isn't fixed, fall back on default behavior + if isLazy(self) or isLazy(other): + return super().union(other, triedReversed) + + # If other region is represented by a mesh, we can extract the mesh to + # perform boolean operations on it + if isinstance(other, MeshSurfaceRegion): + other_mesh = other.mesh + + # Compute union using Trimesh + new_mesh = trimesh.util.concatenate(self.mesh, other_mesh) + + return MeshSurfaceRegion( + new_mesh, + tolerance=min(self.tolerance, other.tolerance), + centerMesh=False, + ) + + # Don't know how to compute this union, fall back to default behavior. + return super().union(other, triedReversed) + @distributionFunction def containsPoint(self, point): """Check if this region's surface contains a point.""" @@ -2052,18 +2079,18 @@ def containsPoint(self, point): def containsObject(self, obj): # A surface cannot contain an object, which must have a volume. - return False + # Use footprint + return self.boundingPolygon.containsObject(obj) def containsRegionInner(self, reg, tolerance): - if tolerance != 0: - warnings.warn( - "Nonzero tolerances are ignored for containsRegionInner on MeshSurfaceRegion" - ) - if isinstance(reg, MeshSurfaceRegion): - diff_region = reg.difference(self) - - return isinstance(diff_region, EmptyRegion) + if self.mesh.is_empty: + return False + elif reg.mesh.is_empty: + return True + return self.boundingPolygon.polygons.buffer(tolerance).contains( + reg.boundingPolygon.polygons + ) raise NotImplementedError @@ -2076,9 +2103,7 @@ def distanceTo(self, point): point = toVector(point, f"Could not convert {point} to vector.") pq = trimesh.proximity.ProximityQuery(self.mesh) - dist = abs(pq.signed_distance([point.coordinates])[0]) - return dist @property @@ -2767,11 +2792,22 @@ def __init__( numpy.linalg.norm(b - a, axis=1), (len(a), 1) ) + def intersects(self, other, triedReversed=False): + if isinstance(other, PathRegion): + self_polyline = PolylineRegion(points=[(v.x, v.y) for v in self.vertices]) + other_polyline = PolylineRegion(points=[(v.x, v.y) for v in other.vertices]) + poly = toPolygon(other_polyline) + if poly is not None: + return self_polyline.lineString.intersects(poly) + return self_polyline.intersects(other, triedReversed) + else: + return super().intersects(other, triedReversed) + def containsPoint(self, point): return self.distanceTo(point) < self.tolerance def containsObject(self, obj): - return False + raise NotImplementedError def containsRegionInner(self, reg, tolerance): raise NotImplementedError @@ -2820,6 +2856,27 @@ def AABB(self): tuple(numpy.amax(self.vertices, axis=0)), ) + @distributionMethod + def signedDistanceTo(self, point) -> float: + """Compute the signed distance of the PathRegion to a point. + + The distance is positive if the point is left of the nearest segment, + and negative otherwise. + """ + dist = self.distanceTo(point) + start, end = self.nearestSegmentTo(point) + rp = point - start + tangent = end - start + return dist if tangent.angleWith(rp) >= 0 else -dist + + def __getitem__(self, i) -> Vector: + """Get the ith point along this path. + + If the region consists of multiple polylines, this order is linear + along each polyline but arbitrary across different polylines. + """ + return self.vertices[i] + def uniformPointInner(self): # Pick an edge, weighted by length, and extract its two points edge = random.choices(population=self.edges, weights=self.edge_lengths, k=1)[0] @@ -3080,6 +3137,16 @@ def unionAll(regions, buf=0): z = 0 if z is None else z return PolygonalRegion(polygon=union, orientation=orientation, z=z) + @cached_property + def _boundingPolygon(self): + return self._polygon + + @cached_property + @distributionFunction + def boundingPolygon(self): + """A PolygonalRegion returning self""" + return self + @property @distributionFunction def points(self): diff --git a/src/scenic/core/requirements.py b/src/scenic/core/requirements.py index 53633057e..946c08ffc 100644 --- a/src/scenic/core/requirements.py +++ b/src/scenic/core/requirements.py @@ -15,6 +15,7 @@ from scenic.core.errors import InvalidScenarioError from scenic.core.lazy_eval import needsLazyEvaluation from scenic.core.propositions import Atomic, PropositionNode +from scenic.core.regions import MeshSurfaceRegion, PolygonalRegion import scenic.syntax.relations as relations diff --git a/src/scenic/domains/driving/__init__.py b/src/scenic/domains/driving/__init__.py index b89c1c54c..783435475 100644 --- a/src/scenic/domains/driving/__init__.py +++ b/src/scenic/domains/driving/__init__.py @@ -1,6 +1,6 @@ """Domain for driving scenarios. -This domain must currently be used in `2D compatibility mode`. +This domain supports both 2D and 3D maps. This can be toggled by setting the `2D Compatibility mode` option. The :doc:`world model ` defines Scenic classes for cars, pedestrians, etc., actions for dynamic agents which walk or drive, as well as simple diff --git a/src/scenic/domains/driving/behaviors.scenic b/src/scenic/domains/driving/behaviors.scenic index 173500190..3c8847d4e 100644 --- a/src/scenic/domains/driving/behaviors.scenic +++ b/src/scenic/domains/driving/behaviors.scenic @@ -10,7 +10,10 @@ import scenic.domains.driving.model as _model from scenic.domains.driving.roads import ManeuverType def concatenateCenterlines(centerlines=[]): - return PolylineRegion.unionAll(centerlines) + if isinstance(centerlines[0], PathRegion): + return PathRegion(polylines=centerlines) + else: + return PolylineRegion.unionAll(centerlines) behavior ConstantThrottleBehavior(x): while True: @@ -131,9 +134,7 @@ behavior FollowLaneBehavior(target_speed = 10, laneToFollow=None, is_oppositeTra target_speed = original_target_speed _lon_controller, _lat_controller = simulation().getLaneFollowingControllers(self) - nearest_line_points = current_centerline.nearestSegmentTo(self.position) - nearest_line_segment = PolylineRegion(nearest_line_points) - cte = nearest_line_segment.signedDistanceTo(self.position) + cte = current_centerline.signedDistanceTo(self.position) if is_oppositeTraffic: cte = -cte @@ -213,7 +214,7 @@ behavior TurnBehavior(trajectory, target_speed=6): it will terminate if the vehicle is outside of an intersection. """ - if isinstance(trajectory, PolylineRegion): + if isinstance(trajectory, PolylineRegion) or isinstance(trajectory, PathRegion): trajectory_centerline = trajectory else: trajectory_centerline = concatenateCenterlines([traj.centerline for traj in trajectory]) diff --git a/src/scenic/domains/driving/model.scenic b/src/scenic/domains/driving/model.scenic index b6d0754af..468c31579 100644 --- a/src/scenic/domains/driving/model.scenic +++ b/src/scenic/domains/driving/model.scenic @@ -17,12 +17,18 @@ If you are writing a generic scenario that supports multiple maps, you may leave ``map`` parameter undefined; then running the scenario will produce an error unless the user uses the :option:`--param` command-line option to specify the map. -The ``use2DMap`` global parameter determines whether or not maps are generated in 2D. Currently -3D maps are not supported, but are under development. By default, this parameter is `False` -(so that future versions of Scenic will automatically use 3D maps), unless -:ref:`2D compatibility mode` is enabled, in which case the default is `True`. The parameter -can be manually set to `True` to ensure 2D maps are used even if the scenario is not compiled -in 2D compatibility mode. +Both 2D and 3D road networkd are now fully supported. The ``use2DMap`` global parameter +determines whether or not maps are generated in 2D or 3D. By default, this parameter is +`False` (so that 3D maps are used by default), unless :ref:`2D compatibility mode` is enabled, +in which case the default is `True`. The parameter can be manually set to `True` to ensure 2D +maps are used even if the scenario is not compiled in 2D compatibility mode. + +.. note:: + + 3D road network support is now fully implemented. When ``use2DMap`` is `False`, road + geometry, object placement, and region queries will use full 3D coordinates, including + elevation (Z) values. This allows for more realistic scenarios with varying terrain + heights and complex road structures. .. note:: @@ -57,21 +63,16 @@ param use2DMap = True if is2DMode() else False if is2DMode() and not globalParameters.use2DMap: raise RuntimeError('in 2D mode, global parameter "use2DMap" must be True') -# Note: The following should be removed when 3D maps are supported -if not globalParameters.use2DMap: - raise RuntimeError('3D maps not supported at this time.' - '(to use 2D maps set global parameter "use2DMap" to True)') - ## Load map and set up workspace if 'map' not in globalParameters: raise RuntimeError('need to specify map before importing driving model ' '(set the global parameter "map")') param map_options = {} - +options = {**globalParameters.map_options, "use2DMap": globalParameters.use2DMap} #: The road network being used for the scenario, as a `Network` object. -network : Network = Network.fromFile(globalParameters.map, **globalParameters.map_options) - +#network : Network = Network.fromFile(globalParameters.map, **globalParameters.map_options) +network : Network = Network.fromFile(globalParameters.map, **options) workspace = DrivingWorkspace(network) ## Various useful objects and regions @@ -282,13 +283,14 @@ class Vehicle(DrivingObject): distribution derived from car color popularity statistics; see :obj:`Color.defaultCarColor`. """ - regionContainedIn: roadOrShoulder + regionContainedIn: roadOrShoulder.boundingPolygon # Change region to bounding polygon region position: new Point on road parentOrientation: (roadDirection at self.position) + self.roadDeviation roadDeviation: 0 viewAngle: 90 deg width: 2 length: 4.5 + height: 2.5 # default height; color: Color.defaultCarColor() @property diff --git a/src/scenic/domains/driving/roads.py b/src/scenic/domains/driving/roads.py index 48289f0c8..5b396e9ac 100644 --- a/src/scenic/domains/driving/roads.py +++ b/src/scenic/domains/driving/roads.py @@ -28,6 +28,7 @@ import attr import shapely from shapely.geometry import MultiPolygon, Polygon +import trimesh from scenic.core.distributions import ( RejectionException, @@ -36,9 +37,18 @@ ) import scenic.core.geometry as geometry from scenic.core.object_types import Point -from scenic.core.regions import PolygonalRegion, PolylineRegion +from scenic.core.regions import ( + EmptyRegion, + MeshRegion, + MeshSurfaceRegion, + PathRegion, + PolygonalRegion, + PolylineRegion, + Region, +) import scenic.core.type_support as type_support import scenic.core.utils as utils +from scenic.core.utils import cached_property from scenic.core.vectors import Orientation, Vector, VectorField import scenic.syntax.veneer as veneer from scenic.syntax.veneer import verbosePrint @@ -135,8 +145,18 @@ def guessTypeFromLanes( return ManeuverType.U_TURN # Identify turns based on relative heading of start and end of connecting lane - startDir = connecting.centerline[1] - connecting.centerline[0] - endDir = connecting.centerline[-1] - connecting.centerline[-2] + startDir = None + endDir = None + if isinstance(connecting.centerline, PathRegion): + startDir = ( + connecting.centerline.vertices[0] - connecting.centerline.vertices[1] + ) + endDir = ( + connecting.centerline.vertices[-2] - connecting.centerline.vertices[-1] + ) + else: + startDir = connecting.centerline[1] - connecting.centerline[0] + endDir = connecting.centerline[-1] - connecting.centerline[-2] turnAngle = startDir.angleWith(endDir) if turnAngle >= turnThreshold: return ManeuverType.LEFT_TURN @@ -208,7 +228,7 @@ def reverseManeuvers(self) -> Tuple[Maneuver]: @attr.s(auto_attribs=True, kw_only=True, repr=False, eq=False) -class NetworkElement(_ElementReferencer, PolygonalRegion): +class NetworkElement(_ElementReferencer, Region): ### Was part of: PolygonalRegion class """NetworkElement() Abstract class for part of a road network. @@ -221,9 +241,12 @@ class NetworkElement(_ElementReferencer, PolygonalRegion): distances to an element, etc. """ - # from PolygonalRegion - polygon: Union[Polygon, MultiPolygon] + def __init__(self, kwargs): + super().__init__(kwargs) + + polygon: Union[Polygon, MultiPolygon, trimesh.Trimesh] orientation: Optional[VectorField] = None + region: Union[PolygonalRegion, MeshRegion] = None #: The region of the element. name: str = "" #: Human-readable name, if any. #: Unique identifier; from underlying format, if possible. @@ -245,10 +268,18 @@ def __attrs_post_init__(self): assert self.uid is not None or self.id is not None if self.uid is None: self.uid = self.id - - super().__init__( - polygon=self.polygon, orientation=self.orientation, name=self.name - ) + if isinstance(self.region, MeshSurfaceRegion): + self.region.__init__( + mesh=self.polygon, + orientation=self.orientation, + centerMesh=False, + name=self.name, + position=None, + ) + else: + self.region.__init__( + polygon=self.polygon, orientation=self.orientation, name=self.name + ) @distributionFunction def nominalDirectionsAt(self, point: Vectorlike) -> Tuple[Orientation]: @@ -283,6 +314,54 @@ def __repr__(self): s += f'uid="{self.uid}">' return s + def intersect(self, other): + return self.region.intersect(other) + + def intersects(self, other, triedReversed=False): + return self.region.boundingPolygon.intersects( + other.boundingPolygon, triedReversed=triedReversed + ) + + def containsPoint(self, point): + return self.region.containsPoint(point) + + def containsObject(self, obj): + return self.region.containsObject(obj) + + def AABB(self): + return self.region.AABB() + + def distanceTo(self, point): + return self.region.distanceTo(point) + + def containsRegion(self, reg, tolerance): + return self.region.containsRegion(reg, tolerance) + + def containsRegionInner(self, reg, tolerance): + return self.region.containsRegionInner(reg, tolerance) + + def projectVector(self, point, onDirection): + return self.region.projectVector(point, onDirection) + + def uniformPointIn(self, region, tag=None): + return self.region.uniformPointIn(region, tag) + + def uniformPointInner(self): + return self.region.uniformPointInner() + + def show(self, plt, style="r-", **kwargs): + return self.region.show(plt, style="r-", **kwargs) + + def buffer(self, amount): + return self.region.buffer(amount) + + def uniformPointInner(self): + return self.region.uniformPointInner() + + @cached_property + def boundingPolygon(self): + return self.region.boundingPolygon + @attr.s(auto_attribs=True, kw_only=True, repr=False, eq=False) class LinearElement(NetworkElement): @@ -302,9 +381,9 @@ class LinearElement(NetworkElement): """ # Geometric info (on top of the overall polygon from PolygonalRegion) - centerline: PolylineRegion - leftEdge: PolylineRegion - rightEdge: PolylineRegion + centerline: Union[PolylineRegion, PathRegion] + leftEdge: Union[PolylineRegion, PathRegion] + rightEdge: Union[PolylineRegion, PathRegion] # Links to next/previous element _successor: Union[NetworkElement, None] = None # going forward @@ -323,8 +402,23 @@ def __attrs_post_init__(self): # Check that left and right edges lie inside the element. # (don't check centerline here since it can lie inside a median, for example) # (TODO reconsider the decision to have polygon only include drivable areas?) - assert self.containsRegion(self.leftEdge, tolerance=0.5) - assert self.containsRegion(self.rightEdge, tolerance=0.5) + poly_conversion = self.region.boundingPolygon + assert poly_conversion.containsRegion( + ( + PolylineRegion(points=([v.x, v.y] for v in self.leftEdge.vertices)) + if isinstance(self.leftEdge, PathRegion) + else self.leftEdge + ), + tolerance=0.5, + ) + assert poly_conversion.containsRegion( + ( + PolylineRegion(points=([v.x, v.y] for v in self.rightEdge.vertices)) + if isinstance(self.rightEdge, PathRegion) + else self.rightEdge + ), + tolerance=0.5, + ) if self.orientation is None: self.orientation = VectorField(self.name, self._defaultHeadingAt) @@ -339,7 +433,9 @@ def _defaultHeadingAt(self, point): """ point = _toVector(point) start, end = self.centerline.nearestSegmentTo(point) - return start.angleTo(end) + direction = end - start + sphericalCoords = direction.sphericalCoordinates() + return Orientation.fromEuler(sphericalCoords[1], sphericalCoords[2], 0) @distributionFunction def flowFrom( @@ -378,7 +474,14 @@ class _ContainsCenterline: def __attrs_post_init__(self): super().__attrs_post_init__() - assert self.containsRegion(self.centerline, tolerance=0.5) + assert self.region.boundingPolygon.containsRegion( + ( + PolylineRegion(points=([v.x, v.y] for v in self.centerline.vertices)) + if isinstance(self.centerline, PathRegion) + else self.centerline + ), + tolerance=0.5, + ) @attr.s(auto_attribs=True, kw_only=True, repr=False, eq=False) @@ -425,7 +528,7 @@ class Road(LinearElement): #: All sidewalks of this road, with the one adjacent to `forwardLanes` being first. sidewalks: Tuple[Sidewalk] = None #: Possibly-empty region consisting of all sidewalks of this road. - sidewalkRegion: PolygonalRegion = None + sidewalkRegion: Union[PolygonalRegion, MeshSurfaceRegion] = None def __attrs_post_init__(self): super().__attrs_post_init__() @@ -441,7 +544,6 @@ def __attrs_post_init__(self): sidewalks.append(self.backwardLanes._sidewalk) self.laneGroups = tuple(lgs) self.sidewalks = tuple(sidewalks) - self.sidewalkRegion = PolygonalRegion.unionAll(sidewalks) def _defaultHeadingAt(self, point): point = _toVector(point) @@ -514,8 +616,10 @@ def __attrs_post_init__(self): super().__attrs_post_init__() # Ensure lanes do not overlap - for i in range(len(self.lanes) - 1): - assert not self.lanes[i].polygon.overlaps(self.lanes[i + 1].polygon) + if type(self.region) is PolygonalRegion: + for i in range(len(self.lanes) - 1): + assert not self.lanes[i].polygon.overlaps(self.lanes[i + 1].polygon) + # TODO need a way to check for overlapping meshes in 3D @property def sidewalk(self) -> Sidewalk: @@ -614,10 +718,6 @@ def __attrs_post_init__(self): ids[i] = lane self.lanesByOpenDriveID = ids - # Ensure lanes do not overlap - for i in range(len(self.lanes) - 1): - assert not self.lanes[i].polygon.overlaps(self.lanes[i + 1].polygon) - def _defaultHeadingAt(self, point): point = _toVector(point) lane = self.laneAt(point) @@ -649,6 +749,7 @@ class LaneSection(_ContainsCenterline, LinearElement): group: LaneGroup #: Grandparent lane group. road: Road #: Great-grandparent road. + polygon: Union[Polygon, MultiPolygon, PathRegion, MeshSurfaceRegion] = None #: ID number as in OpenDRIVE (number of lanes to left of center, with 1 being the # first lane left of the centerline and -1 being the first lane to the right). openDriveID: int @@ -756,7 +857,10 @@ def __attrs_post_init__(self): super().__attrs_post_init__() for maneuver in self.maneuvers: assert maneuver.connectingLane, maneuver - assert self.containsRegion(maneuver.connectingLane, tolerance=0.5) + assert self.region.boundingPolygon.containsRegion( + maneuver.connectingLane.region.boundingPolygon, tolerance=0.5 + ) + if self.orientation is None: self.orientation = VectorField(self.name, self._defaultHeadingAt) @@ -877,16 +981,20 @@ class Network: #: Distance tolerance for testing inclusion in network elements. tolerance: float = 0 + use2DMap: float = 0 + # convenience regions aggregated from various types of network elements - drivableRegion: PolygonalRegion = None - walkableRegion: PolygonalRegion = None - roadRegion: PolygonalRegion = None - laneRegion: PolygonalRegion = None - intersectionRegion: PolygonalRegion = None - crossingRegion: PolygonalRegion = None - sidewalkRegion: PolygonalRegion = None - curbRegion: PolylineRegion = None - shoulderRegion: PolygonalRegion = None + drivableRegion: Union[PolygonalRegion, MeshSurfaceRegion] = None + walkableRegion: Union[PolygonalRegion, MeshSurfaceRegion] = None + roadRegion: Union[PolygonalRegion, MeshSurfaceRegion] = None + laneRegion: Union[PolygonalRegion, MeshSurfaceRegion] = None + intersectionRegion: Union[PolygonalRegion, MeshSurfaceRegion] = None + crossingRegion: Union[PolygonalRegion, MeshSurfaceRegion] = None + sidewalkRegion: Union[PolygonalRegion, MeshSurfaceRegion] = None + curbRegion: Union[PolylineRegion, PathRegion, PolygonalRegion, MeshSurfaceRegion] = ( + None + ) + shoulderRegion: Union[PolygonalRegion, MeshSurfaceRegion] = None #: Traffic flow vector field aggregated over all roads (0 elsewhere). roadDirection: VectorField = None @@ -901,44 +1009,134 @@ def __attrs_post_init__(self): self.roadSections = tuple(sec for road in self.roads for sec in road.sections) self.laneSections = tuple(sec for lane in self.lanes for sec in lane.sections) - if self.roadRegion is None: - self.roadRegion = PolygonalRegion.unionAll(self.roads) - if self.laneRegion is None: - self.laneRegion = PolygonalRegion.unionAll(self.lanes) - if self.intersectionRegion is None: - self.intersectionRegion = PolygonalRegion.unionAll(self.intersections) - if self.crossingRegion is None: - self.crossingRegion = PolygonalRegion.unionAll(self.crossings) - if self.sidewalkRegion is None: - self.sidewalkRegion = PolygonalRegion.unionAll(self.sidewalks) - if self.shoulderRegion is None: - self.shoulderRegion = PolygonalRegion.unionAll(self.shoulders) + if self.use2DMap == 0: + if self.roadRegion is None: + meshes = [m.polygon for m in self.roads] + regions = [r.region for r in self.roads] + combined = trimesh.util.concatenate(meshes) + orientation = VectorField.forUnionOf(regions, tolerance=self.tolerance) + self.roadRegion = MeshSurfaceRegion( + combined, centerMesh=False, position=None, orientation=orientation + ) + if self.laneRegion is None: + meshes = [m.polygon for m in self.lanes] + regions = [r.region for r in self.lanes] + combined = trimesh.util.concatenate(meshes) + orientation = VectorField.forUnionOf(regions, tolerance=self.tolerance) + self.laneRegion = MeshSurfaceRegion( + combined, centerMesh=False, position=None, orientation=orientation + ) + if self.intersectionRegion is None: + meshes = [m.polygon for m in self.intersections] + regions = [r.region for r in self.intersections] + combined = trimesh.util.concatenate(meshes) + orientation = VectorField.forUnionOf(regions, tolerance=self.tolerance) + self.intersectionRegion = MeshSurfaceRegion( + combined, centerMesh=False, position=None, orientation=orientation + ) + if self.crossingRegion is None: + meshes = [m.polygon for m in self.crossings] + regions = [r.region for r in self.crossings] + combined = trimesh.util.concatenate(meshes) + orientation = VectorField.forUnionOf(regions, tolerance=self.tolerance) + self.crossingRegion = MeshSurfaceRegion( + combined, centerMesh=False, position=None, orientation=orientation + ) + if self.sidewalkRegion is None: + meshes = [m.polygon for m in self.sidewalks] + regions = [r.region for r in self.sidewalks] + combined = trimesh.util.concatenate(meshes) + orientation = VectorField.forUnionOf(regions, tolerance=self.tolerance) + self.sidewalkRegion = MeshSurfaceRegion( + combined, centerMesh=False, position=None, orientation=orientation + ) + if self.shoulderRegion is None: + meshes = [m.polygon for m in self.shoulders] + regions = [r.region for r in self.shoulders] + combined = trimesh.util.concatenate(meshes) + orientation = VectorField.forUnionOf(regions, tolerance=self.tolerance) + self.shoulderRegion = MeshSurfaceRegion( + combined, centerMesh=False, position=None, orientation=orientation + ) + else: + if self.roadRegion is None: + self.roadRegion = PolygonalRegion.unionAll(self.roads) + if self.laneRegion is None: + self.laneRegion = PolygonalRegion.unionAll(self.lanes) + if self.intersectionRegion is None: + self.intersectionRegion = PolygonalRegion.unionAll(self.intersections) + if self.crossingRegion is None: + self.crossingRegion = PolygonalRegion.unionAll(self.crossings) + if self.sidewalkRegion is None: + self.sidewalkRegion = PolygonalRegion.unionAll(self.sidewalks) + if self.shoulderRegion is None: + self.shoulderRegion = PolygonalRegion.unionAll(self.shoulders) if self.drivableRegion is None: - self.drivableRegion = PolygonalRegion.unionAll( - ( - self.laneRegion, - self.roadRegion, # can contain points slightly outside laneRegion - self.intersectionRegion, + if not self.use2DMap: + combined = trimesh.util.concatenate( + ( + self.laneRegion.mesh, + self.roadRegion.mesh, + self.intersectionRegion.mesh, + ) + ) + regs = [self.laneRegion, self.roadRegion, self.intersectionRegion] + orientation = VectorField.forUnionOf(regs, tolerance=self.tolerance) + self.drivableRegion = MeshSurfaceRegion( + combined, + centerMesh=False, + position=None, + orientation=orientation, + ) + else: + self.drivableRegion = PolygonalRegion.unionAll( + ( + self.laneRegion, + self.roadRegion, # can contain points slightly outside laneRegion + self.intersectionRegion, + ) ) + assert self.drivableRegion.containsRegion( + self.laneRegion, tolerance=self.tolerance ) - assert self.drivableRegion.containsRegion( - self.laneRegion, tolerance=self.tolerance - ) - assert self.drivableRegion.containsRegion( - self.roadRegion, tolerance=self.tolerance - ) - assert self.drivableRegion.containsRegion( - self.intersectionRegion, tolerance=self.tolerance - ) + assert self.drivableRegion.containsRegion( + self.roadRegion, tolerance=self.tolerance + ) + assert self.drivableRegion.containsRegion( + self.intersectionRegion, tolerance=self.tolerance + ) + if self.walkableRegion is None: - self.walkableRegion = self.sidewalkRegion.union(self.crossingRegion) - assert self.walkableRegion.containsRegion( - self.sidewalkRegion, tolerance=self.tolerance - ) - assert self.walkableRegion.containsRegion( - self.crossingRegion, tolerance=self.tolerance - ) + if not self.use2DMap: + combined = trimesh.util.concatenate( + ( + self.sidewalkRegion.mesh, + self.crossingRegion.mesh, + ) + ) + regs = [self.sidewalkRegion, self.crossingRegion] + orientation = VectorField.forUnionOf(regs, tolerance=self.tolerance) + self.walkableRegion = MeshSurfaceRegion( + combined, centerMesh=False, position=None, orientation=orientation + ) + if not self.walkableRegion.mesh.is_empty: + # if there are no sidewalks or crossings, the combined mesh will be empty + # in which case we skip these assertions + assert self.walkableRegion.containsRegion( + self.sidewalkRegion, tolerance=self.tolerance + ) + assert self.walkableRegion.containsRegion( + self.crossingRegion, tolerance=self.tolerance + ) + else: + self.walkableRegion = self.sidewalkRegion.union(self.crossingRegion) + assert self.walkableRegion.containsRegion( + self.sidewalkRegion, tolerance=self.tolerance + ) + assert self.walkableRegion.containsRegion( + self.crossingRegion, tolerance=self.tolerance + ) if self.curbRegion is None: edges = [] @@ -947,7 +1145,11 @@ def __attrs_post_init__(self): edges.append(road.forwardLanes.curb) if road.backwardLanes: edges.append(road.backwardLanes.curb) - self.curbRegion = PolylineRegion.unionAll(edges) + if not self.use2DMap: + vertex_lists = [edge.vertices for edge in edges] + self.curbRegion = PathRegion(polylines=vertex_lists) + else: + self.curbRegion = PolylineRegion.unionAll(edges) if self.roadDirection is None: # TODO replace with a PolygonalVectorField for better pruning @@ -955,7 +1157,14 @@ def __attrs_post_init__(self): # Build R-tree for faster lookup of roads, etc. at given points self._uidForIndex = tuple(self.elements) - self._rtree = shapely.STRtree([elem.polygons for elem in self.elements.values()]) + if not self.use2DMap: + self._rtree = shapely.STRtree( + [elem.region._boundingPolygon for elem in self.elements.values()] + ) + else: + self._rtree = shapely.STRtree( + [elem.polygon for elem in self.elements.values()] + ) def _defaultRoadDirection(self, point): """Default value for the `roadDirection` vector field. @@ -1071,10 +1280,11 @@ def fromOpenDrive( cls, path, ref_points: int = 20, - tolerance: float = 0.05, + tolerance: float = 0.115, fill_gaps: bool = True, fill_intersections: bool = True, elide_short_roads: bool = False, + use2DMap: bool = False, ): """Create a `Network` from an OpenDRIVE file. @@ -1100,8 +1310,10 @@ def fromOpenDrive( verbosePrint("Parsing OpenDRIVE file...") road_map.parse(path) verbosePrint("Computing road geometry... (this may take a while)") - road_map.calculate_geometry(ref_points, calc_gap=fill_gaps, calc_intersect=True) - network = road_map.toScenicNetwork() + road_map.calculate_geometry( + ref_points, calc_gap=fill_gaps, calc_intersect=True, use2DMap=use2DMap + ) + network = road_map.toScenicNetwork(use2DMap=use2DMap) totalTime = time.time() - startTime verbosePrint(f"Finished loading OpenDRIVE map in {totalTime:.2f} seconds.") return network @@ -1185,16 +1397,25 @@ def findPointIn( are still no matches, we return None, unless **reject** is true, in which case we reject the current sample. """ - point = shapely.geometry.Point(_toVector(point)) + p = _toVector(point) # convert to Scenic Vector + point = shapely.geometry.Point(p) # convert to Shapely Point def findElementWithin(distance): + distance = distance target = point if distance == 0 else point.buffer(distance) indices = self._rtree.query(target, predicate="intersects") candidates = {self._uidForIndex[index] for index in indices} if candidates: + closest = None for elem in elems: if elem.uid in candidates: - return elem + if closest == None: + closest = elem + elif elem.distanceTo(p) < closest.distanceTo( + p + ): # Tie goes to first element + closest = elem + return closest return None # First pass: check for elements containing the point. @@ -1218,11 +1439,11 @@ def _findPointInAll(self, point, things, key=lambda e: e): point = _toVector(point) found = [] for thing in things: - if key(thing).containsPoint(point): + if key(thing).boundingPolygon.containsPoint(point): found.append(thing) if not found and self.tolerance > 0: for thing in things: - if key(thing).distanceTo(point) <= self.tolerance: + if key(thing).boundingPolygon.distanceTo(point) <= self.tolerance: found.append(thing) return found @@ -1344,3 +1565,11 @@ def show(self, labelIncomingLanes=False): x, y, _ = lane.centerline[-1] plt.plot([x], [y], "*b") plt.annotate(str(i), (x, y)) + + def show3D(self, viewer): + self.drivableRegion.mesh.visual.face_colors = [200, 200, 200, 255] + viewer.add_geometry(self.drivableRegion.mesh) + self.shoulderRegion.mesh.visual.face_colors = [0, 0, 255, 255] + viewer.add_geometry(self.shoulderRegion.mesh) + self.walkableRegion.mesh.visual.face_colors = [255, 0, 0, 255] + viewer.add_geometry(self.walkableRegion.mesh) diff --git a/src/scenic/domains/driving/workspace.py b/src/scenic/domains/driving/workspace.py index 2cdcb8143..bba3ab4d7 100644 --- a/src/scenic/domains/driving/workspace.py +++ b/src/scenic/domains/driving/workspace.py @@ -13,6 +13,9 @@ def __init__(self, network): def show2D(self, plt): self.network.show() + def show3D(self, viewer): + self.network.show3D(viewer) + @property def minimumZoomSize(self): return 20 diff --git a/src/scenic/formats/opendrive/xodr_parser.py b/src/scenic/formats/opendrive/xodr_parser.py index e01d9c49d..60763c60c 100644 --- a/src/scenic/formats/opendrive/xodr_parser.py +++ b/src/scenic/formats/opendrive/xodr_parser.py @@ -2,6 +2,7 @@ import abc from collections import defaultdict +import importlib import itertools import math import warnings @@ -10,8 +11,10 @@ import numpy as np from scipy.integrate import quad, solve_ivp from scipy.optimize import brentq +from scipy.spatial import Delaunay from shapely.geometry import GeometryCollection, MultiPoint, MultiPolygon, Point, Polygon from shapely.ops import snap, unary_union +import trimesh from scenic.core.geometry import ( averageVectors, @@ -21,8 +24,13 @@ polygonUnion, removeHoles, ) -from scenic.core.regions import PolygonalRegion, PolylineRegion -from scenic.core.vectors import Vector +from scenic.core.regions import ( + MeshSurfaceRegion, + PathRegion, + PolygonalRegion, + PolylineRegion, +) +from scenic.core.vectors import Vector, VectorField from scenic.domains.driving import roads as roadDomain @@ -223,6 +231,7 @@ def __init__(self, id_, type_, pred=None, succ=None): self.right_bounds = [] self.centerline = [] self.parent_lane_poly = None + self.parent_lane_mesh = None def width_at(self, s): # S here is relative to start of LaneSection this lane is in. @@ -336,6 +345,11 @@ def __init__(self, name, id_, length, junction, drive_on_right=True): self.sec_lane_polys = [] # List of lane polygons. Not a dict b/c lane id is not unique along road. self.lane_polys = [] + # Added for 3D support + self.sec_meshes = [] + self.sec_lane_meshes = [] + self.lane_meshes = [] # List of meshes for each lane section + # Each polygon in lane_polys is the union of connected lane section polygons. # lane_polys is currently not used. # Reference line offset: @@ -346,6 +360,9 @@ def __init__(self, name, id_, length, junction, drive_on_right=True): self.start_bounds_right = {} self.end_bounds_left = {} self.end_bounds_right = {} + # Modified: + self.elevation_poly = [] # List of polys for elevation data + self.lateral_poly = [] # List to polys for lateral data self.remappedStartLanes = None # hack for handling spurious initial lane sections @@ -359,6 +376,37 @@ def get_ref_line_offset(self, s): assert s >= s0 return poly.eval_at(s - s0) + def get_elevation_at(self, s): + """Evaluate the elevation at a given s using the elevation profile.""" + if not self.elevation_poly: + return 0 + for i in range(len(self.elevation_poly) - 1): + s_start = self.elevation_poly[i][1] + s_end = self.elevation_poly[i + 1][1] + if s_start <= s < s_end: + break + else: + i = len(self.elevation_poly) - 1 + s_start = self.elevation_poly[i][1] + ds = s - s_start + return self.elevation_poly[i][0].eval_at(ds) + + def get_super_elevation_at(self, s): + """Evaluate the super-elevation at a given s using the lateral profile.""" + if not self.lateral_poly: + return 0 + for i in range(len(self.lateral_poly) - 1): + s_start, _, _, _, _ = self.lateral_poly[i] + s_end, _, _, _, _ = self.lateral_poly[i + 1] + if s_start <= s < s_end: + break + else: + i = len(self.lateral_poly) - 1 + s_start, a, b, c, d = self.lateral_poly[i] + ds = s - s_start + super_elevation = a + b * ds + c * ds**2 + d * ds**3 + return super_elevation + def get_ref_points(self, num): """Returns list of list of points for each piece of ref_line. List of list structure necessary because each piece needs to be @@ -370,10 +418,18 @@ def get_ref_points(self, num): for piece in self.ref_line: piece_points = piece.to_points(num, extra_points=transition_points) assert piece_points, "Failed to get piece points" + elevated_points = [] if ref_points: - last_s = ref_points[-1][-1][2] - piece_points = [(p[0], p[1], p[2] + last_s) for p in piece_points] - ref_points.append(piece_points) + last_s = ref_points[-1][-1][3] + elevated_points = [ + (p[0], p[1], self.get_elevation_at(p[2] + last_s), p[2] + last_s) + for p in piece_points + ] + else: + elevated_points = [ + (p[0], p[1], self.get_elevation_at(p[2]), p[2]) for p in piece_points + ] + ref_points.append(elevated_points) transition_points = [s - last_s for s in transition_points if s > last_s] return ref_points @@ -410,7 +466,9 @@ def heading_at(self, point): raise RuntimeError("Point not found in piece_polys") - def calc_geometry_for_type(self, lane_types, num, tolerance, calc_gap=False): + def calc_geometry_for_type( + self, lane_types, num, tolerance, calc_gap=False, use2DMap=0 + ): """Given a list of lane types, returns a tuple of: - List of lists of points along the reference line, with same indexing as self.lane_secs - List of region polygons, with same indexing as self.lane_secs @@ -423,14 +481,17 @@ def calc_geometry_for_type(self, lane_types, num, tolerance, calc_gap=False): ref_points = self.get_ref_points(num) self.ref_line_points = list(itertools.chain.from_iterable(ref_points)) cur_lane_polys = {} - sec_points = [] + cur_lane_meshes = {} + sec_points = [] # Same across both 2D and 3D sec_polys = [] sec_lane_polys = [] lane_polys = [] + sec_meshes = [] + sec_lane_meshes = [] + lane_meshes = [] last_lefts = None last_rights = None cur_p = None - for i in range(len(self.lane_secs)): cur_sec = self.lane_secs[i] cur_sec_points = [] @@ -442,13 +503,15 @@ def calc_geometry_for_type(self, lane_types, num, tolerance, calc_gap=False): left_bounds = defaultdict(list) right_bounds = defaultdict(list) cur_sec_lane_polys = defaultdict(list) + cur_sec_lane_meshes = defaultdict(list) cur_sec_polys = [] + cur_sec_meshes = [] end_of_sec = False while ref_points and not end_of_sec: if not ref_points[0]: ref_points.pop(0) - if not ref_points or (cur_p and cur_p[2] >= s_stop): + if not ref_points or (cur_p and cur_p[3] >= s_stop): # Case 1: We have processed the entire reference line. # Case 2: The s-coordinate has exceeded s_stop, so we should move # onto the next LaneSection. @@ -461,23 +524,52 @@ def calc_geometry_for_type(self, lane_types, num, tolerance, calc_gap=False): left = left_bounds[id_] right = right_bounds[id_][::-1] bounds = left + right + if not use2DMap: + right = right_bounds[id_] + bounds = left + right if len(bounds) < 3: continue - poly = cleanPolygon(Polygon(bounds), tolerance) - if not poly.is_empty: - if poly.geom_type == "MultiPolygon": - poly = MultiPolygon( + if not use2DMap: + faces = [] + # Create a 3D mesh from the bounds + for j in range(len(left) - 1): + faces.append( [ - p - for p in poly.geoms - if not p.is_empty and p.exterior + j + 1, + j, + len(left) + j, ] ) - cur_sec_polys.extend(poly.geoms) - else: - cur_sec_polys.append(poly) - cur_sec_lane_polys[id_].append(poly) + faces.append( + [ + j + 1, + len(left) + j, + len(left) + j + 1, + ] + ) + mesh = trimesh.Trimesh( + vertices=bounds, + faces=faces, + ) + if not mesh.is_empty: + cur_sec_meshes.append(mesh) + cur_sec_lane_meshes[id_].append(mesh) + else: + poly = cleanPolygon(Polygon(bounds), tolerance) + if not poly.is_empty: + if poly.geom_type == "MultiPolygon": + poly = MultiPolygon( + [ + p + for p in poly.geoms + if not p.is_empty and p.exterior + ] + ) + cur_sec_polys.extend(poly.geoms) + else: + cur_sec_polys.append(poly) + cur_sec_lane_polys[id_].append(poly) cur_last_lefts[id_] = left_bounds[id_][-1] cur_last_rights[id_] = right_bounds[id_][-1] if i == 0 or not self.start_bounds_left: @@ -492,14 +584,18 @@ def calc_geometry_for_type(self, lane_types, num, tolerance, calc_gap=False): else: cur_p = ref_points[0][0] cur_sec_points.append(cur_p) - s = min(max(cur_p[2], cur_sec.s0), s_stop - 1e-6) + s = min(max(cur_p[3], cur_sec.s0), s_stop - 1e-6) offsets = cur_sec.get_offsets(s) offsets[0] = 0 for id_ in offsets: offsets[id_] += self.get_ref_line_offset(s) if len(ref_points[0]) > 1: next_p = ref_points[0][1] - tan_vec = (next_p[0] - cur_p[0], next_p[1] - cur_p[1]) + tan_vec = ( + next_p[0] - cur_p[0], + next_p[1] - cur_p[1], + next_p[2] - cur_p[2], + ) else: if len(cur_sec_points) >= 2: prev_p = cur_sec_points[-2] @@ -511,17 +607,26 @@ def calc_geometry_for_type(self, lane_types, num, tolerance, calc_gap=False): else: prev_p = sec_points[-2][-2] - tan_vec = (cur_p[0] - prev_p[0], cur_p[1] - prev_p[1]) - tan_norm = math.hypot(tan_vec[0], tan_vec[1]) - assert tan_norm > 1e-10 - normal_vec = (-tan_vec[1] / tan_norm, tan_vec[0] / tan_norm) - if cur_p[2] < s_stop: + tan_vec = ( + cur_p[0] - prev_p[0], + cur_p[1] - prev_p[1], + cur_p[2] - prev_p[2], + ) + tan_vec = np.array([tan_vec[0], tan_vec[1], tan_vec[2]]) + ref_vec = ( + np.array([0, 0, 1]) + if not np.allclose(tan_vec[:2], 0) + else np.array([1, 0, 0]) + ) + normal_vec = np.cross(ref_vec, tan_vec) + normal_vec /= np.linalg.norm(normal_vec) + if cur_p[3] < s_stop: # if at end of section, keep current point to be included in # the next section as well; otherwise remove it ref_points[0].pop(0) elif len(ref_points[0]) == 1 and len(ref_points) > 1: # also get rid of point if this is the last point of the current geometry and - # and there is another geometry following + # there is another geometry following ref_points[0].pop(0) for id_ in offsets: lane = cur_sec.get_lane(id_) @@ -533,10 +638,12 @@ def calc_geometry_for_type(self, lane_types, num, tolerance, calc_gap=False): left_bound = [ cur_p[0] + normal_vec[0] * offsets[id_], cur_p[1] + normal_vec[1] * offsets[id_], + cur_p[2] + normal_vec[2] * offsets[id_], ] right_bound = [ cur_p[0] + normal_vec[0] * offsets[prev_id], cur_p[1] + normal_vec[1] * offsets[prev_id], + cur_p[2] + normal_vec[2] * offsets[prev_id], ] if id_ < 0: left_bound, right_bound = right_bound, left_bound @@ -544,70 +651,118 @@ def calc_geometry_for_type(self, lane_types, num, tolerance, calc_gap=False): centerline = [ cur_p[0] + normal_vec[0] * halfway, cur_p[1] + normal_vec[1] * halfway, + cur_p[2] + normal_vec[2] * halfway, ] + if use2DMap: + left_bound = (left_bound[0], left_bound[1]) + right_bound = (right_bound[0], right_bound[1]) + centerline = (centerline[0], centerline[1]) left_bounds[id_].append(left_bound) right_bounds[id_].append(right_bound) lane.left_bounds.append(left_bound) lane.right_bounds.append(right_bound) lane.centerline.append(centerline) - assert len(cur_sec_points) >= 2, i - sec_points.append(cur_sec_points) - sec_polys.append(buffer_union(cur_sec_polys, tolerance=tolerance)) - for id_ in cur_sec_lane_polys: - poly = buffer_union(cur_sec_lane_polys[id_], tolerance=tolerance) - cur_sec_lane_polys[id_] = poly - cur_sec.get_lane(id_).poly = poly - sec_lane_polys.append(dict(cur_sec_lane_polys)) - next_lane_polys = {} - for id_ in cur_sec_lane_polys: - pred_id = cur_sec.get_lane(id_).pred - if pred_id and pred_id in cur_lane_polys: - next_lane_polys[id_] = cur_lane_polys.pop(pred_id) + [ - cur_sec_lane_polys[id_] - ] - else: - next_lane_polys[id_] = [cur_sec_lane_polys[id_]] + if not use2DMap: + assert len(cur_sec_points) >= 2, i + sec_points.append(cur_sec_points) + sec_meshes.append(trimesh.util.concatenate(cur_sec_meshes)) + for id_ in cur_sec_lane_meshes: + mesh = trimesh.util.concatenate(cur_sec_lane_meshes[id_]) + cur_sec_lane_meshes[id_] = mesh + cur_sec.get_lane(id_).mesh = mesh + sec_lane_meshes.append(dict(cur_sec_lane_meshes)) + next_lane_meshes = {} + for id_ in cur_sec_lane_meshes: + pred_id = cur_sec.get_lane(id_).pred + if pred_id and pred_id in cur_lane_meshes: + next_lane_meshes[id_] = cur_lane_meshes.pop(pred_id) + [ + cur_sec_lane_meshes[id_] + ] + else: + next_lane_meshes[id_] = [cur_sec_lane_meshes[id_]] + for id_ in cur_lane_meshes: + mesh = trimesh.util.concatenate(cur_lane_meshes[id_]) + self.lane_secs[i - 1].get_lane(id_).parent_lane_mesh = len( + lane_meshes + ) + lane_meshes.append(mesh) + cur_lane_meshes = next_lane_meshes + else: + assert len(cur_sec_points) >= 2, i + sec_points.append(cur_sec_points) + sec_polys.append(buffer_union(cur_sec_polys, tolerance=tolerance)) + for id_ in cur_sec_lane_polys: + poly = buffer_union(cur_sec_lane_polys[id_], tolerance=tolerance) + cur_sec_lane_polys[id_] = poly + cur_sec.get_lane(id_).poly = poly + sec_lane_polys.append(dict(cur_sec_lane_polys)) + next_lane_polys = {} + for id_ in cur_sec_lane_polys: + pred_id = cur_sec.get_lane(id_).pred + if pred_id and pred_id in cur_lane_polys: + next_lane_polys[id_] = cur_lane_polys.pop(pred_id) + [ + cur_sec_lane_polys[id_] + ] + else: + next_lane_polys[id_] = [cur_sec_lane_polys[id_]] + for id_ in cur_lane_polys: + poly = buffer_union(cur_lane_polys[id_], tolerance=tolerance) + self.lane_secs[i - 1].get_lane(id_).parent_lane_poly = len(lane_polys) + lane_polys.append(poly) + cur_lane_polys = next_lane_polys + if not use2DMap: + for id_ in cur_lane_meshes: + mesh = trimesh.util.concatenate(cur_lane_meshes[id_]) + cur_sec.get_lane(id_).parent_lane_mesh = len(lane_meshes) + lane_meshes.append(mesh) + union_mesh = trimesh.util.concatenate(sec_meshes) + if last_lefts and last_rights: + self.end_bounds_left.update(last_lefts) + self.end_bounds_right.update(last_rights) + for sec in self.lane_secs: + for lane in sec.lanes.values(): + parentIndex = lane.parent_lane_mesh + if isinstance(parentIndex, int): + lane.parent_lane_mesh = lane_meshes[parentIndex] + return (sec_points, sec_meshes, sec_lane_meshes, lane_meshes, union_mesh) + else: for id_ in cur_lane_polys: poly = buffer_union(cur_lane_polys[id_], tolerance=tolerance) - self.lane_secs[i - 1].get_lane(id_).parent_lane_poly = len(lane_polys) + cur_sec.get_lane(id_).parent_lane_poly = len(lane_polys) lane_polys.append(poly) - cur_lane_polys = next_lane_polys - for id_ in cur_lane_polys: - poly = buffer_union(cur_lane_polys[id_], tolerance=tolerance) - cur_sec.get_lane(id_).parent_lane_poly = len(lane_polys) - lane_polys.append(poly) - union_poly = buffer_union(sec_polys, tolerance=tolerance) - if last_lefts and last_rights: - self.end_bounds_left.update(last_lefts) - self.end_bounds_right.update(last_rights) - - # Difference and slightly erode all overlapping polygons - for i in range(len(sec_polys) - 1): - if sec_polys[i].overlaps(sec_polys[i + 1]): - sec_polys[i] = sec_polys[i].difference(sec_polys[i + 1]).buffer(-1e-6) - assert not sec_polys[i].overlaps(sec_polys[i + 1]) - - for polys in sec_lane_polys: - ids = sorted(polys) # order adjacent lanes consecutively - for i in range(len(ids) - 1): - polyA, polyB = polys[ids[i]], polys[ids[i + 1]] - if polyA.overlaps(polyB): - polys[ids[i]] = polyA.difference(polyB).buffer(-1e-6) - assert not polys[ids[i]].overlaps(polyB) - - for i in range(len(lane_polys) - 1): - if lane_polys[i].overlaps(lane_polys[i + 1]): - lane_polys[i] = lane_polys[i].difference(lane_polys[i + 1]).buffer(-1e-6) - assert not lane_polys[i].overlaps(lane_polys[i + 1]) - - # Set parent lane polygon references to corrected polygons - for sec in self.lane_secs: - for lane in sec.lanes.values(): - parentIndex = lane.parent_lane_poly - if isinstance(parentIndex, int): - lane.parent_lane_poly = lane_polys[parentIndex] - - return (sec_points, sec_polys, sec_lane_polys, lane_polys, union_poly) + union_poly = buffer_union(sec_polys, tolerance=tolerance) + if last_lefts and last_rights: + self.end_bounds_left.update(last_lefts) + self.end_bounds_right.update(last_rights) + + # Difference and slightly erode all overlapping polygons + for i in range(len(sec_polys) - 1): + if sec_polys[i].overlaps(sec_polys[i + 1]): + sec_polys[i] = sec_polys[i].difference(sec_polys[i + 1]).buffer(-1e-6) + assert not sec_polys[i].overlaps(sec_polys[i + 1]) + + for polys in sec_lane_polys: + ids = sorted(polys) # order adjacent lanes consecutively + for i in range(len(ids) - 1): + polyA, polyB = polys[ids[i]], polys[ids[i + 1]] + if polyA.overlaps(polyB): + polys[ids[i]] = polyA.difference(polyB).buffer(-1e-6) + assert not polys[ids[i]].overlaps(polyB) + + for i in range(len(lane_polys) - 1): + if lane_polys[i].overlaps(lane_polys[i + 1]): + lane_polys[i] = ( + lane_polys[i].difference(lane_polys[i + 1]).buffer(-1e-6) + ) + assert not lane_polys[i].overlaps(lane_polys[i + 1]) + + # Set parent lane polygon references to corrected polygons + for sec in self.lane_secs: + for lane in sec.lanes.values(): + parentIndex = lane.parent_lane_poly + if isinstance(parentIndex, int): + lane.parent_lane_poly = lane_polys[parentIndex] + return (sec_points, sec_polys, sec_lane_polys, lane_polys, union_poly) def calculate_geometry( self, @@ -617,18 +772,38 @@ def calculate_geometry( drivable_lane_types, sidewalk_lane_types, shoulder_lane_types, + use2DMap, ): # Note: this also calculates self.start_bounds_left, self.start_bounds_right, # self.end_bounds_left, self.end_bounds_right - ( - self.sec_points, - self.sec_polys, - self.sec_lane_polys, - self.lane_polys, - self.drivable_region, - ) = self.calc_geometry_for_type( - drivable_lane_types, num, tolerance, calc_gap=calc_gap - ) + if not use2DMap: + ( + self.sec_points, + self.sec_meshes, + self.sec_lane_meshes, + self.lane_meshes, + self.drivable_region, + ) = self.calc_geometry_for_type( + drivable_lane_types, + num, + tolerance, + calc_gap=calc_gap, + use2DMap=use2DMap, + ) + else: + ( + self.sec_points, + self.sec_polys, + self.sec_lane_polys, + self.lane_polys, + self.drivable_region, + ) = self.calc_geometry_for_type( + drivable_lane_types, + num, + tolerance, + calc_gap=calc_gap, + use2DMap=use2DMap, + ) for i, sec in enumerate(self.lane_secs): sec.drivable_lanes = {} @@ -663,14 +838,14 @@ def calculate_geometry( assert len(sec.right_edge) >= 2 _, _, _, _, self.sidewalk_region = self.calc_geometry_for_type( - sidewalk_lane_types, num, tolerance, calc_gap=calc_gap + sidewalk_lane_types, num, tolerance, calc_gap=calc_gap, use2DMap=use2DMap ) _, _, _, _, self.shoulder_region = self.calc_geometry_for_type( - shoulder_lane_types, num, tolerance, calc_gap=calc_gap + shoulder_lane_types, num, tolerance, calc_gap=calc_gap, use2DMap=use2DMap ) - def toScenicRoad(self, tolerance): + def toScenicRoad(self, tolerance, use2DMap=0): assert self.sec_points allElements = [] # Create lane and road sections @@ -678,10 +853,18 @@ def toScenicRoad(self, tolerance): last_section = None sidewalkSections = defaultdict(list) shoulderSections = defaultdict(list) - for sec, pts, sec_poly, lane_polys in zip( + sec_zip = zip( self.lane_secs, self.sec_points, self.sec_polys, self.sec_lane_polys - ): - pts = [pt[:2] for pt in pts] # drop s coordinate + ) + if not use2DMap: + sec_zip = zip( + self.lane_secs, self.sec_points, self.sec_meshes, self.sec_lane_meshes + ) + for sec, pts, sec_shape, lane_shape in sec_zip: + if not use2DMap: + pts = [pt[:3] for pt in pts] # drop s coordinate + else: + pts = [pt[:2] for pt in pts] # drop s coordinate assert sec.drivable_lanes laneSections = {} for id_, lane in sec.drivable_lanes.items(): @@ -703,10 +886,22 @@ def toScenicRoad(self, tolerance): succ, pred = pred, succ section = roadDomain.LaneSection( id=f"road{self.id_}_sec{len(roadSections)}_lane{id_}", - polygon=lane_polys[id_], - centerline=PolylineRegion(cleanChain(center)), - leftEdge=PolylineRegion(cleanChain(left)), - rightEdge=PolylineRegion(cleanChain(right)), + polygon=(lane_shape[id_]), + centerline=( + PathRegion(points=cleanChain(center)) + if not use2DMap + else PolylineRegion(cleanChain(center)) + ), + leftEdge=( + PathRegion(points=cleanChain(left)) + if not use2DMap + else PolylineRegion(cleanChain(left)) + ), + rightEdge=( + PathRegion(points=cleanChain(right)) + if not use2DMap + else PolylineRegion(cleanChain(right)) + ), successor=succ, predecessor=pred, lane=None, # will set these later @@ -714,20 +909,54 @@ def toScenicRoad(self, tolerance): road=None, openDriveID=id_, isForward=id_ < 0, + region=( + MeshSurfaceRegion( + lane_shape[id_], + centerMesh=False, + position=None, + orientation=VectorField( + f"lane_{id_}", roadDomain.LaneSection._defaultHeadingAt + ), + ) + if not use2DMap + else PolygonalRegion(polygon=lane_shape[id_]) + ), ) section._original_lane = lane laneSections[id_] = section allElements.append(section) section = roadDomain.RoadSection( id=f"road{self.id_}_sec{len(roadSections)}", - polygon=sec_poly, - centerline=PolylineRegion(cleanChain(pts)), - leftEdge=PolylineRegion(cleanChain(sec.left_edge)), - rightEdge=PolylineRegion(cleanChain(sec.right_edge)), + polygon=sec_shape, + centerline=( + PathRegion(points=cleanChain(pts)) + if not use2DMap + else PolylineRegion(cleanChain(pts)) + ), + leftEdge=( + PathRegion(points=cleanChain(sec.left_edge)) + if not use2DMap + else PolylineRegion(cleanChain(sec.left_edge)) + ), + rightEdge=( + PathRegion(points=cleanChain(sec.right_edge)) + if not use2DMap + else PolylineRegion(cleanChain(sec.right_edge)) + ), successor=None, predecessor=last_section, road=None, # will set later lanesByOpenDriveID=laneSections, + region=( + MeshSurfaceRegion( + sec_shape, + centerMesh=None, + position=None, + orientation=roadDomain.RoadSection._defaultHeadingAt, + ) + if not use2DMap + else PolygonalRegion(polygon=sec_shape) + ), ) roadSections.append(section) allElements.append(section) @@ -769,8 +998,16 @@ def combineSections(laneIDs, sections, name): for leftSec, rightSec in zip(leftSecs, rightSecs): leftPoints.extend(reversed(rightSec.right_bounds)) rightPoints.extend(reversed(leftSec.left_bounds)) - leftEdge = PolylineRegion(cleanChain(leftPoints)) - rightEdge = PolylineRegion(cleanChain(rightPoints)) + leftEdge = ( + PathRegion(points=cleanChain(leftPoints)) + if not use2DMap + else PolylineRegion(cleanChain(leftPoints)) + ) + rightEdge = ( + PathRegion(points=cleanChain(rightPoints)) + if not use2DMap + else PolylineRegion(cleanChain(rightPoints)) + ) # Heuristically create some kind of reasonable centerline if len(leftPoints) == len(rightPoints): @@ -781,16 +1018,40 @@ def combineSections(laneIDs, sections, name): num = max(len(leftPoints), len(rightPoints)) centerPoints = [] for d in np.linspace(0, 1, num): - l = leftEdge.lineString.interpolate(d, normalized=True) - r = rightEdge.lineString.interpolate(d, normalized=True) - centerPoints.append(averageVectors(l.coords[0], r.coords[0])) - centerline = PolylineRegion(cleanChain(centerPoints)) - allPolys = ( - sec.poly - for id_ in range(rightmost, leftmost + 1) - for sec in sections[id_] + if use2DMap: + l = leftEdge.lineString.interpolate(d, normalized=True) + r = rightEdge.lineString.interpolate(d, normalized=True) + centerPoints.append(averageVectors(l.coords[0], r.coords[0])) + else: + l = PolylineRegion( + points=leftEdge.vertices + ).lineString.interpolate(d, normalized=True) + r = PolylineRegion( + points=rightEdge.vertices + ).lineString.interpolate(d, normalized=True) + centerPoints.append(averageVectors(l.coords[0], r.coords[0])) + centerline = ( + PathRegion(points=cleanChain(centerPoints)) + if not use2DMap + else PolylineRegion(cleanChain(centerPoints)) ) - union = buffer_union(allPolys, tolerance=tolerance) + allShape = None + union = None + if not use2DMap: + allShape = ( + sec.mesh + for id_ in range(rightmost, leftmost + 1) + for sec in sections[id_] + ) + union = trimesh.util.concatenate(allShape) + else: + allShape = ( + sec.poly + for id_ in range(rightmost, leftmost + 1) + for sec in sections[id_] + ) + union = buffer_union(allShape, tolerance=tolerance) + id_ = f"road{self.id_}_{name}({leftmost},{rightmost})" return id_, union, centerline, leftEdge, rightEdge @@ -808,6 +1069,16 @@ def makeSidewalk(laneIDs): rightEdge=rightEdge, road=None, crossings=(), # TODO add crosswalks + region=( + MeshSurfaceRegion( + union, + centerMesh=False, + position=None, + orientation=roadDomain.Sidewalk._defaultHeadingAt, + ) + if not use2DMap + else PolygonalRegion(polygon=union) + ), ) allElements.append(sidewalk) return sidewalk @@ -828,6 +1099,16 @@ def makeShoulder(laneIDs): leftEdge=leftEdge, rightEdge=rightEdge, road=None, + region=( + MeshSurfaceRegion( + union, + centerMesh=False, + position=None, + orientation=roadDomain.Shoulder._defaultHeadingAt, + ) + if not use2DMap + else PolygonalRegion(polygon=union) + ), ) allElements.append(shoulder) return shoulder @@ -918,21 +1199,47 @@ def makeShoulder(laneIDs): break laneSection = nextSection ls = laneSection._original_lane - assert ls.parent_lane_poly + assert ls.parent_lane_poly or ls.parent_lane_mesh if not forward: sections = tuple(reversed(sections)) leftPoints, rightPoints, centerPoints = [], [], [] for section in sections: - leftPoints.extend(section.leftEdge.points) - rightPoints.extend(section.rightEdge.points) - centerPoints.extend(section.centerline.points) - leftEdge = PolylineRegion(cleanChain(leftPoints)) - rightEdge = PolylineRegion(cleanChain(rightPoints)) - centerline = PolylineRegion(cleanChain(centerPoints)) + leftPoints.extend( + section.leftEdge.vertices + if not use2DMap + else section.leftEdge.points + ) + rightPoints.extend( + section.rightEdge.vertices + if not use2DMap + else section.rightEdge.points + ) + centerPoints.extend( + section.centerline.vertices + if not use2DMap + else section.centerline.points + ) + leftEdge = ( + PathRegion(points=cleanChain(leftPoints)) + if not use2DMap + else PolylineRegion(cleanChain(leftPoints)) + ) + rightEdge = ( + PathRegion(points=cleanChain(rightPoints)) + if not use2DMap + else PolylineRegion(cleanChain(rightPoints)) + ) + centerline = ( + PathRegion(points=cleanChain(centerPoints)) + if not use2DMap + else PolylineRegion(cleanChain(centerPoints)) + ) lane = roadDomain.Lane( id=f"road{self.id_}_lane{nextID}", - polygon=ls.parent_lane_poly, + polygon=( + ls.parent_lane_mesh if not use2DMap else ls.parent_lane_poly + ), centerline=centerline, leftEdge=leftEdge, rightEdge=rightEdge, @@ -940,6 +1247,16 @@ def makeShoulder(laneIDs): road=None, sections=tuple(sections), successor=successorLane, # will correct inter-road links later + region=( + MeshSurfaceRegion( + ls.parent_lane_mesh, + centerMesh=False, + position=None, + orientation=roadDomain.Lane._defaultHeadingAt, + ) + if not use2DMap + else PolygonalRegion(polygon=ls.parent_lane_poly) + ), ) nextID += 1 for section in sections: @@ -969,27 +1286,47 @@ def getEdges(forward): while current and isinstance(current, roadDomain.LaneSection): if current._laneToLeft and current._laneToLeft.isForward == forward: current = current._laneToLeft - leftPoints.extend(current.leftEdge.points) + leftPoints.extend( + current.leftEdge.vertices if not use2DMap else current.leftEdge.points + ) current = current._successor - leftEdge = PolylineRegion(cleanChain(leftPoints)) + leftEdge = ( + PathRegion(points=cleanChain(leftPoints)) + if not use2DMap + else PolylineRegion(cleanChain(leftPoints)) + ) rightPoints = [] current = startLanes[0] # get rightmost lane of the first section while current and isinstance(current, roadDomain.LaneSection): if current._laneToRight and current._laneToRight.isForward == forward: current = current._laneToRight - rightPoints.extend(current.rightEdge.points) + rightPoints.extend( + current.rightEdge.vertices + if not use2DMap + else current.rightEdge.points + ) current = current._successor - rightEdge = PolylineRegion(cleanChain(rightPoints)) + rightEdge = ( + PathRegion(points=cleanChain(rightPoints)) + if not use2DMap + else PolylineRegion(cleanChain(rightPoints)) + ) middleLane = startLanes[len(startLanes) // 2].lane # rather arbitrary return leftEdge, middleLane.centerline, rightEdge if forwardLanes: leftEdge, centerline, rightEdge = getEdges(forward=True) + shape = ( + trimesh.util.concatenate(lane.polygon for lane in forwardLanes) + if not use2DMap + else buffer_union( + (lane.polygon for lane in forwardLanes), + tolerance=tolerance, + ) + ) forwardGroup = roadDomain.LaneGroup( id=f"road{self.id_}_forward", - polygon=buffer_union( - (lane.polygon for lane in forwardLanes), tolerance=tolerance - ), + polygon=(shape), centerline=centerline, leftEdge=leftEdge, rightEdge=rightEdge, @@ -1000,17 +1337,35 @@ def getEdges(forward): bikeLane=None, shoulder=forwardShoulder, opposite=None, + region=( + MeshSurfaceRegion( + shape, + centerMesh=False, + position=None, + orientation=roadDomain.LaneGroup._defaultHeadingAt, + ) + if not use2DMap + else PolygonalRegion(polygon=shape) + ), ) allElements.append(forwardGroup) else: forwardGroup = None if backwardLanes: leftEdge, centerline, rightEdge = getEdges(forward=False) + shape = ( + trimesh.util.concatenate( + lane.polygon for lane in backwardLanes if lane.polygon + ) + if not use2DMap + else buffer_union( + (lane.polygon for lane in backwardLanes if lane.polygon), + tolerance=tolerance, + ) + ) backwardGroup = roadDomain.LaneGroup( id=f"road{self.id_}_backward", - polygon=buffer_union( - (lane.polygon for lane in backwardLanes), tolerance=tolerance - ), + polygon=(shape), centerline=centerline, leftEdge=leftEdge, rightEdge=rightEdge, @@ -1021,6 +1376,16 @@ def getEdges(forward): bikeLane=None, shoulder=backwardShoulder, opposite=forwardGroup, + region=( + MeshSurfaceRegion( + shape, + centerMesh=False, + position=None, + orientation=roadDomain.LaneGroup._defaultHeadingAt, + ) + if not use2DMap + else PolygonalRegion(polygon=shape) + ), ) allElements.append(backwardGroup) if forwardGroup: @@ -1049,7 +1414,11 @@ def getEdges(forward): leftEdge = backwardGroup.rightEdge else: leftEdge = forwardGroup.leftEdge - centerline = PolylineRegion(tuple(pt[:2] for pt in self.ref_line_points)) + centerline = ( + PathRegion(points=tuple(pt[:3] for pt in self.ref_line_points)) + if not use2DMap + else PolylineRegion(tuple(pt[:2] for pt in self.ref_line_points)) + ) # Changed from pt[:2] to pt[:3] (Might need to change this) road = roadDomain.Road( name=self.name, uid=f"road{self.id_}", # need prefix to prevent collisions with intersections @@ -1064,6 +1433,16 @@ def getEdges(forward): sections=roadSections, signals=tuple(roadSignals), crossings=(), # TODO add these! + region=( + MeshSurfaceRegion( + self.drivable_region, + centerMesh=False, + position=None, + orientation=roadDomain.Road._defaultHeadingAt, + ) + if not use2DMap + else PolygonalRegion(polygon=self.drivable_region) + ), ) allElements.append(road) @@ -1152,6 +1531,8 @@ def __init__( self.junctions = {} self.sec_lane_polys = [] self.lane_polys = [] + self.sec_lane_meshes = [] + self.lane_meshes = [] self.intersection_region = None self.fill_intersections = fill_intersections self.drivable_lane_types = drivable_lane_types @@ -1159,7 +1540,7 @@ def __init__( self.shoulder_lane_types = shoulder_lane_types self.elide_short_roads = elide_short_roads - def calculate_geometry(self, num, calc_gap=False, calc_intersect=True): + def calculate_geometry(self, num, calc_gap=False, calc_intersect=True, use2DMap=0): # If calc_gap=True, fills in gaps between connected roads. # If calc_intersect=True, calculates intersection regions. # These are fairly expensive. @@ -1171,24 +1552,44 @@ def calculate_geometry(self, num, calc_gap=False, calc_intersect=True): drivable_lane_types=self.drivable_lane_types, sidewalk_lane_types=self.sidewalk_lane_types, shoulder_lane_types=self.shoulder_lane_types, + use2DMap=use2DMap, ) - self.sec_lane_polys.extend(road.sec_lane_polys) - self.lane_polys.extend(road.lane_polys) + if not use2DMap: + self.sec_lane_meshes.extend(road.sec_lane_meshes) + self.lane_meshes.extend(road.lane_meshes) + else: + self.sec_lane_polys.extend(road.sec_lane_polys) + self.lane_polys.extend(road.lane_polys) if calc_gap: drivable_polys = [] sidewalk_polys = [] shoulder_polys = [] - for road in self.roads.values(): - drivable_poly = road.drivable_region - sidewalk_poly = road.sidewalk_region - shoulder_poly = road.shoulder_region - if not (drivable_poly is None or drivable_poly.is_empty): - drivable_polys.append(drivable_poly) - if not (sidewalk_poly is None or sidewalk_poly.is_empty): - sidewalk_polys.append(sidewalk_poly) - if not (shoulder_poly is None or shoulder_poly.is_empty): - shoulder_polys.append(shoulder_poly) + drivable_meshes = [] + sidewalk_meshes = [] + shoulder_meshes = [] + if not use2DMap: + for road in self.roads.values(): + drivable_mesh = road.drivable_region + sidewalk_mesh = road.sidewalk_region + shoulder_mesh = road.shoulder_region + if not (drivable_mesh is None or drivable_mesh.is_empty): + drivable_meshes.append(drivable_mesh) + if not (sidewalk_mesh is None or sidewalk_mesh.is_empty): + sidewalk_meshes.append(sidewalk_mesh) + if not (shoulder_mesh is None or shoulder_mesh.is_empty): + shoulder_meshes.append(shoulder_mesh) + else: + for road in self.roads.values(): + drivable_poly = road.drivable_region + sidewalk_poly = road.sidewalk_region + shoulder_poly = road.shoulder_region + if not (drivable_poly is None or drivable_poly.is_empty): + drivable_polys.append(drivable_poly) + if not (sidewalk_poly is None or sidewalk_poly.is_empty): + sidewalk_polys.append(sidewalk_poly) + if not (shoulder_poly is None or shoulder_poly.is_empty): + shoulder_polys.append(shoulder_poly) for link in self.road_links: road_a = self.roads[link.id_a] @@ -1219,48 +1620,88 @@ def calculate_geometry(self, num, calc_gap=False, calc_intersect=True): continue if id_ not in a_bounds_left or id_ not in a_bounds_right: continue - - gap_poly = MultiPoint( - [ + if not use2DMap: + vertices = [ a_bounds_left[id_], a_bounds_right[id_], + b_bounds_right[ + other_id + ], # Note the order change for proper face orientation b_bounds_left[other_id], - b_bounds_right[other_id], ] - ).convex_hull - if not gap_poly.is_valid: - continue - if gap_poly.geom_type == "Polygon" and not gap_poly.is_empty: - if lane.type_ in self.drivable_lane_types: - drivable_polys.append(gap_poly) - elif lane.type_ in self.sidewalk_lane_types: - sidewalk_polys.append(gap_poly) - elif lane.type_ in self.shoulder_lane_types: - shoulder_polys.append(gap_poly) + # Create a quad mesh + faces = [[0, 1, 2], [0, 2, 3]] + gap_mesh = trimesh.Trimesh(vertices=vertices, faces=faces) + if not gap_mesh.is_empty: + if lane.type_ in self.drivable_lane_types: + drivable_meshes.append(gap_mesh) + elif lane.type_ in self.sidewalk_lane_types: + sidewalk_meshes.append(gap_mesh) + elif lane.type_ in self.shoulder_lane_types: + shoulder_meshes.append(gap_mesh) + else: + gap_poly = MultiPoint( + [ + a_bounds_left[id_], + a_bounds_right[id_], + b_bounds_left[other_id], + b_bounds_right[other_id], + ] + ).convex_hull + if not gap_poly.is_valid: + continue + if gap_poly.geom_type == "Polygon" and not gap_poly.is_empty: + if lane.type_ in self.drivable_lane_types: + drivable_polys.append(gap_poly) + elif lane.type_ in self.sidewalk_lane_types: + sidewalk_polys.append(gap_poly) + elif lane.type_ in self.shoulder_lane_types: + shoulder_polys.append(gap_poly) else: - drivable_polys = [road.drivable_region for road in self.roads.values()] - sidewalk_polys = [road.sidewalk_region for road in self.roads.values()] - shoulder_polys = [road.shoulder_region for road in self.roads.values()] - - self.drivable_region = buffer_union(drivable_polys, tolerance=self.tolerance) - self.sidewalk_region = buffer_union(sidewalk_polys, tolerance=self.tolerance) - self.shoulder_region = buffer_union(shoulder_polys, tolerance=self.tolerance) - + if not use2DMap: + drivable_meshes = [road.drivable_region for road in self.roads.values()] + sidewalk_meshes = [road.sidewalk_region for road in self.roads.values()] + shoulder_meshes = [road.shoulder_region for road in self.roads.values()] + else: + drivable_polys = [road.drivable_region for road in self.roads.values()] + sidewalk_polys = [road.sidewalk_region for road in self.roads.values()] + shoulder_polys = [road.shoulder_region for road in self.roads.values()] + + if not use2DMap: + self.drivable_region = trimesh.util.concatenate(drivable_meshes) + self.sidewalk_region = trimesh.util.concatenate(sidewalk_meshes) + self.shoulder_region = trimesh.util.concatenate(shoulder_meshes) + else: + self.drivable_region = buffer_union(drivable_polys, tolerance=self.tolerance) + self.sidewalk_region = buffer_union(sidewalk_polys, tolerance=self.tolerance) + self.shoulder_region = buffer_union(shoulder_polys, tolerance=self.tolerance) if calc_intersect: - self.calculate_intersections() - - def calculate_intersections(self): - intersect_polys = [] - for junc in self.junctions.values(): - junc_polys = [self.roads[i].drivable_region for i in junc.paths] - assert junc_polys, junc - union = buffer_union(junc_polys, tolerance=self.tolerance) - if self.fill_intersections: - union = removeHoles(union) - assert union.is_valid - junc.poly = union - intersect_polys.append(union) - self.intersection_region = buffer_union(intersect_polys, tolerance=self.tolerance) + self.calculate_intersections(use2DMap=use2DMap) + + def calculate_intersections(self, use2DMap): + if not use2DMap: + intersect_meshes = [] + for junc in self.junctions.values(): + junc_meshes = [self.roads[i].drivable_region for i in junc.paths] + assert junc_meshes, junc + union = trimesh.util.concatenate(junc_meshes) + junc.poly = union + intersect_meshes.append(union) + self.intersection_region = trimesh.util.concatenate(intersect_meshes) + else: + intersect_polys = [] + for junc in self.junctions.values(): + junc_polys = [self.roads[i].drivable_region for i in junc.paths] + assert junc_polys, junc + union = buffer_union(junc_polys, tolerance=self.tolerance) + if self.fill_intersections: + union = removeHoles(union) + assert union.is_valid + junc.poly = union + intersect_polys.append(union) + self.intersection_region = buffer_union( + intersect_polys, tolerance=self.tolerance + ) def heading_at(self, point): """Return the road heading at point.""" @@ -1269,7 +1710,7 @@ def heading_at(self, point): for road in self.roads.values(): if point.within(road.drivable_region.buffer(1)): return road.heading_at(point) - # raise RuntimeError('Point not in RoadMap: ', point) + raise RuntimeError("Point not in RoadMap: ", point) return 0 def __parse_lanes(self, lanes_elem): @@ -1513,6 +1954,26 @@ def parse(self, path): assert refLine road.ref_line = refLine + # Parse elevation: + elevation_profile = r.find("elevationProfile") + for elevation in elevation_profile.iter("elevation"): + s = float(elevation.get("s")) + a = float(elevation.get("a")) + b = float(elevation.get("b")) + c = float(elevation.get("c")) + d = float(elevation.get("d")) + road.elevation_poly.append((Poly3(a, b, c, d), s)) + + # Parse super elevation: + lateral_profile = r.find("lateralProfile") + for super_elevation in lateral_profile.iter("superElevation"): + s = float(super_elevation.get("s")) + a = float(super_elevation.get("a")) + b = float(super_elevation.get("b")) + c = float(super_elevation.get("c")) + d = float(super_elevation.get("d")) + road.lateral_poly.append((Poly3(a, b, c, d), s)) + # Parse lanes: lanes = r.find("lanes") for offset in lanes.iter("laneOffset"): @@ -1621,7 +2082,7 @@ def popLastSectionIfShort(l): new_links.append(link) self.road_links = new_links - def toScenicNetwork(self): + def toScenicNetwork(self, use2DMap=0): assert self.intersection_region is not None # Prepare registry of network elements @@ -1641,7 +2102,9 @@ def registerAll(elements): for id_, road in self.roads.items(): if road.drivable_region.is_empty: continue # not actually a road you can drive on - newRoad, elts = road.toScenicRoad(tolerance=self.tolerance) + else: + pass + newRoad, elts = road.toScenicRoad(tolerance=self.tolerance, use2DMap=use2DMap) registerAll(elts) (connectingRoads if road.junction else mainRoads)[id_] = newRoad roads[id_] = newRoad @@ -1802,6 +2265,7 @@ def registerAll(elements): endLane=outgoingLane, intersection=None, # will be patched once the Intersection is created ) + # maneuver = None # TODO: add maneuver type maneuversForLane[fromLane.lane].append(maneuver) # Gather maneuvers @@ -1828,6 +2292,16 @@ def cyclicOrder(elements, contactStart=None): return tuple(elem for elem, pt in pairs) # Create intersection + region = ( + MeshSurfaceRegion( + junction.poly, + centerMesh=False, + position=None, + orientation=roadDomain.Intersection._defaultHeadingAt, + ) + if not use2DMap + else PolygonalRegion(polygon=junction.poly) + ) intersection = roadDomain.Intersection( polygon=junction.poly, name=junction.name, @@ -1839,6 +2313,7 @@ def cyclicOrder(elements, contactStart=None): maneuvers=tuple(allManeuvers), signals=tuple(allSignals), crossings=(), # TODO add these + region=region, ) register(intersection) intersections[jid] = intersection @@ -1896,7 +2371,16 @@ def cyclicOrder(elements, contactStart=None): lane.maneuvers = (maneuver,) def combine(regions): - return PolygonalRegion.unionAll(regions, buf=self.tolerance) + if not use2DMap: + meshes = [m.polygon for m in regions] + regions = [r.region for r in regions] + combined = trimesh.util.concatenate(meshes) + orientation = VectorField.forUnionOf(regions, tolerance=self.tolerance) + return MeshSurfaceRegion( + combined, centerMesh=False, position=None, orientation=orientation + ) + else: + return PolygonalRegion.unionAll(regions, buf=self.tolerance) return roadDomain.Network( elements=allElements, @@ -1914,4 +2398,5 @@ def combine(regions): intersectionRegion=combine(intersections), crossingRegion=combine(crossings), sidewalkRegion=combine(sidewalks), + use2DMap=use2DMap, ) diff --git a/tests/domains/driving/conftest.py b/tests/domains/driving/conftest.py index 8877f6742..d228573ee 100644 --- a/tests/domains/driving/conftest.py +++ b/tests/domains/driving/conftest.py @@ -38,6 +38,18 @@ def cached_maps(tmpdir_factory): return paths +@pytest.fixture(scope="session") +def cached_maps3D(tmpdir_factory): + folder = tmpdir_factory.mktemp("maps3D") + paths = {} + for localMap in maps: + newPath = folder.join(localMap) + os.makedirs(newPath.dirname, exist_ok=True) + shutil.copyfile(localMap, newPath) + paths[localMap] = newPath + return paths + + @pytest.fixture(scope="session") def network(cached_maps, pytestconfig): if pytestconfig.getoption("--fast", False): @@ -45,4 +57,14 @@ def network(cached_maps, pytestconfig): else: path = mapFolder / "CARLA" / "Town03.xodr" path = cached_maps[str(path)] - return Network.fromFile(path) + return Network.fromFile(path, use2DMap=True) + + +@pytest.fixture(scope="session") +def network3D(cached_maps3D, pytestconfig): + if pytestconfig.getoption("--fast", False): + path = mapFolder / "CARLA" / "Town01.xodr" + else: + path = mapFolder / "CARLA" / "Town03.xodr" + path = cached_maps3D[str(path)] + return Network.fromFile(path, use2DMap=False) diff --git a/tests/domains/driving/test_driving.py b/tests/domains/driving/test_driving.py index 50d09266d..7b2c6371c 100644 --- a/tests/domains/driving/test_driving.py +++ b/tests/domains/driving/test_driving.py @@ -35,7 +35,7 @@ def compileDrivingScenario( - cached_maps, code="", useCache=True, path=None, mode2D=True, params={} + cached_maps, code="", useCache=True, path=None, mode2D=False, params={} ): if not path: path = mapFolder / "CARLA" / "Town01.xodr" @@ -55,34 +55,43 @@ def test_driving_2D_map(cached_maps): ) -def test_driving_3D(cached_maps): - with pytest.raises(RuntimeError): - compileDrivingScenario( - cached_maps, code=basicScenario, useCache=False, mode2D=False - ) +def test_driving_3D(cached_maps3D): + compileDrivingScenario( + cached_maps3D, code=basicScenario, useCache=False, mode2D=False + ) @pytest.mark.slow @pytest.mark.parametrize("path", map_params) -def test_opendrive(path, cached_maps): +@pytest.mark.parametrize("use2DMap", [False, True]) +def test_opendrive(path, cached_maps, cached_maps3D, use2DMap): try: # First, try the original .xodr file scenario = compileDrivingScenario( - cached_maps, path=path, code=basicScenario, useCache=False + cached_maps if use2DMap else cached_maps3D, + path=path, + code=basicScenario, + useCache=False, + mode2D=use2DMap, ) sampleScene(scenario, maxIterations=1000) # Second, try the cached version of the network scenario = compileDrivingScenario( - cached_maps, path=path, code=basicScenario, useCache=True + cached_maps if use2DMap else cached_maps3D, + path=path, + code=basicScenario, + useCache=True, + mode2D=use2DMap, ) sampleScene(scenario, maxIterations=1000) except TriangulationError: pytest.skip("need better triangulation library to run this test") -def test_elements_at(cached_maps): +@pytest.mark.parametrize("use2DMap", [True, False]) +def test_elements_at(cached_maps, cached_maps3D, use2DMap): scenario = compileDrivingScenario( - cached_maps, + cached_maps if use2DMap else cached_maps3D, """ ego = new Car posTuple = (ego.position.x, ego.position.y) @@ -96,6 +105,7 @@ def test_elements_at(cached_maps): param crossing = network.crossingAt(spot) param intersection = network.intersectionAt(spot) """, + mode2D=use2DMap, ) scene = sampleScene(scenario, maxIterations=1000) ego = scene.egoObject @@ -116,15 +126,17 @@ def test_elements_at(cached_maps): assert val is getattr(ego, param), param -def test_intersection(cached_maps): +@pytest.mark.parametrize("use2DMap", [True, False]) +def test_intersection(cached_maps, cached_maps3D, use2DMap): scenario = compileDrivingScenario( - cached_maps, + cached_maps if use2DMap else cached_maps3D, """ intersection = Uniform(*network.intersections) lane = Uniform(*intersection.incomingLanes) maneuver = Uniform(*lane.maneuvers) ego = new Car on maneuver.connectingLane.centerline """, + mode2D=use2DMap, ) for i in range(20): ego = sampleEgo(scenario, maxIterations=1000) @@ -136,23 +148,28 @@ def test_intersection(cached_maps): network = intersection.network assert network.elementAt(ego) is intersection directions = intersection.nominalDirectionsAt(ego) + maneuvers = intersection.maneuversAt(ego) + lane = ego.lane assert directions == network.nominalDirectionsAt(ego) assert any( ego.heading == pytest.approx(direction.yaw) for direction in directions ) - maneuvers = intersection.maneuversAt(ego) - lane = ego.lane + assert any(man.connectingLane is lane for man in maneuvers) -def test_curb(cached_maps): +# Tests end here +@pytest.mark.parametrize("use2DMap", [True, False]) +def test_curb(cached_maps, cached_maps3D, use2DMap): + scenario = compileDrivingScenario( - cached_maps, + cached_maps if use2DMap else cached_maps3D, """ ego = new Car spot = new OrientedPoint on visible curb new Car left of spot by 0.25 """, + mode2D=use2DMap, ) ego = sampleEgo(scenario, maxIterations=1000) directions = ego.element.network.nominalDirectionsAt(ego) @@ -160,7 +177,8 @@ def test_curb(cached_maps): @pytest.mark.slow -def test_caching(tmpdir): +@pytest.mark.parametrize("use2DMap", [True, False]) +def test_caching(tmpdir, use2DMap): """Test caching of road networks. In particular, make sure that links between network elements and maneuvers @@ -189,43 +207,120 @@ def test_caching(tmpdir): new Car on ego.lane.successor.centerline, with requireVisible False new Car on ego.lane.maneuvers[0].endLane.centerline, with requireVisible False """, - mode2D=True, + mode2D=use2DMap, ) sampleScene(scenario, maxIterations=1000) @pickle_test @pytest.mark.slow -def test_pickle(cached_maps): +@pytest.mark.parametrize("use2DMap", [True, False]) +def test_pickle(cached_maps, cached_maps3D, use2DMap): scenario = compileDrivingScenario( - cached_maps, + cached_maps if use2DMap else cached_maps3D, """ ego = new Car with behavior FollowLaneBehavior(target_speed=Range(10, 15)) new Pedestrian on visible sidewalk """, + mode2D=use2DMap, ) unpickled = tryPickling(scenario) scene = sampleScene(unpickled, maxIterations=1000) tryPickling(scene) -def test_invalid_road_scenario(cached_maps): +@pytest.mark.parametrize("use2DMap", [True, False]) +def test_invalid_road_scenario(cached_maps, cached_maps3D, use2DMap): with pytest.raises(InvalidScenarioError): scenario = compileDrivingScenario( - cached_maps, + cached_maps if use2DMap else cached_maps3D, """ ego = new Car at 80.6354425964952@-327.5431187869811 param foo = ego.oppositeLaneGroup.sidewalk """, + mode2D=use2DMap, ) with pytest.raises(InvalidScenarioError): # Set regionContainedIn to everywhere to hit driving domain specific code # instead of high level not contained in workspace rejection. scenario = compileDrivingScenario( - cached_maps, + cached_maps if use2DMap else cached_maps3D, """ ego = new Car at 10000@10000, with regionContainedIn everywhere param foo = ego.lane """, + mode2D=use2DMap, ) + + +def test_cars_at_underpass(cached_maps3D): + scenario = compileDrivingScenario( + cached_maps3D, + """ + ego = new Car on road, at (10, -12, 0), with regionContainedIn everywhere + """, + path=mapFolder / "CARLA" / "Town04.xodr", + ) + ego = sampleEgo(scenario, maxIterations=1000) + directions = ego.element.network.nominalDirectionsAt(ego) + # Check the car is under the overpass + assert ego.position.x > 8 and ego.position.x < 12 + assert ego.position.y < -10 and ego.position.y > -14 + assert ego.position.z < 5 + # Checks that road orientation is same as ego orientation + assert any(ego.heading == pytest.approx(direction.yaw) for direction in directions) + assert any( + ego.orientation.pitch == pytest.approx(direction.pitch) + for direction in directions + ) + assert any( + ego.orientation.roll == pytest.approx(direction.roll) for direction in directions + ) + + +def test_cars_at_overpass(cached_maps3D): + scenario = compileDrivingScenario( + # (10, -12, 100) projects car down to map + cached_maps3D, + """ + ego = new Car on road, at (10, -12, 100), with regionContainedIn everywhere + """, + path=mapFolder / "CARLA" / "Town04.xodr", + ) + ego = sampleEgo(scenario, maxIterations=1000) + directions = ego.element.network.nominalDirectionsAt(ego) + # Check the car is over the overpass + assert ego.position.x > 8 and ego.position.x < 12 + assert ego.position.y < -10 and ego.position.y > -14 + assert ego.position.z > 10 + # Checks that road orientation is same as ego orientation + assert any(ego.heading == pytest.approx(direction.yaw) for direction in directions) + assert any( + ego.orientation.pitch == pytest.approx(direction.pitch) + for direction in directions + ) + assert any( + ego.orientation.roll == pytest.approx(direction.roll) for direction in directions + ) + + +def test_car_on_slope(cached_maps3D): + scenario = compileDrivingScenario( + cached_maps3D, + """ + ego = new Car on road, at (200, -12, 0), with regionContainedIn everywhere + """, + path=mapFolder / "CARLA" / "Town04.xodr", + ) + ego = sampleEgo(scenario, maxIterations=1000) + directions = ego.element.network.nominalDirectionsAt(ego) + # Checks that road orientation is same as ego orientation + assert any(ego.heading == pytest.approx(direction.yaw) for direction in directions) + assert any( + ego.orientation.pitch == pytest.approx(direction.pitch) + for direction in directions + ) + assert any( + ego.orientation.roll == pytest.approx(direction.roll) for direction in directions + ) diff --git a/tests/domains/driving/test_network.py b/tests/domains/driving/test_network.py index 4f1394198..ca212fccd 100644 --- a/tests/domains/driving/test_network.py +++ b/tests/domains/driving/test_network.py @@ -28,14 +28,19 @@ def test_show2D(network): plt.close() -def test_element_tolerance(cached_maps, pytestconfig): - path = cached_maps[str(mapFolder / "CARLA" / "Town01.xodr")] - tol = 0.05 - network = Network.fromFile(path, tolerance=tol) - drivable = network.drivableRegion +@pytest.mark.parametrize("use2DMap", [True, False]) +def test_element_tolerance(cached_maps, cached_maps3D, pytestconfig, use2DMap): + path = ( + cached_maps[str(mapFolder / "CARLA" / "Town01.xodr")] + if use2DMap + else cached_maps3D[str(mapFolder / "CARLA" / "Town01.xodr")] + ) + tol = 0.10 + network = Network.fromFile(path, tolerance=tol, use2DMap=use2DMap) + drivable = network.drivableRegion.boundingPolygon toofar = drivable.buffer(2 * tol).difference(drivable.buffer(1.5 * tol)) - toofar_noint = toofar.difference(network.intersectionRegion) - road = network.roads[0] + toofar_noint = toofar.difference(network.intersectionRegion.boundingPolygon) + road = network.roads[0].boundingPolygon nearby = road.buffer(tol).difference(road) rounds = 30 if pytestconfig.getoption("--fast") else 300 for i in range(rounds): @@ -53,7 +58,9 @@ def test_element_tolerance(cached_maps, pytestconfig): assert not network.nominalDirectionsAt(pt) -def test_orientation_consistency(network): +@pytest.mark.parametrize("use2DMap", [True, False]) +def test_orientation_consistency(network, network3D, use2DMap): + network = network if use2DMap else network3D for i in range(30): pt = network.drivableRegion.uniformPointInner() dirs = network.nominalDirectionsAt(pt) @@ -95,7 +102,9 @@ def test_orientation_consistency(network): assert laneSec.orientation[pt] == pytest.approx(d) -def test_linkage(network): +@pytest.mark.parametrize("use2DMap", [True, False]) +def test_linkage(network, network3D, use2DMap): + network = network if use2DMap else network3D for road in network.roads: assert road.forwardLanes or road.backwardLanes assert road.is1Way == (not (road.forwardLanes and road.backwardLanes)) diff --git a/tests/simulators/carla/test_actions.py b/tests/simulators/carla/test_actions.py index 7914ad04a..929213128 100644 --- a/tests/simulators/carla/test_actions.py +++ b/tests/simulators/carla/test_actions.py @@ -122,3 +122,53 @@ def test_brake(getCarlaSimulator): simulation = simulator.simulate(scene) finalSpeed = simulation.result.records["CarSpeed"] assert finalSpeed == pytest.approx(0.0, abs=1e-1) + + +def test_cars_at_underpass(getCarlaSimulator): + simulator, town, mapPath = getCarlaSimulator("Town04") + code = f""" + param map = r'{mapPath}' + param carla_map = '{town}' + param time_step = 1.0/10 + + model scenic.simulators.carla.model + + ego = new Car on road, + at (10, 25, 0), + with regionContainedIn everywhere, + with behavior FollowLaneBehavior(target_speed = 20) + + record final ego.z as finalZ + terminate after 100 steps + """ + + scenario = compileScenic(code, mode2D=False) + scene = sampleScene(scenario) + simulation = simulator.simulate(scene) + finalZ = simulation.result.records["finalZ"] + assert finalZ < 5 + + +def test_cars_at_overpass(getCarlaSimulator): + simulator, town, mapPath = getCarlaSimulator("Town04") + code = f""" + param map = r'{mapPath}' + param carla_map = '{town}' + param time_step = 1.0/10 + + model scenic.simulators.carla.model + + ego = new Car on road, + at (-60, -6, 1), + with regionContainedIn everywhere, + with behavior FollowLaneBehavior(target_speed = 20) + + record final ego.z as finalZ + terminate after 100 steps + """ + + scenario = compileScenic(code, mode2D=False) + scene = sampleScene(scenario) + simulation = simulator.simulate(scene) + finalZ = simulation.result.records["finalZ"] + assert finalZ > 5 diff --git a/tests/simulators/carla/test_basic.py b/tests/simulators/carla/test_basic.py index 205865fc1..bdaa14893 100644 --- a/tests/simulators/carla/test_basic.py +++ b/tests/simulators/carla/test_basic.py @@ -1,5 +1,6 @@ import pytest +from scenic.core.distributions import InvalidScenarioError from tests.utils import compileScenic, pickle_test, sampleScene, tryPickling # Suppress potential warning about missing the carla package @@ -19,12 +20,38 @@ def test_map_param_parse(getAssetPath): assert scenario.params["carla_map"] == "Town01" -def test_basic(loadLocalScenario): - scenario = loadLocalScenario("basic.scenic", mode2D=True) +# Note: This will used the current cached version of the map, during testing this does not work with a cached 3D map +@pytest.mark.parametrize("use2DMap", [True, False]) +def test_basic(loadLocalScenario, use2DMap): + scenario = loadLocalScenario("basic.scenic", mode2D=use2DMap) scenario.generate(maxIterations=1000) +@pytest.mark.parametrize("use2DMap", [True, False]) +def test_car_created(loadLocalScenario, use2DMap): + scenario = loadLocalScenario("basic.scenic", mode2D=use2DMap) + scene = sampleScene(scenario, maxIterations=1000) + car = scene.egoObject + assert car + + +# A test to ensure that cars cannot clip through each other without causing errors +def test_car_clipping_3D(getAssetPath): + pytest.importorskip("carla") + mapPath = getAssetPath("maps/CARLA/Town01.xodr") + code = f""" + param map = r'{mapPath}' + param carla_map = 'Town01' + model scenic.simulators.carla.model + ego = new Car at (100,0,1), with regionContainedIn everywhere + car2 = new Car at (100,0,1), with regionContainedIn everywhere + """ + with pytest.raises(InvalidScenarioError): + compileScenic(code, mode2D=False) + + def test_simulator_import(): + pytest.importorskip("carla") from scenic.simulators.carla import CarlaSimulator diff --git a/tests/simulators/newtonian/test_newtonian.py b/tests/simulators/newtonian/test_newtonian.py index 98561c527..31220cf02 100644 --- a/tests/simulators/newtonian/test_newtonian.py +++ b/tests/simulators/newtonian/test_newtonian.py @@ -43,7 +43,7 @@ def test_gif_creation(loadLocalScenario): scenario = loadLocalScenario("driving.scenic", mode2D=True) scene, _ = scenario.generate(maxIterations=1000) path = Path("assets") / "maps" / "CARLA" / "Town01.xodr" - network = Network.fromFile(path) + network = Network.fromFile(path, use2DMap=True) simulator = NewtonianSimulator(render=True, network=network, export_gif=True) simulation = simulator.simulate(scene, maxSteps=100) gif_path = Path("") / "simulation.gif"