Skip to content
Draft
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@ dependencies = [
]

[dependency-groups]
dev = [
tests = [
"diff-cover>=10.2.0",
"pytest>=9.0.2",
"pytest-cases>=3.9.1",
"pytest-coverage>=0.0",
"pytest-env>=1.2.0",
"pyyaml>=6.0.3",
"ruff>=0.14.8",
]

lint = ["ruff>=0.14.8"]

dev = [
{include-group = "lint"},
{include-group = "tests"},
]

[build-system]
Expand Down
9 changes: 5 additions & 4 deletions scripts/fly-by.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
mics = ac.MicGeom()
grid = ac.RectGrid(x_min=-1, x_max=1, y_min=-1, y_max=1, z=1)

traj = ac.Trajectory(points={0: (-50, 0, 1), T: (50, 0, 1)})
traj = synth.SplineTrajectory(times=[0.0, T], locations=[[-50.0, 0.0, 1.0], [50.0, 0.0, 1.0]])
gen1 = ac.SineGenerator(freq=10, num_samples=ns, sample_freq=sf)
gen2 = ac.SineGenerator(freq=1, num_samples=ns, sample_freq=sf, amplitude=0.5, phase=-np.pi / 2)

mps1 = ac.MovingPointSource(signal=gen1, trajectory=traj, mics=mics)
mps2 = ac.MovingPointSource(signal=gen2, trajectory=traj, mics=mics)
ac_traj = ac.Trajectory(points={0: (-50, 0, 1), T: (50, 0, 1)})
mps1 = ac.MovingPointSource(signal=gen1, trajectory=ac_traj, mics=mics)
mps2 = ac.MovingPointSource(signal=gen2, trajectory=ac_traj, mics=mics)

mix = ac.SourceMixer(sources=[mps1, mps2])

Expand All @@ -43,7 +44,7 @@
mic = synth.Microphone()

scene = synth.Scene()
scene.environment = ac.Environment()
scene.environment = synth.Environment()
scene.microphones = [mic]
scene.sources = [source1, source2]

Expand Down
2 changes: 2 additions & 0 deletions src/scene_synthesis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@
from .microphones import Microphone as Microphone
from .scene import Scene as Scene
from .sources import Source as Source
from .trajectory import SplineTrajectory as SplineTrajectory
from .trajectory import Trajectory as Trajectory
Comment thread
jtodev marked this conversation as resolved.
2 changes: 1 addition & 1 deletion src/scene_synthesis/scene.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _source_state(self, source, sending_time):
"""Return source position and velocity at one sending time."""
if source.trajectory is not None:
source_loc = np.array(source.trajectory.location(sending_time)).T
source_vel = np.array(source.trajectory.location(sending_time, der=1)).T
source_vel = np.array(source.trajectory.velocity(sending_time)).T
else:
source_loc = np.array(source.location)
source_vel = np.array([0, 0, 0])
Expand Down
14 changes: 9 additions & 5 deletions src/scene_synthesis/sources.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
"""Acoustic source definition and properties."""

import numpy as np
from acoular import SignalGenerator, Trajectory
from acoular import SignalGenerator
from traits.api import CArray, HasStrictTraits, Instance

from scene_synthesis.directivities import Directivity
from scene_synthesis.trajectory import Trajectory

Comment thread
jtodev marked this conversation as resolved.

class Source(HasStrictTraits):
"""Class representing an acoustic source.

Examples
--------
Instantiate a simple source with a sine signal and a default trajectory:
Instantiate a simple source with a sine signal and a fixed trajectory:

>>> from acoular import SineGenerator, Trajectory
>>> from scene_synthesis.sources import Source
>>> from acoular import SineGenerator
>>> from scene_synthesis import Source, SplineTrajectory
>>> signal = SineGenerator(freq=1000, sample_freq=44100, num_samples=44100)
>>> trajectory = Trajectory()
>>> trajectory = SplineTrajectory(
... times=[0.0, 1.0],
... locations=[[0.0, 0.0, 0.0], [1.0, 0.0, 0.0]],
... )
>>> source = Source(signal=signal, trajectory=trajectory)
Comment thread
jtodev marked this conversation as resolved.
"""

Expand Down
176 changes: 176 additions & 0 deletions src/scene_synthesis/trajectory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""Trajectory definitions for scene synthesis."""

import numpy as np
from scipy.interpolate import make_interp_spline
from traits.api import Any, Array, Callable, HasStrictTraits, Property


class Trajectory(HasStrictTraits):
"""Trajectory with independently supplied location and velocity functions.

Parameters
----------
location : callable
Function mapping time ``t`` to a 3D position.
velocity : callable
Function mapping time ``t`` to a 3D velocity. This does not need to be
the time derivative of ``location``.

Notes
-----
The return values are normalized to the component-wise convention used by
the rest of scene-synthesis: ``[x, y, z]`` for scalar times and three
arrays ``[x(t), y(t), z(t)]`` for array-valued times.

Examples
--------
>>> import scene_synthesis as ss
>>> location = lambda t: np.stack(
... [np.asarray(t), np.zeros_like(t), np.ones_like(t)],
... axis=-1,
... )
>>> velocity = lambda t: np.stack(
... [np.ones_like(t), np.zeros_like(t), np.zeros_like(t)],
... axis=-1,
... )
>>> traj = ss.Trajectory(location=location, velocity=velocity)
>>> traj.location(0.5)
[array(0.5), array(0.), array(1.)]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the return type should not be a list but an array

>>> traj.velocity(0.5)
[array(1.), array(0.), array(0.)]
"""

#: Time-dependent position function.
location = Property(desc='time-dependent position function')
Comment thread
jtodev marked this conversation as resolved.
Outdated

#: Backing trait for :attr:`location`.
_location = Callable

#: Time-dependent velocity function.
velocity = Property(desc='time-dependent velocity function')

#: Backing trait for :attr:`velocity`.
_velocity = Callable

@staticmethod
def _normalize_output(value):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont understand why we need this function? what does it do?

"""Normalize trajectory outputs to three component arrays."""
if isinstance(value, (list, tuple)) and len(value) == 3:
return [np.asarray(component, dtype=float) for component in value]

array = np.asarray(value, dtype=float)
if array.shape == (3,):
return [np.asarray(array[0]), np.asarray(array[1]), np.asarray(array[2])]
if array.ndim >= 2 and array.shape[-1] == 3:
return [np.asarray(array[..., 0]), np.asarray(array[..., 1]), np.asarray(array[..., 2])]
if array.ndim >= 1 and array.shape[0] == 3:
return [np.asarray(array[0, ...]), np.asarray(array[1, ...]), np.asarray(array[2, ...])]

msg = f'Trajectory output must describe 3D coordinates, got shape {array.shape}.'
raise ValueError(msg)

def _get_location(self):
return lambda t: self._normalize_output(self._location(t))

def _set_location(self, value):
if not callable(value):
msg = 'location must be callable.'
raise ValueError(msg)
self._location = value

def _get_velocity(self):
return lambda t: self._normalize_output(self._velocity(t))

def _set_velocity(self, value):
if not callable(value):
msg = 'velocity must be callable.'
raise ValueError(msg)
self._velocity = value


class SplineTrajectory(Trajectory):
"""Spline-based trajectory built from sampled times and locations.

Parameters
----------
times : array-like of float
Sample times.
locations : array-like of float
Sample positions with shape ``(N, 3)`` matching ``times``.

Notes
-----
The location spline order is chosen automatically up to cubic, which keeps
the interpolated trajectory at least :math:`C^1` whenever the available
number of samples permits it.

Examples
--------
>>> import scene_synthesis as ss
>>> trajectory = ss.SplineTrajectory(
... times=[0.0, 1.0],
... locations=[[0.0, 0.0, 0.0], [1.0, 0.0, 0.0]],
... )
>>> trajectory.location(0.5)
[array(0.5), array(0.), array(0.)]
>>> trajectory.velocity(0.5)
[array(1.), array(0.), array(0.)]
"""

#: Sample times.
times = Array(dtype=float)

#: Sample locations with shape ``(N, 3)``.
locations = Array(dtype=float)

#: Internal spline objects.
_location_spline = Any
_velocity_spline = Any

def __init__(self, times, locations):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should avoid init definitions with the traits package

self.times = np.asarray(times, dtype=float)
self.locations = np.asarray(locations, dtype=float)
self._validate_inputs()
self._prepare_samples()

order = min(3, self.times.size - 1)
self._location_spline = make_interp_spline(self.times, self.locations, k=order, axis=0)
self._velocity_spline = self._location_spline.derivative()

super().__init__(location=self._location_spline.__call__, velocity=self._velocity_spline.__call__)

def _validate_inputs(self):
"""Validate spline trajectory inputs."""
if self.times.ndim != 1:
msg = f'times must be a one-dimensional array, got shape {self.times.shape}.'
raise ValueError(msg)
if self.times.size < 2:
msg = 'times must contain at least two samples.'
raise ValueError(msg)
if self.locations.shape != (self.times.size, 3):
msg = f'locations must have shape ({self.times.size}, 3), got {self.locations.shape}.'
raise ValueError(msg)

def _prepare_samples(self):
"""Sort sample times and merge exact duplicate times with identical locations."""
order = np.argsort(self.times)
sorted_times = self.times[order]
sorted_locations = self.locations[order]

unique_times = [sorted_times[0]]
unique_locations = [sorted_locations[0]]
for time, location in zip(sorted_times[1:], sorted_locations[1:], strict=True):
if time == unique_times[-1]:
if not np.allclose(location, unique_locations[-1]):
msg = 'duplicate times must map to identical locations.'
raise ValueError(msg)
Comment thread
jtodev marked this conversation as resolved.
continue
unique_times.append(time)
unique_locations.append(location)

self.times = np.asarray(unique_times, dtype=float)
self.locations = np.asarray(unique_locations, dtype=float)

if self.times.size < 2:
msg = 'times must contain at least two distinct samples.'
raise ValueError(msg)
37 changes: 25 additions & 12 deletions tests/cases_trajectory.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Test cases for source trajectories."""

import acoular as ac
import numpy as np
import scene_synthesis as ss


class Trajectories:
"""Test cases for all :class:`acoular.trajectories.Trajectory`-derived classes.
"""Test cases for :class:`scene_synthesis.trajectory.SplineTrajectory` objects.

Also goes over ``None`` (no trajectory).

New trajectories should be added here.
"""
Expand All @@ -16,26 +18,37 @@ def case_none(self):

def case_static(self):
"""Static trajectory test case."""
points = {0.0: (1.0, 0.0, 0.0), 1.0: (1.0, 0.0, 0.0)}
return [ac.Trajectory(points=points)]
times = [0.0, 1.0]
locations = [[1.0, 0.0, 0.0], [1.0, 0.0, 0.0]]
return [ss.SplineTrajectory(times=times, locations=locations)]

def case_linear_pass(self):
"""Linear pass trajectory (fly by) test case."""
points = {0.0: (-1.0, -1.0, 1.0), 1.0: (1.0, 1.0, 1.0)}
return [ac.Trajectory(points=points)]
times = [0.0, 1.0]
locations = [[-1.0, -1.0, 1.0], [1.0, 1.0, 1.0]]
return [ss.SplineTrajectory(times=times, locations=locations)]

def case_linear_approach(self):
"""Linear approach trajectory (fly at) test case."""
points = {0.0: (5.0, 0.0, 0.0), 1.0: (0.5, 0.0, 0.0)}
return [ac.Trajectory(points=points)]
times = [0.0, 1.0]
locations = [[5.0, 0.0, 0.0], [0.5, 0.0, 0.0]]
return [ss.SplineTrajectory(times=times, locations=locations)]

def case_circular(self):
"""Circular trajectory (fly around) test case."""
n = 3600
points = {i / n: (1.0 * np.cos(2 * np.pi * i / n), 1.0 * np.sin(2 * np.pi * i / n), 0.0) for i in range(n + 1)}
return [ac.Trajectory(points=points)]
times = np.linspace(0.0, 1.0, n + 1)
locations = np.column_stack(
[
np.cos(2 * np.pi * times),
np.sin(2 * np.pi * times),
np.zeros_like(times),
]
)
return [ss.SplineTrajectory(times=times, locations=locations)]

def case_static_array(self):
"""Static array trajectory test case."""
points_array = [{0.0: (x, y, 1.0), 1.0: (x, y, 1.0)} for x, y in [(1.0, 1.0), (1.0, 0.0), (0.0, 0.0)]]
return [ac.Trajectory(points=points) for points in points_array]
times = [0.0, 1.0]
static_points = [(1.0, 1.0, 1.0), (1.0, 0.0, 1.0), (0.0, 0.0, 1.0)]
return [ss.SplineTrajectory(times=times, locations=[point, point]) for point in static_points]
Loading
Loading