Skip to content
Draft
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions scripts/fly-by.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
mics = ac.MicGeom()
grid = ac.RectGrid(x_min=-1, x_max=1, y_min=-1, y_max=1, z=1)

traj = ac.Trajectory(points={0: (-50, 0, 1), T: (50, 0, 1)})
traj = synth.SplineTrajectory(times=[0.0, T], locations=[[-50.0, 0.0, 1.0], [50.0, 0.0, 1.0]])
gen1 = ac.SineGenerator(freq=10, num_samples=ns, sample_freq=sf)
gen2 = ac.SineGenerator(freq=1, num_samples=ns, sample_freq=sf, amplitude=0.5, phase=-np.pi / 2)

mps1 = ac.MovingPointSource(signal=gen1, trajectory=traj, mics=mics)
mps2 = ac.MovingPointSource(signal=gen2, trajectory=traj, mics=mics)
ac_traj = ac.Trajectory(points={0: (-50, 0, 1), T: (50, 0, 1)})
mps1 = ac.MovingPointSource(signal=gen1, trajectory=ac_traj, mics=mics)
mps2 = ac.MovingPointSource(signal=gen2, trajectory=ac_traj, mics=mics)

mix = ac.SourceMixer(sources=[mps1, mps2])

Expand All @@ -43,7 +44,7 @@
mic = synth.Microphone()

scene = synth.Scene()
scene.environment = ac.Environment()
scene.environment = synth.Environment()
scene.microphones = [mic]
scene.sources = [source1, source2]

Expand Down
2 changes: 2 additions & 0 deletions src/scene_synthesis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@
from .microphones import Microphone as Microphone
from .scene import Scene as Scene
from .sources import Source as Source
from .trajectory import SplineTrajectory as SplineTrajectory
from .trajectory import Trajectory as Trajectory
Comment thread
jtodev marked this conversation as resolved.
2 changes: 1 addition & 1 deletion src/scene_synthesis/scene.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _source_state(self, source, sending_time):
"""Return source position and velocity at one sending time."""
if source.trajectory is not None:
source_loc = np.array(source.trajectory.location(sending_time)).T
source_vel = np.array(source.trajectory.location(sending_time, der=1)).T
source_vel = np.array(source.trajectory.velocity(sending_time)).T
else:
source_loc = np.array(source.location)
source_vel = np.array([0, 0, 0])
Expand Down
14 changes: 9 additions & 5 deletions src/scene_synthesis/sources.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
"""Acoustic source definition and properties."""

import numpy as np
from acoular import SignalGenerator, Trajectory
from acoular import SignalGenerator
from traits.api import CArray, HasStrictTraits, Instance

from scene_synthesis.directivities import Directivity
from scene_synthesis.trajectory import Trajectory

Comment thread
jtodev marked this conversation as resolved.

class Source(HasStrictTraits):
"""Class representing an acoustic source.

Examples
--------
Instantiate a simple source with a sine signal and a default trajectory:
Instantiate a simple source with a sine signal and a fixed trajectory:

>>> from acoular import SineGenerator, Trajectory
>>> from scene_synthesis.sources import Source
>>> from acoular import SineGenerator
>>> from scene_synthesis import Source, SplineTrajectory
>>> signal = SineGenerator(freq=1000, sample_freq=44100, num_samples=44100)
>>> trajectory = Trajectory()
>>> trajectory = SplineTrajectory(
... times=[0.0, 1.0],
... locations=[[0.0, 0.0, 0.0], [1.0, 0.0, 0.0]],
... )
>>> source = Source(signal=signal, trajectory=trajectory)
Comment thread
jtodev marked this conversation as resolved.
"""

Expand Down
176 changes: 176 additions & 0 deletions src/scene_synthesis/trajectory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""Trajectory definitions for scene synthesis."""

import numpy as np
from scipy.interpolate import make_interp_spline
from traits.api import Any, Array, Callable, HasStrictTraits, Property


class Trajectory(HasStrictTraits):
"""Trajectory with independently supplied location and velocity functions.

Parameters
----------
location : callable
Function mapping time ``t`` to a 3D position.
velocity : callable
Function mapping time ``t`` to a 3D velocity. This does not need to be
the time derivative of ``location``.

