diff --git a/.codespellrc b/.codespellrc index 8f3c597..226875b 100644 --- a/.codespellrc +++ b/.codespellrc @@ -13,4 +13,5 @@ ignore-words-list = process, technik, thirdparty, - lamba + lamba, + sav diff --git a/docs/conf.py b/docs/conf.py index 28d1c4e..50a9e48 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -41,6 +41,7 @@ "sphinx_changelog", "sphinx_design", "sphinx_gallery.gen_gallery", + "sphinxcontrib.bibtex", ] # Add any paths that contain templates here, relative to this directory. @@ -77,6 +78,8 @@ autodoc_typehints = "description" autoclass_content = "init" +bibtex_bibfiles = ["references.bib"] + # -- Options for intersphinx extension --------------------------------------- # Example configuration for intersphinx: refer to the Python standard library. diff --git a/docs/reference/bibliography.rst b/docs/reference/bibliography.rst new file mode 100644 index 0000000..0272476 --- /dev/null +++ b/docs/reference/bibliography.rst @@ -0,0 +1,6 @@ +.. _bibliography_xrayvision: + +Bibliography +------------ + +.. bibliography:: diff --git a/docs/reference/index.rst b/docs/reference/index.rst index 389d57b..0da2a1c 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -11,5 +11,7 @@ Reference transform utils visibility + uv-smooth + bibliography ../whatsnew/index diff --git a/docs/reference/uv-smooth.rst b/docs/reference/uv-smooth.rst new file mode 100644 index 0000000..9bed2ce --- /dev/null +++ b/docs/reference/uv-smooth.rst @@ -0,0 +1,8 @@ +.. _uv-smooth: + +UV-SMOOTH ('xrayvision.uv_smooth') +********************************** + +The ``uv_smooth`` submodule contains the uv_smooth image reconstruction technique. + +.. automodapi:: xrayvision.uv_smooth diff --git a/docs/references.bib b/docs/references.bib new file mode 100644 index 0000000..c750ab3 --- /dev/null +++ b/docs/references.bib @@ -0,0 +1,11 @@ + @article{Massone2009_uv_smooth, + title={Hard X-ray Imaging of Solar Flares Using Interpolated Visibilities}, + volume={703}, + url={http://adsabs.harvard.edu/cgi-bin/nph-data_query?bibcode=2009ApJ...703.2004M&link_type=EJOURNAL}, + DOI={10.1088/0004-637x/703/2/2004}, + abstractNote={RHESSI produces solar flare images with the finest angular and spectral resolutions ever achieved at hard X-ray energies. Because this instrument uses indirect, collimator-based imaging techniques, the “native” output of which is in the form of “visibilities” (two-dimensional spatial Fourier components of the image), the development and application of robust, accurate, visibility-based image reconstruction techniques is required. Recognizing that the density of spatial-frequency (u, v) coverage by RHESSI is much sparser than that normally encountered in radio astronomy, we therefore introduce a method for image reconstruction from a relatively sparse distribution of sampled visibilities. The method involves spline interpolation at spatial frequencies less than the largest sampled frequency and the imposition of a positivity constraint on the image to reduce the ringing effects resulting from an unconstrained Fourier transform inversion procedure. Using simulated images consisting both of assumed mathematical forms and of the type of structure typically associated with solar flares, we validate the fidelity, accuracy, and robustness with which the new procedure recovers input images. The method faithfully recovers both single and multiple sources, both compact and extended, over a dynamic range of ~10:1. The performance of the method, which we term as uv_smooth, is compared with other RHESSI image reconstruction algorithms currently in use and its advantages summarized. We also illustrate the application of the method using RHESSI observations of four solar flares.}, + number={2}, + journal={The Astrophysical Journal}, + author={Massone, Anna Maria and Emslie, A Gordon and Hurford, G J and Prato, Marco and Kontar, Eduard P and Piana, Michele}, + year={2009}, + pages={2004–2016} } diff --git a/pyproject.toml b/pyproject.toml index 5da520d..ee9e9a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,14 +10,14 @@ build-backend = 'setuptools.build_meta' [project] name = "xrayvisim" description = "An open-source Python library for Fourier or synthesis X-Ray imaging instruments." -requires-python = ">=3.9" +requires-python = ">=3.11" readme = {file = "README.rst", content-type="text/x-rst"} license = "BSD-3-Clause" license-files = ["LICENSE.rst"] authors = [{name="Shane Maloney", email = "shane.maloney@dias.ie"}] dependencies = [ "astropy>=6.0.0", - "numpy>=1.24.0", + "numpy>=1.25.0", "packaging>=23.0", "scipy>=1.13", "xarray>=2023.5.0" @@ -59,7 +59,8 @@ docs = [ "sphinx-gallery>=0.13.0", "sphinxext-opengraph>=0.6.0", "xrayvisim[all]", - "pydata-sphinx-theme>=0.16.0" + "pydata-sphinx-theme>=0.16.0", + "sphinxcontrib-bibtex" ] dev = ["xrayvisim[tests,docs]"] @@ -91,7 +92,7 @@ version_file = "xrayvision/_version.py" [tool.mypy] disable_error_code = "import-untyped" -python_version = "py39" +python_version = "3.11" [ tool.gilesbot ] diff --git a/ruff.toml b/ruff.toml index 6b03df6..7d0f5c6 100644 --- a/ruff.toml +++ b/ruff.toml @@ -1,4 +1,4 @@ -target-version = "py39" +target-version = "py311" line-length = 120 exclude = [ ".git,", diff --git a/xrayvision/clean.py b/xrayvision/clean.py index 08ff541..59a8e22 100644 --- a/xrayvision/clean.py +++ b/xrayvision/clean.py @@ -7,7 +7,6 @@ """ -from typing import Union, Optional from collections.abc import Iterable import astropy.units as u @@ -71,11 +70,11 @@ def clean( dirty_map: Quantity, dirty_beam: Quantity, pixel_size: Quantity[u.arcsec / u.pix] = None, - clean_beam_width: Optional[Quantity[u.arcsec]] = 4.0 * u.arcsec, - gain: Optional[float] = 0.1, - thres: Optional[float] = None, + clean_beam_width: Quantity[u.arcsec] | None = 4.0 * u.arcsec, + gain: float | None = 0.1, + thres: float | None = None, niter: int = 5000, -) -> Union[Quantity, NDArray[np.float64]]: +) -> Quantity | NDArray[np.float64]: r""" Clean the image using Hogbom's original method. @@ -176,10 +175,10 @@ def vis_clean( vis: Visibilities, shape: Quantity[u.pix], pixel_size: Quantity[u.arcsec / u.pix], - clean_beam_width: Optional[Quantity[u.arcsec]] = 4.0, - niter: Optional[int] = 5000, - map: Optional[bool] = True, - gain: Optional[float] = 0.1, + clean_beam_width: Quantity[u.arcsec] | None = 4.0, + niter: int | None = 5000, + map: bool | None = True, + gain: float | None = 0.1, **kwargs, ): r""" @@ -258,12 +257,12 @@ def ms_clean( dirty_map: Quantity, dirty_beam: Quantity, pixel_size: Quantity[u.arcsec / u.pix], - scales: Union[Iterable, NDArray, None] = None, + scales: Iterable | NDArray | None = None, clean_beam_width: Quantity = 4.0 * u.arcsec, gain: float = 0.1, thres: float = 0.01, niter: int = 5000, -) -> Union[Quantity, NDArray[np.float64]]: +) -> Quantity | NDArray[np.float64]: r""" Clean the map using a multiscale clean algorithm. @@ -406,13 +405,13 @@ def vis_ms_clean( vis: Visibilities, shape: Quantity[u.pix], pixel_size: Quantity[u.arcsec / u.pix], - scales: Optional[Iterable], - clean_beam_width: Optional[Quantity[u.arcsec]] = 4.0, - niter: Optional[int] = 5000, - map: Optional[bool] = True, - gain: Optional[float] = 0.1, - thres: Optional[float] = 0.01, -) -> Union[Quantity, NDArray[np.float64]]: + scales: Iterable | None, + clean_beam_width: Quantity[u.arcsec] | None = 4.0, + niter: int | None = 5000, + map: bool | None = True, + gain: float | None = 0.1, + thres: float | None = 0.01, +) -> Quantity | NDArray[np.float64]: r""" Clean the visibilities using a multiscale clean method. diff --git a/xrayvision/conftest.py b/xrayvision/conftest.py index 25f4943..ff1f974 100644 --- a/xrayvision/conftest.py +++ b/xrayvision/conftest.py @@ -1,8 +1,6 @@ # Force MPL to use non-gui backends for testing. -import matplotlib - try: - pass + import matplotlib except ImportError: pass else: diff --git a/xrayvision/imaging.py b/xrayvision/imaging.py index a185d52..84ab0f9 100644 --- a/xrayvision/imaging.py +++ b/xrayvision/imaging.py @@ -1,5 +1,3 @@ -from typing import Optional - import astropy.units as apu import numpy as np from astropy.units import Quantity @@ -56,7 +54,7 @@ def get_weights(vis: Visibilities, scheme: str = "natural", norm: bool = True) - @apu.quantity_input() -def validate_and_expand_kwarg(q: Quantity, name: Optional[str] = "") -> Quantity: +def validate_and_expand_kwarg(q: Quantity, name: str | None = "") -> Quantity: r""" Expand a scalar or array of size one to size two by repeating. @@ -97,8 +95,8 @@ def image_to_vis( *, u: Quantity[apu.arcsec**-1], v: Quantity[apu.arcsec**-1], - phase_center: Optional[Quantity[apu.arcsec]] = (0.0, 0.0) * apu.arcsec, - pixel_size: Optional[Quantity[apu.arcsec / apu.pix]] = 1.0 * apu.arcsec / apu.pix, + phase_center: Quantity[apu.arcsec] | None = (0.0, 0.0) * apu.arcsec, + pixel_size: Quantity[apu.arcsec / apu.pix] | None = 1.0 * apu.arcsec / apu.pix, ) -> Visibilities: r""" Return a Visibilities object created from the image and u, v sampling. @@ -135,7 +133,7 @@ def image_to_vis( def vis_to_image( vis: Visibilities, shape: Quantity[apu.pix] = (65, 65) * apu.pixel, - pixel_size: Optional[Quantity[apu.arcsec / apu.pix]] = 1 * apu.arcsec / apu.pix, + pixel_size: Quantity[apu.arcsec / apu.pix] | None = 1 * apu.arcsec / apu.pix, scheme: str = "natural", ) -> Quantity: """ @@ -180,8 +178,8 @@ def vis_psf_map( vis: Visibilities, *, shape: Quantity[apu.pix] = (65, 65) * apu.pixel, - pixel_size: Optional[Quantity[apu.arcsec / apu.pix]] = 1 * apu.arcsec / apu.pix, - scheme: Optional[str] = "natural", + pixel_size: Quantity[apu.arcsec / apu.pix] | None = 1 * apu.arcsec / apu.pix, + scheme: str | None = "natural", ) -> GenericMap: r""" Create a map of the point spread function for given the visibilities. @@ -259,8 +257,8 @@ def vis_psf_image( def vis_to_map( vis: Visibilities, shape: Quantity[apu.pix] = (65, 65) * apu.pixel, - pixel_size: Optional[Quantity[apu.arcsec / apu.pix]] = 1 * apu.arcsec / apu.pixel, - scheme: Optional[str] = "natural", + pixel_size: Quantity[apu.arcsec / apu.pix] | None = 1 * apu.arcsec / apu.pixel, + scheme: str | None = "natural", ) -> GenericMap: r""" Create a map by performing a back projection of inverse transform on the visibilities. diff --git a/xrayvision/mem.py b/xrayvision/mem.py index d0353a1..b8636e6 100644 --- a/xrayvision/mem.py +++ b/xrayvision/mem.py @@ -3,7 +3,6 @@ """ from types import SimpleNamespace -from typing import Union, Optional import astropy.units as apu import numpy as np @@ -580,12 +579,12 @@ def mem( shape: Quantity[apu.pix], pixel_size: Quantity[apu.arcsec / apu.pix], *, - percent_lambda: Optional[Quantity[apu.percent]] = 0.02 * apu.percent, + percent_lambda: Quantity[apu.percent] | None = 0.02 * apu.percent, maxiter: int = 1000, tol: float = 1e-3, map: bool = True, - total_flux: Optional[Quantity] = None, -) -> Union[Quantity, NDArray[np.float64]]: + total_flux: Quantity | None = None, +) -> Quantity | NDArray[np.float64]: r""" Maximum Entropy Method visibility based image reconstruction diff --git a/xrayvision/simulation/__init__.py b/xrayvision/simulation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/xrayvision/simulation/instruments.py b/xrayvision/simulation/instruments.py new file mode 100644 index 0000000..3fa67aa --- /dev/null +++ b/xrayvision/simulation/instruments.py @@ -0,0 +1,22 @@ +import astropy.units as u +import numpy as np + + +def rhessi_like_uv_coverage(): + """ + Creates RHESSI-like u, v coverage coverage distribution. + """ + # Approximate RHESSI spatial frequencies (arcsec^-1) for subcollimators 1-9, + # based on typical RHESSI grid pitches of 2.26, 3.92, 6.79, 11.76, 20.36, 35.27, 61.08, 105.8, 183.2 arcsec. + isc = np.arange(0, 9) # index subcollimator (isc) detectors 1-9 + resolutions = [2.26, 3.92, 6.79, 11.76, 20.36, 35.27, 61.08, 105.8, 183.2] * u.arcsec + radii = 1 / resolutions + + # 32 evenly-spaced rotation angles per detector (simulates spacecraft rotation) + n_angles = 32 + angles = np.linspace(0, 2 * np.pi, n_angles, endpoint=False) + + u_comp = np.outer(radii, np.cos(angles)) + v_comp = np.outer(radii, np.sin(angles)) + + return {"u": u_comp, "v": v_comp, "det": isc[:, None] * np.ones(u_comp.shape)} diff --git a/xrayvision/simulation/tests/__init__.py b/xrayvision/simulation/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/xrayvision/simulation/tests/test_instruments.py b/xrayvision/simulation/tests/test_instruments.py new file mode 100644 index 0000000..5208724 --- /dev/null +++ b/xrayvision/simulation/tests/test_instruments.py @@ -0,0 +1,6 @@ +from xrayvision.simulation.instruments import rhessi_like_uv_coverage + + +def test_rhessi_like_uv_coverage(): + out = rhessi_like_uv_coverage() + assert out["u"].shape == out["v"].shape == out["det"].shape == (9, 32) diff --git a/xrayvision/tests/test_uv_smooth.py b/xrayvision/tests/test_uv_smooth.py new file mode 100644 index 0000000..95cd5b1 --- /dev/null +++ b/xrayvision/tests/test_uv_smooth.py @@ -0,0 +1,138 @@ +import astropy.units as apu +import hissw +import numpy as np +import pytest +from astropy.io import fits +from numpy.testing import assert_allclose + +from xrayvision.simulation.instruments import rhessi_like_uv_coverage +from xrayvision.uv_smooth import uv_smooth +from xrayvision.visibility import Visibilities, VisMeta + + +@pytest.mark.skip("needs local files") +def test_uv_smooth(): + hdul = fits.open("~/Downloads/hsi_vis_20020221_2357_0054_46tx3e.fits") + times = np.unique(hdul[-1].data["TRANGE"], axis=0) + + index = np.argwhere( + (np.all(hdul[3].data["TRANGE"] == times[7], axis=1)) & (np.all(hdul[3].data["ERANGE"] == [12.0, 25.0], axis=1)) + ) + vis_data = hdul[3].data[index.squeeze()] + + ############################################################################### + # Now lets filter by ISC or detector to remove possibly bad data in this case + # need to remove ISC 0 and 1. + vis_data = vis_data[vis_data["isc"] > 1] + vis_data = vis_data[vis_data["v"] > 0] + vis_data = vis_data[vis_data["obsvis"] != 0 + 0j] + + # vis_sav = readsav("/Users/sm/hsi_hsi_20020221_0006-0007_12-25.sav") + # vis_data = vis_sav["vis"] + ############################################################################### + # Now we can create the visibility object from the filtered visibilities. + meta = VisMeta({"isc": vis_data["isc"]}) + + vunit = apu.Unit("photon/(cm**2 s)") + vis = Visibilities( + visibilities=vis_data["obsvis"] * vunit, + u=vis_data["u"] / apu.arcsec, + v=vis_data["v"] / apu.arcsec, + phase_center=vis_data["xyoffset"][0] * apu.arcsec, + meta=meta, + amplitude_uncertainty=vis_data["sigamp"] * vunit, + ) + + image, vis = uv_smooth(vis) + print("here") + + +@pytest.fixture +def rhessi_like_gaussian_vis(): + """ + Synthetic RHESSI-like visibilities of a circular Gaussian source at the origin. + + Samples a circular Gaussian (flux=100 ph/cm²/s, sigma=5 arcsec) at u,v points + arranged on annuli at spatial frequencies corresponding to RHESSI subcollimators + 2-6, with 12 rotation samples per annulus. + + Returns + ------- + vis : Visibilities + Synthetic visibilities ready for uv_smooth. + flux : float + True source flux (ph/cm^2/s). + sigma : float + True source sigma (arcsec). + """ + uv = rhessi_like_uv_coverage() + # detectors 3-7 or isc 2-6 + u, v, isc = uv["u"][2:7].flatten(), uv["v"][2:7].flatten(), uv["det"][2:7].flatten() + + flux = 100.0 # ph/cm^2/s + sigma = 5.0 * apu.arcsec + + # V(u,v) = flux * exp(-2π²σ²(u²+v²)) [source at origin, so no phase term] + vis_vals = flux * np.exp(-2 * np.pi**2 * sigma**2 * (u**2 + v**2)) + + vis = Visibilities( + visibilities=vis_vals * apu.Unit("ph cm-2 s-1"), + u=u, + v=v, + meta=VisMeta({"isc": isc}), + ) + return vis, flux, sigma + + +def test_uv_smooth_peak_at_origin(rhessi_like_gaussian_vis): + vis, flux, sigma = rhessi_like_gaussian_vis + + image, _ = uv_smooth(vis, niter=50) + + # For a source centered at (0, 0) the peak should lie at the image center + peak_idx = np.unravel_index(np.argmax(image), image.shape) + center = np.array(image.shape) // 2 + + assert abs(peak_idx[0] - center[0]) <= 3, f"Peak row {peak_idx[0]} too far from image center {center[0]}" + assert abs(peak_idx[1] - center[1]) <= 3, f"Peak col {peak_idx[1]} too far from image center {center[1]}" + + +def test_uv_smooth_matches_gaussian(rhessi_like_gaussian_vis): + vis, flux, sigma = rhessi_like_gaussian_vis + sigma = sigma.value + image, _ = uv_smooth(vis, niter=50) + im_new = image.shape[0] + + # Build the reference Gaussian on the same pixel grid as the uv_smooth output + pixel_size = 1.0 # _uv_smooth_pixel_scale() # arcsec/pixel + coords = (np.arange(im_new) - im_new // 2) * pixel_size # arcsec + xx, yy = np.meshgrid(coords, coords) + ref_image = (100.0 / (2 * np.pi * sigma**2)) * np.exp(-0.5 * (xx**2 + yy**2) / sigma**2) + + np.testing.assert_allclose(image, ref_image, atol=0.05) + + +def test_uv_smooth_idl(rhessi_like_gaussian_vis): + vis, flux, sigma = rhessi_like_gaussian_vis + # vis.u + # vis.v + # vis.obsvis + # vis.isc + + ssw = hissw.Environment(ssw_packages=["hessi"]) + script = """ +visin = {u:{{u | list }}, v:{{v | list }}, obsvis:{{obsvis | list}}, isc:{{isc|list}}, xyoffset:[0,0], trange:[0,0]} +uv_smooth, visin, map, reconstructed_map_visibilities=visout + """ + out = ssw.run( + script=script, + args={ + "u": vis.u.value.tolist(), + "v": vis.v.value.tolist(), + "obsvis": vis.visibilities.value.tolist(), + "isc": vis.meta["isc"].tolist(), + }, + ) + image, _ = uv_smooth(vis, niter=50) + image_idl = out["map"]["data"][0] + assert_allclose(image, image_idl, atol=5e-5) diff --git a/xrayvision/tests/test_visibility.py b/xrayvision/tests/test_visibility.py index 037388b..6e9b104 100644 --- a/xrayvision/tests/test_visibility.py +++ b/xrayvision/tests/test_visibility.py @@ -5,6 +5,7 @@ from astropy.tests.helper import assert_quantity_allclose from astropy.time import Time from numpy.testing import assert_array_equal +from sunpy.util import SunpyDeprecationWarning import xrayvision.visibility as vm from xrayvision.visibility import Visibilities, Visibility @@ -22,7 +23,8 @@ def test_visibilities(): def test_visibility(): - vis = Visibility(vis=1 * apu.ct, u=1 / apu.deg, v=1 / apu.deg) + with pytest.warns(SunpyDeprecationWarning): + vis = Visibility(vis=1 * apu.ct, u=1 / apu.deg, v=1 / apu.deg) assert vis.vis == 1 * apu.ct assert vis.u == 1 / apu.deg assert vis.v == 1 / apu.deg diff --git a/xrayvision/transform.py b/xrayvision/transform.py index 749067b..57081c8 100644 --- a/xrayvision/transform.py +++ b/xrayvision/transform.py @@ -7,8 +7,6 @@ """ -from typing import Union, Optional - import astropy.units as apu import numpy as np import numpy.typing as npt @@ -22,8 +20,8 @@ def generate_xy( number_pixels: Quantity[apu.pix], *, - phase_center: Optional[Quantity[apu.arcsec]] = 0.0 * apu.arcsec, - pixel_size: Optional[Quantity[apu.arcsec / apu.pix]] = 1.0 * apu.arcsec / apu.pix, + phase_center: Quantity[apu.arcsec] | None = 0.0 * apu.arcsec, + pixel_size: Quantity[apu.arcsec / apu.pix] | None = 1.0 * apu.arcsec / apu.pix, ) -> Quantity[apu.arcsec]: """ Generate the x or y coordinates given the number of pixels, phase_center and pixel size. @@ -70,8 +68,8 @@ def generate_xy( def generate_uv( number_pixels: Quantity[apu.pix], *, - phase_center: Optional[Quantity[apu.arcsec]] = 0.0 * apu.arcsec, - pixel_size: Optional[Quantity[apu.arcsec / apu.pix]] = 1.0 * apu.arcsec / apu.pix, + phase_center: Quantity[apu.arcsec] | None = 0.0 * apu.arcsec, + pixel_size: Quantity[apu.arcsec / apu.pix] | None = 1.0 * apu.arcsec / apu.pix, ) -> Quantity[1 / apu.arcsec]: """ Generate the u or v coordinates given the number of pixels, phase_center and pixel size. @@ -124,13 +122,13 @@ def generate_uv( @apu.quantity_input() def dft_map( - input_array: Union[Quantity, npt.NDArray], + input_array: Quantity | npt.NDArray, *, u: Quantity[1 / apu.arcsec], v: Quantity[1 / apu.arcsec], phase_center: Quantity[apu.arcsec] = (0.0, 0.0) * apu.arcsec, pixel_size: Quantity[apu.arcsec / apu.pix] = (1.0, 1.0) * apu.arcsec / apu.pix, -) -> Union[Quantity, npt.NDArray]: +) -> Quantity | npt.NDArray: r""" Discrete Fourier transform in terms of coordinates returning 1-D array complex visibilities. @@ -183,15 +181,15 @@ def dft_map( @apu.quantity_input def idft_map( - input_vis: Union[Quantity, npt.NDArray], + input_vis: Quantity | npt.NDArray, *, u: Quantity[1 / apu.arcsec], v: Quantity[1 / apu.arcsec], shape: Quantity[apu.pix], - weights: Optional[npt.NDArray] = None, + weights: npt.NDArray | None = None, phase_center: Quantity[apu.arcsec] = (0.0, 0.0) * apu.arcsec, pixel_size: Quantity[apu.arcsec / apu.pix] = (1.0, 1.0) * apu.arcsec / apu.pix, -) -> Union[Quantity, npt.NDArray]: +) -> Quantity | npt.NDArray: r""" Inverse discrete Fourier transform in terms of coordinates returning a 2D real array or image. diff --git a/xrayvision/utils.py b/xrayvision/utils.py index bb9789f..a249b6d 100644 --- a/xrayvision/utils.py +++ b/xrayvision/utils.py @@ -1,8 +1,7 @@ import logging -from typing import Union, Optional -def get_logger(name: str, level: Optional[Union[int, str]] = logging.WARNING) -> logging.Logger: +def get_logger(name: str, level: int | str | None = logging.WARNING) -> logging.Logger: """ Return a configured logger instance. diff --git a/xrayvision/uv_smooth.py b/xrayvision/uv_smooth.py new file mode 100644 index 0000000..2593372 --- /dev/null +++ b/xrayvision/uv_smooth.py @@ -0,0 +1,484 @@ +import logging + +import numpy as np +from numpy.fft import fft2, fftshift, ifft2, ifftshift +from scipy.interpolate import RBFInterpolator + +from xrayvision.visibility import Visibilities + +__all__ = ["uv_smooth"] + +logger = logging.getLogger(__name__) + + +def uv_smooth(vis: Visibilities, niter: int = 50): + r""" + uv_smooth image reconstruction method. + + This method reconstructs images from sparse Fourier-domain visibilities using an iterative Landweber scheme. It + interpolates visibilities onto a regular uv grid, applies smoothing to stabilize the reconstruction, and + iteratively refines the image while enforcing positivity. + + Parameters + ---------- + vis : + Input visibilities. + niter : + Maximum number of iterations. + + References + ---------- + * :cite:t:`Massone2009_uv_smooth` + + Returns + ------- + Reconstructed 2D image of the X-ray source. + + Notes + ----- + UV_smooth solves the ill-posed image reconstruction problem in the uv-plane + through the following steps: + + 1. **Visibility Gridding**: Sparse visibilities `V(u_i, v_i)` are interpolated onto a regular uv grid `V_grid(u,v)` + using a smoothing kernel `K(u,v)`: `V_grid(u,v) = sum_i V(u_i,v_i) * K(u-u_i, v-v_i)` + + 2. **Smoothing / Regularization**: The kernel enforces smoothness in the uv-plane to mitigate noise and compensate + for sparse coverage. + + 3. **Iterative Landweber Update**: The image `I_n(x,y)` at iteration n is updated using the Landweber scheme: + `I_{n+1} = I_n + λ * F^{-1}[ V_grid - F[I_n] ]` + where `F` and `F^{-1}` are the Fourier and inverse Fourier transforms, and λ is a relaxation parameter. After + each iteration, positivity is enforced: `I_{n+1} = max(0, I_{n+1})` + + 4. **Stopping Criterion**: Iteration continues until either: + `|| F[I_{n+1}] - V_grid || < tolerance` + or the maximum number of iterations `max_iter` is reached. This residual norm ensures that updates stop when the + reconstruction is consistent with the measured visibilities. + """ + + pixel = 0.0005 + detmin = min(vis.meta["isc"]) + if detmin == 0: + # fov = 0.45 + pixel = pixel * 2.0 + N = 450 + + if detmin == 1: + # fov = 0.26 + pixel = pixel * 2.0 + N = 260 + + if detmin >= 2: + # fov = 0.16 + N = 320 + + r = np.sqrt(vis.u**2 + vis.v**2).value + + # Construct new u`, v` grid to interpolate + Ulimit = (N / 2 - 1) * pixel + pixel / 2 + usampl = -Ulimit + np.arange(N) * pixel + vsampl = usampl + uu, vv = np.meshgrid(usampl, vsampl) + + # Interpolate real and imag components onto new grid + uv_obs = np.vstack([vis.u, vis.v]).T + uv_samp = np.vstack([uu.flatten(), vv.flatten()]).T + interpolator = RBFInterpolator(uv_obs, vis.visibilities.real / (4 * np.pi**2)) + real_interp = interpolator(uv_samp) + interpolator = RBFInterpolator(uv_obs, vis.visibilities.imag / (4 * np.pi**2)) + imag_interp = interpolator(uv_samp) + vis_interp = (real_interp + 1j * imag_interp).reshape((N, N)) + + # Set any component outside the original uv sampling to 0 + vis_interp[np.sqrt(uu**2 + vv**2) > r.max()] = 0j + + # fov = 0.96 + # Define new grid to zero pad the visibilities + Nnew = 1920 + Ulimit = (Nnew / 2 - 1) * pixel + pixel / 2 + xpix = -Ulimit + np.arange(Nnew) * pixel + ypix = xpix + + # use np.pad ? + intzpadd = np.zeros((Nnew, Nnew), dtype=np.complex128) + intzpadd[(Nnew - N) // 2 : (Nnew - N) // 2 + N, (Nnew - N) // 2 : (Nnew - N) // 2 + N] = vis_interp + + im_new = Nnew // 15 + intznew = np.zeros((im_new, im_new), dtype=np.complex128) + xpixnew = np.zeros(im_new) + ypixnew = np.zeros(im_new) + + for i in range(im_new): + xpixnew[i] = xpix[15 * i + 7] + ypixnew[i] = ypix[15 * i + 7] + for j in range(im_new): + intznew[i, j] = intzpadd[15 * i + 7, 15 * j + 7] + + # xx, yy = np.meshgrid(np.arange(im_new), np.arange(im_new), indexing="xy") + # intznew = intzpadd[(15 * xx) + 7, (15 * yy) + 7] + # xpixnew = xpix[(15 * xx[0,:]) + 7] + # ypixnew = ypix[(15 * yy[:,0]) + 7] + + # OMEGA = (xpixnew[im_new - 1] - xpixnew[0]) / 2 + # X = im_new // (4 * OMEGA) + deltaomega = (xpixnew[im_new - 1] - xpixnew[0]) / im_new + # deltax = 2 * X / im_new + + g = intznew[:, :] + # fftInverse = 4 * np.pi**2 * deltaomega * deltaomega * fft2(ifftshift(intznew)).real + # fftInverse = fftshift(fftInverse) + + # Characteristic function of disk + chi = np.zeros((im_new, im_new), dtype=np.complex128) + chi[np.sqrt(xpixnew.reshape(-1, 1) ** 2 + ypixnew.reshape(1, -1) ** 2) < r.max()] = 1 + 0j + + # Landweber iterations + map_actual = np.zeros((im_new, im_new), dtype=np.complex128) + map_iteration = np.zeros((niter, im_new, im_new)) + map_solution = np.zeros((im_new, im_new)) + + # relaxation parameter + tau = 0.2 + + descent = np.zeros(niter - 1) + normAf_g = np.zeros(niter) + + # iteration 0: Inverse Fourier Transform of the initial solution + map_shifted = ifftshift(map_actual) + F_Trasf_shifted = ifft2(map_shifted) / (4 * np.pi**2 * deltaomega * deltaomega) + F_Trasf = fftshift(F_Trasf_shifted) + + for iter in range(niter): + # Update rule + F_Trasf_up = F_Trasf + tau * (g - chi * F_Trasf) + F_Trasf = F_Trasf_up + + # FFT of the updated soln + F_Trasf_shifted = ifftshift(F_Trasf) + map_shifted = fft2(F_Trasf_shifted) * 4 * np.pi**2 * deltaomega * deltaomega + map_actual = fftshift(map_shifted) + + # Project solution onto subset of positive solutions (positivity constraint) + map_actual[map_actual.real < 0] = 0 + 0j + map_iteration[iter, :, :] = map_actual.real + + map_shifted = ifftshift(map_actual) + F_Trasf_shifted = ifft2(map_shifted) / (4 * np.pi**2 * deltaomega * deltaomega) + F_Trasf = fftshift(F_Trasf_shifted) + + # Stopping criterion based on the descent of ||Af - g|| + Af_g = chi * F_Trasf - g + normAf_g[iter] = np.sqrt(np.sum(np.abs(Af_g) * np.abs(Af_g))) + if iter >= 1: + with np.errstate(divide="ignore", invalid="ignore"): + descent[iter - 1] = (normAf_g[iter - 1] - normAf_g[iter]) / normAf_g[iter - 1] + if descent[iter - 1] < 0.02: + logger.info("Converge at iteration %d", iter) + break + else: + logger.info("Max iterations reached %d", iter) + + # output + F_Trasf = 4 * np.pi**2 * F_Trasf + + if iter == niter - 1: + map_solution[:, :] = map_iteration[14, :, :].real + else: + map_solution = map_actual.real + + return map_solution, F_Trasf + + +def uv_smooth_flexible(vis: Visibilities, niter: int = 50, pixel_size: float = 1, image_dim: int = 128): + """ + UV smooth with user-specified image dimensions and pixel size. + + Parameters + ---------- + vis : Visibilities + Input visibilities + niter : int + Number of Landweber iterations + pixel_size : float, optional + Desired pixel size in arcseconds for the output image. + If None, automatically determined. + image_dim : int + Desired output image dimension in pixels (default: 128) + + Returns + ------- + map_solution : ndarray + Reconstructed image of size (image_dim, image_dim) + pixel_size_arcsec : float + Actual pixel size in arcseconds + """ + + # Calculate maximum UV radius + # r = np.sqrt(vis.u**2 + vis.v**2).value + # r_max = r.max() + + # Determine UV pixel size + if pixel_size is not None: + # Convert arcsec to arcsec^-1 for UV space + # For an image of size L arcsec, the UV sampling is 1/L arcsec^-1 + fov_arcsec = pixel_size * image_dim + uv_pixel = 1.0 / fov_arcsec + else: + # Use default based on detector + uv_pixel = 0.0005 + detmin = min(vis.meta["isc"]) + if detmin <= 1: + uv_pixel = uv_pixel * 2.0 + + # Call the main function with appropriate parameters + map_solution, F_Trasf, xpix, ypix = uv_smooth_new(vis, niter=niter, pixel_size=uv_pixel, shape=image_dim) + + # Calculate actual pixel size + fov_uv = xpix[-1] - xpix[0] + pixel_size_arcsec = 1.0 / fov_uv + + return map_solution, pixel_size_arcsec + + +def uv_smooth_new( + vis: Visibilities, niter: int = 50, pixel_size: float = 1, shape: int = 128, natural_weighting: bool = True +): + """ + UV smooth algorithm with flexible image dimensions. + + Parameters + ---------- + vis : Visibilities + Input visibilities + niter : int + Number of Landweber iterations (default: 50) + pixel_size : float, optional + Pixel size in UV space (arcsec^-1). If None, automatically determined + from detector configuration. + shape : int, optional + Final image dimension in pixels. If None, automatically determined + from detector configuration. + natural_weighting : bool + Apply natural weighting based on UV coverage (default: True) + + Returns + ------- + map_solution : ndarray + Reconstructed image + F_Trasf : ndarray + Final Fourier transform + + """ + + # Determine pixel size if not provided + if pixel_size is None: + pixel_size = 0.0005 + detmin = min(vis.meta["isc"]) + if detmin <= 1: + pixel_size = pixel_size * 2.0 + + # Calculate maximum UV radius from the data + r = np.sqrt(vis.u**2 + vis.v**2).value + r_max = r.max() + + # Determine appropriate grid size for interpolation + # This should be large enough to capture the UV coverage + # Rule of thumb: make sure pixel * N/2 > r_max + if shape is None: + # Auto-determine based on detector configuration + detmin = min(vis.meta["isc"]) + if detmin == 0: + N = 450 + elif detmin == 1: + N = 260 + else: + N = 320 + else: + # Calculate N to ensure we cover the UV space properly + # N should be at least 2 * r_max / pixel + N = int(np.ceil(2 * r_max / pixel_size)) + # Make it even for FFT efficiency + if N % 2 == 1: + N += 1 + # Ensure minimum size + N = max(N, 64) + + # Construct new u, v grid to interpolate + # CRITICAL: Ulimit must accommodate r_max + Ulimit = (N / 2 - 1) * pixel_size + pixel_size / 2 + + # Check if our grid is large enough + if Ulimit < r_max: + # Increase N to accommodate the data + N = int(np.ceil(2 * r_max / pixel_size)) + 2 + if N % 2 == 1: + N += 1 + Ulimit = (N / 2 - 1) * pixel_size + pixel_size / 2 + print(f"Warning: Grid size increased to N={N} to accommodate r_max={r_max:.6f}") + + usampl = -Ulimit + np.arange(N) * pixel_size + vsampl = usampl + uu, vv = np.meshgrid(usampl, vsampl) + + # Interpolate real and imag components onto new grid + uv_obs = np.vstack([vis.u, vis.v]).T + uv_samp = np.vstack([uu.flatten(), vv.flatten()]).T + + # Normalize by 4π² for consistency with IDL code + interpolator = RBFInterpolator(uv_obs, vis.visibilities.real / (4 * np.pi**2)) + real_interp = interpolator(uv_samp) + interpolator = RBFInterpolator(uv_obs, vis.visibilities.imag / (4 * np.pi**2)) + imag_interp = interpolator(uv_samp) + vis_interp = (real_interp + 1j * imag_interp).reshape((N, N)) + + # CRITICAL: Set any component outside the original uv sampling to 0 + # This prevents sampling regions where we have no data + uv_radius = np.sqrt(uu**2 + vv**2) + vis_interp[uv_radius > r_max] = 0j + + # Define new grid to zero pad the visibilities + # The zero-padding increases the sampling in image space + # Choose Nnew based on desired oversampling + oversample_factor = 6 # This gives ~15x downsampling later + Nnew = N * oversample_factor + + # Make sure Nnew is even + if Nnew % 2 == 1: + Nnew += 1 + + Ulimit_new = (Nnew / 2 - 1) * pixel_size + pixel_size / 2 + xpix = -Ulimit_new + np.arange(Nnew) * pixel_size + ypix = xpix + + # Zero pad the visibilities + intzpadd = np.zeros((Nnew, Nnew), dtype=np.complex128) + start_idx = (Nnew - N) // 2 + intzpadd[start_idx : start_idx + N, start_idx : start_idx + N] = vis_interp + + # Downsample to get final image size + # This reduces the pixel size in image space + downsample_factor = 15 + im_new = Nnew // downsample_factor + + # Ensure im_new is reasonable + if im_new < 32: + im_new = 32 + downsample_factor = Nnew // im_new + + intznew = np.zeros((im_new, im_new), dtype=np.complex128) + xpixnew = np.zeros(im_new) + ypixnew = np.zeros(im_new) + + for i in range(im_new): + idx = downsample_factor * i + downsample_factor // 2 + if idx >= Nnew: + idx = Nnew - 1 + xpixnew[i] = xpix[idx] + ypixnew[i] = ypix[idx] + for j in range(im_new): + jdx = downsample_factor * j + downsample_factor // 2 + if jdx >= Nnew: + jdx = Nnew - 1 + intznew[i, j] = intzpadd[idx, jdx] + + deltaomega = (xpixnew[im_new - 1] - xpixnew[0]) / im_new + + g = intznew[:, :] + + # Characteristic function of disk + # CRITICAL: chi should only be 1 where we have UV data coverage + # This constraint ensures we don't extrapolate beyond measured frequencies + chi = np.zeros((im_new, im_new), dtype=np.complex128) + uv_grid_radius = np.sqrt(xpixnew.reshape(-1, 1) ** 2 + ypixnew.reshape(1, -1) ** 2) + + # IMPORTANT: Only set chi=1 where we actually have data + # Use a slightly smaller radius to avoid edge effects + chi[uv_grid_radius < (r_max * 0.95)] = 1 + 0j + + if natural_weighting: + # Apply natural weighting based on density of UV coverage + # This down-weights regions with sparse sampling + from scipy.ndimage import gaussian_filter + + # Create a density map of UV coverage + uv_density = np.zeros((im_new, im_new)) + for u_obs, v_obs in zip(vis.u.value, vis.v.value): + # Find nearest grid point + i = np.argmin(np.abs(xpixnew - u_obs)) + j = np.argmin(np.abs(ypixnew - v_obs)) + if i < im_new and j < im_new: + uv_density[i, j] += 1 + + # Smooth the density map + uv_density_smooth = gaussian_filter(uv_density, sigma=1.0) + + # Normalize and use as weight + if uv_density_smooth.max() > 0: + weights = uv_density_smooth / uv_density_smooth.max() + # Avoid complete zero weighting + weights = np.clip(weights, 0.1, 1.0) + chi = chi * weights + + # Landweber iterations + map_actual = np.zeros((im_new, im_new), dtype=np.complex128) + map_iteration = np.zeros((niter, im_new, im_new)) + + # Relaxation parameter (can be tuned) + tau = 0.2 + + descent = np.zeros(niter - 1) + normAf_g = np.zeros(niter) + + # Iteration 0: Inverse Fourier Transform of the initial solution + map_shifted = ifftshift(map_actual) + F_Trasf_shifted = ifft2(map_shifted) / (4 * np.pi**2 * deltaomega * deltaomega) + F_Trasf = fftshift(F_Trasf_shifted) + + for iter in range(niter): + # Update rule + F_Trasf_up = F_Trasf + tau * (g - chi * F_Trasf) + F_Trasf = F_Trasf_up + + # FFT of the updated solution + F_Trasf_shifted = ifftshift(F_Trasf) + map_shifted = fft2(F_Trasf_shifted) / (im_new**2) * 4 * np.pi**2 * deltaomega * deltaomega + map_actual = fftshift(map_shifted) + + # Project solution onto subset of positive solutions (positivity constraint) + map_actual[map_actual.real < 0] = 0 + 0j + map_iteration[iter, :, :] = map_actual.real + + # Back to Fourier space + map_shifted = ifftshift(map_actual) + F_Trasf_shifted = ifft2(map_shifted) * (im_new**2) / (4 * np.pi**2 * deltaomega * deltaomega) + F_Trasf = fftshift(F_Trasf_shifted) + + # Stopping criterion based on the descent of ||Af - g|| + Af_g = chi * F_Trasf - g + normAf_g[iter] = np.sqrt(np.sum(np.abs(Af_g) * np.abs(Af_g))) + + if iter >= 1: + with np.errstate(divide="ignore", invalid="ignore"): + descent[iter - 1] = (normAf_g[iter - 1] - normAf_g[iter]) / normAf_g[iter - 1] + + logger.debug("Iteration %d: %f", iter, descent[iter - 1]) + + if descent[iter - 1] < 0.02: + print(f"Converged at iteration {iter}") + break + + # Output + F_Trasf = 4 * np.pi**2 * F_Trasf + + if iter == niter - 1: + # If didn't converge, use iteration 14 (arbitrary choice from original) + if niter > 14: + map_solution = map_iteration[14, :, :].real + else: + map_solution = map_iteration[niter - 1, :, :].real + else: + map_solution = map_actual.real + + map_solution = map_solution * im_new**2 + + return map_solution, F_Trasf, xpixnew, ypixnew diff --git a/xrayvision/visibility.py b/xrayvision/visibility.py index ca9698f..545f767 100644 --- a/xrayvision/visibility.py +++ b/xrayvision/visibility.py @@ -8,7 +8,7 @@ import abc import copy import numbers -from typing import Any, Union, Optional +from typing import Any from collections.abc import Iterable, Sequence import astropy.units as apu @@ -20,6 +20,7 @@ __all__ = ["Visibility", "Visibilities", "VisMeta", "VisibilitiesABC", "VisMetaABC"] +from sunpy.util import deprecated _E_RANGE_KEY = "spectral_range" _T_RANGE_KEY = "time_range" @@ -31,21 +32,21 @@ class VisMetaABC(abc.ABC): @property @abc.abstractmethod - def observer_coordinate(self) -> Union[SkyCoord, None]: + def observer_coordinate(self) -> SkyCoord | None: """ Location of the observer. """ @property @abc.abstractmethod - def spectral_range(self) -> Optional[Iterable[apu.Quantity]]: + def spectral_range(self) -> Iterable[apu.Quantity] | None: """ Spectral range over which the visibilities are computed. """ @property @abc.abstractmethod - def time_range(self) -> Optional[Iterable[Time]]: + def time_range(self) -> Iterable[Time] | None: """ Time range over which the visibilities are computed. """ @@ -59,7 +60,7 @@ def vis_labels(self) -> Sequence[Iterable[str]]: @property @abc.abstractmethod - def instrument(self) -> Union[str, None]: + def instrument(self) -> str | None: """ The name of the instrument or observer that measured the visibilities. """ @@ -103,35 +104,35 @@ def meta(self) -> VisMetaABC: @property @abc.abstractmethod - def uncertainty(self) -> Union[Iterable[apu.Quantity], None]: + def uncertainty(self) -> Iterable[apu.Quantity] | None: """ Uncertainties on visibilities values. """ @property @abc.abstractmethod - def amplitude(self) -> Union[Iterable[apu.Quantity], None]: + def amplitude(self) -> Iterable[apu.Quantity] | None: """ Amplitudes of the visibilities. """ @property @abc.abstractmethod - def amplitude_uncertainty(self) -> Union[Iterable[apu.Quantity], None]: + def amplitude_uncertainty(self) -> Iterable[apu.Quantity] | None: """ Amplitude uncertainty of the visibilities. """ @property @abc.abstractmethod - def phase(self) -> Union[Iterable[apu.Quantity[apu.deg]], None]: + def phase(self) -> Iterable[apu.Quantity[apu.deg]] | None: """ Phases of the visibilities. """ @property @abc.abstractmethod - def phase_uncertainty(self) -> Union[Iterable[apu.Quantity[apu.deg]], None]: + def phase_uncertainty(self) -> Iterable[apu.Quantity[apu.deg]] | None: """ Phase uncertainty of the visibilities. """ @@ -160,7 +161,7 @@ def __init__(self, *args, **kwargs): def _check_input_type_and_unit(self, key, key_type, unit=None, equivalencies=None): value = self.get(key, None) - if not isinstance(value, (key_type, type(None))): + if not isinstance(value, (key_type | type(None))): raise KeyError(f"Inputs must include a key, '{key}', that gives a {key_type}.") if unit is not None and value is not None and not value.unit.is_equivalent(unit, equivalencies=equivalencies): raise ValueError(f"'{key}' must have angular units.") @@ -199,12 +200,12 @@ def __init__( u: apu.Quantity[1 / apu.deg], v: apu.Quantity[1 / apu.deg], phase_center: apu.Quantity[apu.arcsec] = [0, 0] * apu.arcsec, - meta: Optional[VisMetaABC] = None, - uncertainty: Optional[apu.Quantity] = None, - amplitude: Optional[apu.Quantity] = None, - amplitude_uncertainty: Optional[apu.Quantity] = None, - phase: Optional[apu.Quantity[apu.deg]] = None, - phase_uncertainty: Optional[apu.Quantity[apu.deg]] = None, + meta: VisMetaABC | None = None, + uncertainty: apu.Quantity | None = None, + amplitude: apu.Quantity | None = None, + amplitude_uncertainty: apu.Quantity | None = None, + phase: apu.Quantity[apu.deg] | None = None, + phase_uncertainty: apu.Quantity[apu.deg] | None = None, ): r""" A class for holding visibilities. @@ -492,6 +493,7 @@ def _attrs_both_none_or_neither(attr1, attr2): return True +@deprecated("0.2.0", alternative="Visibilities") class Visibility: r""" Hold a set of related visibilities and information. @@ -506,8 +508,8 @@ def __init__( *, u: Quantity[1 / apu.arcsec], v: Quantity[1 / apu.arcsec], - offset: Optional[Quantity[apu.arcsec]] = (0.0, 0.0) * apu.arcsec, - phase_centre: Optional[Quantity[apu.arcsec]] = (0.0, 0.0) * apu.arcsec, + offset: Quantity[apu.arcsec] | None = (0.0, 0.0) * apu.arcsec, + phase_centre: Quantity[apu.arcsec] | None = (0.0, 0.0) * apu.arcsec, ) -> None: r""" Generic Visibility object.