Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add decimator and gaussian filter #32

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 227 additions & 1 deletion src/nifreeze/data/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@

from __future__ import annotations

from numbers import Number

import numpy as np
from scipy.ndimage import median_filter
from nibabel import Nifti1Image, load
from nibabel.affines import apply_affine, voxel_sizes
from scipy.ndimage import gaussian_filter as _gs
from scipy.ndimage import map_coordinates, median_filter
from skimage.morphology import ball

DEFAULT_DTYPE = "int16"
Expand Down Expand Up @@ -92,3 +97,224 @@
data = np.round(255 * data).astype(dtype)

return data


def gaussian_filter(
data: np.ndarray,
vox_width: float | tuple[float, float, float],
) -> np.ndarray:
"""
Applies a Gaussian smoothing filter to a n-dimensional array.

This function smooths the input data using a Gaussian filter with a specified
width (sigma) in voxels along each relevant dimension. It automatically
handles different data dimensionalities (2D, 3D, or 4D) and ensures that
smoothing is not applied along the time or orientation dimension (if present
in 4D data).

Parameters
----------
data : :obj:`~numpy.ndarray`
The input data array.
vox_width : :obj:`float` or :obj:`tuple` of three :obj:`float`
The smoothing kernel width (sigma) in voxels. If a single :obj:`float` is provided,
it is applied uniformly across all spatial dimensions. Alternatively, a
tuple of three floats can be provided to specify different sigma values
for each spatial dimension (x, y, z).

Returns
-------
:obj:`~numpy.ndarray`
The smoothed data array.

"""

data = np.squeeze(data) # Drop unused dimensions
ndim = data.ndim

Check warning on line 133 in src/nifreeze/data/filtering.py

View check run for this annotation

Codecov / codecov/patch

src/nifreeze/data/filtering.py#L132-L133

Added lines #L132 - L133 were not covered by tests

if isinstance(vox_width, Number):
vox_width = tuple([vox_width] * min(3, ndim))

Check warning on line 136 in src/nifreeze/data/filtering.py

View check run for this annotation

Codecov / codecov/patch

src/nifreeze/data/filtering.py#L136

Added line #L136 was not covered by tests

# Do not smooth across time/orientation (if present in 4D data)
if ndim == 4 and len(vox_width) == 3:
vox_width = (*vox_width, 0)

Check warning on line 140 in src/nifreeze/data/filtering.py

View check run for this annotation

Codecov / codecov/patch

src/nifreeze/data/filtering.py#L140

Added line #L140 was not covered by tests

return _gs(data, vox_width)

Check warning on line 142 in src/nifreeze/data/filtering.py

View check run for this annotation

Codecov / codecov/patch

src/nifreeze/data/filtering.py#L142

Added line #L142 was not covered by tests


def downsample(
in_file: str,
shape: tuple[int, int, int],
smooth: bool | tuple[int, int, int] = True,
order: int = 3,
nonnegative: bool = True,
) -> Nifti1Image:
"""
Downsamples a 3D or 4D Nifti image by a specified downsampling factor.

This function downsamples a Nifti image by averaging voxels within a user-defined
factor in each spatial dimension. It optionally applies Gaussian smoothing
before downsampling to reduce aliasing artifacts. The function also handles
updating the affine transformation matrix to reflect the change in voxel size.

Parameters
----------
in_file : :obj:`str`
Path to the input NIfTI image file.
factor : :obj:`int` or :obj:`tuple`
The downsampling factor. If a single integer is provided, it is applied
uniformly across all spatial dimensions. Alternatively, a tuple of three
integers can be provided to specify different downsampling factors for each
spatial dimension (x, y, z). Values must be greater than 0.
smooth : :obj:`bool` or :obj:`tuple`, optional (default=``True``)
Controls application of Gaussian smoothing before downsampling. If True,
a smoothing kernel size equal to the downsampling factor is applied.
Alternatively, a tuple of three integers can be provided to specify
different smoothing kernel sizes for each spatial dimension. Setting to
False disables smoothing.
order : :obj:`int`, optional (default=3)
The order of the spline interpolation used for downsampling. Higher
orders provide smoother results but are computationally more expensive.
nonnegative : :obj:`bool`, optional (default=``True``)
If True, negative values in the downsampled data are set to zero.

Returns
-------
:obj:`~nibabel.Nifti1Image`
The downsampled NIfTI image object.

"""

imnii = load(in_file)
data = np.squeeze(imnii.get_fdata()) # Remove unused dimensions
datashape = np.array(data.shape)
shape = np.array(shape)
ndim = data.ndim

