Source code for beetroots.inversion.results.utils.mmse_ci

import os
from typing import List, Optional, Tuple

import numpy as np
import pandas as pd

from beetroots.inversion.plots.plots_estimator import PlotsEstimator
from beetroots.inversion.results.utils.abstract_util import ResultsUtil
from beetroots.inversion.results.utils.perf_saver import EstimatorPerfSaver
from beetroots.modelling.posterior import Posterior
from beetroots.space_transform.abstract_transform import Scaler


[docs] class ResultsMMSEandCI(ResultsUtil): __slots__ = ( "model_name", "path_img", "path_data_csv_out_mcmc", "N", "D", ) def __init__( self, model_name: str, path_img: str, path_data_csv_out_mcmc: str, N: int, D: int, ): self.model_name = model_name self.path_img = path_img self.path_data_csv_out_mcmc = path_data_csv_out_mcmc self.N = N self.D = D
[docs] def read_data(self) -> pd.DataFrame: path_overall_results = f"{self.path_data_csv_out_mcmc}/" path_overall_results += f"estimation_Theta_{self.model_name}.csv" df_estim = pd.read_csv(path_overall_results, index_col=["n", "d"]) df_estim = df_estim.sort_index().reset_index(drop=False) assert len(df_estim) == self.N * self.D return df_estim
[docs] def create_folders(self, list_CI: List[int]) -> Tuple[str, dict[int, str]]: folder_path = f"{self.path_img}/estimators" folder_path_inter = f"{folder_path}/{self.model_name}" folder_path_MMSE = f"{folder_path_inter}/MMSE" dict_folder_path_CI = {ci: f"{folder_path_inter}/CI{ci}" for ci in list_CI} for path_ in [ folder_path, folder_path_inter, folder_path_MMSE, ] + list(dict_folder_path_CI.values()): if not os.path.isdir(path_): os.mkdir(path_) return folder_path_MMSE, dict_folder_path_CI
[docs] def main( self, posterior: Posterior, scaler: Scaler, estimator_plot: Optional[PlotsEstimator], Theta_true_scaled_full: Optional[np.ndarray], list_idx_sampling: List[int], list_fixed_values: np.ndarray, list_CI: List[int], ) -> pd.DataFrame: estimator_name = "MMSE" df_estim = self.read_data() Theta_mmse_lin = df_estim["Theta_MMSE"].values.reshape((self.N, self.D)) Theta_mmse_scaled_full = scaler.from_lin_to_scaled(Theta_mmse_lin) # save its performance perf_saver = EstimatorPerfSaver() if Theta_true_scaled_full is not None: mse = perf_saver.compute_MSE( Theta_mmse_scaled_full, Theta_true_scaled_full, ) snr = perf_saver.compute_SNR( Theta_mmse_scaled_full, Theta_true_scaled_full, ) mse_component_wise = perf_saver.compute_MSE( Theta_mmse_scaled_full, Theta_true_scaled_full, component_wise=True, ) snr_component_wise = perf_saver.compute_SNR( Theta_mmse_scaled_full, Theta_true_scaled_full, component_wise=True, ) else: mse = None snr = None mse_component_wise = None snr_component_wise = None # evaluate objective Theta_mmse_scaled = Theta_mmse_scaled_full[:, list_idx_sampling] forward_map_evals = posterior.likelihood.evaluate_all_forward_map( Theta_mmse_scaled, compute_derivatives=False, compute_derivatives_2nd_order=False, ) nll_utils = posterior.likelihood.evaluate_all_nll_utils( forward_map_evals, compute_derivatives=False, compute_derivatives_2nd_order=False, ) objective_mmse = posterior.neglog_pdf( Theta_mmse_scaled, idx_pix=np.arange(self.N), forward_map_evals=forward_map_evals, nll_utils=nll_utils, chromatic_gibbs=False, ) perf_saver.save_estimator_performance( self.path_data_csv_out_mcmc, estimator_name, self.model_name, mse, snr, mse_component_wise, snr_component_wise, objective_mmse, ) # plot it N, _ = Theta_mmse_lin.shape if N > 1 and estimator_plot is not None: ( folder_path_MMSE, dict_folder_path_CI, ) = self.create_folders(list_CI) estimator_plot.plot_estimator( Theta_mmse_lin, estimator_name, folder_path_MMSE, self.model_name, ) dict_ci_per = { ci: {"lower": (100 - ci) / 2, "upper": 100 - (100 - ci) / 2} for ci in list_CI } dict_per_ci = {} for ci in list_CI: per_lower = dict_ci_per[ci]["lower"] * 1 per_upper = dict_ci_per[ci]["upper"] * 1 dict_per_ci[per_lower] = ci * 1 dict_per_ci[per_upper] = ci * 1 list_per = list(dict_per_ci.keys()) list_per.sort() for ci in list_CI: for percentile in dict_ci_per[ci].values(): estimator_name = f"percentile {percentile:.1f}%" per_name = f"per_{percentile:.1f}".replace(".", "p") Theta_percentile_lin = df_estim[per_name].values Theta_percentile_lin = Theta_percentile_lin.reshape( (self.N, self.D), ) estimator_plot.plot_estimator( Theta_percentile_lin, estimator_name, dict_folder_path_CI[ci], self.model_name, ) per_low = dict_ci_per[ci]["lower"] * 1 per_low_name = f"per_{per_low:.1f}".replace(".", "p") per_up = dict_ci_per[ci]["upper"] * 1 per_up_name = f"per_{per_up:.1f}".replace(".", "p") df_ratio = df_estim[per_up_name] / df_estim[per_low_name] ratio = df_ratio.values.reshape((self.N, self.D)) estimator_plot.plot_CI_size( ratio, CI_name=f"{ci}% CI size", folder_path=dict_folder_path_CI[ci], ) estimator_plot.plot_CI_size( np.sqrt(ratio), CI_name=f"{ci}% CI uncertainty factor", folder_path=dict_folder_path_CI[ci], ) # self.plot_map_nll_of_estimator( # Theta_MAP_scaled, estimator_name, model_name # ) return df_estim