Skip to content
9 changes: 5 additions & 4 deletions scripts/fly-by.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
mics = ac.MicGeom()
grid = ac.RectGrid(x_min=-1, x_max=1, y_min=-1, y_max=1, z=1)

traj = ac.Trajectory(points={0: (-50, 0, 1), T: (50, 0, 1)})
traj = synth.FixedTrajectory(points={0: (-50, 0, 1), T: (50, 0, 1)})
gen1 = ac.SineGenerator(freq=10, num_samples=ns, sample_freq=sf)
gen2 = ac.SineGenerator(freq=1, num_samples=ns, sample_freq=sf, amplitude=0.5, phase=-np.pi / 2)

mps1 = ac.MovingPointSource(signal=gen1, trajectory=traj, mics=mics)
mps2 = ac.MovingPointSource(signal=gen2, trajectory=traj, mics=mics)
ac_traj = ac.Trajectory(points={0: (-50, 0, 1), T: (50, 0, 1)})
mps1 = ac.MovingPointSource(signal=gen1, trajectory=ac_traj, mics=mics)
mps2 = ac.MovingPointSource(signal=gen2, trajectory=ac_traj, mics=mics)

mix = ac.SourceMixer(sources=[mps1, mps2])

Expand All @@ -43,7 +44,7 @@
mic = synth.Microphone()

scene = synth.Scene()
scene.environment = ac.Environment()
scene.environment = synth.Environment()
scene.microphones = [mic]
scene.sources = [source1, source2]

Expand Down
2 changes: 2 additions & 0 deletions src/scene_synthesis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@
from .microphones import Microphone as Microphone
from .scene import Scene as Scene
from .sources import Source as Source
from .trajectory import FixedTrajectory as FixedTrajectory
from .trajectory import Trajectory as Trajectory
11 changes: 6 additions & 5 deletions src/scene_synthesis/sources.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
"""Acoustic source definition and properties."""

import numpy as np
from acoular import SignalGenerator, Trajectory
from acoular import SignalGenerator
from traits.api import CArray, HasStrictTraits, Instance

from scene_synthesis.directivities import Directivity
from scene_synthesis.trajectory import Trajectory

Comment thread
jtodev marked this conversation as resolved.

class Source(HasStrictTraits):
"""Class representing an acoustic source.

Examples
--------
Instantiate a simple source with a sine signal and a default trajectory:
Instantiate a simple source with a sine signal and a fixed trajectory:

>>> from acoular import SineGenerator, Trajectory
>>> from scene_synthesis.sources import Source
>>> from acoular import SineGenerator
>>> from scene_synthesis import FixedTrajectory, Source
>>> signal = SineGenerator(freq=1000, sample_freq=44100, num_samples=44100)
>>> trajectory = Trajectory()
>>> trajectory = FixedTrajectory(points={0.0: (0.0, 0.0, 0.0), 1.0: (1.0, 0.0, 0.0)})
>>> source = Source(signal=signal, trajectory=trajectory)
Comment thread
jtodev marked this conversation as resolved.
"""

Expand Down
173 changes: 173 additions & 0 deletions src/scene_synthesis/trajectory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""Trajectory definitions for scene synthesis."""

import numpy as np
from scipy.interpolate import splev, splprep
from traits.api import Dict, Float, HasStrictTraits, Property, Tuple, cached_property, property_depends_on


class Trajectory(HasStrictTraits):
"""Abstract trajectory interface.

A trajectory maps time to a 3D position and optionally provides time
derivatives such as velocity. Concrete implementations may represent only a
point moving in the global frame or, in the future, trajectories derived
from richer motion/reference-frame descriptions.
"""

def location(self, t, der=0):
"""Evaluate the trajectory or one of its derivatives at time ``t``."""
raise NotImplementedError

def shift_by_offset(self, x_off):
"""Return a shifted copy of the trajectory."""
raise NotImplementedError


class FixedTrajectory(Trajectory):
"""Represent a point trajectory in a fixed frame of reference.

The trajectory is specified by a mapping from time instants to sampled
``(x, y, z)`` positions in the global frame. A spline is fit through those
samples and can then be evaluated at arbitrary times to obtain positions or
time derivatives such as velocity.

Notes
-----
- The spline order is chosen automatically based on the number of
available points, up to cubic interpolation.
- The frame of reference is fixed, i.e. the sampled positions are
interpreted directly as global coordinates.

