Source code for beetroots.inversion.results.results_mcmc
import copy
import os
from typing import Dict, List, Optional, Union
import numpy as np
from beetroots.inversion.plots.plots_estimator import PlotsEstimator
from beetroots.inversion.results.abstract_results import ResultsExtractor
from beetroots.inversion.results.utils.bayes_pval_plots import ResultsBayesPvalues
from beetroots.inversion.results.utils.clppd import ResultsCLPPD
from beetroots.inversion.results.utils.ess_plots import ResultsESS
from beetroots.inversion.results.utils.kernel import ResultsKernels
from beetroots.inversion.results.utils.lowest_obj_estimator import (
ResultsLowestObjective,
)
from beetroots.inversion.results.utils.mc import ResultsMC
from beetroots.inversion.results.utils.mmse_ci import ResultsMMSEandCI
from beetroots.inversion.results.utils.objective import ResultsObjective
from beetroots.inversion.results.utils.regularization_weights import (
ResultsRegularizationWeights,
)
from beetroots.inversion.results.utils.valid_mc import ResultsValidMC
from beetroots.inversion.results.utils.y_f_Theta import (
ResultsDistributionComparisonYandFTheta,
)
from beetroots.modelling.posterior import Posterior
from beetroots.space_transform.abstract_transform import Scaler
[docs]
class ResultsExtractorMCMC(ResultsExtractor):
r"""extractor of inference results for the Markov chain data that was saved."""
__slots__ = (
"path_data_csv_out_mcmc",
"path_img",
"path_raw",
"N_MCMC",
"T_MC",
"T_BI",
"freq_save",
"max_workers",
)
def __init__(
self,
path_data_csv_out_mcmc: str,
path_img: str,
path_raw: str,
N_MCMC: int,
T_MC: int,
T_BI: int,
freq_save: int,
max_workers: int,
):
r"""
Parameters
----------
path_data_csv_out_mcmc : str
path to the csv file in which the performance of estimators is to be saved
path_img : str
path to the folder in which images are to be saved
path_raw : str
path to the raw ``.hdf5`` files
N_MCMC : int
number of Markov chains to run per posterior distribution
T_MC : int
total size of each Markov chain
T_BI : int
duration of the `Burn-in` phase
freq_save : int
frequency of saved iterates, 1 means that all iterates were saved (used to show correct Markov chain sizes in chain plots)
max_workers : int
maximum number of workers that can be used for results extraction
"""
self.path_data_csv_out_mcmc = path_data_csv_out_mcmc
r"""str: path to the csv file in which the performance of estimators is to be saved"""
self.path_img = path_img
r"""str: path to the folder in which images are to be saved"""
self.path_raw = path_raw
r"""str: path to the raw ``.hdf5`` files"""
self.N_MCMC = N_MCMC
r"""int: number of Markov chains to run per posterior distribution"""
self.T_MC = T_MC
r"""int: total size of each Markov chain"""
self.T_BI = T_BI
r"""int: duration of the `Burn-in` phase"""
self.freq_save = freq_save
r"""int: frequency of saved iterates, 1 means that all iterates were saved (used to show correct Markov chain sizes in chain plots)"""
self.max_workers = max_workers
r"""int: maximum number of workers that can be used for results extraction"""
[docs]
def main(
self,
posterior: Posterior,
model_name: str,
scaler: Scaler,
list_names: List[str],
list_idx_sampling: List[int],
list_fixed_values: Union[List[Optional[float]], np.ndarray],
#
plot_1D_chains: bool,
plot_2D_chains: bool,
plot_ESS: bool,
plot_comparisons_yspace: bool,
#
estimator_plot: Optional[PlotsEstimator] = None,
analyze_regularization_weight: bool = False,
Theta_true_scaled: Optional[np.ndarray] = None,
list_lines_fit: Optional[List[str]] = None,
#
list_lines_valid: List[str] = [],
y_valid: Optional[np.ndarray] = None,
sigma_a_valid: Optional[np.ndarray] = None,
omega_valid: Optional[np.ndarray] = None,
sigma_m_valid: Optional[np.ndarray] = None,
point_challenger: Dict = {},
list_CI: List[int] = [68, 90, 95, 99],
):
r"""performs the data extraction, in this order:
* step 1 : clppd (see ``beetroots.inversion.results.utils.clppd``)
* step 2 : kernel analysis (see ``beetroots.inversion.results.utils.kernel``)
* step 3 : objective evolution (see ``beetroots.inversion.results.utils.objective``) (the objective is the negative log posterior pdf)
* step 4 : MAP estimator from samples (see ``beetroots.inversion.results.utils.lowest_obj_estimator``)
* step 5 : deal with whole Markov chain for MMSE and histograms, in a pixel-wise approach to avoid overloading the memory (see ``beetroots.inversion.results.utils.mc``)
* step 6 : save global MMSE performance (see ``beetroots.inversion.results.utils.mmse_ci``)
* step 7 (if map) : plot maps of ESS (see ``beetroots.inversion.results.utils.ess_plots``)
* step 8 : model checking with Bayesian p-value computation (see ``beetroots.inversion.results.utils.bayes_pval_plots``)
* step 9 (if the true value is known): plot how many components have their true value between min and max of Markov chain (see ``beetroots.inversion.results.utils.valid_mc``)
* step 10 : plot comparison of distributions of :math:`y_n` and :math:`f(\theta_n)` for all :math:`n \in [\![1, N]\!]` (see ``beetroots.inversion.results.utils.y_f_Theta``)
* step 11 (if ``analyze_regularization_weight``) : analysis of the regularization weight :math:`\tau` automatic tuning (see ``beetroots.inversion.results.utils.regularization_weights``)
Parameters
----------
posterior : Posterior
probability distribution used to generate the Markov chain(s)
model_name : str
name of the model, used to identify the posterior distribution
scaler : Scaler
contains the transformation of the Theta values from their natural space to their scaled space (in which the sampling happens) and its inverse
list_names : List[str]
names of the D physical parameters to appear in plots (for instance, $P_{th}$ for thermal pressure)
list_idx_sampling : List[int]
indices of the physical parameters that were sampled (the other ones were fixed)
list_fixed_values : List[float]
list of used values for the parameters fixed during the sampling
plot_1D_chains : bool
wether to plot each of the :math:`N \times D` 1D chains and histograms for each physical parameter :math:`\theta_{nd}`
plot_2D_chains : bool
wether to plot each of the :math:`N \times D \times (D-1)` 2D chains and histograms for pairs of parameters :math:`(\theta_{n d_1}, \theta_{n d_2})` with :math:`1 \leq d_1 < d_2 \leq D`
plot_ESS : bool
wether to plot the Effective sample size maps (only used when :math:`N > 1`)
plot_comparisons_yspace : bool
whether to plot comparisons of the distribution on :math:`y_n` and :math:`\mathcal{A}(f(\theta_n))` (with :math:`\mathcal{A}` the noise model). Offers a visualization to understand the model checking based on the Bayesian p-value :cite:p:`paludProblemesInversesTest2023a`
estimator_plot : Optional[PlotsEstimator], optional
object used to plot the estimator figures, by default None
analyze_regularization_weight : bool, optional
wether to analyze the evaluation of the regularization weight :math:`\tau`, by default False
Theta_true_scaled : Optional[np.ndarray], optional
true value for the inferred physical parameter :math:`\Theta` (only possible for toy cases), by default None
list_lines_fit : Optional[List[str]], optional
names of the observables used for the inversion, by default None
list_lines_valid : List[str], optional
names of the available observables not used for the inversion (can be used for model checking), by default []
y_valid : Optional[np.ndarray], optional
observation values for the observables not used for inversion. If provided, must have shape (N, L_valid). by default None
sigma_a_valid : Optional[np.ndarray], optional
additive noise standard deviation values for the observables not used for inversion. If provided, must have shape (N, L_valid). by default None
omega_valid : Optional[np.ndarray], optional
censor threshold values for the observables not used for inversion. If provided, must have shape (N, L_valid)., by default None
sigma_m_valid : Optional[np.ndarray], optional
multiplicative noise standard deviation values for the observables not used for inversion. If provided, must have shape (N, L_valid)., by default None
point_challenger : Dict, optional
other estimator that can come from the literature, provided to be compared with the inference results, by default {}
list_CI : List[int], optional
list of credibility intervals to evaluate (in percent), by default [68, 90, 95, 99]
"""
list_mcmc_folders = [
f"{x[0]}/mc_chains.hdf5"
for x in os.walk(f"{self.path_raw}/{model_name}")
if "mcmc_" in x[0]
]
list_mcmc_folders.sort()
chain_type = "mcmc"
N = posterior.N * 1
D_sampling = posterior.D * 1
D = len(list_fixed_values)
L = posterior.L * 1 # len(list_lines_fit)
assert D >= D_sampling, f"should have D={D} >= D_sampling={D_sampling}"
print(f"N = {N}, L (fit) = {L}, D_sampling = {D_sampling}, D = {D}")
if estimator_plot is not None:
map_shaper = copy.copy(estimator_plot.map_shaper)
else:
map_shaper = None
list_fixed_values_scaled = list(
posterior.likelihood.forward_map.arr_fixed_values
)
list_fixed_values_scaled = [
v if v != 0 else None for v in list_fixed_values_scaled
]
# clppd
ResultsCLPPD(
model_name=model_name,
chain_type=chain_type,
path_img=self.path_img,
path_data_csv_out=self.path_data_csv_out_mcmc,
N_MCMC=self.N_MCMC,
N=N,
L=L,
).main(list_mcmc_folders, map_shaper)
# kernel analysis
ResultsKernels(
model_name,
chain_type,
self.path_img * 1,
self.N_MCMC * 1,
self.T_MC * 1,
self.freq_save * 1,
).main(list_mcmc_folders)
# objective evolution
if Theta_true_scaled is not None:
forward_map_evals = posterior.likelihood.evaluate_all_forward_map(
Theta_true_scaled,
compute_derivatives=False,
compute_derivatives_2nd_order=False,
)
nll_utils = posterior.likelihood.evaluate_all_nll_utils(
forward_map_evals,
compute_derivatives=False,
compute_derivatives_2nd_order=False,
)
objective_true = posterior.neglog_pdf(
Theta_true_scaled,
idx_pix=np.arange(N),
forward_map_evals=forward_map_evals,
nll_utils=nll_utils,
chromatic_gibbs=False,
)
assert isinstance(objective_true, float)
else:
objective_true = np.nan
idx_lowest_obj, lowest_obj = ResultsObjective(
model_name,
chain_type,
self.path_img * 1,
self.N_MCMC * 1,
self.T_MC * 1,
self.T_BI * 1,
self.freq_save * 1,
N=N,
D=D,
L=L,
).main(list_mcmc_folders, objective_true)
# MAP estimator from samples
if Theta_true_scaled is not None:
Theta_true_scaled_full = np.zeros((N, D))
Theta_true_scaled_full[:, list_idx_sampling] += Theta_true_scaled
for d in range(D):
if list_fixed_values_scaled[d] is not None:
val_d = copy.copy(list_fixed_values_scaled[d])
assert isinstance(val_d, float)
Theta_true_scaled_full[:, d] += val_d
else:
Theta_true_scaled_full = None
ResultsLowestObjective(
model_name,
chain_type,
self.path_img,
self.path_data_csv_out_mcmc,
self.N_MCMC,
self.T_MC,
self.freq_save,
).main(
list_mcmc_folders,
lowest_obj,
idx_lowest_obj,
scaler,
Theta_true_scaled_full, # (N, D_sampling)
list_idx_sampling, # (D_sampling,)
list_fixed_values, # (D,)
estimator_plot,
)
# deal with whole MC for MMSE and histograms
if estimator_plot is not None:
lower_bounds_lin = estimator_plot.lower_bounds_lin * 1
upper_bounds_lin = estimator_plot.upper_bounds_lin * 1
else:
assert posterior.prior_indicator is not None
lower_bounds = posterior.prior_indicator.lower_bounds_full.reshape(
(1, D),
)
upper_bounds = posterior.prior_indicator.upper_bounds_full.reshape(
(1, D),
)
lower_bounds_lin = scaler.from_scaled_to_lin(lower_bounds)
lower_bounds_lin = lower_bounds_lin.flatten()
upper_bounds_lin = scaler.from_scaled_to_lin(upper_bounds)
upper_bounds_lin = upper_bounds_lin.flatten()
ResultsMC(
model_name,
chain_type,
self.path_img,
self.path_data_csv_out_mcmc,
self.max_workers,
self.N_MCMC,
self.T_MC,
self.T_BI,
self.freq_save,
N,
list_idx_sampling, # (D_sampling,)
list_fixed_values_scaled, # (D,)
lower_bounds_lin,
upper_bounds_lin,
list_names,
).main(
scaler,
Theta_true_scaled_full,
list_mcmc_folders,
plot_ESS,
plot_1D_chains,
plot_2D_chains,
plot_comparisons_yspace,
point_challenger=point_challenger,
list_CI=list_CI,
)
# save global MMSE performance
# (to do now, once the MMSE if computed for all pixels)
_ = ResultsMMSEandCI(
model_name,
self.path_img,
self.path_data_csv_out_mcmc,
N,
D,
).main(
posterior,
scaler,
estimator_plot,
Theta_true_scaled_full,
list_idx_sampling,
list_fixed_values,
list_CI,
)
# plot maps of ESS
if plot_ESS and estimator_plot is not None:
ResultsESS(
model_name,
self.path_img,
self.path_data_csv_out_mcmc,
N,
D_sampling,
).main(
map_shaper,
list_names,
list_idx_sampling,
)
ResultsBayesPvalues(
model_name,
chain_type,
self.path_img,
self.path_data_csv_out_mcmc,
self.N_MCMC,
N,
D_sampling,
plot_ESS,
).main(
list_idx_sampling=list_idx_sampling,
map_shaper=map_shaper if N > 1 else None,
)
# plot how many components have their true value
# between min and max of MC
if Theta_true_scaled is not None:
ResultsValidMC(
model_name=model_name,
path_img=self.path_img,
path_data_csv_out_mcmc=self.path_data_csv_out_mcmc,
N_MCMC=self.N_MCMC,
T_MC=self.T_MC,
T_BI=self.T_BI,
freq_save=self.freq_save,
N=N,
D_sampling=D_sampling,
).main(list_names, list_idx_sampling)
# plot comparison of distributions of y and f(\theta)
if plot_comparisons_yspace and list_lines_fit is not None:
comparison_y_f_Theta = ResultsDistributionComparisonYandFTheta(
model_name,
self.path_img,
self.path_data_csv_out_mcmc,
self.N_MCMC,
self.T_MC,
self.T_BI,
self.freq_save,
N,
D,
list_idx_sampling,
self.max_workers,
)
if hasattr(posterior.likelihood, "sigma_a"):
sigma_a = posterior.likelihood.sigma_a
elif hasattr(posterior.likelihood, "sigma"):
sigma_a = posterior.likelihood.sigma
else:
sigma_a = np.zeros_like(posterior.likelihood.y)
if hasattr(posterior.likelihood, "sigma_m"):
sigma_m = posterior.likelihood.sigma_m
else:
sigma_m = np.zeros_like(posterior.likelihood.y)
comparison_y_f_Theta.main(
list_mcmc_folders,
scaler,
posterior.likelihood.forward_map,
posterior.likelihood.y,
posterior.likelihood.omega,
sigma_a,
sigma_m,
list_lines_fit,
"fit",
point_challenger=point_challenger,
)
# if len(list_lines_valid) > 0:
# comparison_y_f_Theta.main(
# list_mcmc_folders,
# scaler,
# posterior.likelihood.forward_map,
# y_valid,
# omega_valid,
# sigma_a_valid,
# sigma_m_valid,
# list_lines_valid,
# "valid",
# point_challenger=point_challenger,
# )
# posterior.likelihood.forward_map.restrict_to_output_subset(
# list_lines_fit,
# )
if analyze_regularization_weight:
ResultsRegularizationWeights(
model_name=model_name,
path_img=self.path_img,
path_data_csv_out_mcmc=self.path_data_csv_out_mcmc,
N_MCMC=self.N_MCMC,
T_MC=self.T_MC,
T_BI=self.T_BI,
freq_save=self.freq_save,
D_sampling=D_sampling,
list_names=list_names,
).main(list_mcmc_folders, list_idx_sampling)
print()
return