Source code for beetroots.sampler.utils.utils

from typing import Tuple

import numba as nb
import numpy as np
from scipy.special import gamma

from beetroots.modelling.priors import smooth_indicator_prior

GAMMA_1_4 = gamma(1 / 4)


[docs] @nb.njit() def sample_generalized_gaussian( alpha: float, a_alpha: float, size: Tuple[int, int], seed: int, ) -> np.ndarray: r"""sample from a generalized Gaussian distribution of pdf: .. math:: p(\theta) = \frac{2}{\delta \Gamma(1/4)} \exp \left\{- \left( \frac{\theta}{\delta}\right)^4\right\} with here :math:`\delta = 1/A(\alpha)`. See :cite:t:`nardonSimulationTechniquesGeneralized2009` for more details. Parameters ---------- alpha : float power used in the distribution. alpha=2 is equivalent to the Gaussian distribution. a_alpha : float scaling parameter size : Tuple[int, int] number of samples to draw seed : int random seed for sampling reproducibility Returns ------- np.ndarray array of samples """ np.random.seed(seed) z = np.random.gamma(shape=1 / alpha, scale=(1 / a_alpha) ** alpha, size=size) x = np.random.binomial(n=1, p=0.5, size=size) * 2 - 1 return x * z ** (1 / alpha)
[docs] @nb.njit() def sample_smooth_indicator( lower_bounds: np.ndarray, upper_bounds: np.ndarray, indicator_margin_scale: float, size: Tuple[int, int], seed: int, ) -> np.ndarray: r"""algorithm to draw iid samples from the smooth indicator prior, detailed in Appendix B in :cite:t:`paludEfficientSamplingNon2023` Parameters ---------- lower_bounds : np.ndarray vector of lower bounds for each of the D physical parameters upper_bounds : np.ndarray vector of upper bounds for each of the D physical parameters indicator_margin_scale : float scaling parameter, quantifies how much values out of the validity intervals are penalized size : Tuple[int, int] number of samples to draw seed : int random seed, for sampling reproducibility Returns ------- np.ndarray iid samples drawn from the smooth uniform distribution """ np.random.seed(seed) k_mtm, D = size Z_unif = 2 * (upper_bounds - lower_bounds) / (indicator_margin_scale * GAMMA_1_4) Z_tot = 1 + Z_unif w_unif = Z_unif / Z_tot # (D,) z1 = np.zeros((k_mtm, D)) x_unif = np.zeros((k_mtm, D)) for d in range(D): z1[:, d] = np.random.binomial(1, w_unif[d], size=k_mtm) x_unif[:, d] = np.random.uniform(lower_bounds[d], upper_bounds[d], size=k_mtm) x_gg = sample_generalized_gaussian(4.0, 1 / indicator_margin_scale, size, seed) x_gg = np.where( x_gg < 0, x_gg + np.expand_dims(lower_bounds, 0), x_gg + np.expand_dims(upper_bounds, 0), ) return np.where(z1 == 1, x_unif, x_gg)
[docs] @nb.njit() def get_neighboring_pixels( current_Theta: np.ndarray, list_edges: np.ndarray, idx_pix: int ) -> np.ndarray: r"""returns the array of current values of the neighboring pixels of a specified pixel Parameters ---------- current_Theta : np.ndarray current iterate list_edges : np.ndarray list of edges, defined as pairs of pixel indices. Generated by the class:`SpatialPrior`. idx_pix : int index of a pixel Returns ------- np.ndarray array of current values of the neighboring pixels """ list_edges_n = list_edges[ (list_edges[:, 0] == idx_pix) | (list_edges[:, 1] == idx_pix) ] list_neighbors_idx = list_edges_n.flatten() list_neighbors_idx = list_neighbors_idx[list_neighbors_idx != idx_pix] return current_Theta[list_neighbors_idx, :] # (n_neighbors, D)
[docs] @nb.njit() def sample_conditional_spatial_and_indicator_prior( current_Theta: np.ndarray, spatial_list_edges: np.ndarray, spatial_weights: np.ndarray, indicator_lower_bounds: np.ndarray, indicator_upper_bounds: np.ndarray, indicator_indicator_margin_scale: float, idx_pix: np.ndarray, k_mtm: int, seed: int, ) -> np.ndarray: r"""draw ``k_mtm`` iid samples for each candidate in the `ìdx_pix`` array. Parameters ---------- current_Theta : np.ndarray current iterate spatial_list_edges : np.ndarray list of edges, defined as pairs of pixel indices. Generated by the class:`SpatialPrior`. spatial_weights : np.ndarray regularization weights of the spatial prior indicator_lower_bounds : np.ndarray vector of lower bounds for the D physical parameters indicator_upper_bounds : np.ndarray vector of upper bounds for the D physical parameters indicator_indicator_margin_scale : float scaling parameter, quantifies how much values out of the validity intervals are penalized idx_pix : np.ndarray array of indices of pixels for which candidates are to be proposed k_mtm : int number of candidates in the MTM kernel seed : int random seed, used for sampling reproducibility Returns ------- np.ndarray samples drawn from the conditional prior """ np.random.seed(seed) (N, D) = current_Theta.shape n_pix = idx_pix.size * 1 samples = np.zeros((n_pix, k_mtm, D)) i = 0 for idx_1_pix in idx_pix: # * sample from around neighbors neighbors = get_neighboring_pixels(current_Theta, spatial_list_edges, idx_1_pix) N_neighbors = neighbors.shape[0] if N_neighbors > 0: Theta = np.zeros((k_mtm, D)) for k in range(k_mtm): # select which combination f neighbors is going to be used arr_use_neighbors = np.random.binomial(n=1, p=0.5, size=N_neighbors) while np.max(arr_use_neighbors) == 0: arr_use_neighbors = np.random.binomial(n=1, p=0.5, size=N_neighbors) used_neighbors = neighbors[arr_use_neighbors == 1] N_used_neighbors = used_neighbors.shape[0] # sigma_mtm_eff = 1 / (2 * np.sqrt(N_neighbors * spatial_weights)) # (D,) sigma_mtm_eff = 1 / ( 2 * np.sqrt(N_used_neighbors * spatial_weights) ) # (D,) # initialize array of candidates mean = np.zeros((1, D)) for d in range(D): mean[0, d] = np.mean(used_neighbors[:, d]) # mean[d] = np.mean(neighbors[:, d]) repeat = True n_repeats = 0 n_repeats_tot = 0 while repeat: # * step 1 : generate candidates from spatial prior only Theta_cand = mean + sigma_mtm_eff * np.random.standard_normal( size=(1, D) ) # * step 2 : accept or reject with combination of spatial # * and indicator priors p_Theta = np.exp( -smooth_indicator_prior.penalty_one_pix( Theta_cand, indicator_lower_bounds, indicator_upper_bounds, indicator_indicator_margin_scale, ) ) # (1,) u = np.random.uniform(0, 1) if u <= p_Theta[0]: Theta[k] += Theta_cand[0] repeat = False n_repeats += 1 n_repeats_tot += 1 if n_repeats >= 10: # select which combination f neighbors is going to be used arr_use_neighbors = np.random.binomial( n=1, p=0.5, size=N_neighbors ) while np.max(arr_use_neighbors) == 0: arr_use_neighbors = np.random.binomial( n=1, p=0.5, size=N_neighbors ) used_neighbors = neighbors[arr_use_neighbors == 1] N_used_neighbors = used_neighbors.shape[0] # sigma_mtm_eff = 1 / (2 * np.sqrt(N_neighbors * spatial_weights)) # (D,) sigma_mtm_eff = 1 / ( 2 * np.sqrt(N_used_neighbors * spatial_weights) ) # (D,) # initialize array of candidates mean = np.zeros((1, D)) for d in range(D): mean[0, d] = np.mean(used_neighbors[:, d]) # mean[d] = np.mean(neighbors[:, d]) n_repeats = 0 assert n_repeats_tot < 1_000 samples[i, :, :] = Theta * 1 # (k_mtm, D) else: samples[i, :, :] = sample_smooth_indicator( indicator_lower_bounds, indicator_upper_bounds, indicator_indicator_margin_scale, size=(k_mtm, D), seed=seed, ) i += 1 return samples
[docs] @nb.njit() def sample_conditional_spatial_prior( current_Theta: np.ndarray, spatial_list_edges: np.ndarray, spatial_weights: np.ndarray, idx_pix: np.ndarray, k_mtm: int, seed: int, ) -> np.ndarray: r"""draw ``k_mtm`` iid samples for each candidate in the `ìdx_pix`` array. Parameters ---------- current_Theta : np.ndarray current iterate spatial_list_edges : np.ndarray list of edges, defined as pairs of pixel indices. Generated by the class:`SpatialPrior`. spatial_weights : np.ndarray regularization weights of the spatial prior indicator_lower_bounds : np.ndarray vector of lower bounds for the D physical parameters indicator_upper_bounds : np.ndarray vector of upper bounds for the D physical parameters indicator_indicator_margin_scale : float scaling parameter, quantifies how much values out of the validity intervals are penalized idx_pix : np.ndarray array of indices of pixels for which candidates are to be proposed k_mtm : int number of candidates in the MTM kernel seed : int random seed, used for sampling reproducibility Returns ------- np.ndarray samples drawn from the conditional prior """ np.random.seed(seed) (N, D) = current_Theta.shape n_pix = idx_pix.size * 1 samples = np.zeros((n_pix, k_mtm, D)) for i, idx_1_pix in enumerate(idx_pix): # * sample from around neighbors neighbors = get_neighboring_pixels(current_Theta, spatial_list_edges, idx_1_pix) N_neighbors = neighbors.shape[0] if N_neighbors == 0: continue Theta = np.zeros((k_mtm, D)) for k in range(k_mtm): # select which combination f neighbors is going to be used arr_use_neighbors = np.random.binomial(n=1, p=0.5, size=N_neighbors) while np.max(arr_use_neighbors) == 0: arr_use_neighbors = np.random.binomial(n=1, p=0.5, size=N_neighbors) used_neighbors = neighbors[arr_use_neighbors == 1] N_used_neighbors = used_neighbors.shape[0] # sigma_mtm_eff = 1 / (2 * np.sqrt(N_neighbors * spatial_weights)) # (D,) sigma_mtm_eff = 1 / (N_used_neighbors * np.sqrt(spatial_weights)) # (D,) # initialize array of candidates mean = np.zeros((D,)) for d in range(D): mean[d] = np.mean(used_neighbors[:, d]) # mean[d] = np.mean(neighbors[:, d]) Theta[k] = mean + sigma_mtm_eff * np.random.standard_normal(size=(D,)) samples[i, :, :] = Theta * 1 # (k_mtm, D) return samples
[docs] @nb.njit() def compute_sum_subsets_mean(dists: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: r"""Computes the sums of the elements of each subset of neighbors, including the empty set, ordered such that the last element is the complement of the first, and so on. It uses bitwise operation and take advantage of the tree structure of the problem. Parameters ---------- dists : np.ndarray of shape (N_neighbors, k_mtm, D) Squared distances from a point to each of its neighbors. Returns ------- np.ndarray of shape: (2**N_neighbors, k_mtm, D) Array containing the sum of distances for all subsets of neighbors, including the empty set. np.ndarray Array containing the weight of each subset of neighbors """ N_neighbors, k_mtm, D = dists.shape num_subsets = 2**N_neighbors # Initialize the output array sums_ = np.zeros((num_subsets, k_mtm, D)) neighbors_ = np.zeros((num_subsets,)) # One loop to compute the sums using the tree structure for subset_idx in range(1, num_subsets): # Determine the index of the last added neighbor, e.g. 1010 -> 0010 last_neighbor_idx = int(np.log2(subset_idx & -subset_idx)) # Add the contribution of the corresponding neighbor to the subset sum, i.e. we retrieve the index in sums for 1000 and add 0010 (the 2 second distance.) sums_[subset_idx] = ( sums_[subset_idx ^ (1 << last_neighbor_idx)] + dists[last_neighbor_idx] ) neighbors_[subset_idx] += neighbors_[subset_idx ^ (1 << last_neighbor_idx)] + 1 return sums_, neighbors_[:, None, None]
[docs] @nb.njit(fastmath=True) def numba_logsumexp_stable(x: np.ndarray, weights: np.ndarray) -> float: r"""stable numba implementation of the ``logsumexp`` function. Used in step 4 of ``compute_nlratio_prior_proposal``. Parameters ---------- x : np.ndarray array of real values Returns ------- float result of the ``logsumexp`` function """ if weights is None: weights = np.ones(x.shape) elif weights.size != x.size: raise ValueError( f"weights must have the same size as x. Got {weights.size} and {x.size}" ) x_max = np.max(x) res = 0.0 for j in range(x.size): res += weights[j] * np.exp(x[j] - x_max) res = np.log(res) + x_max return res
[docs] @nb.njit() def compute_nlpdf_spatial_proposal( candidates: np.ndarray, spatial_list_edges: np.ndarray, spatial_weights: np.ndarray, idx_pix: np.ndarray, ) -> np.ndarray: """evaluates the negative log ratio of prior proposal in the MTM-chromatic Gibbs kernel in ``MySampler`` Note ---- The function works in four steps : * step 1: get the values of the neighbors of each pixel in ``idx_pix`` using ``get_neighboring_pixels`` * step 2: compute the vector distances between the candidate and the pixel neighbors * step 3: compute list of sums of subsets using ``compute_sum_subsets_norms`` * step 4: evaluate ``LogSumExp`` to obtain the log pdf of the Gaussian mixture, using the stable ``numba`` implementation. Parameters ---------- candidates : np.ndarray iterate spatial_list_edges : np.ndarray list of edges, defined as pairs of pixel indices. Generated by the class:`SpatialPrior`. spatial_weights : np.ndarray regularization weights of the spatial prior idx_pix : np.ndarray of shape (n_pix,) array of indices of pixels for which candidates are to be proposed Returns ------- np.ndarray of shape (n_pix, k_mtm) negative log ratio of prior proposal """ N, k_mtm, D = candidates.shape n_pix = idx_pix.size nl_ratio = np.zeros((n_pix, k_mtm)) for i in range(n_pix): idx_1_pix = idx_pix[i] # * step 1: get neighbors neighbors = get_neighboring_pixels( candidates[:, -1], spatial_list_edges, idx_1_pix, # Each candidate has the same value the pixels not belonging idx_pix so we can pick any candidates for the neighbors of pixels in idx_pix. ) # (N_neighbors, D) N_neighbors = neighbors.shape[0] # * step 2: compute list of means of subsets sum_neighbors, weight_neighbors = compute_sum_subsets_mean( np.expand_dims(neighbors, 1) ) mean_neighbors = ( sum_neighbors[1:] / weight_neighbors[1:] ) # (2 ** N_neighbors - 1, 1, D) Remove the empty set. # * step 3: compute distances between candidate and the means of subsets of neighbors dists_with_mean = ( np.expand_dims(candidates[idx_1_pix], 0) - mean_neighbors ) ** 2 assert dists_with_mean.shape == (2**N_neighbors - 1, k_mtm, D) # * step 4: compute weights of each mode weights = np.sqrt(spatial_weights) * weight_neighbors[1:, :, :] # * step 5: LogSumExp inter_ = -dists_with_mean * weights**2 # (2 ** N_neighbors - 1, k_mtm, D) for k in range(k_mtm): for d in range(D): nl_ratio[i, k] += numba_logsumexp_stable( inter_[:, k, d], weights=weights[:, 0, d] ) return nl_ratio