if smooth:
if smooth is True:
smooth = datashape[:3] / shape[:3]
data = gaussian_filter(data, smooth)

Check warning on line 197 in src/nifreeze/data/filtering.py

View check run for this annotation

Codecov / codecov/patch

src/nifreeze/data/filtering.py#L196-L197

Added lines #L196 - L197 were not covered by tests

extents = np.abs(
apply_affine(imnii.affine, datashape - 1)
- apply_affine(imnii.affine, (0.0, 0.0, 0.0))
)
newzooms = extents / shape

# Update affine transformation
newaffine = np.eye(4)
oldzooms = voxel_sizes(imnii.affine)
newaffine[:3, :3] = np.diag(newzooms / oldzooms) @ imnii.affine[:3, :3]

# Update offset so new array is aligned with original
newaffine[:3, 3] = (
apply_affine(imnii.affine, 0.5 * datashape)
- apply_affine(newaffine, 0.5 * shape)
)

xfm = np.linalg.inv(imnii.affine) @ newaffine

# Create downsampled grid
down_grid = np.array(
np.meshgrid(
*[np.arange(_s, step=1) for _s in shape],
indexing="ij",
)
)

# Locations is an Nx3 array of index coordinates of the original image where we sample
locations = apply_affine(xfm, down_grid.reshape((ndim, np.prod(shape))).T)

# Resample data on the new grid
resampled = map_coordinates(
data,
locations.T,
order=order,
mode="mirror",
prefilter=True,
).reshape(shape)

# Set negative values to zero (optional)
if order > 2 and nonnegative:
resampled[resampled < 0] = 0

Check warning on line 240 in src/nifreeze/data/filtering.py

View check run for this annotation

Codecov / codecov/patch

src/nifreeze/data/filtering.py#L240

Added line #L240 was not covered by tests

# Create new Nifti image with updated information
newnii = Nifti1Image(resampled, newaffine, imnii.header)
newnii.set_sform(newaffine, code=1)
newnii.set_qform(newaffine, code=1)

return newnii


def decimate(
in_file: str,
factor: int | tuple[int, int, int],
smooth: bool | tuple[int, int, int] = True,
nonnegative: bool = True,
) -> Nifti1Image:
"""
Decimates a 3D or 4D Nifti image by a specified downsampling factor.

This function downsamples a Nifti image by averaging voxels within a user-defined
factor in each spatial dimension. It optionally applies Gaussian smoothing
before downsampling to reduce aliasing artifacts. The function also handles
updating the affine transformation matrix to reflect the change in voxel size.

Parameters
----------
in_file : :obj:`str`
Path to the input NIfTI image file.
factor : :obj:`int` or :obj:`tuple`
The downsampling factor. If a single integer is provided, it is applied
uniformly across all spatial dimensions. Alternatively, a tuple of three
integers can be provided to specify different downsampling factors for each
spatial dimension (x, y, z). Values must be greater than 0.
smooth : :obj:`bool` or :obj:`tuple`, optional (default=``True``)
Controls application of Gaussian smoothing before downsampling. If True,
a smoothing kernel size equal to the downsampling factor is applied.
Alternatively, a tuple of three integers can be provided to specify
different smoothing kernel sizes for each spatial dimension. Setting to
False disables smoothing.
nonnegative : :obj:`bool`, optional (default=``True``)
If True, negative values in the downsampled data are set to zero.

Returns
-------
:obj:`~nibabel.Nifti1Image`
The downsampled NIfTI image object.

"""

imnii = load(in_file)
data = np.squeeze(imnii.get_fdata()) # Remove unused dimensions
ndim = data.ndim

if isinstance(factor, Number):
factor = tuple([factor] * min(3, ndim))

if any(f <= 0 for f in factor[:3]):
raise ValueError("All spatial downsampling factors must be positive.")

Check warning on line 297 in src/nifreeze/data/filtering.py

View check run for this annotation

Codecov / codecov/patch

src/nifreeze/data/filtering.py#L297

Added line #L297 was not covered by tests

if ndim == 4 and len(factor) == 3:
factor = (*factor, 0)

Check warning on line 300 in src/nifreeze/data/filtering.py

View check run for this annotation

Codecov / codecov/patch

src/nifreeze/data/filtering.py#L300

Added line #L300 was not covered by tests

if smooth:
if smooth is True:
smooth = factor
data = gaussian_filter(data, smooth)