Examples
--------
>>> import scene_synthesis as ss
>>> trajectory = ss.FixedTrajectory(points={0.0: (0.0, 0.0, 0.0), 1.0: (1.0, 0.0, 0.0)})
>>> trajectory.location(0.5)
[array(0.5), array(0.), array(0.)]
"""

#: Dictionary mapping time instants to sampled ``(x, y, z)`` positions.
points = Dict(
key_trait=Float,
value_trait=Tuple(Float, Float, Float),
)

#: Start and end time of the trajectory as ``(t_min, t_max)``.
interval = Property()

#: Internal spline representation returned by :func:`scipy.interpolate.splprep`.
tck = Property()

@property_depends_on(['points[]'])
def _get_interval(self):
Comment thread
jtodev marked this conversation as resolved.
if not self.points:
msg = 'Trajectory.points must contain at least one sampled position to compute an interval.'
raise ValueError(msg)
return np.sort(list(self.points.keys()))[np.r_[0, -1]]

@cached_property
@property_depends_on(['points[]'])
def _get_tck(self):
if len(self.points) < 2:
msg = 'Trajectory.points must contain at least two sampled positions to build a spline.'
raise ValueError(msg)
t = np.sort(list(self.points.keys()))
xp = np.array([self.points[i] for i in t]).T
k = min(3, len(self.points) - 1)
tcku = splprep(xp, u=t, s=0, k=k)
return tcku[0]
Comment thread
jtodev marked this conversation as resolved.

def location(self, t, der=0):
"""Evaluate the trajectory or one of its derivatives.

Parameters
----------
t : float or array-like of float
Time instant or time instants at which to evaluate the trajectory.
der : int, optional
Derivative order. Use ``0`` for position, ``1`` for velocity,
``2`` for acceleration, and so on. Defaults to ``0``.

Returns
-------
list[numpy.ndarray]
Three arrays representing the ``x``, ``y``, and ``z`` components
at the requested times.

Examples
--------
>>> import scene_synthesis as ss
>>> trajectory = ss.FixedTrajectory(points={0.0: (0.0, 0.0, 0.0), 1.0: (1.0, 0.0, 0.0)})
>>> trajectory.location(0.5)
[array(0.5), array(0.), array(0.)]
"""
return splev(t, self.tck, der)

def shift_by_offset(self, x_off):
"""Return a copy of the trajectory shifted by a constant 3D offset.

Parameters
----------
x_off : array-like of float
Offset added to every sampled point.

Returns
-------
FixedTrajectory
Shifted trajectory with the same time samples.
"""
offset = np.asarray(x_off, dtype=float)
# Validate that the offset is a 3D vector to avoid ambiguous NumPy broadcasting.
if offset.shape != (3,):
msg = f'x_off must be an array-like of shape (3,), got {offset.shape} instead.'
raise ValueError(msg)
shifted_points = {time: tuple(np.asarray(point, dtype=float) + offset) for time, point in self.points.items()}
return FixedTrajectory(points=shifted_points)

def traj(self, t_start, t_end=None, delta_t=None, der=0):
"""Iterate through trajectory samples over a time range.

Parameters
----------
t_start : float
Start time of the iteration. If ``delta_t`` is omitted, this value
is interpreted as the step size and the full trajectory interval is
used.
t_end : float, optional
End time of the iteration. Defaults to the end of
:attr:`interval`.
delta_t : float, optional
Time step between yielded samples. If omitted, ``t_start`` is used
as the step size for traversing the full trajectory interval.
der : int, optional
Derivative order to evaluate. Defaults to ``0``.

Yields
------
tuple[numpy.float64, numpy.float64, numpy.float64]
The interpolated ``(x, y, z)`` values at each sampled time.

Examples
--------
>>> import scene_synthesis as ss
>>> trajectory = ss.FixedTrajectory(points={0.0: (0.0, 0.0, 0.0), 1.0: (1.0, 0.0, 0.0)})
>>> samples = list(trajectory.traj(0.5))
>>> samples[0]
(np.float64(0.0), np.float64(0.0), np.float64(0.0))
>>> samples[1]
(np.float64(0.5), np.float64(0.0), np.float64(0.0))
>>> samples = list(trajectory.traj(0.0, 1.0, 0.5))
>>> samples[0]
(np.float64(0.0), np.float64(0.0), np.float64(0.0))
>>> samples[1]
(np.float64(0.5), np.float64(0.0), np.float64(0.0))
"""
if delta_t is None:
delta_t = t_start
t_start, t_end = self.interval
if delta_t <= 0:
msg = 'delta_t must be a positive time step.'
raise ValueError(msg)
if t_end is None:
t_end = self.interval[1]
yield from zip(*self.location(np.arange(t_start, t_end, delta_t), der), strict=True)
17 changes: 9 additions & 8 deletions tests/cases_trajectory.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Test cases for source trajectories."""

import acoular as ac
import numpy as np
import scene_synthesis as ss


class Trajectories:
"""Test cases for all :class:`acoular.trajectories.Trajectory`-derived classes.
"""Test cases for all :class:`scene_synthesis.trajectory.FixedTrajectory` objects.