Notes
-----
The return values are normalized to the component-wise convention used by
the rest of scene-synthesis: ``[x, y, z]`` for scalar times and three
arrays ``[x(t), y(t), z(t)]`` for array-valued times.

Examples
--------
>>> import scene_synthesis as ss
>>> location = lambda t: np.stack(
... [np.asarray(t), np.zeros_like(t), np.ones_like(t)],
... axis=-1,
... )
>>> velocity = lambda t: np.stack(
... [np.ones_like(t), np.zeros_like(t), np.zeros_like(t)],
... axis=-1,
... )
>>> traj = ss.Trajectory(location=location, velocity=velocity)
>>> traj.location(0.5)
[array(0.5), array(0.), array(1.)]
>>> traj.velocity(0.5)
[array(1.), array(0.), array(0.)]
"""

#: Time-dependent position function.
location = Property(desc='time-dependent position function')

#: Backing trait for :attr:`location`.
_location = Callable

#: Time-dependent velocity function.
velocity = Property(desc='time-dependent velocity function')

#: Backing trait for :attr:`velocity`.
_velocity = Callable

@staticmethod
def _normalize_output(value):
"""Normalize trajectory outputs to three component arrays."""
if isinstance(value, (list, tuple)) and len(value) == 3:
return [np.asarray(component, dtype=float) for component in value]

array = np.asarray(value, dtype=float)
if array.shape == (3,):
return [np.asarray(array[0]), np.asarray(array[1]), np.asarray(array[2])]
if array.ndim >= 2 and array.shape[-1] == 3:
return [np.asarray(array[..., 0]), np.asarray(array[..., 1]), np.asarray(array[..., 2])]
if array.ndim >= 1 and array.shape[0] == 3:
return [np.asarray(array[0, ...]), np.asarray(array[1, ...]), np.asarray(array[2, ...])]

msg = f'Trajectory output must describe 3D coordinates, got shape {array.shape}.'
raise ValueError(msg)

def _get_location(self):
return lambda t: self._normalize_output(self._location(t))

def _set_location(self, value):
if not callable(value):
msg = 'location must be callable.'
raise ValueError(msg)
self._location = value

def _get_velocity(self):
return lambda t: self._normalize_output(self._velocity(t))

def _set_velocity(self, value):
if not callable(value):
msg = 'velocity must be callable.'
raise ValueError(msg)
self._velocity = value


class SplineTrajectory(Trajectory):
"""Spline-based trajectory built from sampled times and locations.

Parameters
----------
times : array-like of float
Sample times.
locations : array-like of float
Sample positions with shape ``(N, 3)`` matching ``times``.

Notes
-----
The location spline order is chosen automatically up to cubic, which keeps
the interpolated trajectory at least :math:`C^1` whenever the available
number of samples permits it.

