Source code for beetroots.inversion.run.run_mcmc

import copy
import multiprocessing as mp
import os
import time
from concurrent.futures import ProcessPoolExecutor
from typing import Optional, Union

import numpy as np
import pandas as pd

from beetroots.inversion.results.results_optim_map import ResultsExtractorOptimMAP
from beetroots.inversion.run.abstract_run import Run
from beetroots.modelling.posterior import Posterior
from beetroots.sampler.abstract_sampler import Sampler
from beetroots.sampler.saver.abstract_saver import Saver
from beetroots.space_transform.abstract_transform import Scaler


[docs] class RunMCMC(Run): r"""class that runs inversions using a sampling approach""" __slots__ = ("path_data_csv_out", "max_workers") def __init__(self, path_data_csv_out: str, max_workers: int): r""" Parameters ---------- path_data_csv_out : str path to the folder where results are to be saved max_workers : int max number of workers that can be used to run the inversion """ self.path_data_csv_out = path_data_csv_out r"""str: path to the folder where results are to be saved""" self.max_workers = max_workers r"""int: max number of workers that can be used to run the inversion"""
[docs] def prepare_run( self, dict_posteriors: dict[str, Posterior], path_raw: str, N_runs: int, scaler: Scaler, start_from: Optional[str], path_csv_mle: Optional[str], path_csv_map: Optional[str], ) -> Optional[np.ndarray]: r"""prepares the run in two ways : * step 1 : creates empty folders to save the run results * step 2 : reads ``Theta_0`` if specified (as the MLE or MAP) Parameters ---------- dict_posteriors : dict[str, Posterior] dictionary of posterior distributions path_raw : str path to the folders where the ``.hdf5`` files are to be stored N_runs : int number of independent Markov chains to run per posterior distribution scaler : Scaler contains the transformation of the Theta values from their natural space to their scaled space (in which the sampling happens) start_from : Optional[str] point at which the inversion will start, must be in [None, "MAP"]. For None, a random value is drawn uniformly in the scaled hypercube. path_csv_mle : Optional[str] path to the csv file containing the already estimated MLE path_csv_map : Optional[str] path to the csv file containing the already estimated MAP Returns ------- Optional[np.ndarray] starting point of the (in scaled space) inversion, ``Theta_0``, if specified. Otherwise ``None``. """ # step 1 : create empty folders to save the run results for seed in range(N_runs): for model_name in list(dict_posteriors.keys()): folder_path = f"{path_raw}/{model_name}/mcmc_{seed}" if not os.path.isdir(folder_path): os.mkdir(folder_path) # step 2 : read Theta_0 if needed assert start_from in ["MAP", None] model_name = list(dict_posteriors.keys())[0] if start_from == "MAP": assert path_csv_map is not None Theta_0, _ = ResultsExtractorOptimMAP.read_estimator( path_csv_map, model_name, ) Theta_0 = scaler.from_lin_to_scaled(Theta_0) else: Theta_0 = None return Theta_0
[docs] def run( self, dict_posteriors: dict[str, Posterior], sampler_: Sampler, saver_: Saver, N_runs: int, max_iter: int, T_BI: int, path_raw: str, Theta_0: Optional[np.ndarray] = None, # sample_regu_weights: bool = True, # T_BI_reguweights: Optional[int] = None, # regu_spatial_N0: Union[int, float] = np.infty, regu_spatial_scale: Optional[float] = 1.0, regu_spatial_vmin: Optional[float] = 1e-8, regu_spatial_vmax: Optional[float] = 1e8, # freq_save: int = 1, can_run_in_parallel: bool = True, ) -> None: r"""runs the inversion Parameters ---------- dict_posteriors : dict[str, Posterior] dictionary of posterior distributions sampler_ : Sampler sampler to be used to generate the Markov chain(s) saver_ : Saver object responsible for progressively saving the Markov chain data during the run N_runs : int number of independent Markov chains to run per posterior distribution max_iter : int total duration of a Markov chain T_BI : int duration of the `Burn-in` phase path_raw : str path to the folders where the ``.hdf5`` files are to be stored Theta_0 : Optional[np.ndarray], optional starting point, by default None regu_spatial_N0 : Union[int, float], optional number of iterations defining the initial update phase (for spatial regularization weight optimization). np.infty means that the optimization phase never starts, and that the weight optimization is not applied. by default np.infty regu_spatial_scale : Optional[float], optional scale parameter involved in the definition of the projected gradient step size (for spatial regularization weight optimization). by default 1.0 regu_spatial_vmin : Optional[float], optional lower limit of the admissible interval (for spatial regularization weight optimization), by default 1e-8 regu_spatial_vmax : Optional[float], optional upper limit of the admissible interval (for spatial regularization weight optimization), by default 1e8 freq_save : int, optional frequency of saved iterates during the run (1 means that every iteration is saved), by default 1 can_run_in_parallel : bool, optional wether the inversion can be run in parallel (may cause difficulties for forward maps based on neural networks run on GPU), by default True """ global _run_one_simulation_mcmc_all_pixels def _run_one_simulation_mcmc_all_pixels(dict_input: dict) -> dict: model_name = dict_input["model_name"] seed = dict_input["seed"] folder_path = f"{path_raw}/{model_name}/mcmc_{seed}" saver_seed = copy.deepcopy(saver_) saver_seed.set_results_path(folder_path) sampler_seed = copy.deepcopy(sampler_) tps0 = time.time() sampler_seed.sample( dict_posteriors[model_name], saver=saver_seed, max_iter=max_iter, Theta_0=Theta_0, # # sample_regu_weights=sample_regu_weights, # T_BI_reguweights=T_BI_reguweights, regu_spatial_N0=regu_spatial_N0, regu_spatial_scale=regu_spatial_scale, regu_spatial_vmin=regu_spatial_vmin, regu_spatial_vmax=regu_spatial_vmax, # T_BI=T_BI, ) # return input dict with duration information total_duration = time.time() - tps0 dict_output = { "seed": seed, "model_name": model_name, "total_duration": total_duration, } return dict_output # * print("starting sampling") list_params = [ {"seed": seed, "model_name": model_name} for seed in range(N_runs) for model_name in list(dict_posteriors.keys()) ] if can_run_in_parallel: with ProcessPoolExecutor( max_workers=self.max_workers, mp_context=mp.get_context("fork") ) as p: list_simulations_durations = list( p.map(_run_one_simulation_mcmc_all_pixels, list_params) ) else: list_simulations_durations = [ _run_one_simulation_mcmc_all_pixels(params) for params in list_params ] # save run durations df_results_sampling = pd.DataFrame(list_simulations_durations) filename = f"{self.path_data_csv_out}/durations_MCMC.csv" df_results_sampling.to_csv(filename) print("sampling done\n") return
[docs] def main( self, dict_posteriors: dict[str, Posterior], sampler_: Sampler, saver_: Saver, scaler: Scaler, N_runs: int, max_iter: int, T_BI: int, path_raw: str, path_csv_mle: Optional[str], path_csv_map: Optional[str], start_from: Optional[str], # sample_regu_weights: bool = True, # T_BI_reguweights: Optional[int] = None, # regu_spatial_N0: Union[int, float] = np.infty, regu_spatial_scale: Optional[float] = 1.0, regu_spatial_vmin: Optional[float] = 1e-8, regu_spatial_vmax: Optional[float] = 1e8, # freq_save: int = 1, can_run_in_parallel: bool = True, ) -> None: r"""sequentially calls ``prepare_run`` and ``run`` Parameters ---------- dict_posteriors : dict[str, Posterior] dictionary of posterior distributions sampler_ : Sampler sampler to be used to generate the Markov chain(s) saver_ : Saver object responsible for progressively saving the Markov chain data during the run scaler : Scaler contains the transformation of the Theta values from their natural space to their scaled space (in which the sampling happens) N_runs : int number of independent Markov chains to run per posterior distribution max_iter : int total duration of a Markov chain T_BI : int duration of the `Burn-in` phase path_raw : str path to the folders where the ``.hdf5`` files are to be stored path_csv_mle : Optional[str] path to the csv file containing the already estimated MLE path_csv_map : Optional[str] path to the csv file containing the already estimated MAP start_from : Optional[str] point at which the inversion will start, must be in [None, "MAP"]. For None, a random value is drawn uniformly in the scaled hypercube. regu_spatial_N0 : Union[int, float], optional number of iterations defining the initial update phase (for spatial regularization weight optimization). np.infty means that the optimization phase never starts, and that the weight optimization is not applied. By default np.infty regu_spatial_scale : Optional[float], optional scale parameter involved in the definition of the projected gradient step size (for spatial regularization weight optimization). by default 1.0 regu_spatial_vmin : Optional[float], optional lower limit of the admissible interval (for spatial regularization weight optimization), by default 1e-8 regu_spatial_vmax : Optional[float], optional upper limit of the admissible interval (for spatial regularization weight optimization), by default 1e8 freq_save : int, optional frequency of saved iterates during the run (1 means that every iteration is saved), by default 1 can_run_in_parallel : bool, optional wether the inversion can be run in parallel (may cause difficulties for forward maps based on neural networks run on GPU), by default True """ Theta_0 = self.prepare_run( dict_posteriors, path_raw, N_runs, scaler, start_from=start_from, path_csv_mle=path_csv_mle, path_csv_map=path_csv_map, ) self.run( dict_posteriors, sampler_, saver_, N_runs=N_runs, max_iter=max_iter, T_BI=T_BI, path_raw=path_raw, Theta_0=Theta_0, # regu_spatial_N0=regu_spatial_N0, regu_spatial_scale=regu_spatial_scale, regu_spatial_vmin=regu_spatial_vmin, regu_spatial_vmax=regu_spatial_vmax, # freq_save=freq_save, can_run_in_parallel=can_run_in_parallel, ) return