Skip to content
31 changes: 28 additions & 3 deletions bin/run_vega_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
description='Run Vega in parallel.')

pars.add_argument('config', type=str, help='Config file')
pars.add_argument(
'--init-limit', type=int, default=None,
help=(
'Maximum number of cuncurrent initializations of Vega. '
'Use this to avoid memory issues when running with many MPI threads.'
)
)
args = pars.parse_args()

mpi_comm = MPI.COMM_WORLD
Expand All @@ -25,8 +32,25 @@ def print_func(message):

print_func('Initializing Vega')

# Initialize Vega and get the sampling parameters
vega = VegaInterface(args.config)
if args.init_limit is not None:
mpi_comm.barrier()

node_comm = mpi_comm.Split_type(MPI.COMM_TYPE_SHARED, 0)
local_rank = node_comm.Get_rank()
local_size = node_comm.Get_size()

node_comm.Barrier()
for i in range(local_size // args.init_limit + 1):
if local_rank // args.init_limit == i:
# Initialize Vega in low memory mode
vega = VegaInterface(args.config)
node_comm.Barrier()

mpi_comm.barrier()
else:
# Initialize Vega and get the sampling parameters
vega = VegaInterface(args.config)

sampling_params = vega.sample_params['limits']

# run compute_model once to initialize all the caches
Expand All @@ -52,7 +76,8 @@ def print_func(message):
from vega.samplers.polychord import Polychord

print_func('Running Polychord')
sampler = Polychord(vega.main_config['Polychord'], sampling_params, vega.log_lik)
sampler = Polychord(
vega.main_config['Polychord'], sampling_params, vega.log_lik, vega.corr_num_marg_modes)
sampler.run()

elif vega.sampler == 'PocoMC':
Expand Down
4 changes: 4 additions & 0 deletions vega/build_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ def __init__(self, options={}, overwrite=False):
self.options['marginalize-below-rpmax'] = options.get('marginalize-below-rpmax', 0)
self.options['marginalize-above-rpmin'] = options.get('marginalize-above-rpmin', 0)
self.options['marginalize-all-rmin-cuts'] = options.get('marginalize-all-rmin-cuts', False)
self.options['marginalize-prior-sigma'] = options.get('marginalize-prior-sigma', 10.0)
self.options['fit-marginalized-scales'] = options.get('fit-marginalized-scales', False)

self.options['hcd_model'] = options.get('hcd_model', None)
self.options['fvoigt_model'] = options.get('fvoigt_model', 'exp')
Expand Down Expand Up @@ -340,6 +342,8 @@ def _build_corr_config(self, name, corr_info, git_hash):
config['model']['marginalize-above-rpmin'] = str(self.options['marginalize-above-rpmin'])
config['model']['marginalize-all-rmin-cuts'] = str(
self.options['marginalize-all-rmin-cuts'])
config['model']['marginalize-prior-sigma'] = str(self.options['marginalize-prior-sigma'])
config['model']['fit-marginalized-scales'] = str(self.options['fit-marginalized-scales'])

if 'skip-nl-model-in-peak' in self.options:
config['model']['skip-nl-model-in-peak'] = str(self.options['skip-nl-model-in-peak'])
Expand Down
35 changes: 35 additions & 0 deletions vega/coordinates.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,38 @@ def get_mask_scale_cuts(self, cuts_config, small_scale_mask=False):
mask &= (self.mu_regular_grid > mu_min_cut) & (self.mu_regular_grid < mu_max_cut)

return mask

def get_mask_marginalization_scales(self, cuts_config, marginalization_cuts):
"""Build mask to for bins that are marginalized

Parameters
----------
marginalization_cuts : dict
Dictionary with the small-scale marginalization cuts

Returns
-------
Array
Mask
"""
mask = np.ones_like(self.rp_regular_grid, dtype=bool)

if 'rtmax' in marginalization_cuts:
rtmax = marginalization_cuts['rtmax']
mask &= self.rt_regular_grid < rtmax
if 'rtmin' in marginalization_cuts:
rtmin = marginalization_cuts['rtmin']
mask &= self.rt_regular_grid > rtmin
if 'rpmax' in marginalization_cuts:
rpmax = marginalization_cuts['rpmax']
mask &= np.abs(self.rp_regular_grid) < rpmax
if 'rpmin' in marginalization_cuts:
rpmin = marginalization_cuts['rpmin']
mask &= np.abs(self.rp_regular_grid) > rpmin

if 'all-rmin' in marginalization_cuts:
mask = ~self.get_mask_scale_cuts(
cuts_config, small_scale_mask=True
)

return mask
2 changes: 2 additions & 0 deletions vega/correlation_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def __init__(self, config, model_pk=False):
if marginalize_all:
self.marginalize_small_scales['all-rmin'] = True

self.fit_marg_scales = config['model'].getboolean("fit-marginalized-scales", False)

self.has_metals = False
self.has_bb = False

Expand Down
81 changes: 51 additions & 30 deletions vega/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,29 +94,29 @@ def __init__(self, corr_item):
if corr_item.marginalize_small_scales:
self.marg_templates, self.cov_marg_update = self.get_dist_xi_marg_templates()

if not self.corr_item.low_mem_mode:
print('Updating covariance with marginalization templates.')
ntemps = self.marg_templates.shape[1]
# if not self.corr_item.low_mem_mode:
print('Updating covariance with marginalization templates.')
ntemps = self.marg_templates.shape[1]

# Invert the matrix but do not save it
_inv_masked_cov = self.inv_masked_cov
self._inv_masked_cov = None
# Invert the matrix but do not save it
_inv_masked_cov = self.inv_masked_cov
self._inv_masked_cov = None

self._cov_mat[np.ix_(self.data_mask, self.data_mask)] += self.cov_marg_update
self._cov_mat[np.ix_(self.data_mask, self.data_mask)] += self.cov_marg_update

# Construct solution matrix, G becomes an ndarray
templates_masked = self.marg_templates[self.model_mask, :]
G = templates_masked.T.dot(_inv_masked_cov)
# Construct solution matrix, G becomes an ndarray
templates_masked = self.marg_templates[self.model_mask, :]
G = templates_masked.T.dot(_inv_masked_cov)

S = np.diag(np.full(
ntemps, self.corr_item.marginalize_small_scales_prior_sigma**-2
))
Ainv = np.linalg.inv(templates_masked.T.dot(G.T).T + S)
S = np.diag(np.full(
ntemps, self.corr_item.marginalize_small_scales_prior_sigma**-2
))
Ainv = np.linalg.inv(templates_masked.T.dot(G.T).T + S)

# When multiplied by data - bestfit model, the below matrix will
# give the coefficients for each template. Total marginalized model
# is given by marg_templates.dot(marg_diff2coeff_matrix.dot(diff))
self.marg_diff2coeff_matrix = Ainv.dot(G)
# When multiplied by data - bestfit model, the below matrix will
# give the coefficients for each template. Total marginalized model
# is given by marg_templates.dot(marg_diff2coeff_matrix.dot(diff))
self.marg_diff2coeff_matrix = Ainv.dot(G)

self._cholesky = None
self._scale = 1.
Expand Down Expand Up @@ -167,10 +167,20 @@ def masked_data_vec(self):
Masked data vector (xi[mask])
"""
if self._masked_data_vec is None:
self._masked_data_vec = np.zeros(self.data_mask.sum())
self._masked_data_vec[:] = self.data_vec[self.data_mask]
self._masked_data_vec = self.data_vec[self.data_mask]
return self._masked_data_vec

@property
def data_size(self):
"""Data size property

Returns
-------
int
Data size (number of bins after masking)
"""
return self.masked_data_vec.size

@property
def cov_mat(self):
"""Covariance matrix property
Expand Down Expand Up @@ -321,16 +331,16 @@ def _read_data(self, data_path, cuts_config, dmat_path=None, cov_path=None, cov_
self._distortion_mat = csr_array(hdul[1].data['DM'].astype(float))

# Read the covariance matrix
if not self.corr_item.low_mem_mode:
if cov_path is not None:
print(f'Reading covariance matrix file {cov_path}\n')
with fits.open(find_file(cov_path)) as cov_hdul:
self._cov_mat = cov_hdul[1].data['CO']
elif 'CO' in hdul[1].columns.names:
self._cov_mat = hdul[1].data['CO']
# if not self.corr_item.low_mem_mode:
if cov_path is not None:
print(f'Reading covariance matrix file {cov_path}\n')
with fits.open(find_file(cov_path)) as cov_hdul:
self._cov_mat = cov_hdul[1].data['CO']
elif 'CO' in hdul[1].columns.names:
self._cov_mat = hdul[1].data['CO']

if cov_rescale is not None:
self._cov_mat *= cov_rescale
if cov_rescale is not None:
self._cov_mat *= cov_rescale

# Get the cosmological parameters
if "OMEGAM" in header:
Expand Down Expand Up @@ -385,7 +395,6 @@ def _read_data(self, data_path, cuts_config, dmat_path=None, cov_path=None, cov_
self.model_mask = self.dist_model_coordinates.get_mask_scale_cuts(cuts_config)

# Compute data size
self.data_size = len(self.masked_data_vec)
self.full_data_size = len(self.data_vec)

# Read the cuts we need to save for plotting
Expand Down Expand Up @@ -717,6 +726,18 @@ def get_dist_xi_marg_templates(self, factor=1e-8, return_AAT=True):
templates = self.corr_item.get_undist_xi_marg_templates()
templates = self.distortion_mat.dot(templates)

if self.corr_item.fit_marg_scales:
# Update masks
self.data_mask |= self.data_coordinates.get_mask_marginalization_scales(
self.corr_item.config['cuts'], self.corr_item.marginalize_small_scales)

self.model_mask |= self.dist_model_coordinates.get_mask_marginalization_scales(
self.corr_item.config['cuts'], self.corr_item.marginalize_small_scales)

# Recompute masked data vector and size
self._masked_data_vec = None
_ = self.masked_data_vec

if not return_AAT:
return templates

Expand Down
8 changes: 4 additions & 4 deletions vega/samplers/polychord.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
class Polychord(Sampler):
''' Interface between Vega and the nested sampler PolyChord '''

def __init__(self, sampler_config, limits, log_lik_func):
super().__init__(sampler_config, limits, log_lik_func)
def __init__(self, sampler_config, limits, log_lik_func, derived_dict=None):
super().__init__(sampler_config, limits, log_lik_func, derived_dict=derived_dict)

def get_sampler_settings(self, sampler_config, num_params, num_derived):
"""Extract polychord settings and create the settings object.
Expand Down Expand Up @@ -96,8 +96,8 @@ def log_lik(theta):
for i, name in enumerate(self.names):
params[name] = theta[i]

log_lik = self.log_lik(params)
return log_lik, []
log_lik, marg_coeff = self.log_lik(params, return_marg_coeff=True)
return log_lik, marg_coeff

def prior(hypercube):
""" Uniform prior """
Expand Down
21 changes: 19 additions & 2 deletions vega/samplers/sampler_interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os.path
import sys
import numpy as np
from pathlib import Path

from mpi4py import MPI
Expand All @@ -11,11 +12,17 @@
class Sampler:
''' Interface between Vega and the nested sampler PolyChord '''

def __init__(self, sampler_config, limits, log_lik_func):
def __init__(self, sampler_config, limits, log_lik_func, derived_dict=None):
self.limits = limits
self.names = list(limits.keys())
self.num_params = len(limits)
self.num_derived = 0
self.derived_dict = None
if derived_dict is not None:
self.derived_dict = derived_dict
self.num_derived = np.sum([num_marg for num_marg in derived_dict.values()])
else:
self.num_derived = 0

self.log_lik = log_lik_func

self.getdist_latex = sampler_config.getboolean('getdist_latex', True)
Expand Down Expand Up @@ -51,6 +58,16 @@ def write_parnames(self, parnames_path):
print('Writing parameter names')
sys.stdout.flush()
latex_names = build_names(list(self.names))

if self.derived_dict is not None:
corr_names = sorted(self.derived_dict.keys())
for corr in corr_names:
num_marg = self.derived_dict[corr]
for i in range(num_marg):
name = f'{corr}_marg_{i}'
latex_name = r'M_{\rm ' + f'{corr}' + '}^{' + f'{i}' + '}'
latex_names[name] = latex_name

with open(parnames_path, 'w') as f:
for name, latex in latex_names.items():
if self.getdist_latex:
Expand Down
47 changes: 43 additions & 4 deletions vega/vega_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ def __init__(self, main_path):
self.corr_items, self.data, self.mc_config, self.global_cov
)

# Check for analytic marginalization configuration
self.corr_num_marg_modes = {}
if self._has_data:
for name in self.corr_items:
self.corr_num_marg_modes[name] = self.data[name].num_marg_modes

# Check for sampler
self.run_sampler = False
if 'control' in self.main_config:
Expand Down Expand Up @@ -224,7 +230,7 @@ def compute_model(self, params=None, run_init=True, direct_pk=None, marg_coeff=N

return model_cf

def chi2(self, params=None, direct_pk=None):
def chi2(self, params=None, direct_pk=None, return_marg_coeff=False):
"""Compute full chi2 for all components.

Parameters
Expand Down Expand Up @@ -277,9 +283,13 @@ def chi2(self, params=None, direct_pk=None):
chi2 += self.compute_prior_chi2(params)

assert isinstance(chi2, float)
return chi2
if not return_marg_coeff:
return chi2

marg_coeff = self.compute_marg_coeff(model_cf)
return chi2, marg_coeff

def log_lik(self, params=None, direct_pk=None):
def log_lik(self, params=None, direct_pk=None, return_marg_coeff=False):
"""Compute full log likelihood for all components.

Parameters
Expand All @@ -297,7 +307,10 @@ def log_lik(self, params=None, direct_pk=None):
assert self._has_data

# Get the full chi2
chi2 = self.chi2(params, direct_pk)
if return_marg_coeff:
chi2, marg_coeff = self.chi2(params, direct_pk, return_marg_coeff)
else:
chi2 = self.chi2(params, direct_pk)

# Compute the normalization for each component
log_norm = 0
Expand All @@ -320,6 +333,12 @@ def log_lik(self, params=None, direct_pk=None):
for prior in self.priors.values():
log_lik += self._gaussian_lik_prior(prior[1])

if return_marg_coeff:
corr_names = sorted(self.corr_items.keys())
marg_coeff_list = np.hstack([
marg_coeff[corr] for corr in corr_names
])
return log_lik, marg_coeff_list
return log_lik

def _get_lcl_prms(self, params=None):
Expand Down Expand Up @@ -421,6 +440,26 @@ def initialize_monte_carlo(self, scale=None, print_func=print):

return mocks

def compute_marg_coeff(self, model_cf):
bestfit_marg_coeff = {}
for name in self.corr_items:
corr_data = self.data[name]

if self.monte_carlo:
diff = corr_data.masked_mc_mock \
- model_cf[name][corr_data.model_mask]
else:
diff = corr_data.masked_data_vec \
- model_cf[name][corr_data.model_mask]

# Calculate best-fitting values for the marginalized templates.
# This approximation ignores global_cov, hence correlations between
# CFs. Bestfit_model is updated in-place.
if corr_data.marg_diff2coeff_matrix is not None:
bestfit_marg_coeff[name] = corr_data.marg_diff2coeff_matrix.dot(diff)

return bestfit_marg_coeff

def minimize(self):
"""Minimize the chi2 over the sampled parameters.
"""
Expand Down