"""
File with tools to interface Cosmosis chains with GetDist.
"""
"""
For testing purposes:
chain = loadMCSamples('./../test_chains/1p2_SN1_zcut0p3_abs')
chain_root = './test_chains/DES_multinest_cosmosis'
chain_root = './chains_lcdm/chain_1x2pt_lcdm'
chain_min_root = './chains_lcdm/chain_1x2pt_lcdm_MAP.maxlike'
param_label_dict=None
param_name_dict=None
settings = None
# test that everything is working:
test = MCSamplesFromCosmosis(chain_root, chain_min_root)
print(test.bestfit)
"""
import os
import numpy as np
import functools
import types
from getdist.chains import loadNumpyTxt
from getdist.mcsamples import MCSamples
from getdist.types import BestFit
from getdist.paramnames import ParamInfo
[docs]def MCSamplesFromCosmosis(chain_root, chain_min_root=None,
param_name_dict=None, param_label_dict=None,
name_tag=None, settings=None):
"""
Function to import Cosmosis chains in GetDist.
:param chain_root: the name and path to the chain or the path to the
folder that contains it.
:param chain_min_root: (optional) name of the file containing the
explicit best fit.
:param param_name_dict: (optional) a dictionary with the mapping between
cosmosis names and reasonable parameter names.
:param param_label_dict: (optional) dictionary with the mapping between
parameter names and parameter labels, since Cosmosis does not save
the labels in the chain.
:param name_tag: (optional) a string with the name tag for the chain.
:param settings: (optional) dictionary of analysis settings to override
getdist defaults
:return: The :class:`~getdist.mcsamples.MCSamples` instance
"""
# decide if the user passed a folder or a chain:
if os.path.isfile(chain_root+'.txt'):
chain_file = chain_root+'.txt'
elif os.path.isdir(chain_root):
# look for the chain file:
temp = list(filter(lambda x: 'chain.txt' in x, os.listdir(chain_root)))
if len(temp) == 0:
raise ValueError('No chain file found in folder', chain_root)
chain_file = chain_root+'/'+temp[0]
else:
raise ValueError('Input chain root is not a folder nor a file.')
# get all the commented lines in the chain file:
info = get_cosmosis_info(chain_file)
# get the parameter names:
param_names = get_param_names(info)
# get the parameter labels from the user provided dictionary:
param_labels = get_param_labels(info, param_names, param_label_dict)
# get the sampler:
sampler = get_sampler_type(info)
# get the name tag:
if name_tag is None:
name_tag = get_name_tag(info)
# get the samples weights and likelihood:
chain = loadNumpyTxt(chain_file, skiprows=0)
# parse the chain depending on the sampler that produced it:
if sampler == 'nested':
# get number of samples to use:
nsamples = int(list(filter(lambda x: 'nsample=' in x, info))
[0].replace(' ', '').split('=')[1])
# get the chain:
chain = chain[-nsamples:]
# get all quantities:
indexes = [i for i in range(len(param_names))
if i != param_names.index('weight')
and i != param_names.index('post')]
samples = chain[:, indexes]
weights = chain[:, param_names.index('weight')]
loglike = chain[:, param_names.index('post')]
# delete the weights and likelihood from names:
if param_labels is not None:
param_labels.pop(param_names.index('weight'))
param_labels.pop(param_names.index('post'))
param_names.pop(param_names.index('weight'))
param_names.pop(param_names.index('post'))
elif sampler == 'mcmc':
# get all quantities:
indexes = [i for i in range(len(param_names))
if i != param_names.index('post')]
samples = chain[:, indexes]
loglike = chain[:, param_names.index('post')]
# Cosmosis does not weight samples:
samples, idx, weights = np.unique(samples, return_index=True, return_counts=True, axis=0)
loglike = loglike[idx]
# delete the weights and likelihood from names:
if param_labels is not None:
param_labels.pop(param_names.index('post'))
param_names.pop(param_names.index('post'))
elif sampler == 'uncorrelated':
# get all quantities:
indexes = [i for i in range(len(param_names))
if i != param_names.index('post')]
samples = chain[:, indexes]
loglike = chain[:, param_names.index('post')]
weights = None
# delete the weights and likelihood from names:
if param_labels is not None:
param_labels.pop(param_names.index('post'))
param_names.pop(param_names.index('post'))
else:
raise ValueError('Unknown sampler')
# get the ranges:
ranges = get_ranges(info, param_names)
# transform param names:
if param_name_dict is not None:
for i, name in enumerate(param_names):
if name in param_name_dict.keys():
param_names[i] = param_name_dict[name]
if name in ranges.keys():
ranges[param_name_dict[name]] = ranges.pop(name)
#for i, name in enumerate(param_names):
# if name in param_name_dict.keys():
# initialize the samples:
mc_samples = MCSamples(samples=samples, weights=weights,
loglikes=-2.*loglike,
sampler=sampler, names=param_names,
labels=param_labels, ranges=ranges,
ignore_rows=0, name_tag=name_tag,
settings=settings)
# set running parameters:
for name in mc_samples.getParamNames().parsWithNames(
mc_samples.getParamNames().list()):
if name.name in ranges.keys():
name.isDerived = False
else:
name.isDerived = True
# polish the samples removing nans:
mc_samples = polish_samples(mc_samples)
# get the best fit:
if chain_min_root is not None:
# since getdist does not cache the best fit we have to override the
# method in this brute way:
funcType = types.MethodType
mc_samples.getBestFit = funcType(functools.partial(get_maximum_likelihood,
chain_min_root=chain_min_root,
param_name_dict=param_name_dict,
param_label_dict=param_label_dict),
mc_samples)
# update statistics:
mc_samples.updateBaseStatistics()
#
return mc_samples
[docs]def get_cosmosis_info(file):
"""
Parse a file to get all the information about a Cosmosis run.
:param file: path and name of the file to parse.
:return: a list of strings with the cosmosis parameters for the run.
"""
with open(file, 'r') as _file:
info = []
for ln in _file:
if ln.startswith('#'):
info.append(ln[1:])
# polish the info strings:
info = list(filter(None, [line.replace('#', '').strip() for line in info]))
#
return info
[docs]def get_param_names(info):
"""
Get the parameter names for a Cosmosis run.
:param info: a list of strings with the cosmosis parameters for the run.
:return: a list of strings with the parameter names.
"""
return info[0].split('\t')
[docs]def get_param_labels(info, param_names, param_label_dict):
"""
Get the labels for the parameter names of a Cosmosis run.
:param info: a list of strings with the cosmosis parameters for the run.
:param param_names: a list of strings with the parameter names.
:param param_label_dict: a dictionary with the mapping between names and
labels.
:return: a list of strings with the parameter labels.
"""
if param_label_dict is not None:
param_labels = []
for name in param_names:
if name in param_label_dict.keys():
param_labels.append(param_label_dict[name])
else:
param_labels.append(name)
else:
param_labels = None
#
return param_labels
[docs]def get_sampler_type(info):
"""
Get the sampler type for a chain given the a list of strings containing the
cosmosis run parameter informations.
To process the sampler type the function defines internally a dictionary
with the mapping from sampler name to sampler type.
:param info: a list of strings with the cosmosis parameters for the run.
:return: a string with the sampler type if any, otherwise returns none.
"""
# define the dictionary with the mapping:
sampler_dict = {
'polychord': 'nested',
'multinest': 'nested',
'apriori': 'uncorrelated',
'emcee': 'mcmc',
'pmaxlike': 'max_like',
'maxlike': 'max_like'
}
# find the sampler in the parameters:
temp = list(filter(lambda x: 'sampler' in x, info))
if len(temp) > 0:
sampler = temp[0].replace(' ', '').split('=')[1].lower()
if sampler in sampler_dict.keys():
sampler = sampler_dict[sampler]
else:
raise ValueError('Unknown input sampler')
else:
sampler = None
#
return sampler
[docs]def get_name_tag(info):
"""
Get the name tag for a chain given the a list of strings containing the
cosmosis run parameter informations.
:param info: a list of strings with the cosmosis parameters for the run.
:return: a string with the name tag if any, otherwise returns none.
"""
temp = list(filter(lambda x: 'run_name' in x, info))
if len(temp) > 0:
name_tag = temp[0].replace(' ', '').split('=')[1].lower()
name_tag = name_tag.rstrip().lstrip()
else:
name_tag = None
#
return name_tag
[docs]def get_ranges(info, param_names):
"""
Get the ranges for the parameters from the info file.
:param info: a list of strings with the cosmosis parameters for the run.
:param param_names: a list with the parameter names.
:return: a dictionary with the parameter ranges.
"""
ranges = {}
for name in param_names:
try:
# look for the section:
section = '['+name.split('--')[0]+']'
pname = name.split('--')[1]
section_index = info.index(section)
temp = list(filter(lambda x:
pname in x[1] and x[0] > section_index,
enumerate(info)))[0][1]
_min = float(list(filter(None, temp.split()))[2])
_max = float(list(filter(None, temp.split()))[4])
ranges[name] = [_min, _max]
except:
pass
#
return ranges
[docs]def polish_samples(chain):
"""
Remove fixed parameters and samples with some parameter that is Nan
from the input chain.
:param chain: :class:`~getdist.mcsamples.MCSamples` the input chain.
:return: :class:`~getdist.mcsamples.MCSamples` the polished chain.
"""
# delete fixed parameters:
chain.deleteFixedParams()
# we need to delete nan columns:
fixed = []
values = []
for i in range(chain.samples.shape[1]):
if np.all(np.isnan(chain.samples[:, i])):
fixed.append(i)
values.append(chain.samples[0, i])
chain.changeSamples(np.delete(chain.samples, fixed, 1))
if hasattr(chain, 'ranges'):
for ix, value in zip(fixed, values):
chain.ranges.setFixed(chain.paramNames.names[ix].name,
value)
chain.paramNames.deleteIndices(fixed)
chain._getParamIndices()
# now we need to delete nan samples:
nan_filter = np.all(np.logical_not(np.isnan(chain.samples)), axis=1)
chain.filter(where=nan_filter)
#
return chain
[docs]def get_maximum_likelihood(dummy, max_posterior, chain_min_root,
param_name_dict, param_label_dict):
"""
Import the maximum likelihood file for a Cosmosis run, if present.
:param dummy: dummy argument for interfacing, not used in practice
:param chain_min_root: name of the minimum file or the folder that contains
it.
:param param_name_dict: a dictionary with the mapping between
cosmosis names and reasonable names.
:param param_label_dict: dictionary with the mapping between the parameter
names and the labels.
:return: :class:`~getdist.types.BestFit` the best fit object.
"""
# decide if the user passed a folder or a chain:
if os.path.isfile(chain_min_root+'.txt'):
minimum_file = chain_min_root+'.txt'
elif os.path.isdir(chain_min_root):
# look for the chain file:
temp = list(filter(lambda x: 'chain_pmaxlike.txt' in x,
os.listdir(chain_min_root)))
if len(temp) == 0:
raise ValueError('No minimum file found in folder', chain_min_root)
minimum_file = chain_min_root+'/'+temp[0]
else:
raise ValueError('Input chain root is not a folder nor a file.')
# get the info:
info = get_cosmosis_info(minimum_file)
# check the sampler:
sampler = get_sampler_type(info)
if sampler != 'max_like':
raise ValueError('Minimum file appears to have a strange sampler:',
sampler)
# get the parameter names:
param_names = get_param_names(info)
# get the parameter labels from the user provided dictionary:
param_labels = get_param_labels(info, param_names, param_label_dict)
# best fit:
best_fit = BestFit()
# set parameter names:
if param_name_dict is not None:
best_fit.names = []
for name in param_names:
if name in param_name_dict.keys():
best_fit.names.append(ParamInfo(param_name_dict[name]))
else:
best_fit.names.append(ParamInfo(name))
else:
best_fit.names = [ParamInfo(name) for name in param_names]
if param_labels is not None:
for name, label in zip(best_fit.names, param_labels):
name.label = label
# get the numbers:
best_fit_params = loadNumpyTxt(minimum_file, skiprows=0)[0]
# look for weight:
if 'weigth' in param_names:
best_fit.weight = best_fit_params[param_names.index('weigth')]
else:
best_fit.weight = 1.0
# look for log like:
if 'post' in param_names:
best_fit.logLike = -best_fit_params[param_names.index('post')]
else:
raise ValueError('Best fit file does not contain the best fit like')
# correct log likes in getdist format:
if 'post' in param_names:
best_fit_params[param_names.index('post')] = \
-best_fit_params[param_names.index('post')]
if 'prior' in param_names:
best_fit_params[param_names.index('prior')] = \
-best_fit_params[param_names.index('prior')]
if 'like' in param_names:
best_fit_params[param_names.index('like')] = \
-best_fit_params[param_names.index('like')]
# get the data chi squareds:
best_fit.chiSquareds = []
# get the ranges to get the running parameters:
ranges = get_ranges(info, param_names)
# now initialize all parameters:
for ind, param in enumerate(best_fit.names):
param.isDerived = param.name not in ranges.keys()
param.number = ind
param.best_fit = best_fit_params[ind]
#
return best_fit