Source code for beetroots.sampler.abstract_sampler

import abc
from sys import byteorder
from typing import Optional

import numpy as np

from beetroots.modelling.posterior import Posterior
from beetroots.sampler.saver.abstract_saver import Saver
from beetroots.sampler.utils.mml import EBayesMMLE


[docs] class Sampler(abc.ABC): r"""abstract class designed to sample from a distribution defined with the ``Posterior`` class.""" ESS_OPTIM = 1_000 r"""number of random reproduced observations to draw to evaluate the model checking p-value for optimization procedures""" def __init__( self, D: int, L: int, N: int, rng: np.random.Generator = np.random.default_rng(42), ): r""" Parameters ---------- D : int total number of physical parameters to reconstruct L : int number of observables per component :math:`n` N : int total number of pixels to reconstruct rng : np.random.Generator numpy random generator (for sampling reproducibility), by default np.random.default_rng(42) """ self.D = D r"""int: total number of physical parameters to reconstruct""" self.L = L r"""int: number of observables per component :math:`n`""" self.N = N r"""int: total number of pixels to reconstruct""" self.rng = rng r"""np.random.Generator: numpy random generator (for sampling reproducibility)"""
[docs] def get_rng_state(self): r"""extracts the current state and inc of the random generator Returns ------- rng_state_array : np.array of ints current state of the random generator rng_inc_array : np.array of ints current inc of the random generator Note ---- 1. state and inc are very large integers, and thus need to be converted to hex format (later to an array of ints) to be saved in a .h5 file https://docs.python.org/3/library/stdtypes.html#int.to_bytes 2. need 32 bytes in length: otherwise, the inverse operation int.from_bytes(state_array,byteorder) does not coincide with the original int value """ rng_state_array = np.array( bytearray( self.rng.__getstate__()["state"]["state"].to_bytes(32, byteorder), ) ) rng_inc_array = np.array( bytearray( self.rng.__getstate__()["state"]["inc"].to_bytes(32, byteorder), ) ) return rng_state_array, rng_inc_array
[docs] def set_rng_state(self, state_bytes, inc_bytes): r"""sets the state and inc of the random generator to a specific value. Enables to re-launch a sampling when the rng state is regularly saved (see ``MySaver`` class). Parameters ---------- state_bytes : bytes state to be set inc_bytes : bytes inc to be set """ # step 1 : convert bytes to int loaded_state = int.from_bytes(state_bytes, byteorder) loaded_inc = int.from_bytes(inc_bytes, byteorder) # step 2 : set rng state state = self.rng.__getstate__() state["state"]["state"] = loaded_state state["state"]["inc"] = loaded_inc self.rng.__setstate__(state)
[docs] def generate_random_start_Theta(self, posterior: Posterior): r"""generates a random element of the hypercube defined by the lower and upper bounds with uniform distribution Parameters ---------- posterior : Posterior contains the lower and upper bounds of the hypercube Returns ------- Theta : np.array of shape (N, D) random element of the hypercube defined by the lower and upper bounds with uniform distribution """ if posterior.prior_indicator is not None: Theta = ( self.rng.uniform(0, 1, size=(posterior.N, posterior.D)) * ( posterior.prior_indicator.upper_bounds[None, :] - posterior.prior_indicator.lower_bounds[None, :] ) + posterior.prior_indicator.lower_bounds[None, :] ) else: Theta = self.rng.standard_normal(size=(posterior.N, posterior.D)) return Theta
[docs] @abc.abstractmethod def generate_random_start_Theta_1pix( self, Theta: np.ndarray, posterior: Posterior, idx_pix: np.ndarray ) -> np.ndarray: r"""draws a random vectors for components :math:`n` (e.g., a pixel :math:`\theta_n`). Parameters ---------- Theta : np.ndarray current iterate posterior : Posterior contains the lower and upper bounds of the hypercube idx_pix : np.ndarray indices of the pixels Returns ------- np.array of shape (n_pix, self.k_mtm, D) random element of the hypercube defined by the lower and upper bounds with uniform distribution """ pass
[docs] @abc.abstractmethod def sample( self, posterior: Posterior, saver: Saver, max_iter: int, Theta_0: Optional[np.ndarray], T_BI: Optional[int], **kwargs, ) -> None: """main method of the class, runs the sampler on the input posterior distribution Parameters ---------- posterior : Posterior probability distribution to be sampled saver : Saver object responsible for progressively saving the Markov chain data during the run max_iter : int total duration of a Markov chain Theta_0 : Optional[np.ndarray], optional starting point T_BI : int, optional duration of the `Burn-in` phase """ pass
[docs] def sample_regu_hyperparams( self, posterior: Posterior, regu_weights_optimizer: EBayesMMLE, t: int, current_Theta: np.ndarray, ) -> np.ndarray: r"""updates the spatial regularization weight vector (denoted :math:`\tau` in :cite:t:`paludEfficientSamplingNon2023`) using the marginal likelihood optimizer defined in :cite:t:`vidalMaximumLikelihoodEstimation2020` Parameters ---------- posterior : Posterior posterior distribution regu_weights_optimizer : EBayesMMLE marginal likelihood optimizer t : int time step current_Theta : np.ndarray current iterate Returns ------- np.ndarray updated spatial regularization weight vector """ assert self.N > 1 assert posterior.prior_spatial is not None tau_tm1 = posterior.prior_spatial.weights * 1 neglog_prior = posterior.prior_spatial.neglog_pdf( current_Theta, idx_pix=np.arange(current_Theta.shape[0]), # = [0, ..., N-1] (all pixels) with_weights=False, ) # (D,) new_tau = regu_weights_optimizer.update(tau_tm1.mean(), t, neglog_prior.sum()) return new_tau * np.ones((self.D,))