diff --git a/bin/run_vega_mpi.py b/bin/run_vega_mpi.py index f3650f6b..3b062bcb 100644 --- a/bin/run_vega_mpi.py +++ b/bin/run_vega_mpi.py @@ -12,6 +12,13 @@ description='Run Vega in parallel.') pars.add_argument('config', type=str, help='Config file') + pars.add_argument( + '--init-limit', type=int, default=None, + help=( + 'Maximum number of cuncurrent initializations of Vega. ' + 'Use this to avoid memory issues when running with many MPI threads.' + ) + ) args = pars.parse_args() mpi_comm = MPI.COMM_WORLD @@ -25,8 +32,25 @@ def print_func(message): print_func('Initializing Vega') - # Initialize Vega and get the sampling parameters - vega = VegaInterface(args.config) + if args.init_limit is not None: + mpi_comm.barrier() + + node_comm = mpi_comm.Split_type(MPI.COMM_TYPE_SHARED, 0) + local_rank = node_comm.Get_rank() + local_size = node_comm.Get_size() + + node_comm.Barrier() + for i in range(local_size // args.init_limit + 1): + if local_rank // args.init_limit == i: + # Initialize Vega in low memory mode + vega = VegaInterface(args.config) + node_comm.Barrier() + + mpi_comm.barrier() + else: + # Initialize Vega and get the sampling parameters + vega = VegaInterface(args.config) + sampling_params = vega.sample_params['limits'] # run compute_model once to initialize all the caches @@ -52,7 +76,8 @@ def print_func(message): from vega.samplers.polychord import Polychord print_func('Running Polychord') - sampler = Polychord(vega.main_config['Polychord'], sampling_params, vega.log_lik) + sampler = Polychord( + vega.main_config['Polychord'], sampling_params, vega.log_lik, vega.corr_num_marg_modes) sampler.run() elif vega.sampler == 'PocoMC': diff --git a/vega/build_config.py b/vega/build_config.py index 11c1525d..9bcd5f70 100644 --- a/vega/build_config.py +++ b/vega/build_config.py @@ -72,6 +72,8 @@ def __init__(self, options={}, overwrite=False): self.options['marginalize-below-rpmax'] = options.get('marginalize-below-rpmax', 0) self.options['marginalize-above-rpmin'] = options.get('marginalize-above-rpmin', 0) self.options['marginalize-all-rmin-cuts'] = options.get('marginalize-all-rmin-cuts', False) + self.options['marginalize-prior-sigma'] = options.get('marginalize-prior-sigma', 10.0) + self.options['fit-marginalized-scales'] = options.get('fit-marginalized-scales', False) self.options['hcd_model'] = options.get('hcd_model', None) self.options['fvoigt_model'] = options.get('fvoigt_model', 'exp') @@ -340,6 +342,8 @@ def _build_corr_config(self, name, corr_info, git_hash): config['model']['marginalize-above-rpmin'] = str(self.options['marginalize-above-rpmin']) config['model']['marginalize-all-rmin-cuts'] = str( self.options['marginalize-all-rmin-cuts']) + config['model']['marginalize-prior-sigma'] = str(self.options['marginalize-prior-sigma']) + config['model']['fit-marginalized-scales'] = str(self.options['fit-marginalized-scales']) if 'skip-nl-model-in-peak' in self.options: config['model']['skip-nl-model-in-peak'] = str(self.options['skip-nl-model-in-peak']) diff --git a/vega/coordinates.py b/vega/coordinates.py index 2307b77e..001a1490 100644 --- a/vega/coordinates.py +++ b/vega/coordinates.py @@ -180,3 +180,38 @@ def get_mask_scale_cuts(self, cuts_config, small_scale_mask=False): mask &= (self.mu_regular_grid > mu_min_cut) & (self.mu_regular_grid < mu_max_cut) return mask + + def get_mask_marginalization_scales(self, cuts_config, marginalization_cuts): + """Build mask to for bins that are marginalized + + Parameters + ---------- + marginalization_cuts : dict + Dictionary with the small-scale marginalization cuts + + Returns + ------- + Array + Mask + """ + mask = np.ones_like(self.rp_regular_grid, dtype=bool) + + if 'rtmax' in marginalization_cuts: + rtmax = marginalization_cuts['rtmax'] + mask &= self.rt_regular_grid < rtmax + if 'rtmin' in marginalization_cuts: + rtmin = marginalization_cuts['rtmin'] + mask &= self.rt_regular_grid > rtmin + if 'rpmax' in marginalization_cuts: + rpmax = marginalization_cuts['rpmax'] + mask &= np.abs(self.rp_regular_grid) < rpmax + if 'rpmin' in marginalization_cuts: + rpmin = marginalization_cuts['rpmin'] + mask &= np.abs(self.rp_regular_grid) > rpmin + + if 'all-rmin' in marginalization_cuts: + mask = ~self.get_mask_scale_cuts( + cuts_config, small_scale_mask=True + ) + + return mask diff --git a/vega/correlation_item.py b/vega/correlation_item.py index 8141afbf..40087a20 100644 --- a/vega/correlation_item.py +++ b/vega/correlation_item.py @@ -66,6 +66,8 @@ def __init__(self, config, model_pk=False): if marginalize_all: self.marginalize_small_scales['all-rmin'] = True + self.fit_marg_scales = config['model'].getboolean("fit-marginalized-scales", False) + self.has_metals = False self.has_bb = False diff --git a/vega/data.py b/vega/data.py index 0e32b8db..02f81432 100644 --- a/vega/data.py +++ b/vega/data.py @@ -94,29 +94,29 @@ def __init__(self, corr_item): if corr_item.marginalize_small_scales: self.marg_templates, self.cov_marg_update = self.get_dist_xi_marg_templates() - if not self.corr_item.low_mem_mode: - print('Updating covariance with marginalization templates.') - ntemps = self.marg_templates.shape[1] + # if not self.corr_item.low_mem_mode: + print('Updating covariance with marginalization templates.') + ntemps = self.marg_templates.shape[1] - # Invert the matrix but do not save it - _inv_masked_cov = self.inv_masked_cov - self._inv_masked_cov = None + # Invert the matrix but do not save it + _inv_masked_cov = self.inv_masked_cov + self._inv_masked_cov = None - self._cov_mat[np.ix_(self.data_mask, self.data_mask)] += self.cov_marg_update + self._cov_mat[np.ix_(self.data_mask, self.data_mask)] += self.cov_marg_update - # Construct solution matrix, G becomes an ndarray - templates_masked = self.marg_templates[self.model_mask, :] - G = templates_masked.T.dot(_inv_masked_cov) + # Construct solution matrix, G becomes an ndarray + templates_masked = self.marg_templates[self.model_mask, :] + G = templates_masked.T.dot(_inv_masked_cov) - S = np.diag(np.full( - ntemps, self.corr_item.marginalize_small_scales_prior_sigma**-2 - )) - Ainv = np.linalg.inv(templates_masked.T.dot(G.T).T + S) + S = np.diag(np.full( + ntemps, self.corr_item.marginalize_small_scales_prior_sigma**-2 + )) + Ainv = np.linalg.inv(templates_masked.T.dot(G.T).T + S) - # When multiplied by data - bestfit model, the below matrix will - # give the coefficients for each template. Total marginalized model - # is given by marg_templates.dot(marg_diff2coeff_matrix.dot(diff)) - self.marg_diff2coeff_matrix = Ainv.dot(G) + # When multiplied by data - bestfit model, the below matrix will + # give the coefficients for each template. Total marginalized model + # is given by marg_templates.dot(marg_diff2coeff_matrix.dot(diff)) + self.marg_diff2coeff_matrix = Ainv.dot(G) self._cholesky = None self._scale = 1. @@ -167,10 +167,20 @@ def masked_data_vec(self): Masked data vector (xi[mask]) """ if self._masked_data_vec is None: - self._masked_data_vec = np.zeros(self.data_mask.sum()) - self._masked_data_vec[:] = self.data_vec[self.data_mask] + self._masked_data_vec = self.data_vec[self.data_mask] return self._masked_data_vec + @property + def data_size(self): + """Data size property + + Returns + ------- + int + Data size (number of bins after masking) + """ + return self.masked_data_vec.size + @property def cov_mat(self): """Covariance matrix property @@ -321,16 +331,16 @@ def _read_data(self, data_path, cuts_config, dmat_path=None, cov_path=None, cov_ self._distortion_mat = csr_array(hdul[1].data['DM'].astype(float)) # Read the covariance matrix - if not self.corr_item.low_mem_mode: - if cov_path is not None: - print(f'Reading covariance matrix file {cov_path}\n') - with fits.open(find_file(cov_path)) as cov_hdul: - self._cov_mat = cov_hdul[1].data['CO'] - elif 'CO' in hdul[1].columns.names: - self._cov_mat = hdul[1].data['CO'] + # if not self.corr_item.low_mem_mode: + if cov_path is not None: + print(f'Reading covariance matrix file {cov_path}\n') + with fits.open(find_file(cov_path)) as cov_hdul: + self._cov_mat = cov_hdul[1].data['CO'] + elif 'CO' in hdul[1].columns.names: + self._cov_mat = hdul[1].data['CO'] - if cov_rescale is not None: - self._cov_mat *= cov_rescale + if cov_rescale is not None: + self._cov_mat *= cov_rescale # Get the cosmological parameters if "OMEGAM" in header: @@ -385,7 +395,6 @@ def _read_data(self, data_path, cuts_config, dmat_path=None, cov_path=None, cov_ self.model_mask = self.dist_model_coordinates.get_mask_scale_cuts(cuts_config) # Compute data size - self.data_size = len(self.masked_data_vec) self.full_data_size = len(self.data_vec) # Read the cuts we need to save for plotting @@ -717,6 +726,18 @@ def get_dist_xi_marg_templates(self, factor=1e-8, return_AAT=True): templates = self.corr_item.get_undist_xi_marg_templates() templates = self.distortion_mat.dot(templates) + if self.corr_item.fit_marg_scales: + # Update masks + self.data_mask |= self.data_coordinates.get_mask_marginalization_scales( + self.corr_item.config['cuts'], self.corr_item.marginalize_small_scales) + + self.model_mask |= self.dist_model_coordinates.get_mask_marginalization_scales( + self.corr_item.config['cuts'], self.corr_item.marginalize_small_scales) + + # Recompute masked data vector and size + self._masked_data_vec = None + _ = self.masked_data_vec + if not return_AAT: return templates diff --git a/vega/samplers/polychord.py b/vega/samplers/polychord.py index 85ab5206..20a14e8d 100644 --- a/vega/samplers/polychord.py +++ b/vega/samplers/polychord.py @@ -8,8 +8,8 @@ class Polychord(Sampler): ''' Interface between Vega and the nested sampler PolyChord ''' - def __init__(self, sampler_config, limits, log_lik_func): - super().__init__(sampler_config, limits, log_lik_func) + def __init__(self, sampler_config, limits, log_lik_func, derived_dict=None): + super().__init__(sampler_config, limits, log_lik_func, derived_dict=derived_dict) def get_sampler_settings(self, sampler_config, num_params, num_derived): """Extract polychord settings and create the settings object. @@ -96,8 +96,8 @@ def log_lik(theta): for i, name in enumerate(self.names): params[name] = theta[i] - log_lik = self.log_lik(params) - return log_lik, [] + log_lik, marg_coeff = self.log_lik(params, return_marg_coeff=True) + return log_lik, marg_coeff def prior(hypercube): """ Uniform prior """ diff --git a/vega/samplers/sampler_interface.py b/vega/samplers/sampler_interface.py index 2ce60fd5..a8231be8 100644 --- a/vega/samplers/sampler_interface.py +++ b/vega/samplers/sampler_interface.py @@ -1,5 +1,6 @@ import os.path import sys +import numpy as np from pathlib import Path from mpi4py import MPI @@ -11,11 +12,17 @@ class Sampler: ''' Interface between Vega and the nested sampler PolyChord ''' - def __init__(self, sampler_config, limits, log_lik_func): + def __init__(self, sampler_config, limits, log_lik_func, derived_dict=None): self.limits = limits self.names = list(limits.keys()) self.num_params = len(limits) - self.num_derived = 0 + self.derived_dict = None + if derived_dict is not None: + self.derived_dict = derived_dict + self.num_derived = np.sum([num_marg for num_marg in derived_dict.values()]) + else: + self.num_derived = 0 + self.log_lik = log_lik_func self.getdist_latex = sampler_config.getboolean('getdist_latex', True) @@ -51,6 +58,16 @@ def write_parnames(self, parnames_path): print('Writing parameter names') sys.stdout.flush() latex_names = build_names(list(self.names)) + + if self.derived_dict is not None: + corr_names = sorted(self.derived_dict.keys()) + for corr in corr_names: + num_marg = self.derived_dict[corr] + for i in range(num_marg): + name = f'{corr}_marg_{i}' + latex_name = r'M_{\rm ' + f'{corr}' + '}^{' + f'{i}' + '}' + latex_names[name] = latex_name + with open(parnames_path, 'w') as f: for name, latex in latex_names.items(): if self.getdist_latex: diff --git a/vega/vega_interface.py b/vega/vega_interface.py index 640e8017..e58789dd 100644 --- a/vega/vega_interface.py +++ b/vega/vega_interface.py @@ -160,6 +160,12 @@ def __init__(self, main_path): self.corr_items, self.data, self.mc_config, self.global_cov ) + # Check for analytic marginalization configuration + self.corr_num_marg_modes = {} + if self._has_data: + for name in self.corr_items: + self.corr_num_marg_modes[name] = self.data[name].num_marg_modes + # Check for sampler self.run_sampler = False if 'control' in self.main_config: @@ -224,7 +230,7 @@ def compute_model(self, params=None, run_init=True, direct_pk=None, marg_coeff=N return model_cf - def chi2(self, params=None, direct_pk=None): + def chi2(self, params=None, direct_pk=None, return_marg_coeff=False): """Compute full chi2 for all components. Parameters @@ -277,9 +283,13 @@ def chi2(self, params=None, direct_pk=None): chi2 += self.compute_prior_chi2(params) assert isinstance(chi2, float) - return chi2 + if not return_marg_coeff: + return chi2 + + marg_coeff = self.compute_marg_coeff(model_cf) + return chi2, marg_coeff - def log_lik(self, params=None, direct_pk=None): + def log_lik(self, params=None, direct_pk=None, return_marg_coeff=False): """Compute full log likelihood for all components. Parameters @@ -297,7 +307,10 @@ def log_lik(self, params=None, direct_pk=None): assert self._has_data # Get the full chi2 - chi2 = self.chi2(params, direct_pk) + if return_marg_coeff: + chi2, marg_coeff = self.chi2(params, direct_pk, return_marg_coeff) + else: + chi2 = self.chi2(params, direct_pk) # Compute the normalization for each component log_norm = 0 @@ -320,6 +333,12 @@ def log_lik(self, params=None, direct_pk=None): for prior in self.priors.values(): log_lik += self._gaussian_lik_prior(prior[1]) + if return_marg_coeff: + corr_names = sorted(self.corr_items.keys()) + marg_coeff_list = np.hstack([ + marg_coeff[corr] for corr in corr_names + ]) + return log_lik, marg_coeff_list return log_lik def _get_lcl_prms(self, params=None): @@ -421,6 +440,26 @@ def initialize_monte_carlo(self, scale=None, print_func=print): return mocks + def compute_marg_coeff(self, model_cf): + bestfit_marg_coeff = {} + for name in self.corr_items: + corr_data = self.data[name] + + if self.monte_carlo: + diff = corr_data.masked_mc_mock \ + - model_cf[name][corr_data.model_mask] + else: + diff = corr_data.masked_data_vec \ + - model_cf[name][corr_data.model_mask] + + # Calculate best-fitting values for the marginalized templates. + # This approximation ignores global_cov, hence correlations between + # CFs. Bestfit_model is updated in-place. + if corr_data.marg_diff2coeff_matrix is not None: + bestfit_marg_coeff[name] = corr_data.marg_diff2coeff_matrix.dot(diff) + + return bestfit_marg_coeff + def minimize(self): """Minimize the chi2 over the sampled parameters. """