Examples
--------
>>> import scene_synthesis as ss
>>> trajectory = ss.SplineTrajectory(
... times=[0.0, 1.0],
... locations=[[0.0, 0.0, 0.0], [1.0, 0.0, 0.0]],
... )
>>> trajectory.location(0.5)
[array(0.5), array(0.), array(0.)]
>>> trajectory.velocity(0.5)
[array(1.), array(0.), array(0.)]
"""

#: Sample times.
times = Array(dtype=float)

#: Sample locations with shape ``(N, 3)``.
locations = Array(dtype=float)

#: Internal spline objects.
_location_spline = Any
_velocity_spline = Any

def __init__(self, times, locations):
self.times = np.asarray(times, dtype=float)
self.locations = np.asarray(locations, dtype=float)
self._validate_inputs()
self._prepare_samples()

order = min(3, self.times.size - 1)
self._location_spline = make_interp_spline(self.times, self.locations, k=order, axis=0)
self._velocity_spline = self._location_spline.derivative()

super().__init__(location=self._location_spline.__call__, velocity=self._velocity_spline.__call__)

def _validate_inputs(self):
"""Validate spline trajectory inputs."""
if self.times.ndim != 1:
msg = f'times must be a one-dimensional array, got shape {self.times.shape}.'
raise ValueError(msg)
if self.times.size < 2:
msg = 'times must contain at least two samples.'
raise ValueError(msg)
if self.locations.shape != (self.times.size, 3):
msg = f'locations must have shape ({self.times.size}, 3), got {self.locations.shape}.'
raise ValueError(msg)

def _prepare_samples(self):
"""Sort sample times and merge duplicate times with identical locations."""
order = np.argsort(self.times)
sorted_times = self.times[order]
sorted_locations = self.locations[order]

unique_times = [sorted_times[0]]
unique_locations = [sorted_locations[0]]
for time, location in zip(sorted_times[1:], sorted_locations[1:], strict=True):
if np.isclose(time, unique_times[-1]):
if not np.allclose(location, unique_locations[-1]):
msg = 'duplicate times must map to identical locations.'
raise ValueError(msg)
Comment thread
jtodev marked this conversation as resolved.
continue
unique_times.append(time)
unique_locations.append(location)

self.times = np.asarray(unique_times, dtype=float)
self.locations = np.asarray(unique_locations, dtype=float)

if self.times.size < 2:
msg = 'times must contain at least two distinct samples.'
raise ValueError(msg)
35 changes: 23 additions & 12 deletions tests/cases_trajectory.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Test cases for source trajectories."""

import acoular as ac
import numpy as np
import scene_synthesis as ss


class Trajectories:
"""Test cases for all :class:`acoular.trajectories.Trajectory`-derived classes.
"""Test cases for all :class:`scene_synthesis.trajectory.SplineTrajectory` objects.
Comment thread
jtodev marked this conversation as resolved.
Outdated

New trajectories should be added here.
"""
Expand All @@ -16,26 +16,37 @@ def case_none(self):

def case_static(self):
"""Static trajectory test case."""
points = {0.0: (1.0, 0.0, 0.0), 1.0: (1.0, 0.0, 0.0)}
return [ac.Trajectory(points=points)]
times = [0.0, 1.0]
locations = [[1.0, 0.0, 0.0], [1.0, 0.0, 0.0]]
return [ss.SplineTrajectory(times=times, locations=locations)]

def case_linear_pass(self):
"""Linear pass trajectory (fly by) test case."""
points = {0.0: (-1.0, -1.0, 1.0), 1.0: (1.0, 1.0, 1.0)}
return [ac.Trajectory(points=points)]
times = [0.0, 1.0]
locations = [[-1.0, -1.0, 1.0], [1.0, 1.0, 1.0]]
return [ss.SplineTrajectory(times=times, locations=locations)]

def case_linear_approach(self):
"""Linear approach trajectory (fly at) test case."""
points = {0.0: (5.0, 0.0, 0.0), 1.0: (0.5, 0.0, 0.0)}
return [ac.Trajectory(points=points)]
times = [0.0, 1.0]
locations = [[5.0, 0.0, 0.0], [0.5, 0.0, 0.0]]
return [ss.SplineTrajectory(times=times, locations=locations)]

def case_circular(self):
"""Circular trajectory (fly around) test case."""
n = 3600
points = {i / n: (1.0 * np.cos(2 * np.pi * i / n), 1.0 * np.sin(2 * np.pi * i / n), 0.0) for i in range(n + 1)}
return [ac.Trajectory(points=points)]
times = np.linspace(0.0, 1.0, n + 1)
locations = np.column_stack(
[
np.cos(2 * np.pi * times),
np.sin(2 * np.pi * times),
np.zeros_like(times),
]
)
return [ss.SplineTrajectory(times=times, locations=locations)]

def case_static_array(self):
"""Static array trajectory test case."""
points_array = [{0.0: (x, y, 1.0), 1.0: (x, y, 1.0)} for x, y in [(1.0, 1.0), (1.0, 0.0), (0.0, 0.0)]]
return [ac.Trajectory(points=points) for points in points_array]
times = [0.0, 1.0]
static_points = [(1.0, 1.0, 1.0), (1.0, 0.0, 1.0), (0.0, 0.0, 1.0)]
return [ss.SplineTrajectory(times=times, locations=[point, point]) for point in static_points]
Loading
Loading