Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 252 additions & 0 deletions src/compass/utils/correct_cslc_amplitude.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
#! python

'''
Script to extract amplitude of CSLC,
with thermal noise correction and / or radiometric normalization applied
'''
import argparse

import numpy as np
from osgeo import gdal, osr

# Constants for dataset location in HDF5 file
PATH_CSLC_LAYER_IN_HDF = '/data'
PATH_LOCAL_INCIDENCE_ANGLE = '/data/local_incidence_angle'
PATH_NOISE_LUT = '/metadata/noise_information/thermal_noise_lut'


def load_amplitude(ds_cslc_amp, ds_local_incidence_angle=None, ds_noise_lut=None):
'''
Load the amplitude from CSLC GDAL dataset.
Apply noise correction and/or radiometric normalization when
the data are provided.

Parameters
----------
ds_cslc_amp: osgeo.gdal.Dataset
GDAL Raster dataset for CSLC amplitude
ds_local_incidence_angle: osgeo.gdal.Dataset
GDAL Raster dataset for local incidence angle
ds_noise_lut: osgeo.gdal.Dataset
GDAL Raster dataset noise LUT

Returns
-------
arr_cslc: np.ndarray
CSLC amplitude
'''

# Load CSLC amplutude into array
arr_cslc = ds_cslc_amp.ReadAsArray()

if ds_noise_lut is not None:
print('Applying noise removal')
# Apply noise correction

arr_cslc = arr_cslc ** 2 - ds_noise_lut.ReadAsArray()
arr_cslc[arr_cslc < 0.0] = 0.0
arr_cslc = np.sqrt(arr_cslc)

if ds_local_incidence_angle is not None:
print('Applying radiometric mormalization')
local_incidence_angle_arr_rad = \
np.deg2rad(ds_local_incidence_angle.ReadAsArray())
# Apply radiometric normalization
correction_factor = np.sqrt(np.tan(local_incidence_angle_arr_rad))
arr_cslc *= correction_factor

return arr_cslc


def get_cslc_gdal_dataset(cslc_path, cslc_static_path, pol,
noise_correction=True,
radiometric_normalization=True,
resampling_alg='BILINEAR'):
'''
Get the GDAL dataset for CSLC layer and resample &
reprojecte noise LUT (when user opted in)

Parameters
----------
cslc_path: str
path to the CSLC HDF5 file
cslc_static_path: str
path to the CSLC static layer HDF5 file
pol: str
Polarization
noise_correction: bool
Flag to turn on/off noise correction
radiometric_normalization: bool
Flag to turn on/ott radiometric normalization
resampling_alg: str
Resampling algorithm for gdal.Warp()

Returns
-------
(ds_cslc,
ds_incidence_angle,
ds_noise_resampled): tuple
Respectively, GDAL raster dataset for CSLC,
GDAL raster dataset for local incidence angle, and
GDAL raster dataset for noise LUT
(resampled to the same geogrid as the other two)
'''
# Get the geotransform and dimensions


prefix_netcdf = 'DERIVED_SUBDATASET:AMPLITUDE:NETCDF'

path_cslc = f'{prefix_netcdf}:{cslc_path}:{PATH_CSLC_LAYER_IN_HDF}/{pol}'
path_local_incidence = f'NETCDF:{cslc_static_path}:{PATH_LOCAL_INCIDENCE_ANGLE}'
path_noise_lut = f'NETCDF:{cslc_path}:{PATH_NOISE_LUT}'

ds_in = gdal.Open(path_cslc, gdal.GA_ReadOnly)

ds_noise_lut = (gdal.Open(path_noise_lut, gdal.GA_ReadOnly) if
noise_correction else None)

ds_local_incidence_angle = (gdal.Open(path_local_incidence, gdal.GA_ReadOnly) if
radiometric_normalization else None)

# Prepare for the reprojection
# Reproject the CSLC layer, and
# Define the source and target spatial references
gt_in = ds_in.GetGeoTransform()
xsize = ds_in.RasterXSize
ysize = ds_in.RasterYSize

src_srs = osr.SpatialReference()
src_srs.ImportFromWkt(ds_in.GetProjectionRef())

# Convert the corner points in `epsg_out`
extent_reprojected=[gt_in[0],
gt_in[3] + gt_in[5] * ysize,
gt_in[0] + gt_in[1] * xsize,
gt_in[3]]

# Put together the warp options
warp_options = gdal.WarpOptions(format='MEM',
resampleAlg=resampling_alg,
xRes=gt_in[1],
yRes=abs(gt_in[5]),
outputBounds=extent_reprojected,
dstSRS=src_srs)

# Resample the noise LUT
ds_noise_resampled = (gdal.Warp('', ds_noise_lut, options=warp_options)
if ds_noise_lut else None)

return (ds_in,
ds_local_incidence_angle,
ds_noise_resampled)


def save_amplitude(amplitude_arr, geotransform_cslc, epsg_cslc,
amplitude_filename):
'''
Save mosaicked array into GDAL Raster

Parameters
----------
amplitude_arr: np.ndarray
Amplitude image as numpy array
geotransform_cslc: tuple
Geotransform parameter for mosaic
epsg_cslc: int
EPSG for mosaic as projection parameter
amplitude_filename: str
GEOTIFF file name of the output raster
'''

# Create the projection from the EPSG code
srs_out = osr.SpatialReference()
srs_out.ImportFromEPSG(epsg_cslc)
projection_out = srs_out.ExportToWkt()

length_mosaic, width_mosaic = amplitude_arr.shape
driver = gdal.GetDriverByName('GTiff')
ds_out = driver.Create(amplitude_filename,
width_mosaic, length_mosaic, 1, gdal.GDT_Float32,
options=['COMPRESS=LZW', 'BIGTIFF=YES'])

ds_out.SetGeoTransform(geotransform_cslc)
ds_out.SetProjection(projection_out)

ds_out.GetRasterBand(1).WriteArray(amplitude_arr)

ds_out = None


def get_parser():
'''
Get the parser for CLI
'''
parser = argparse.ArgumentParser(
description=('Extracts CSLC amplitude with thermal noise and / or '
'radiometric correction applied'),
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-c',
dest='cslc_path',
type=str,
default='',
help='cslc file')

parser.add_argument('-s',
dest='cslc_static_path',
type=str,
default=[],
help='CSLC static layer file')

parser.add_argument('-o',
dest='out_path',
type=str,
default=None,
help='path to the output file')

parser.add_argument('-p',
dest='pol',
type=str,
default='VV',
help='Polarization')

parser.add_argument('--noise_correction_off',
dest='apply_noise_correction',
default=True,
action='store_false',
help='Turn off the noise correction')

parser.add_argument('--radiometric_normalization_off',
dest='apply_radiometric_normalization',
default=True,
action='store_false',
help='Turn off the radiometric normalization')

return parser


def main():
'''
Entrypoint of the script
'''
parser = get_parser()
args = parser.parse_args()

cslc_layer, noise_lut, local_incidence_angle = \
get_cslc_gdal_dataset(args.cslc_path,
args.cslc_static_path,
args.pol,
args.apply_noise_correction,
args.apply_radiometric_normalization)
amplitude_arr = load_amplitude(cslc_layer, noise_lut, local_incidence_angle)

geotransform_cslc = cslc_layer.GetGeoTransform()

proj_cslc = cslc_layer.GetProjection()
srs_cslc = osr.SpatialReference(wkt=proj_cslc)
epsg_cslc = int(srs_cslc.GetAuthorityCode(None))

save_amplitude(amplitude_arr, geotransform_cslc, epsg_cslc, args.out_path)


if __name__=='__main__':
main()