diff --git a/ghg_process.py b/ghg_process.py index 1faf7cf..a49ef5c 100644 --- a/ghg_process.py +++ b/ghg_process.py @@ -23,11 +23,10 @@ import target_generation import parallel_mf import scale -import apply_glt -from spectral.io import envi import numpy as np -from utils import envi_header, convert_to_cog +from utils import convert_to_cog from files import Filenames +import spec_io metadata = { 'ch4': { @@ -93,15 +92,6 @@ def main(input_args=None): if args.wavelength_range is not None and len(args.wavelength_range) % 2 != 0: raise ValueError('wavelength_range must have an even number of elements') - radiance_file = args.radiance_file - radiance_file_hdr = envi_header(radiance_file) - - obs_file = args.obs_file - obs_file_hdr = envi_header(obs_file) - - loc_file = args.loc_file - loc_file_hdr = envi_header(loc_file) - logging.basicConfig(format='%(levelname)s:%(asctime)s ||| %(message)s', level=args.loglevel, filename=args.logfile, datefmt='%Y-%m-%d,%H:%M:%S') @@ -115,18 +105,26 @@ def main(input_args=None): print(files.target_file) if os.path.isfile(files.target_file) is False or args.overwrite: - sza = envi.open(obs_file_hdr).open_memmap(interleave='bip')[...,4] + _, obs = spec_io.load_data(args.obs_file) + sza = obs[...,4] + mean_sza = np.mean(sza[sza != -9999]) - elevation = envi.open(loc_file_hdr).open_memmap(interleave='bip')[...,2] + _, elevation = spec_io.load_data(args.loc_file, return_loc_from_l1b_rad_nc=True) + elevation = elevation[:,:,:2] mean_elevation = np.mean(elevation[elevation != -9999]) / 1000. mean_elevation = min(max(0, mean_elevation),3) if args.state_subs is not None: - state_ds = envi.open(envi_header(args.state_subs)) - band_names = state_ds.metadata['band names'] - h2o = state_ds.open_memmap(interleave='bip')[...,band_names.index('H2OSTR')] - mean_h2o = np.mean(h2o[h2o != -9999]) + + m_state, d_state = spec_io.load_data(args.state_subs, mask_type='mask') + if args.state_subs.endswith('.nc'): + ind = m_state.band_names.index('H2O (g cm-2)') + mean_h2o = np.mean(d_state[:,:,ind]) + else: + ind = m_state.band_names.index('H2OSTR') + mean_h2o = np.mean(d_state[:,0,ind]) + else: # Just guess something... exit() @@ -143,7 +141,7 @@ def main(input_args=None): '-g', str(mean_elevation), '-w', str(mean_h2o), '--output', files.target_file, - '--hdr', radiance_file_hdr, + '--hdr', args.radiance_file, '--lut_dataset', args.lut_file] if args.co2: target_params.append('--co2') @@ -176,10 +174,18 @@ def main(input_args=None): if args.ace_filter: subargs.append('--use_ace_filter') parallel_mf.main(subargs) + + def do_ortho_with_spec_io(obs_for_glt_filename, unortho_input_filename, ortho_output_filename): + m_obs, _ = spec_io.load_data(obs_for_glt_filename, load_glt=True) + m, d = spec_io.load_data(unortho_input_filename) + d_ort = spec_io.ortho_data(d, m_obs.glt) + m.geotransform = m_obs.geotransform + m.projection = m_obs.projection + spec_io.write_envi_file(d_ort, m, ortho_output_filename) # ORT MF if (os.path.isfile(files.mf_ort_file) is False or args.overwrite): - apply_glt.main([args.glt_file, files.mf_file, files.mf_ort_file]) + do_ortho_with_spec_io(args.glt_file, files.mf_file, files.mf_ort_file) convert_to_cog(files.mf_ort_file, files.mf_ort_cog, metadata[gas]['mf'], @@ -187,7 +193,7 @@ def main(input_args=None): args.product_version) # ORT Sensitivity if os.path.isfile(files.sens_ort_file) is False or args.overwrite: - apply_glt.main([args.glt_file, files.mf_sens_file, files.sens_ort_file]) + do_ortho_with_spec_io(args.glt_file, files.mf_sens_file, files.sens_ort_file) if os.path.isfile(files.sens_ort_cog) is False or args.overwrite: convert_to_cog(files.sens_ort_file, files.sens_ort_cog, @@ -196,7 +202,7 @@ def main(input_args=None): args.product_version) # ORT Uncertainty if os.path.isfile(files.uncert_ort_file) is False or args.overwrite: - apply_glt.main([args.glt_file, files.mf_uncert_file, files.uncert_ort_file]) + do_ortho_with_spec_io(args.glt_file, files.mf_uncert_file, files.uncert_ort_file) if os.path.isfile(files.uncert_ort_cog) is False or args.overwrite: convert_to_cog(files.uncert_ort_file, files.uncert_ort_cog, diff --git a/parallel_mf.py b/parallel_mf.py index 6c771a6..7c67522 100644 --- a/parallel_mf.py +++ b/parallel_mf.py @@ -32,16 +32,11 @@ import json from utils import SerialEncoder +import spec_io + import logging import os -# Borrowed from isofit/isofit/wrappers/ray.py -if os.environ.get("GHG_DEBUG"): - logging.info("Using internal ray") - import rray as ray -else: - import ray - def main(input_args=None): parser = argparse.ArgumentParser(description="Robust MF") @@ -54,7 +49,6 @@ def main(input_args=None): parser.add_argument('--num_cores', type=int, default=-1, help='number of cores (-1 (default))') parser.add_argument('--n_mc', type=int, default=10, help='number of monte carlo runs') parser.add_argument('--mc_bag_fraction',type=float, default=0.7, help='fraction of data to use in each MC instance') - parser.add_argument('--ray_temp_dir', type=str, default=None, help='ray temp directory (None (default))') parser.add_argument('--wavelength_range', nargs='+', type=float, default=None, help='wavelengths to use: None = default for gas, 2x values = min/max pairs of regions') parser.add_argument('--remove_dominant_pcs',action='store_true', help='remove dominant PCs from covariance calculation') parser.add_argument('--subsample_strategy',type=str,choices=['random','spatial_blocks'], help='sampling strategy for mc runs') @@ -92,11 +86,9 @@ def main(input_args=None): filename=args.logfile, datefmt='%Y-%m-%d,%H:%M:%S') logging.info('Started processing input file: "%s"'%str(args.radiance_file)) - ds = envi.open(envi_header(args.radiance_file),image=args.radiance_file) - if 'wavelength' not in ds.metadata: - logging.error('wavelength field not found in input header') - sys.exit(0) - wavelengths = np.array([float(x) for x in ds.metadata['wavelength']]) + m_radiance, radiance = spec_io.load_data(args.radiance_file) # nalong x ncross x nbands + radiance = np.array(radiance).transpose([0,2,1]) # nalong x nbands x ncross + wavelengths = m_radiance.wavelengths if args.wavelength_range is None: if 'ch4' in args.library: @@ -119,7 +111,7 @@ def main(input_args=None): la = np.where(np.logical_and(wavelengths > args.wavelength_range[2*n], wavelengths <= args.wavelength_range[2*n+1]))[0] active_wl_idx.extend(la.tolist()) always_exclude_idx = [] - if 'emit' in args.radiance_file: + if 'emit' in args.radiance_file.lower(): always_exclude_idx = np.where(np.logical_and(wavelengths < 1321, wavelengths > 1275))[0].tolist() active_wl_idx = np.array([x for x in active_wl_idx if x not in always_exclude_idx]) @@ -136,7 +128,9 @@ def main(input_args=None): absorption_coefficients = library_reference[active_wl_idx,2] logging.info('Create output file, initialized with nodata') - outmeta = ds.metadata + outmeta = {} + outmeta['samples'] = radiance.shape[2] # ncross + outmeta['lines'] = radiance.shape[0] # nalong outmeta['data type'] = np2envitype(np.float32) outmeta['bands'] = 1 outmeta['description'] = 'Matched Filter Results' @@ -160,7 +154,6 @@ def main(input_args=None): output_ds = envi.create_image(envi_header(args.sens_output_file),outmeta,force=True,ext='') del output_ds write_bil_chunk(np.ones(output_shape)*args.nodata_value, args.sens_output_file, 0, output_shape) - if args.chunksize is None: chunk_edges = [0, output_shape[0]] @@ -168,22 +161,11 @@ def main(input_args=None): chunk_edges = np.arange(0, output_shape[0], args.chunksize).tolist() chunk_edges.append(output_shape[0]) - rayargs = {'_temp_dir': args.ray_temp_dir, 'ignore_reinit_error': True, 'include_dashboard': False} - rayargs['num_cpus'] = args.num_cores - if args.num_cores == -1: - import multiprocessing - rayargs['num_cpus'] = multiprocessing.cpu_count() - 1 - ray.init(**rayargs) - rdn_id = None - absorption_coefficients_id = ray.put(absorption_coefficients) - noise_model_parameters_id = ray.put(noise_model_parameters) - del absorption_coefficients, noise_model_parameters for _ce, ce in enumerate(chunk_edges[:-1]): - if rdn_id is not None: - del rdn_id; rdn_id = None logging.info(f"load radiance for chunk {_ce +1} / {len(chunk_edges) - 1}") - radiance = ds.open_memmap(interleave='bil',writeable=False)[ce:chunk_edges[_ce+1],...].copy() + radiance = radiance[ce:chunk_edges[_ce+1],...] + rad_for_mf = np.float64(np.ascontiguousarray(radiance[:,active_wl_idx,:].transpose([2,0,1]))) #ncross x nalong x nbands chunk_shape = (chunk_edges[_ce+1] - ce, output_shape[1], output_shape[2]) logging.info("load masks") @@ -205,37 +187,28 @@ def main(input_args=None): logging.debug("adding cloud / water mask") clouds_and_surface_water_mask = None if args.l2a_mask_file is not None: - clouds_and_surface_water_mask = np.sum(envi.open(envi_header(args.l2a_mask_file)).open_memmap(interleave='bip')[ce:chunk_edges[_ce+1],:,:3],axis=-1) > 0 + _, clouds_and_surface_water_mask = spec_io.load_data(args.l2a_mask_file, mask_type='mask') + clouds_and_surface_water_mask = clouds_and_surface_water_mask[ce:chunk_edges[_ce+1],:,:3] + clouds_and_surface_water_mask = np.sum(clouds_and_surface_water_mask, axis=-1) > 0 good_pixel_mask = np.where(clouds_and_surface_water_mask, False, good_pixel_mask) + + good_pixel_mask = np.ascontiguousarray(good_pixel_mask.T) - logging.info('initializing ray, adding data to shared memory') - - rdn_id = ray.put(radiance) - del radiance - - logging.info('Run jobs') - jobs = [mf_one_column.remote(col, rdn_id, absorption_coefficients_id, active_wl_idx, good_pixel_mask, noise_model_parameters_id, args) for col in range(output_shape[2])] - rreturn = [ray.get(jid) for jid in jobs] + logging.info("applying matched filter") + output_dat, output_uncert_dat, output_sens_dat = mf_full_scene(rad_for_mf, + absorption_coefficients, + good_pixel_mask, + noise_model_parameters, + args) - logging.info('Collecting and writing output') - output_dat = np.zeros(chunk_shape,dtype=np.float32) - output_uncert_dat = np.ones(chunk_shape,dtype=np.float32) * args.nodata_value - output_sens_dat = np.ones(chunk_shape,dtype=np.float32) * args.nodata_value - for ret in rreturn: - if ret[0] is not None: - output_dat[:, 0, ret[1]] = ret[0][:,0] - if ret[2] is not None: - output_uncert_dat[:, 0, ret[1]] = ret[2][:] - if ret[3] is not None: - output_sens_dat[:, 0, ret[1]] = ret[3][:] + output_dat = output_dat.T + output_uncert_dat = output_uncert_dat.T + output_sens_dat = output_sens_dat.T def apply_badvalue(d, mask, bad_data_value): - d = d.transpose((0,2,1)) - d[mask,:] = bad_data_value - d = d.transpose((0,2,1)) + d[mask] = bad_data_value return d - if args.mask_clouds_water and clouds_and_surface_water_mask is not None: logging.info('Masking clouds and water') output_dat = apply_badvalue(output_dat, clouds_and_surface_water_mask, args.screen_value) @@ -257,6 +230,10 @@ def apply_badvalue(d, mask, bad_data_value): output_uncert_dat = apply_badvalue(output_uncert_dat, dilated_flare_mask, args.screen_value) output_sens_dat = apply_badvalue(output_sens_dat, dilated_flare_mask, args.screen_value) + output_dat = output_dat[:,None,:] + output_uncert_dat = output_uncert_dat[:,None,:] + output_sens_dat = output_sens_dat[:,None,:] + write_bil_chunk(output_dat, args.output_file, ce, chunk_shape) if args.uncert_output_file is not None: write_bil_chunk(output_uncert_dat, args.uncert_output_file, ce, chunk_shape) @@ -414,10 +391,9 @@ def calculate_saturation_mask(bandmask_file: str, radiance: np.array, dilation_i has been otherwise flagged with bad values (-9999). The bad9999 mask identifies these and excludes them.''' - if chunk_edges is None: - l1b_bandmask_loaded = envi.open(envi_header(bandmask_file))[:,:,:] - else: - l1b_bandmask_loaded = envi.open(envi_header(bandmask_file))[chunk_edges[0]:chunk_edges[1],:,:] + _, l1b_bandmask_loaded = spec_io.load_data(bandmask_file, mask_type='band_mask') + if chunk_edges is not None: + l1b_bandmask_loaded = l1b_bandmask_loaded[chunk_edges[0]:chunk_edges[1],:,:] bad9999 = np.any(radiance < -1, axis = 1) l1b_bandmask_unpacked = np.unpackbits(l1b_bandmask_loaded, axis= -1) @@ -452,124 +428,88 @@ def get_noise_equivalent_spectral_radiance(noise_model_parameters: np.array, rad nedl = np.abs(noise_model_parameters[:, 0] * np.sqrt(noise_plus_meas) + noise_model_parameters[:, 2]) return nedl - -@ray.remote -def mf_one_column(col: int, rdn_full: np.array, absorption_coefficients: np.array, active_wl_idx: np.array, good_pixel_mask: np.array, noise_model_parameters: np.array, args, nd_buffer=0.1): - """ Run the matched filter on a single column of the input image - +def mf_full_scene(rdn, absorption_coefficients, good_pixel_mask, noise_model_parameters, args, nd_buffer=0.1): + '''Compute the matched filter for the entire scene along with sensitivity and uncertainty + Args: - col (int): column to run on - rdn_full (np.array): full radiance dataset, bil interleave - outimg_mm_shape (tuple): output image shape - absorption_coefficients (np.array): absorption coefficients for target - active_wl_idx (np.array): active wavelength indices - good_pixel_mask (np.array): mask of valid pixels to use for covariance / mean estimates - args (_type_): arguments from input - nd_buffer (float, optional): buffer value to add to values that accidentily land on nodata values. Defaults to 0.1. - + rdn (nd.array): radiance [ncross, nalong, nbands] + absorption_coefficient (nd.array): the absorption coefficient vector + good_pixel_mask (nd.array): pixels to use in MF [ncross, nalong] + noise_model_parameters (nd.array): instrument noise coefficients + args: the argparse arguments when this script was called + Returns: - (np.array): matched filter results from the column - """ + mf (nd.array): matched filter values [ncross, nalong] + uncert (nd.array): uncertainty values [ncross, nalong] + sens (nd.array): sensitivity values [ncross, nalong] + ''' + ncross, nalong, nspec = rdn.shape + + mf = np.ones((ncross, nalong)) * args.nodata_value + uncert = np.ones((ncross, nalong)) * args.nodata_value + sens = np.ones((ncross, nalong)) * args.nodata_value + + no_radiance_mask_full = np.all(np.logical_and(np.isfinite(rdn), rdn> -0.05), axis=2) + + for col in range(ncross): + rdn_col = rdn[col,:,:] + no_radiance_mask = no_radiance_mask_full[col,:] + good_pixel_idx = np.where(np.logical_and(good_pixel_mask[col,:], no_radiance_mask))[0] + if len(good_pixel_idx) < 10: + logging.debug('Too few good pixels found in col {col}: skipping') + continue - logging.basicConfig(format='%(levelname)s:%(asctime)s ||| %(message)s', level=args.loglevel, - filename=args.logfile, datefmt='%Y-%m-%d,%H:%M:%S') - logging.debug(f'Col: {col}') - - rdn = np.float64(rdn_full[:, active_wl_idx, col].copy()) - no_radiance_mask = np.all(np.logical_and(np.isfinite(rdn), rdn > -0.05), axis=1) - good_pixel_idx = np.where(np.logical_and(good_pixel_mask[:,col], no_radiance_mask))[0] - if len(good_pixel_idx) < 10: - logging.debug('Too few good pixels found in col {col}: skipping') - return None, None - - if args.uncert_output_file is not None: - nedl_variance = (get_noise_equivalent_spectral_radiance(noise_model_parameters, rdn))**2 + if args.uncert_output_file is not None: + nedl_variance = (get_noise_equivalent_spectral_radiance(noise_model_parameters, rdn_col))**2 - # array to hold results in - mf_mc = np.ones((rdn.shape[0],args.n_mc)) * args.nodata_value - uncert_mc = np.ones((rdn.shape[0],args.n_mc)) * args.nodata_value - sens_mc = np.ones((rdn.shape[0],args.n_mc)) * args.nodata_value - - np.random.seed(13) - for _mc in range(args.n_mc): - - # get subset of pixels to use for covariance / mean estimates - cov_subset = get_mc_subset(_mc, args, good_pixel_idx) - - # optional radiance adjustment for max radiance pcs...experimental - if args.remove_dominant_pcs: - pca_mean = rdn[cov_subset,:].mean(axis=0) - pcavals, pcavec = scipy.linalg.eigh(cov(rdn - pca_mean)) - loc_rdn = (rdn - pca_mean ) @ pcavec[:,:-5] - target = (absorption_coefficients.copy() * pca_mean) @ pcavec[:,:-5] - else: - loc_rdn = rdn - target = absorption_coefficients.copy() * np.mean(loc_rdn[cov_subset,:],axis=0) - - # calculate covariance and mean try: - C = calculate_mf_covariance(loc_rdn[cov_subset,:], args.covariance_style, args.fixed_alpha) + C = calculate_mf_covariance(rdn_col[good_pixel_idx,:], args.covariance_style, args.fixed_alpha) Cinv = scipy.linalg.inv(C, check_finite=False) except np.linalg.LinAlgError: logging.warn('singular matrix. skipping this column') - return None, None - mu = np.mean(loc_rdn[cov_subset,:], axis=0) + continue + mu = np.mean(rdn_col[good_pixel_idx,:], axis=0) + + target = absorption_coefficients * mu - # Matched filter time normalizer = target.dot(Cinv).dot(target.T) - if args.ace_filter: - rx = np.sum((loc_rdn[no_radiance_mask,:] - mu) @ Cinv * (loc_rdn[no_radiance_mask,:] - mu), axis = 1) - normalizer = normalizer * rx - mf = ((loc_rdn[no_radiance_mask,:] - mu).dot(Cinv).dot(target.T)) / normalizer + # Matched filter + mf_col = target.T.dot(Cinv).dot((rdn_col[no_radiance_mask,:] - mu).T) / normalizer + mf[col, no_radiance_mask] = mf_col * args.ppm_scaling if args.uncert_output_file is not None: - #################################################################################################################### - # Uncertainty - # This implements (s^T Cinv Sigma Cinv s) / (s^T Cinv aX) (in linear algebra notation) + # Sensitivity and Uncertainty + # This implements (s^T Cinv Sigma Cinv s) / (s^T Cinv aX) (in linear algebra notation) for the uncertainty # Sigma is diagonal, so we just need a standard numpy multiply, which we can also broadcast along the whole column sC = target.dot(Cinv) numer = (sC * nedl_variance[no_radiance_mask,:]) @ sC - a_times_X = -1 * absorption_coefficients.copy() * loc_rdn[no_radiance_mask, :] - denom = ((a_times_X).dot(Cinv).dot(target.T))**2 - uncert = np.sqrt(numer/denom) - #################################################################################################################### + a_times_X = -1 * absorption_coefficients * rdn_col[no_radiance_mask, :] + denom = ((target).dot(Cinv).dot(a_times_X.T))**2 + uncert_col = np.sqrt(numer/denom) + sens_col = np.sqrt(denom) / normalizer - sens_mc[no_radiance_mask,_mc] = np.sqrt(denom) / normalizer - - - # scale outputs - mf_mc[no_radiance_mask,_mc] = mf * args.ppm_scaling - if args.uncert_output_file is not None: - uncert_mc[no_radiance_mask,_mc] = uncert * args.ppm_scaling + sens[col,no_radiance_mask] = sens_col + uncert[col,no_radiance_mask] = uncert_col * args.ppm_scaling - - output = np.vstack([np.mean(mf_mc,axis=-1), np.std(mf_mc,axis=-1)]).T - output[np.logical_and(no_radiance_mask, output[...,0] == args.nodata_value),:] = args.nodata_value + nd_buffer - output[np.logical_not(no_radiance_mask),:] = args.nodata_value + mf[np.logical_and(no_radiance_mask_full, mf == args.nodata_value)] = args.nodata_value + nd_buffer + mf[np.logical_not(no_radiance_mask_full)] = args.nodata_value if args.uncert_output_file is not None: - uncert = uncert_mc[:,0] - uncert[np.logical_and(no_radiance_mask, uncert == args.nodata_value)] = args.nodata_value + nd_buffer - uncert[np.logical_not(no_radiance_mask)] = args.nodata_value + uncert[np.logical_and(no_radiance_mask_full, uncert == args.nodata_value)] = args.nodata_value + nd_buffer + uncert[np.logical_not(no_radiance_mask_full)] = args.nodata_value uncert[np.logical_not(np.isfinite(uncert))] = args.nodata_value - sens = sens_mc[:,0] - sens[np.logical_and(no_radiance_mask, sens == args.nodata_value)] = args.nodata_value + nd_buffer - sens[np.logical_not(no_radiance_mask)] = args.nodata_value + sens[np.logical_and(no_radiance_mask_full, sens == args.nodata_value)] = args.nodata_value + nd_buffer + sens[np.logical_not(no_radiance_mask_full)] = args.nodata_value sens[np.logical_not(np.isfinite(uncert))] = args.nodata_value else: - #uncert = np.ones(rdn.shape[0]) * args.nodata_value - #sens = np.ones(rdn.shape[0]) * args.nodata_value uncert = None sens = None - - logging.debug(f'Column {col} mean: {np.mean(output[good_pixel_idx,0])}') - return output.astype(np.float32), col, uncert, sens + return mf.astype(np.float32), uncert, sens if __name__ == '__main__': main() - ray.shutdown() diff --git a/target_generation.py b/target_generation.py index 3c6fbb3..2129848 100755 --- a/target_generation.py +++ b/target_generation.py @@ -22,9 +22,10 @@ import numpy as np import scipy.ndimage import argparse -import spectral import h5py +import spec_io + def check_param(value, min, max, name): if value < min or value > max: @@ -251,9 +252,9 @@ def main(input_args=None): 'water': args.water_vapor, 'order': args.order} if args.hdr and exists(args.hdr): - image = spectral.io.envi.open(args.hdr) - centers = np.array([float(x) for x in image.metadata['wavelength']]) - fwhm = np.array([float(x) for x in image.metadata['fwhm']]) + m, _ = spec_io.load_data(args.hdr) + centers = m.wavelengths + fwhm = m.fwhm elif args.txt and exists(args.txt): data = np.loadtxt(args.txt, usecols=(0, 1),delimiter=',') centers = data[:, 0]