Source code for beetroots.inversion.run.run_optim_map

import copy
import multiprocessing as mp
import os
import time
from concurrent.futures import ProcessPoolExecutor
from typing import Optional

import numpy as np
import pandas as pd

from beetroots.inversion.results.results_optim_map import ResultsExtractorOptimMAP
from beetroots.inversion.run.abstract_run import Run
from beetroots.modelling.posterior import Posterior
from beetroots.sampler.abstract_sampler import Sampler
from beetroots.sampler.saver.abstract_saver import Saver
from beetroots.space_transform.abstract_transform import Scaler


[docs] class RunOptimMAP(Run): r"""class that runs inversions using an optimization approach, considering all pixels / components at once""" __slots__ = ("path_data_csv_out", "max_workers") def __init__(self, path_data_csv_out: str, max_workers: int): r""" Parameters ---------- path_data_csv_out : str path to the folder where results are to be saved max_workers : int max number of workers that can be used to run the inversion """ self.path_data_csv_out = path_data_csv_out r"""str: path to the folder where results are to be saved""" self.max_workers = max_workers r"""int: max number of workers that can be used to run the inversion"""
[docs] def prepare_run( self, dict_posteriors: dict[str, Posterior], path_raw: str, N_runs: int, scaler: Scaler, start_from: Optional[str] = None, path_csv_mle: Optional[str] = None, ) -> Optional[np.ndarray]: r"""prepares the run in two ways : * step 1 : creates empty folders to save the run results * step 2 : reads ``Theta_0`` if specified (as the MLE) Parameters ---------- dict_posteriors : dict[str, Posterior] dictionary of posterior distributions path_raw : str path to the folders where the ``.hdf5`` files are to be stored N_runs : int number of independent optimization runs to run per posterior distribution scaler : Scaler contains the transformation of the Theta values from their natural space to their scaled space (in which the sampling happens) start_from : Optional[str] point at which the inversion will start, must be in [None, "MAP"]. For None, a random value is drawn uniformly in the scaled hypercube. path_csv_mle : Optional[str] path to the csv file containing the already estimated MLE Returns ------- Optional[np.ndarray] starting point of the (in scaled space) inversion, ``Theta_0``, if specified. Otherwise ``None``. """ # create empty folders to save the run results for seed in range(N_runs): for model_name in list(dict_posteriors.keys()): folder_path = f"{path_raw}/{model_name}/optim_MAP_{seed}" if not os.path.isdir(folder_path): os.mkdir(folder_path) # read Theta_0 if needed assert start_from in ["MAP", None] # # TODO: correct feature? # if start_from == "MAP": # result_extractor = ResultsExtractorOptimMAP( # f"{path_csv_mle}/../optim_map", # f"{path_csv_mle}/../optim_map", # f"{path_csv_mle}/../../../img", # 1, # 100, # 10, # 1, # 1 # ) # Theta_0, _ = result_extractor.read_estimator( # os.path.abspath(f"{path_csv_mle}/../outputs/article_obs_N90_map_withjump_reg1e-1_2024-10-30_08/data/output/optim_map/"), # model_name # ) # Theta_0 = scaler.from_lin_to_scaled(Theta_0)[:, :-1] # print(Theta_0.shape) # else: Theta_0 = None return Theta_0
[docs] def run( self, dict_posteriors: dict[str, Posterior], sampler_: Sampler, saver_: Saver, N_runs: int, max_iter: int, path_raw: str, Theta_0: Optional[np.ndarray] = None, freq_save: int = 1, can_run_in_parallel: bool = True, ) -> None: r"""runs the inversion Parameters ---------- dict_posteriors : dict[str, Posterior] dictionary of posterior distributions sampler_ : Sampler optimizer saver_ : Saver object responsible for progressively saving the optimization run data during the run N_runs : int number of independent optimization runs to run per posterior distribution max_iter : int total duration of an optimization run path_raw : str path to the folders where the ``.hdf5`` files are to be stored Theta_0 : Optional[np.ndarray], optional starting point, by default None freq_save : int, optional frequency of saved iterates during the run (1 means that every iteration is saved), by default 1 can_run_in_parallel : bool, optional wether the inversion can be run in parallel (may cause difficulties for forward maps based on neural networks run on GPU), by default True """ global _run_one_simulation_optim_map_all_pixels def _run_one_simulation_optim_map_all_pixels(dict_input: dict) -> dict: model_name = dict_input["model_name"] seed = dict_input["seed"] folder_path = f"{path_raw}/{model_name}/optim_MAP_{seed}" saver_seed = copy.deepcopy(saver_) saver_seed.set_results_path(folder_path) sampler_seed = copy.deepcopy(sampler_) tps0 = time.time() sampler_seed.sample( dict_posteriors[model_name], saver=saver_seed, max_iter=max_iter, Theta_0=Theta_0, T_BI=max_iter // 5, ) # return input dict with duration information dict_output = { "seed": seed, "model_name": model_name, "total_duration": time.time() - tps0, } return dict_output # * global function print("starting optimization MAP") list_params = [ {"seed": seed, "model_name": model_name} for seed in range(N_runs) for model_name in list(dict_posteriors.keys()) ] if can_run_in_parallel: with ProcessPoolExecutor( max_workers=self.max_workers, mp_context=mp.get_context("fork") ) as p: list_simulations_durations = list( p.map(_run_one_simulation_optim_map_all_pixels, list_params) ) else: # * non parallel version list_simulations_durations = [] for params in list_params: duration = _run_one_simulation_optim_map_all_pixels(params) list_simulations_durations.append(duration) df_results_sampling = pd.DataFrame(list_simulations_durations) filename = f"{self.path_data_csv_out}/durations_optim_MAP.csv" df_results_sampling.to_csv(filename) print("optimization MAP done\n") return
[docs] def main( self, dict_posteriors: dict[str, Posterior], sampler_: Sampler, saver_: Saver, scaler: Scaler, N_runs: int, max_iter: int, path_raw: str, path_csv_mle: Optional[str] = None, start_from: Optional[str] = None, freq_save: int = 1, can_run_in_parallel: bool = True, ) -> None: r"""sequentially calls ``prepare_run`` and ``run`` Parameters ---------- dict_posteriors : dict[str, Posterior] dictionary of posterior distributions sampler_ : Sampler optimizer saver_ : Saver object responsible for progressively saving the optimization run data during the run scaler : Scaler contains the transformation of the Theta values from their natural space to their scaled space (in which the sampling happens) N_runs : int number of independent optimization runs to run per posterior distribution max_iter : int total duration of an optimization run path_raw : str path to the folders where the ``.hdf5`` files are to be stored path_csv_mle : Optional[str] path to the csv file containing the already estimated MLE start_from : Optional[str], optional name of the starting point, such as "MAP", by default None freq_save : int, optional frequency of saved iterates during the run (1 means that every iteration is saved), by default 1 can_run_in_parallel : bool, optional wether the inversion can be run in parallel (may cause difficulties for forward maps based on neural networks run on GPU), by default True """ Theta_0 = self.prepare_run( dict_posteriors=dict_posteriors, path_raw=path_raw, N_runs=N_runs, scaler=scaler, start_from=start_from, path_csv_mle=path_csv_mle, ) self.run( dict_posteriors, sampler_, saver_, N_runs, max_iter, path_raw, Theta_0, freq_save, can_run_in_parallel, ) return