Source code for tensiometer.mcmc_tension.flow

"""

"""

###############################################################################
# initial imports and set-up:

import os
import time
import gc
from numba import jit
import numpy as np
import getdist.chains as gchains
gchains.print_load_details = False
from getdist import MCSamples, WeightedSamples
import scipy
from scipy.linalg import sqrtm
from scipy.integrate import simps
from scipy.spatial import cKDTree
import scipy.stats
import pickle
from collections.abc import Iterable
from matplotlib import pyplot as plt

from .. import utilities as utils
from .. import gaussian_tension

try:
    import tensorflow as tf
    import tensorflow_probability as tfp
    tfb = tfp.bijectors
    tfd = tfp.distributions
    from tensorflow.keras.models import Model
    from tensorflow.keras.layers import Input
    from tensorflow.keras.callbacks import Callback
    HAS_FLOW = True
except Exception as e:
    print("Could not import tensorflow or tensorflow_probability: ", e)
    Callback = object
    HAS_FLOW = False

try:
    from IPython.display import clear_output, set_matplotlib_formats
except ModuleNotFoundError:
    pass

###############################################################################
# helper class to build a masked-autoregressive flow:


[docs]class SimpleMAF(object): """ A class to implement a simple Masked AutoRegressive Flow (MAF) using the implementation :class:`tfp.bijectors.AutoregressiveNetwork` from from `Tensorflow Probability <https://www.tensorflow.org/probability/>`_. Additionally, this class provides utilities to load/save models, including random permutations. :param num_params: number of parameters, ie the dimension of the space of which the bijector is defined. :type num_params: int :param n_maf: number of MAFs to stack. Defaults to None, in which case it is set to `2*num_params`. :type n_maf: int, optional :param hidden_units: a list of the number of nodes per hidden layers. Defaults to None, in which case it is set to `[num_params*2]*2`. :type hidden_units: list, optional :param permutations: whether to use shuffle dimensions between stacked MAFs, defaults to True. :type permutations: bool, optional :param activation: activation function to use in all layers, defaults to :func:`tf.math.asinh`. :type activation: optional :param kernel_initializer: kernel initializer, defaults to 'glorot_uniform'. :type kernel_initializer: str, optional :param feedback: print the model architecture, defaults to 0. :type feedback: int, optional :reference: George Papamakarios, Theo Pavlakou, Iain Murray (2017). Masked Autoregressive Flow for Density Estimation. `arXiv:1705.07057 <https://arxiv.org/abs/1705.07057>`_ """ def __init__(self, num_params, n_maf=None, hidden_units=None, permutations=True, activation=tf.math.asinh, kernel_initializer='glorot_uniform', feedback=0, **kwargs): if n_maf is None: n_maf = 2*num_params event_shape = (num_params,) if hidden_units is None: hidden_units = [num_params*2]*2 if permutations is None: _permutations = False elif isinstance(permutations, Iterable): assert len(permutations) == n_maf _permutations = permutations elif isinstance(permutations, bool): if permutations: _permutations = [np.random.permutation(num_params) for _ in range(n_maf)] else: _permutations = False self.permutations = _permutations # Build transformed distribution bijectors = [] for i in range(n_maf): if _permutations: bijectors.append(tfb.Permute(_permutations[i].astype(np.int32))) made = tfb.AutoregressiveNetwork(params=2, event_shape=event_shape, hidden_units=hidden_units, activation=activation, kernel_initializer=kernel_initializer, **kwargs) bijectors.append(tfb.MaskedAutoregressiveFlow(shift_and_log_scale_fn=made)) self.bijector = tfb.Chain(bijectors) if feedback > 0: print("Building MAF") print(" - number of MAFs:", n_maf) print(" - activation:", activation) print(" - hidden_units:", hidden_units)
[docs] def save(self, path): """ Save a `SimpleMAF` object. :param path: path of the directory where to save. :type path: str """ checkpoint = tf.train.Checkpoint(bijector=self.bijector) checkpoint.write(path) pickle.dump(self.permutations, open(path+'_permutations.pickle', 'wb'))
[docs] @classmethod def load(cls, num_params, path, **kwargs): """ Load a saved `SimpleMAF` object. The number of parameters and all other keyword arguments (except for `permutations`) must be included as the MAF is first created with random weights and then these weights are restored. :param num_params: number of parameters, ie the dimension of the space of which the bijector is defined. :type num_params: int :param path: path of the directory from which to load. :type path: str :return: a :class:`~.SimpleMAF`. """ permutations = pickle.load(open(path+'_permutations.pickle', 'rb')) maf = SimpleMAF(num_params=num_params, permutations=permutations, **kwargs) checkpoint = tf.train.Checkpoint(bijector=maf.bijector) checkpoint.read(path) return maf
############################################################################### # main class to compute NF-based tension:
[docs]class DiffFlowCallback(Callback): """ A class to compute the normalizing flow estimate of the probability of a parameter shift given an input parameter difference chain. A normalizing flow is trained to approximate the difference distribution and then used to numerically evaluate the probablity of a parameter shift (see REF). To do so, it defines a bijective mapping that is optimized to gaussianize the difference chain samples. This mapping is performed in two steps, using the gaussian approximation as pre-whitening. The notations used in the code are: * `X` designates samples in the original parameter difference space; * `Y` designates samples in the gaussian approximation space, `Y` is obtained by shifting and scaling `X` by its mean and covariance (like a PCA); * `Z` designates samples in the gaussianized space, connected to `Y` with a normalizing flow denoted `Z2Y_bijector`. The user may provide the `Z2Y_bijector` as a :class:`~tfp.bijectors.Bijector` object from `Tensorflow Probability <https://www.tensorflow.org/probability/>`_ or make use of the utility class :class:`~.SimpleMAF` to instantiate a Masked Autoregressive Flow (with `Z2Y_bijector='MAF'`). This class derives from :class:`~tf.keras.callbacks.Callback` from Keras, which allows for visualization during training. The normalizing flows (X->Y->Z) are implemented as :class:`~tfp.bijectors.Bijector` objects and encapsulated in a Keras :class:`~tf.keras.Model`. Here is an example: .. code-block:: python # Initialize the flow and model diff_flow_callback = DiffFlowCallback(diff_chain, Z2Y_bijector='MAF') # Train the model diff_flow_callback.train() # Compute the shift probability and confidence interval p, p_low, p_high = diff_flow_callback.estimate_shift_significance() :param diff_chain: input parameter difference chain. :type diff_chain: :class:`~getdist.mcsamples.MCSamples` :param param_names: parameter names of the parameters to be used in the calculation. By default all running parameters. :type param_names: list, optional :param Z2Y_bijector: either a :class:`~tfp.bijectors.Bijector` object representing the mapping from `Z` to `Y`, or 'MAF' to instantiate a :class:`~.SimpleMAF`, defaults to 'MAF'. :type Z2Y_bijector: optional :param pregauss_bijector: not implemented yet, defaults to None. :type pregauss_bijector: optional :param learning_rate: initial learning rate, defaults to 1e-3. :type learning_rate: float, optional :param feedback: feedback level, defaults to 1. :type feedback: int, optional :param validation_split: fraction of samples to use for the validation sample, defaults to 0.1 :type validation_split: float, optional :param early_stop_nsigma: absolute error on the tension at the zero-shift point to be used as an approximate convergence criterion for early stopping, defaults to 0. :type early_stop_nsigma: float, optional :param early_stop_patience: minimum number of epochs to use when `early_stop_nsigma` is non-zero, defaults to 10. :type early_stop_patience: int, optional :raises NotImplementedError: if `pregauss_bijector` is not None. :reference: George Papamakarios, Theo Pavlakou, Iain Murray (2017). Masked Autoregressive Flow for Density Estimation. `arXiv:1705.07057 <https://arxiv.org/abs/1705.07057>`_ """ def __init__(self, diff_chain, param_names=None, Z2Y_bijector='MAF', pregauss_bijector=None, learning_rate=1e-3, feedback=1, validation_split=0.1, early_stop_nsigma=0., early_stop_patience=10, **kwargs): self.feedback = feedback # Chain self._init_diff_chain(diff_chain, param_names=None, validation_split=validation_split) # Transformed distribution self._init_transf_dist(Z2Y_bijector, learning_rate=learning_rate, **kwargs) if feedback > 0: print("Building flow") print(" - trainable parameters:", self.model.count_params()) # Metrics keys = ["loss", "val_loss", "shift0_chi2", "shift0_pval", "shift0_nsigma", "chi2Z_ks", "chi2Z_ks_p"] self.log = {_k: [] for _k in keys} self.chi2Y = np.sum(self.Y_test**2, axis=1) self.chi2Y_ks, self.chi2Y_ks_p = scipy.stats.kstest(self.chi2Y, 'chi2', args=(self.num_params,)) # Options self.early_stop_nsigma = early_stop_nsigma self.early_stop_patience = early_stop_patience # Pre-gaussianization if pregauss_bijector is not None: # The idea is to introduce yet another step of deterministic gaussianization, eg using the prior CDF # or double prior (convolved with itself, eg a triangular distribution) raise NotImplementedError def _init_diff_chain(self, diff_chain, param_names=None, validation_split=0.1): # initialize param names: if param_names is None: param_names = diff_chain.getParamNames().getRunningNames() else: chain_params = diff_chain.getParamNames().list() if not np.all([name in chain_params for name in param_names]): raise ValueError('Input parameter is not in the diff chain.\n', 'Input parameters ', param_names, '\n' 'Possible parameters', chain_params) # indexes: ind = [diff_chain.index[name] for name in param_names] self.num_params = len(ind) # Gaussian approximation (full chain) mcsamples_gaussian_approx = gaussian_tension.gaussian_approximation(diff_chain, param_names=param_names) self.dist_gaussian_approx = tfd.MultivariateNormalTriL(loc=mcsamples_gaussian_approx.means[0].astype(np.float32), scale_tril=tf.linalg.cholesky(mcsamples_gaussian_approx.covs[0].astype(np.float32))) self.Y2X_bijector = self.dist_gaussian_approx.bijector # Samples # Split training/test n = diff_chain.samples.shape[0] indices = np.random.permutation(n) n_split = int(validation_split*n) test_idx, training_idx = indices[:n_split], indices[n_split:] # Training self.X = diff_chain.samples[training_idx, :][:, ind] self.weights = diff_chain.weights[training_idx] self.weights *= len(self.weights) / np.sum(self.weights) # weights normalized to number of samples self.has_weights = np.any(self.weights != self.weights[0]) self.Y = np.array(self.Y2X_bijector.inverse(self.X)) assert not np.any(np.isnan(self.Y)) self.num_samples = len(self.X) # Test self.X_test = diff_chain.samples[test_idx, :][:, ind] self.Y_test = np.array(self.Y2X_bijector.inverse(self.X_test)) self.weights_test = diff_chain.weights[test_idx] self.weights_test *= len(self.weights_test) / np.sum(self.weights_test) # weights normalized to number of samples # Training sample generator Y_ds = tf.data.Dataset.from_tensor_slices((self.Y.astype(np.float32), # input np.zeros(self.num_samples, dtype=np.float32), # output (dummy zero) self.weights.astype(np.float32),)) # weights Y_ds = Y_ds.prefetch(tf.data.experimental.AUTOTUNE).cache() self.Y_ds = Y_ds.shuffle(self.num_samples, reshuffle_each_iteration=True).repeat() if self.feedback: print("Building training/test samples") if self.has_weights: print(" - {}/{} training/test samples and non-uniform weights.".format(self.num_samples, self.X_test.shape[0])) else: print(" - {}/{} training/test samples and uniform weights.".format(self.num_samples, self.X_test.shape[0])) def _init_transf_dist(self, Z2Y_bijector, learning_rate=1e-4, **kwargs): # Model if Z2Y_bijector == 'MAF': self.MAF = SimpleMAF(self.num_params, feedback=self.feedback, **kwargs) Z2Y_bijector = self.MAF.bijector assert isinstance(Z2Y_bijector, tfp.bijectors.Bijector) # Bijector and transformed distribution self.Z2Y_bijector = Z2Y_bijector self.dist_transformed = tfd.TransformedDistribution(distribution=tfd.MultivariateNormalDiag(np.zeros(self.num_params, dtype=np.float32), np.ones(self.num_params, dtype=np.float32)), bijector=Z2Y_bijector) # Full bijector self.Z2X_bijector = tfb.Chain([self.Y2X_bijector, self.Z2Y_bijector]) # Full distribution self.dist_learned = tfd.TransformedDistribution(distribution=tfd.MultivariateNormalDiag(np.zeros(self.num_params, dtype=np.float32), np.ones(self.num_params, dtype=np.float32)), bijector=self.Z2X_bijector) # samples from std gaussian mapped to X # Construct model x_ = Input(shape=(self.num_params,), dtype=tf.float32) log_prob_ = self.dist_transformed.log_prob(x_) self.model = Model(x_, log_prob_) loss = lambda _, log_prob: -log_prob self.model.compile(optimizer=tf.optimizers.Adam(learning_rate=learning_rate), loss=loss)
[docs] def train(self, epochs=100, batch_size=None, steps_per_epoch=None, callbacks=[], verbose=1, **kwargs): """ Train the normalizing flow model. Internallay, this runs the fit method of the Keras :class:`~tf.keras.Model`, to which `**kwargs are passed`. :param epochs: number of training epochs, defaults to 100. :type epochs: int, optional :param batch_size: number of samples per batch, defaults to None. If None, the training sample is divided into `steps_per_epoch` batches. :type batch_size: int, optional :param steps_per_epoch: number of steps per epoch, defaults to None. If None and `batch_size` is also None, then `steps_per_epoch` is set to 100. :type steps_per_epoch: int, optional :param callbacks: a list of additional Keras callbacks, such as :class:`~tf.keras.callbacks.ReduceLROnPlateau`, defaults to []. :type callbacks: list, optional :param verbose: verbosity level, defaults to 1. :type verbose: int, optional :return: A :class:`~tf.keras.callbacks.History` object. Its `history` attribute is a dictionary of training and validation loss values and metrics values at successive epochs: `"shift0_chi2"` is the squared norm of the zero-shift point in the gaussianized space, with the probability-to-exceed and corresponding tension in `"shift0_pval"` and `"shift0_nsigma"`; `"chi2Z_ks"` and `"chi2Z_ks_p"` contain the :math:`D_n` statistic and probability-to-exceed of the Kolmogorov-Smironov test that squared norms of the transformed samples `Z` are :math:`\\chi^2` distributed (with a number of degrees of freedom equal to the number of parameters). """ # We're trying to loop through the full sample each epoch if batch_size is None: if steps_per_epoch is None: steps_per_epoch = 100 batch_size = int(self.num_samples/steps_per_epoch) else: if steps_per_epoch is None: steps_per_epoch = int(self.num_samples/batch_size) # Run ! hist = self.model.fit(x=self.Y_ds.batch(batch_size), batch_size=batch_size, epochs=epochs, steps_per_epoch=steps_per_epoch, validation_data=(self.Y_test, np.zeros(len(self.Y_test), dtype=np.float32), self.weights_test), verbose=verbose, callbacks=[tf.keras.callbacks.TerminateOnNaN(), self]+callbacks, **kwargs) return hist
[docs] def estimate_shift(self, tol=0.05, max_iter=1000, step=100000): """ Compute the normalizing flow estimate of the probability of a parameter shift given the input parameter difference chain. This is done with a Monte Carlo estimate by comparing the probability density at the zero-shift point to that at samples drawn from the normalizing flow approximation of the distribution. :param tol: absolute tolerance on the shift significance, defaults to 0.05. :type tol: float, optional :param max_iter: maximum number of sampling steps, defaults to 1000. :type max_iter: int, optional :param step: number of samples per step, defaults to 100000. :type step: int, optional :return: probability value and error estimate. """ err = np.inf counter = max_iter _thres = self.dist_learned.log_prob(np.zeros(self.num_params, dtype=np.float32)) _num_filtered = 0 _num_samples = 0 while err > tol and counter >= 0: counter -= 1 _s = self.dist_learned.sample(step) _s_prob = self.dist_learned.log_prob(_s) _t = np.array(_s_prob > _thres) _num_filtered += np.sum(_t) _num_samples += step _P = float(_num_filtered)/float(_num_samples) _low, _upper = utils.clopper_pearson_binomial_trial(float(_num_filtered), float(_num_samples), alpha=0.32) err = np.abs(utils.from_confidence_to_sigma(_upper)-utils.from_confidence_to_sigma(_low)) return _P, _low, _upper
def _compute_shift_proba(self): zero = np.array(self.Z2X_bijector.inverse(np.zeros(self.num_params, dtype=np.float32))) chi2Z0 = np.sum(zero**2) pval = scipy.stats.chi2.cdf(chi2Z0, df=self.num_params) nsigma = utils.from_confidence_to_sigma(pval) return zero, chi2Z0, pval, nsigma def _plot_loss(self, ax, logs={}): self.log["loss"].append(logs.get('loss')) self.log["val_loss"].append(logs.get('val_loss')) if ax is not None: ax.plot(self.log["loss"], label='Training') ax.plot(self.log["val_loss"], label='Testing') ax.set_title("Training Loss") ax.set_xlabel("Epoch #") ax.set_ylabel("Loss") ax.legend() def _plot_shift_proba(self, ax, logs={}): # Compute chi2 at zero shift zero, chi2Z0, pval, nsigma = self._compute_shift_proba() self.log["shift0_chi2"].append(chi2Z0) self.log["shift0_pval"].append(pval) self.log["shift0_nsigma"].append(nsigma) # Plot if ax is not None: ax.plot(self.log["shift0_chi2"]) ax.set_title(r"$\chi^2$ at zero-shift") ax.set_xlabel("Epoch #") ax.set_ylabel(r"$\chi^2$") def _plot_chi2_dist(self, ax, logs={}): # Compute chi2 and make sure some are finite chi2Z = np.sum(np.array(self.Z2Y_bijector.inverse(self.Y_test))**2, axis=1) _s = np.isfinite(chi2Z) assert np.any(_s) chi2Z = chi2Z[_s] # Run KS test try: # Note that scipy.stats.kstest does not handle weights yet so we need to resample. if self.has_weights: chi2Z = np.random.choice(chi2Z, size=len(chi2Z), replace=True, p=self.weights_test[_s]/np.sum(self.weights_test[_s])) chi2Z_ks, chi2Z_ks_p = scipy.stats.kstest(chi2Z, 'chi2', args=(self.num_params,)) except: chi2Z_ks, chi2Z_ks_p = 0., 0. self.log["chi2Z_ks"].append(chi2Z_ks) self.log["chi2Z_ks_p"].append(chi2Z_ks_p) xx = np.linspace(0, self.num_params*4, 1000) bins = np.linspace(0, self.num_params*4, 100) # Plot if ax is not None: ax.plot(xx, scipy.stats.chi2.pdf(xx, df=self.num_params), label='$\\chi^2_{{{}}}$ PDF'.format(self.num_params), c='k', lw=1) ax.hist(self.chi2Y, bins=bins, density=True, histtype='step', weights=self.weights_test, label='Pre-gauss ($D_n$={:.3f})'.format(self.chi2Y_ks)) ax.hist(chi2Z, bins=bins, density=True, histtype='step', weights=self.weights_test[_s], label='Post-gauss ($D_n$={:.3f})'.format(chi2Z_ks)) ax.set_title(r'$\chi^2_{{{}}}$ PDF'.format(self.num_params)) ax.set_xlabel(r'$\chi^2$') ax.legend(fontsize=8) def _plot_chi2_ks_p(self, ax, logs={}): # Plot if ax is not None: ln1 = ax.plot(self.log["chi2Z_ks_p"], label='$p$') ax.set_title(r"KS test ($\chi^2$)") ax.set_xlabel("Epoch #") ax.set_ylabel(r"$p$-value") ax2 = ax.twinx() ln2 = ax2.plot(self.log["chi2Z_ks"], ls='--', label='$D_n$') ax2.set_ylabel('r$D_n$') lns = ln1+ln2 labs = [l.get_label() for l in lns] ax2.legend(lns, labs, loc=1)
[docs] def on_epoch_end(self, epoch, logs={}): """ This method is used by Keras to show progress during training if `feedback` is True. """ if self.feedback: if isinstance(self.feedback, int): if epoch % self.feedback: return clear_output(wait=True) fig, axes = plt.subplots(1, 4, figsize=(16, 3)) else: axes = [None]*4 self._plot_loss(axes[0], logs=logs) self._plot_shift_proba(axes[1], logs=logs) self._plot_chi2_dist(axes[2], logs=logs) self._plot_chi2_ks_p(axes[3], logs=logs) for k in self.log.keys(): logs[k] = self.log[k][-1] if self.early_stop_nsigma > 0.: if len(self.log["shift0_nsigma"]) > self.early_stop_patience and \ np.std(self.log["shift0_nsigma"][-self.early_stop_patience:]) < self.early_stop_nsigma and \ self.log["chi2Z_ks_p"][-1] > 1e-6: self.model.stop_training = True if self.feedback: plt.tight_layout() plt.show() return fig
############################################################################### # helper function to compute tension with default MAF: def flow_parameter_shift(diff_chain, param_names=None, epochs=100, batch_size=None, steps_per_epoch=None, callbacks=[], verbose=1, tol=0.05, max_iter=1000, step=100000, **kwargs): # Callback/model handler diff_flow_callback = DiffFlowCallback(diff_chain, param_names=param_names, **kwargs) # Train model diff_flow_callback.train(epochs=epochs, batch_size=batch_size, steps_per_epoch=steps_per_epoch, callbacks=callbacks, verbose=verbose) # Compute tension return diff_flow_callback.estimate_shift(tol=tol, max_iter=max_iter, step=step)