Source code for beetroots.modelling.priors.abstract_spatial_prior

import abc
import os
from collections import defaultdict
from typing import List, Union

import numpy as np
import pandas as pd

from beetroots.modelling.priors.abstract_prior import PriorProbaDistribution
from beetroots.modelling.priors.spatial_prior_params import SpatialPriorParams


[docs] def build_list_edges( df: pd.DataFrame, use_next_nearest_neighbors: bool = True, ) -> np.ndarray: r"""builds the list of edges necessary in the rest of the functions of this file Parameters ---------- df : pandas.DataFrame observation map data. The index needs to contain 2 columns (one for the x-coordinate of the pixel and one for its y-coordinate) use_next_nearest_neighbors : bool, optional wether or not to use the next nearest neighbors, i.e., in diagonal, by default True Returns ------- list_edges : numpy.array of shape (-1, 2) list of 2 by 2 neighboring relations without duplicates (each pair is ordered by lowest id to highest id) """ df_clusters = df.copy() df_clusters = df_clusters.reset_index().set_index("idx") df_clusters["clusters"] = 0 df_clusters = df_clusters["clusters"] list_edges = [] list_considered_neighbors = ( [(1, 0), (0, 1), (1, 1), (-1, 1)] if use_next_nearest_neighbors else [(1, 0), (0, 1)] ) for x, y in list(df.index): idx = df.at[(x, y), "idx"] cluster_current = df_clusters.at[idx] for delta_x, delta_y in list_considered_neighbors: x2 = x + delta_x y2 = y + delta_y if (x2, y2) in df.index: idx2 = df.at[(x2, y2), "idx"] if df_clusters.at[idx2] == cluster_current: list_edges.append([idx, idx2]) list_edges = np.array(list_edges) # list_edges = list_edges[list_edges[:, 0].argsort()] print(f"total number of edges in img graph : {list_edges.size // 2}") return list_edges
[docs] class SpatialPrior(PriorProbaDistribution): r"""Abstract Base Class for a spatial regularizing prior""" __slots__ = ( "D", "N", "use_next_nearest_neighbors", "list_edges", "dict_neighbors", "dict_sites", "initial_weights", "weights", ) def __init__( self, spatial_prior_params: SpatialPriorParams, cloud_name: str, D: int, N: int, df: pd.DataFrame, list_idx_sampling: List[int], ) -> None: super().__init__(D, N) # list of neighboring relations self.use_next_nearest_neighbors = ( spatial_prior_params.use_next_nearest_neighbors ) r"""bool: wether to use the next nearest neighbors (i.e., neighbors in diagonal)""" self.list_edges = build_list_edges(df, self.use_next_nearest_neighbors) r"""np.ndarray: set of edges in the graph induced by the spatial regularization""" self.dict_neighbors = defaultdict(list) for edge in self.list_edges: self.dict_neighbors[edge[0]].append(edge[1]) self.dict_neighbors[edge[1]].append(edge[0]) self.dict_neighbors = dict(self.dict_neighbors) self.dict_sites = self.build_sites(df) """dict[int, np.ndarray]: sites of the graph induced the spatial regularization. A site is a set of nodes that are conditionally independent.""" # one weight per dimension initial_weights = spatial_prior_params.initial_regu_weights * 1 if initial_weights is not None: if isinstance(initial_weights, list): initial_weights = np.array(initial_weights) initial_weights = initial_weights[list_idx_sampling] * 1 weights = initial_weights[list_idx_sampling] * 1 # assert self.initial_weights.shape == ( # self.D, # ), f"{self.initial_weights.shape} should be size {self.D}" else: initial_weights = np.ones((D,)) weights = np.ones((D,)) self.initial_weights = initial_weights r"""np.ndarray of shape (D,): initial weights of the spatial regularization :math:`\tau`""" self.weights = weights r"""np.ndarray of shape (D,): current value of weights of the spatial regularization :math:`\tau`""" return
[docs] def build_sites(self, df: pd.DataFrame) -> dict[int, np.ndarray]: """builds the site from the DataFrame of positions Parameters ---------- df : pd.DataFrame positions of the pixels Returns ------- dict[int, np.ndarray] set of sites and corresponding pixels. Forms a partition of the full set of pixels. """ if self.use_next_nearest_neighbors: dict_sites_raw = {i: [] for i in range(4)} for x, y in list(df.index): idx_site = 2 * (x % 2) + y % 2 dict_sites_raw[idx_site].append(df.at[(x, y), "idx"]) else: dict_sites_raw = {i: [] for i in range(2)} for x, y in list(df.index): idx_site = (x + y) % 2 dict_sites_raw[idx_site].append(df.at[(x, y), "idx"]) dict_sites = {} for k, list_idx in dict_sites_raw.items(): dict_sites[k] = np.sort(list_idx) return dict_sites
[docs] @abc.abstractmethod def neglog_pdf( self, Theta: np.ndarray, with_weights: bool = True, **kwargs, ) -> np.ndarray: r"""computes the negative log pdf (defined up to some multiplicative constant) Parameters ---------- Theta : np.array of shape (N, D) set of D maps on which we want to evaluate the neg log prior Returns ------- neglog_p : np.array of shape (D,) set of the D neg log priors evaluation """ pass
[docs] @abc.abstractmethod def gradient_neglog_pdf(self, Theta: np.ndarray) -> np.ndarray: r"""Computes the gradient of the spatial regularization Parameters ---------- Theta : np.array of shape (N, D) current iterate Returns ------- np.array of shape (N, D) gradient of the spatial regularization """ pass
[docs] @abc.abstractmethod def hess_diag_neglog_pdf(self, Theta: np.ndarray) -> np.ndarray: r"""Computes the diagonal of the hessian of the spatial regularization Parameters ---------- Theta : np.array of shape (N, D) current iterate Returns ------- np.array of shape (N, D) diagonal of the hessian of the spatial regularization """ pass