from typing import Tuple
import numba as nb
import numpy as np
from scipy.special import gamma
from beetroots.modelling.priors import smooth_indicator_prior
GAMMA_1_4 = gamma(1 / 4)
[docs]
@nb.njit()
def sample_generalized_gaussian(
alpha: float,
a_alpha: float,
size: Tuple[int, int],
seed: int,
) -> np.ndarray:
r"""sample from a generalized Gaussian distribution of pdf:
.. math::
p(\theta) = \frac{2}{\delta \Gamma(1/4)} \exp \left\{- \left( \frac{\theta}{\delta}\right)^4\right\}
with here :math:`\delta = 1/A(\alpha)`.
See :cite:t:`nardonSimulationTechniquesGeneralized2009` for more details.
Parameters
----------
alpha : float
power used in the distribution. alpha=2 is equivalent to the Gaussian distribution.
a_alpha : float
scaling parameter
size : Tuple[int, int]
number of samples to draw
seed : int
random seed for sampling reproducibility
Returns
-------
np.ndarray
array of samples
"""
np.random.seed(seed)
z = np.random.gamma(shape=1 / alpha, scale=(1 / a_alpha) ** alpha, size=size)
x = np.random.binomial(n=1, p=0.5, size=size) * 2 - 1
return x * z ** (1 / alpha)
[docs]
@nb.njit()
def sample_smooth_indicator(
lower_bounds: np.ndarray,
upper_bounds: np.ndarray,
indicator_margin_scale: float,
size: Tuple[int, int],
seed: int,
) -> np.ndarray:
r"""algorithm to draw iid samples from the smooth indicator prior, detailed in Appendix B in :cite:t:`paludEfficientSamplingNon2023`
Parameters
----------
lower_bounds : np.ndarray
vector of lower bounds for each of the D physical parameters
upper_bounds : np.ndarray
vector of upper bounds for each of the D physical parameters
indicator_margin_scale : float
scaling parameter, quantifies how much values out of the validity intervals are penalized
size : Tuple[int, int]
number of samples to draw
seed : int
random seed, for sampling reproducibility
Returns
-------
np.ndarray
iid samples drawn from the smooth uniform distribution
"""
np.random.seed(seed)
k_mtm, D = size
Z_unif = 2 * (upper_bounds - lower_bounds) / (indicator_margin_scale * GAMMA_1_4)
Z_tot = 1 + Z_unif
w_unif = Z_unif / Z_tot # (D,)
z1 = np.zeros((k_mtm, D))
x_unif = np.zeros((k_mtm, D))
for d in range(D):
z1[:, d] = np.random.binomial(1, w_unif[d], size=k_mtm)
x_unif[:, d] = np.random.uniform(lower_bounds[d], upper_bounds[d], size=k_mtm)
x_gg = sample_generalized_gaussian(4.0, 1 / indicator_margin_scale, size, seed)
x_gg = np.where(
x_gg < 0,
x_gg + np.expand_dims(lower_bounds, 0),
x_gg + np.expand_dims(upper_bounds, 0),
)
return np.where(z1 == 1, x_unif, x_gg)
[docs]
@nb.njit()
def get_neighboring_pixels(
current_Theta: np.ndarray, list_edges: np.ndarray, idx_pix: int
) -> np.ndarray:
r"""returns the array of current values of the neighboring pixels of a specified pixel
Parameters
----------
current_Theta : np.ndarray
current iterate
list_edges : np.ndarray
list of edges, defined as pairs of pixel indices. Generated by the class:`SpatialPrior`.
idx_pix : int
index of a pixel
Returns
-------
np.ndarray
array of current values of the neighboring pixels
"""
list_edges_n = list_edges[
(list_edges[:, 0] == idx_pix) | (list_edges[:, 1] == idx_pix)
]
list_neighbors_idx = list_edges_n.flatten()
list_neighbors_idx = list_neighbors_idx[list_neighbors_idx != idx_pix]
return current_Theta[list_neighbors_idx, :] # (n_neighbors, D)
[docs]
@nb.njit()
def sample_conditional_spatial_and_indicator_prior(
current_Theta: np.ndarray,
spatial_list_edges: np.ndarray,
spatial_weights: np.ndarray,
indicator_lower_bounds: np.ndarray,
indicator_upper_bounds: np.ndarray,
indicator_indicator_margin_scale: float,
idx_pix: np.ndarray,
k_mtm: int,
seed: int,
) -> np.ndarray:
r"""draw ``k_mtm`` iid samples for each candidate in the `ìdx_pix`` array.
Parameters
----------
current_Theta : np.ndarray
current iterate
spatial_list_edges : np.ndarray
list of edges, defined as pairs of pixel indices. Generated by the class:`SpatialPrior`.
spatial_weights : np.ndarray
regularization weights of the spatial prior
indicator_lower_bounds : np.ndarray
vector of lower bounds for the D physical parameters
indicator_upper_bounds : np.ndarray
vector of upper bounds for the D physical parameters
indicator_indicator_margin_scale : float
scaling parameter, quantifies how much values out of the validity intervals are penalized
idx_pix : np.ndarray
array of indices of pixels for which candidates are to be proposed
k_mtm : int
number of candidates in the MTM kernel
seed : int
random seed, used for sampling reproducibility
Returns
-------
np.ndarray
samples drawn from the conditional prior
"""
np.random.seed(seed)
(N, D) = current_Theta.shape
n_pix = idx_pix.size * 1
samples = np.zeros((n_pix, k_mtm, D))
i = 0
for idx_1_pix in idx_pix:
# * sample from around neighbors
neighbors = get_neighboring_pixels(current_Theta, spatial_list_edges, idx_1_pix)
N_neighbors = neighbors.shape[0]
if N_neighbors > 0:
Theta = np.zeros((k_mtm, D))
for k in range(k_mtm):
# select which combination f neighbors is going to be used
arr_use_neighbors = np.random.binomial(n=1, p=0.5, size=N_neighbors)
while np.max(arr_use_neighbors) == 0:
arr_use_neighbors = np.random.binomial(n=1, p=0.5, size=N_neighbors)
used_neighbors = neighbors[arr_use_neighbors == 1]
N_used_neighbors = used_neighbors.shape[0]
# sigma_mtm_eff = 1 / (2 * np.sqrt(N_neighbors * spatial_weights)) # (D,)
sigma_mtm_eff = 1 / (
2 * np.sqrt(N_used_neighbors * spatial_weights)
) # (D,)
# initialize array of candidates
mean = np.zeros((1, D))
for d in range(D):
mean[0, d] = np.mean(used_neighbors[:, d])
# mean[d] = np.mean(neighbors[:, d])
repeat = True
n_repeats = 0
n_repeats_tot = 0
while repeat:
# * step 1 : generate candidates from spatial prior only
Theta_cand = mean + sigma_mtm_eff * np.random.standard_normal(
size=(1, D)
)
# * step 2 : accept or reject with combination of spatial
# * and indicator priors
p_Theta = np.exp(
-smooth_indicator_prior.penalty_one_pix(
Theta_cand,
indicator_lower_bounds,
indicator_upper_bounds,
indicator_indicator_margin_scale,
)
) # (1,)
u = np.random.uniform(0, 1)
if u <= p_Theta[0]:
Theta[k] += Theta_cand[0]
repeat = False
n_repeats += 1
n_repeats_tot += 1
if n_repeats >= 10:
# select which combination f neighbors is going to be used
arr_use_neighbors = np.random.binomial(
n=1, p=0.5, size=N_neighbors
)
while np.max(arr_use_neighbors) == 0:
arr_use_neighbors = np.random.binomial(
n=1, p=0.5, size=N_neighbors
)
used_neighbors = neighbors[arr_use_neighbors == 1]
N_used_neighbors = used_neighbors.shape[0]
# sigma_mtm_eff = 1 / (2 * np.sqrt(N_neighbors * spatial_weights)) # (D,)
sigma_mtm_eff = 1 / (
2 * np.sqrt(N_used_neighbors * spatial_weights)
) # (D,)
# initialize array of candidates
mean = np.zeros((1, D))
for d in range(D):
mean[0, d] = np.mean(used_neighbors[:, d])
# mean[d] = np.mean(neighbors[:, d])
n_repeats = 0
assert n_repeats_tot < 1_000
samples[i, :, :] = Theta * 1 # (k_mtm, D)
else:
samples[i, :, :] = sample_smooth_indicator(
indicator_lower_bounds,
indicator_upper_bounds,
indicator_indicator_margin_scale,
size=(k_mtm, D),
seed=seed,
)
i += 1
return samples
[docs]
@nb.njit()
def sample_conditional_spatial_prior(
current_Theta: np.ndarray,
spatial_list_edges: np.ndarray,
spatial_weights: np.ndarray,
idx_pix: np.ndarray,
k_mtm: int,
seed: int,
) -> np.ndarray:
r"""draw ``k_mtm`` iid samples for each candidate in the `ìdx_pix`` array.
Parameters
----------
current_Theta : np.ndarray
current iterate
spatial_list_edges : np.ndarray
list of edges, defined as pairs of pixel indices. Generated by the class:`SpatialPrior`.
spatial_weights : np.ndarray
regularization weights of the spatial prior
indicator_lower_bounds : np.ndarray
vector of lower bounds for the D physical parameters
indicator_upper_bounds : np.ndarray
vector of upper bounds for the D physical parameters
indicator_indicator_margin_scale : float
scaling parameter, quantifies how much values out of the validity intervals are penalized
idx_pix : np.ndarray
array of indices of pixels for which candidates are to be proposed
k_mtm : int
number of candidates in the MTM kernel
seed : int
random seed, used for sampling reproducibility
Returns
-------
np.ndarray
samples drawn from the conditional prior
"""
np.random.seed(seed)
(N, D) = current_Theta.shape
n_pix = idx_pix.size * 1
samples = np.zeros((n_pix, k_mtm, D))
for i, idx_1_pix in enumerate(idx_pix):
# * sample from around neighbors
neighbors = get_neighboring_pixels(current_Theta, spatial_list_edges, idx_1_pix)
N_neighbors = neighbors.shape[0]
if N_neighbors == 0:
continue
Theta = np.zeros((k_mtm, D))
for k in range(k_mtm):
# select which combination f neighbors is going to be used
arr_use_neighbors = np.random.binomial(n=1, p=0.5, size=N_neighbors)
while np.max(arr_use_neighbors) == 0:
arr_use_neighbors = np.random.binomial(n=1, p=0.5, size=N_neighbors)
used_neighbors = neighbors[arr_use_neighbors == 1]
N_used_neighbors = used_neighbors.shape[0]
# sigma_mtm_eff = 1 / (2 * np.sqrt(N_neighbors * spatial_weights)) # (D,)
sigma_mtm_eff = 1 / (N_used_neighbors * np.sqrt(spatial_weights)) # (D,)
# initialize array of candidates
mean = np.zeros((D,))
for d in range(D):
mean[d] = np.mean(used_neighbors[:, d])
# mean[d] = np.mean(neighbors[:, d])
Theta[k] = mean + sigma_mtm_eff * np.random.standard_normal(size=(D,))
samples[i, :, :] = Theta * 1 # (k_mtm, D)
return samples
[docs]
@nb.njit()
def compute_sum_subsets_mean(dists: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
r"""Computes the sums of the elements of each subset of neighbors, including the empty set,
ordered such that the last element is the complement of the first, and so on. It uses bitwise operation and take advantage of the tree structure of the problem.
Parameters
----------
dists : np.ndarray of shape (N_neighbors, k_mtm, D)
Squared distances from a point to each of its neighbors.
Returns
-------
np.ndarray of shape: (2**N_neighbors, k_mtm, D)
Array containing the sum of distances for all subsets of neighbors, including the empty set.
np.ndarray
Array containing the weight of each subset of neighbors
"""
N_neighbors, k_mtm, D = dists.shape
num_subsets = 2**N_neighbors
# Initialize the output array
sums_ = np.zeros((num_subsets, k_mtm, D))
neighbors_ = np.zeros((num_subsets,))
# One loop to compute the sums using the tree structure
for subset_idx in range(1, num_subsets):
# Determine the index of the last added neighbor, e.g. 1010 -> 0010
last_neighbor_idx = int(np.log2(subset_idx & -subset_idx))
# Add the contribution of the corresponding neighbor to the subset sum, i.e. we retrieve the index in sums for 1000 and add 0010 (the 2 second distance.)
sums_[subset_idx] = (
sums_[subset_idx ^ (1 << last_neighbor_idx)] + dists[last_neighbor_idx]
)
neighbors_[subset_idx] += neighbors_[subset_idx ^ (1 << last_neighbor_idx)] + 1
return sums_, neighbors_[:, None, None]
[docs]
@nb.njit(fastmath=True)
def numba_logsumexp_stable(x: np.ndarray, weights: np.ndarray) -> float:
r"""stable numba implementation of the ``logsumexp`` function. Used in step 4 of ``compute_nlratio_prior_proposal``.
Parameters
----------
x : np.ndarray
array of real values
Returns
-------
float
result of the ``logsumexp`` function
"""
if weights is None:
weights = np.ones(x.shape)
elif weights.size != x.size:
raise ValueError(
f"weights must have the same size as x. Got {weights.size} and {x.size}"
)
x_max = np.max(x)
res = 0.0
for j in range(x.size):
res += weights[j] * np.exp(x[j] - x_max)
res = np.log(res) + x_max
return res
[docs]
@nb.njit()
def compute_nlpdf_spatial_proposal(
candidates: np.ndarray,
spatial_list_edges: np.ndarray,
spatial_weights: np.ndarray,
idx_pix: np.ndarray,
) -> np.ndarray:
"""evaluates the negative log ratio of prior proposal in the MTM-chromatic Gibbs kernel in ``MySampler``
Note
----
The function works in four steps :
* step 1: get the values of the neighbors of each pixel in ``idx_pix`` using ``get_neighboring_pixels``
* step 2: compute the vector distances between the candidate and the pixel neighbors
* step 3: compute list of sums of subsets using ``compute_sum_subsets_norms``
* step 4: evaluate ``LogSumExp`` to obtain the log pdf of the Gaussian mixture, using the stable ``numba`` implementation.
Parameters
----------
candidates : np.ndarray
iterate
spatial_list_edges : np.ndarray
list of edges, defined as pairs of pixel indices. Generated by the class:`SpatialPrior`.
spatial_weights : np.ndarray
regularization weights of the spatial prior
idx_pix : np.ndarray of shape (n_pix,)
array of indices of pixels for which candidates are to be proposed
Returns
-------
np.ndarray of shape (n_pix, k_mtm)
negative log ratio of prior proposal
"""
N, k_mtm, D = candidates.shape
n_pix = idx_pix.size
nl_ratio = np.zeros((n_pix, k_mtm))
for i in range(n_pix):
idx_1_pix = idx_pix[i]
# * step 1: get neighbors
neighbors = get_neighboring_pixels(
candidates[:, -1],
spatial_list_edges,
idx_1_pix, # Each candidate has the same value the pixels not belonging idx_pix so we can pick any candidates for the neighbors of pixels in idx_pix.
) # (N_neighbors, D)
N_neighbors = neighbors.shape[0]
# * step 2: compute list of means of subsets
sum_neighbors, weight_neighbors = compute_sum_subsets_mean(
np.expand_dims(neighbors, 1)
)
mean_neighbors = (
sum_neighbors[1:] / weight_neighbors[1:]
) # (2 ** N_neighbors - 1, 1, D) Remove the empty set.
# * step 3: compute distances between candidate and the means of subsets of neighbors
dists_with_mean = (
np.expand_dims(candidates[idx_1_pix], 0) - mean_neighbors
) ** 2
assert dists_with_mean.shape == (2**N_neighbors - 1, k_mtm, D)
# * step 4: compute weights of each mode
weights = np.sqrt(spatial_weights) * weight_neighbors[1:, :, :]
# * step 5: LogSumExp
inter_ = -dists_with_mean * weights**2
# (2 ** N_neighbors - 1, k_mtm, D)
for k in range(k_mtm):
for d in range(D):
nl_ratio[i, k] += numba_logsumexp_stable(
inter_[:, k, d], weights=weights[:, 0, d]
)
return nl_ratio