diff --git a/src/nifreeze/data/filtering.py b/src/nifreeze/data/filtering.py index 0f14604a..5108e71e 100644 --- a/src/nifreeze/data/filtering.py +++ b/src/nifreeze/data/filtering.py @@ -24,8 +24,13 @@ from __future__ import annotations +from numbers import Number + import numpy as np -from scipy.ndimage import median_filter +from nibabel import Nifti1Image, load +from nibabel.affines import apply_affine, voxel_sizes +from scipy.ndimage import gaussian_filter as _gs +from scipy.ndimage import map_coordinates, median_filter from skimage.morphology import ball DEFAULT_DTYPE = "int16" @@ -92,3 +97,224 @@ def advanced_clip( data = np.round(255 * data).astype(dtype) return data + + +def gaussian_filter( + data: np.ndarray, + vox_width: float | tuple[float, float, float], +) -> np.ndarray: + """ + Applies a Gaussian smoothing filter to a n-dimensional array. + + This function smooths the input data using a Gaussian filter with a specified + width (sigma) in voxels along each relevant dimension. It automatically + handles different data dimensionalities (2D, 3D, or 4D) and ensures that + smoothing is not applied along the time or orientation dimension (if present + in 4D data). + + Parameters + ---------- + data : :obj:`~numpy.ndarray` + The input data array. + vox_width : :obj:`float` or :obj:`tuple` of three :obj:`float` + The smoothing kernel width (sigma) in voxels. If a single :obj:`float` is provided, + it is applied uniformly across all spatial dimensions. Alternatively, a + tuple of three floats can be provided to specify different sigma values + for each spatial dimension (x, y, z). + + Returns + ------- + :obj:`~numpy.ndarray` + The smoothed data array. + + """ + + data = np.squeeze(data) # Drop unused dimensions + ndim = data.ndim + + if isinstance(vox_width, Number): + vox_width = tuple([vox_width] * min(3, ndim)) + + # Do not smooth across time/orientation (if present in 4D data) + if ndim == 4 and len(vox_width) == 3: + vox_width = (*vox_width, 0) + + return _gs(data, vox_width) + + +def downsample( + in_file: str, + shape: tuple[int, int, int], + smooth: bool | tuple[int, int, int] = True, + order: int = 3, + nonnegative: bool = True, +) -> Nifti1Image: + """ + Downsamples a 3D or 4D Nifti image by a specified downsampling factor. + + This function downsamples a Nifti image by averaging voxels within a user-defined + factor in each spatial dimension. It optionally applies Gaussian smoothing + before downsampling to reduce aliasing artifacts. The function also handles + updating the affine transformation matrix to reflect the change in voxel size. + + Parameters + ---------- + in_file : :obj:`str` + Path to the input NIfTI image file. + factor : :obj:`int` or :obj:`tuple` + The downsampling factor. If a single integer is provided, it is applied + uniformly across all spatial dimensions. Alternatively, a tuple of three + integers can be provided to specify different downsampling factors for each + spatial dimension (x, y, z). Values must be greater than 0. + smooth : :obj:`bool` or :obj:`tuple`, optional (default=``True``) + Controls application of Gaussian smoothing before downsampling. If True, + a smoothing kernel size equal to the downsampling factor is applied. + Alternatively, a tuple of three integers can be provided to specify + different smoothing kernel sizes for each spatial dimension. Setting to + False disables smoothing. + order : :obj:`int`, optional (default=3) + The order of the spline interpolation used for downsampling. Higher + orders provide smoother results but are computationally more expensive. + nonnegative : :obj:`bool`, optional (default=``True``) + If True, negative values in the downsampled data are set to zero. + + Returns + ------- + :obj:`~nibabel.Nifti1Image` + The downsampled NIfTI image object. + + """ + + imnii = load(in_file) + data = np.squeeze(imnii.get_fdata()) # Remove unused dimensions + datashape = np.array(data.shape) + shape = np.array(shape) + ndim = data.ndim + + if smooth: + if smooth is True: + smooth = datashape[:3] / shape[:3] + data = gaussian_filter(data, smooth) + + extents = np.abs( + apply_affine(imnii.affine, datashape - 1) + - apply_affine(imnii.affine, (0.0, 0.0, 0.0)) + ) + newzooms = extents / shape + + # Update affine transformation + newaffine = np.eye(4) + oldzooms = voxel_sizes(imnii.affine) + newaffine[:3, :3] = np.diag(newzooms / oldzooms) @ imnii.affine[:3, :3] + + # Update offset so new array is aligned with original + newaffine[:3, 3] = ( + apply_affine(imnii.affine, 0.5 * datashape) + - apply_affine(newaffine, 0.5 * shape) + ) + + xfm = np.linalg.inv(imnii.affine) @ newaffine + + # Create downsampled grid + down_grid = np.array( + np.meshgrid( + *[np.arange(_s, step=1) for _s in shape], + indexing="ij", + ) + ) + + # Locations is an Nx3 array of index coordinates of the original image where we sample + locations = apply_affine(xfm, down_grid.reshape((ndim, np.prod(shape))).T) + + # Resample data on the new grid + resampled = map_coordinates( + data, + locations.T, + order=order, + mode="mirror", + prefilter=True, + ).reshape(shape) + + # Set negative values to zero (optional) + if order > 2 and nonnegative: + resampled[resampled < 0] = 0 + + # Create new Nifti image with updated information + newnii = Nifti1Image(resampled, newaffine, imnii.header) + newnii.set_sform(newaffine, code=1) + newnii.set_qform(newaffine, code=1) + + return newnii + + +def decimate( + in_file: str, + factor: int | tuple[int, int, int], + smooth: bool | tuple[int, int, int] = True, + nonnegative: bool = True, +) -> Nifti1Image: + """ + Decimates a 3D or 4D Nifti image by a specified downsampling factor. + + This function downsamples a Nifti image by averaging voxels within a user-defined + factor in each spatial dimension. It optionally applies Gaussian smoothing + before downsampling to reduce aliasing artifacts. The function also handles + updating the affine transformation matrix to reflect the change in voxel size. + + Parameters + ---------- + in_file : :obj:`str` + Path to the input NIfTI image file. + factor : :obj:`int` or :obj:`tuple` + The downsampling factor. If a single integer is provided, it is applied + uniformly across all spatial dimensions. Alternatively, a tuple of three + integers can be provided to specify different downsampling factors for each + spatial dimension (x, y, z). Values must be greater than 0. + smooth : :obj:`bool` or :obj:`tuple`, optional (default=``True``) + Controls application of Gaussian smoothing before downsampling. If True, + a smoothing kernel size equal to the downsampling factor is applied. + Alternatively, a tuple of three integers can be provided to specify + different smoothing kernel sizes for each spatial dimension. Setting to + False disables smoothing. + nonnegative : :obj:`bool`, optional (default=``True``) + If True, negative values in the downsampled data are set to zero. + + Returns + ------- + :obj:`~nibabel.Nifti1Image` + The downsampled NIfTI image object. + + """ + + imnii = load(in_file) + data = np.squeeze(imnii.get_fdata()) # Remove unused dimensions + ndim = data.ndim + + if isinstance(factor, Number): + factor = tuple([factor] * min(3, ndim)) + + if any(f <= 0 for f in factor[:3]): + raise ValueError("All spatial downsampling factors must be positive.") + + if ndim == 4 and len(factor) == 3: + factor = (*factor, 0) + + if smooth: + if smooth is True: + smooth = factor + data = gaussian_filter(data, smooth) + + # Update affine transformation + newaffine = imnii.affine.copy() + newaffine[:3, :3] = np.array(factor[:3]) * newaffine[:3, :3] + + # Create new Nifti image with updated information + newnii = Nifti1Image( + data[::factor[0], ::factor[1], ::factor[2]], + newaffine, + imnii.header, + ) + newnii.set_sform(newaffine, code=1) + newnii.set_qform(newaffine, code=1) + + return newnii diff --git a/test/test_filtering.py b/test/test_filtering.py new file mode 100644 index 00000000..54c3aaab --- /dev/null +++ b/test/test_filtering.py @@ -0,0 +1,146 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2024 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +"""Unit tests exercising data filtering utilities.""" +import nibabel as nb +import numpy as np + +import pytest + +from nifreeze.data.filtering import decimate, downsample + + +@pytest.mark.parametrize( + ("size", "block_size"), + [ + ((20, 20, 20), (5, 5, 5),), + ((21, 21, 21), (5, 5, 5),), + ], +) +@pytest.mark.parametrize( + ("zoom_x", ), + [(1.0, ), (-1.0, ), (2.0, ), (-2.0, )], +) +@pytest.mark.parametrize( + ("zoom_y", ), + [(1.0, ), (-1.0, ), (2.0, ), (-2.0, )], +) +@pytest.mark.parametrize( + ("zoom_z", ), + [(1.0, ), (-1.0, ), (2.0, ), (-2.0, )], +) +@pytest.mark.parametrize( + ("angle_x", ), + [(0.0, ), (0.2, ), (-0.05, )], +) +@pytest.mark.parametrize( + ("angle_y", ), + [(0.0, ), (0.2, ), (-0.05, )], +) +@pytest.mark.parametrize( + ("angle_z", ), + [(0.0, ), (0.2, ), (-0.05, )], +) +@pytest.mark.parametrize( + ("offsets", ), + [ + (None, ), + ((0.0, 0.0, 0.0),), + ], +) +def test_decimation( + tmp_path, + size, + block_size, + zoom_x, + zoom_y, + zoom_z, + angle_x, + angle_y, + angle_z, + offsets, + outdir, +): + """Exercise decimation.""" + + # Calculate the number of sub-blocks in each dimension + num_blocks = [s // b for s, b in zip(size, block_size)] + + # Create the empty array + voxel_array = np.zeros(size, dtype=np.uint16) + + # Fill the array with increasing values based on sub-block position + current_block = 0 + for k in range(num_blocks[2]): + for j in range(num_blocks[1]): + for i in range(num_blocks[0]): + voxel_array[ + i * block_size[0]:(i + 1) * block_size[0], + j * block_size[1]:(j + 1) * block_size[1], + k * block_size[2]:(k + 1) * block_size[2] + ] = current_block + current_block += 1 + + fname = tmp_path / "test_img.nii.gz" + + affine = np.eye(4) + affine[:3, :3] = ( + nb.eulerangles.euler2mat(x=angle_x, y=angle_y, z=angle_z) + @ np.diag((zoom_x, zoom_y, zoom_z)) + @ affine[:3, :3] + ) + + if offsets is None: + affine[:3, 3] = -0.5 * nb.affines.apply_affine(affine, np.array(size) - 1) + + test_image = nb.Nifti1Image(voxel_array.astype(np.uint16), affine, None) + test_image.header.set_data_dtype(np.uint16) + test_image.to_filename(fname) + + # Need to define test oracle. For now, just see if it doesn't smoke. + out = decimate(fname, factor=2, smooth=False) + + out = downsample(fname, shape=(10, 10, 10), smooth=False, order=0) + + if outdir: + from niworkflows.interfaces.reportlets.registration import ( + SimpleBeforeAfterRPT as SimpleBeforeAfter, + ) + + out.to_filename(tmp_path / "decimated.nii.gz") + + SimpleBeforeAfter( + after_label="Decimated", + before_label="Original", + after=str(tmp_path / "decimated.nii.gz"), + before=str(fname), + out_report=str(outdir / f'decimated-{tmp_path.name}.svg'), + ).run() + + out.to_filename(tmp_path / "downsampled.nii.gz") + SimpleBeforeAfter( + after_label="Downsampled", + before_label="Original", + after=str(tmp_path / "downsampled.nii.gz"), + before=str(fname), + out_report=str(outdir / f'downsampled-{tmp_path.name}.svg'), + ).run()