New trajectories should be added here.
"""
Expand All @@ -17,25 +17,26 @@ def case_none(self):
def case_static(self):
"""Static trajectory test case."""
points = {0.0: (1.0, 0.0, 0.0), 1.0: (1.0, 0.0, 0.0)}
return [ac.Trajectory(points=points)]
return [ss.FixedTrajectory(points=points)]

def case_linear_pass(self):
"""Linear pass trajectory (fly by) test case."""
points = {0.0: (-1.0, -1.0, 1.0), 1.0: (1.0, 1.0, 1.0)}
return [ac.Trajectory(points=points)]
return [ss.FixedTrajectory(points=points)]

def case_linear_approach(self):
"""Linear approach trajectory (fly at) test case."""
points = {0.0: (5.0, 0.0, 0.0), 1.0: (0.5, 0.0, 0.0)}
return [ac.Trajectory(points=points)]
return [ss.FixedTrajectory(points=points)]

def case_circular(self):
"""Circular trajectory (fly around) test case."""
n = 3600
points = {i / n: (1.0 * np.cos(2 * np.pi * i / n), 1.0 * np.sin(2 * np.pi * i / n), 0.0) for i in range(n + 1)}
return [ac.Trajectory(points=points)]
return [ss.FixedTrajectory(points=points)]

def case_static_array(self):
"""Static array trajectory test case."""
points_array = [{0.0: (x, y, 1.0), 1.0: (x, y, 1.0)} for x, y in [(1.0, 1.0), (1.0, 0.0), (0.0, 0.0)]]
return [ac.Trajectory(points=points) for points in points_array]
base_trajectory = ss.FixedTrajectory(points={0.0: (0.0, 0.0, 1.0), 1.0: (0.0, 0.0, 1.0)})
offsets = [(1.0, 1.0, 0.0), (1.0, 0.0, 0.0), (0.0, 0.0, 0.0)]
return [base_trajectory.shift_by_offset(offset) for offset in offsets]
58 changes: 58 additions & 0 deletions tests/test_trajectory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Unit tests for the scene-synthesis trajectory class."""

import numpy as np
import pytest
import scene_synthesis as ss


def test_trajectory_interval_uses_point_bounds():
"""``FixedTrajectory.interval`` should expose the first and last point times."""
trajectory = ss.FixedTrajectory(points={0.5: (0.0, 0.0, 0.0), 2.0: (1.0, 0.0, 0.0), 1.0: (0.5, 0.0, 0.0)})

np.testing.assert_allclose(trajectory.interval, np.array([0.5, 2.0]))


def test_trajectory_location_matches_sampled_points():
"""``FixedTrajectory.location`` should pass through the sampled points."""
trajectory = ss.FixedTrajectory(points={0.0: (0.0, 0.0, 0.0), 1.0: (1.0, 2.0, 0.0), 2.0: (2.0, 4.0, 0.0)})

location = np.array(trajectory.location(1.0))

np.testing.assert_allclose(location, np.array([1.0, 2.0, 0.0]))


def test_trajectory_traj_iterates_over_requested_range():
"""``FixedTrajectory.traj`` should iterate over positions with the requested step size."""
trajectory = ss.FixedTrajectory(points={0.0: (0.0, 0.0, 0.0), 1.0: (1.0, 0.0, 0.0)})

samples = list(trajectory.traj(0.0, 1.0, 0.25))

assert len(samples) == 4
np.testing.assert_allclose(samples[0], (0.0, 0.0, 0.0))
np.testing.assert_allclose(samples[-1], (0.75, 0.0, 0.0))


def test_trajectory_location_requires_at_least_two_points():
"""``FixedTrajectory.location`` should raise a clear error for underspecified splines."""
trajectory = ss.FixedTrajectory(points={0.0: (0.0, 0.0, 0.0)})

with pytest.raises(ValueError, match='at least two sampled positions'):
trajectory.location(0.0)


def test_trajectory_shift_by_offset_moves_all_sampled_points():
"""``FixedTrajectory.shift_by_offset`` should shift all sampled points."""
trajectory = ss.FixedTrajectory(points={0.0: (0.0, 1.0, 2.0), 1.0: (1.0, 2.0, 3.0)})

shifted = trajectory.shift_by_offset((1.0, -1.0, 0.5))

np.testing.assert_allclose(np.array(shifted.location(0.0)), np.array([1.0, 0.0, 2.5]))
np.testing.assert_allclose(np.array(shifted.location(1.0)), np.array([2.0, 1.0, 3.5]))


def test_trajectory_shift_by_offset_requires_three_coordinates():
"""``FixedTrajectory.shift_by_offset`` should reject offsets with invalid shapes."""
trajectory = ss.FixedTrajectory(points={0.0: (0.0, 1.0, 2.0), 1.0: (1.0, 2.0, 3.0)})

with pytest.raises(ValueError, match=r'shape \(3,\)'):
trajectory.shift_by_offset(1.0)
Loading