Check warning on line 305 in src/nifreeze/data/filtering.py

View check run for this annotation

Codecov / codecov/patch

src/nifreeze/data/filtering.py#L304-L305

Added lines #L304 - L305 were not covered by tests

# Update affine transformation
newaffine = imnii.affine.copy()
newaffine[:3, :3] = np.array(factor[:3]) * newaffine[:3, :3]

# Create new Nifti image with updated information
newnii = Nifti1Image(
data[::factor[0], ::factor[1], ::factor[2]],
newaffine,
imnii.header,
)
newnii.set_sform(newaffine, code=1)
newnii.set_qform(newaffine, code=1)

return newnii
146 changes: 146 additions & 0 deletions test/test_filtering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright 2024 The NiPreps Developers <[email protected]>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
# https://www.nipreps.org/community/licensing/
#
"""Unit tests exercising data filtering utilities."""
import nibabel as nb
import numpy as np

import pytest

from nifreeze.data.filtering import decimate, downsample


@pytest.mark.parametrize(
("size", "block_size"),
[
((20, 20, 20), (5, 5, 5),),
((21, 21, 21), (5, 5, 5),),
],
)
@pytest.mark.parametrize(
("zoom_x", ),
[(1.0, ), (-1.0, ), (2.0, ), (-2.0, )],
)
@pytest.mark.parametrize(
("zoom_y", ),
[(1.0, ), (-1.0, ), (2.0, ), (-2.0, )],
)
@pytest.mark.parametrize(
("zoom_z", ),
[(1.0, ), (-1.0, ), (2.0, ), (-2.0, )],
)
@pytest.mark.parametrize(
("angle_x", ),
[(0.0, ), (0.2, ), (-0.05, )],
)
@pytest.mark.parametrize(
("angle_y", ),
[(0.0, ), (0.2, ), (-0.05, )],
)
@pytest.mark.parametrize(
("angle_z", ),
[(0.0, ), (0.2, ), (-0.05, )],
)
@pytest.mark.parametrize(
("offsets", ),
[
(None, ),
((0.0, 0.0, 0.0),),
],
)
def test_decimation(
tmp_path,
size,
block_size,
zoom_x,
zoom_y,
zoom_z,
angle_x,
angle_y,
angle_z,
offsets,
outdir,
):
"""Exercise decimation."""

# Calculate the number of sub-blocks in each dimension
num_blocks = [s // b for s, b in zip(size, block_size)]

# Create the empty array
voxel_array = np.zeros(size, dtype=np.uint16)

# Fill the array with increasing values based on sub-block position
current_block = 0
for k in range(num_blocks[2]):
for j in range(num_blocks[1]):
for i in range(num_blocks[0]):
voxel_array[
i * block_size[0]:(i + 1) * block_size[0],
j * block_size[1]:(j + 1) * block_size[1],
k * block_size[2]:(k + 1) * block_size[2]
] = current_block
current_block += 1

fname = tmp_path / "test_img.nii.gz"

affine = np.eye(4)
affine[:3, :3] = (
nb.eulerangles.euler2mat(x=angle_x, y=angle_y, z=angle_z)
@ np.diag((zoom_x, zoom_y, zoom_z))
@ affine[:3, :3]
)

if offsets is None:
affine[:3, 3] = -0.5 * nb.affines.apply_affine(affine, np.array(size) - 1)

test_image = nb.Nifti1Image(voxel_array.astype(np.uint16), affine, None)
test_image.header.set_data_dtype(np.uint16)
test_image.to_filename(fname)

# Need to define test oracle. For now, just see if it doesn't smoke.
out = decimate(fname, factor=2, smooth=False)

out = downsample(fname, shape=(10, 10, 10), smooth=False, order=0)

if outdir:
from niworkflows.interfaces.reportlets.registration import (
SimpleBeforeAfterRPT as SimpleBeforeAfter,
)

out.to_filename(tmp_path / "decimated.nii.gz")

SimpleBeforeAfter(
after_label="Decimated",
before_label="Original",
after=str(tmp_path / "decimated.nii.gz"),
before=str(fname),
out_report=str(outdir / f'decimated-{tmp_path.name}.svg'),
).run()

out.to_filename(tmp_path / "downsampled.nii.gz")
SimpleBeforeAfter(
after_label="Downsampled",
before_label="Original",
after=str(tmp_path / "downsampled.nii.gz"),
before=str(fname),
out_report=str(outdir / f'downsampled-{tmp_path.name}.svg'),
).run()
Loading