import os
import warnings
from typing import Tuple
import numpy as np
import pandas as pd
from beetroots.inversion.plots.map_shaper import MapShaper
from beetroots.modelling.forward_maps.abstract_base import ForwardMap
from beetroots.simulations.astro.observation.abstract_observation import (
SimulationObservation,
)
from beetroots.space_transform.transform import MyScaler
[docs]
class SimulationToyCase(SimulationObservation):
[docs]
def setup_observation(
self,
scaler: MyScaler,
forward_map: ForwardMap,
sigma_a: np.ndarray,
sigma_m: np.ndarray,
omega: np.ndarray,
) -> Tuple[pd.DataFrame, np.ndarray]:
data_int_path = f"{self.DATA_PATH}/{self.cloud_name}"
assert os.path.isfile(f"{data_int_path}.pkl") or os.path.isfile(
f"{data_int_path}.csv"
), f"There is no csv or pkl file called {self.DATA_PATH}/{self.cloud_name}."
if os.path.isfile(f"{data_int_path}.pkl"):
syn_map = pd.read_pickle(f"{data_int_path}.pkl")
else:
syn_map = pd.read_csv(f"{data_int_path}.csv")
warnings.warn(
"For faster loading, consider using the `.pkl` format instead `.csv`."
)
for col in ["idx", "X", "Y"] + self.list_names:
assert col in list(syn_map.columns), f"{col} not in {list(syn_map.columns)}"
syn_map = syn_map.sort_values("idx")
syn_map = syn_map.set_index(["X", "Y"])
syn_map.loc[:, self.list_names] = scaler.from_lin_to_scaled(
syn_map.loc[:, self.list_names].values
)
# * generate observations for specified lines
self.Theta_true_scaled = syn_map.loc[:, self.list_names].values
self.Theta_true_scaled = self.Theta_true_scaled[:, self.list_idx_sampling]
syn_map.loc[:, self.list_lines_fit] = forward_map.evaluate(
self.Theta_true_scaled,
)
# * save observations
syn_map_to_csv = syn_map.copy()
syn_map_to_csv.loc[:, self.list_names] = scaler.from_scaled_to_lin(
syn_map_to_csv.loc[:, self.list_names].values,
)
syn_map_to_csv.to_csv(
f"{self.path_data_csv_in}/true_params_and_emissions_maps.csv"
)
self.map_shaper = MapShaper(syn_map_to_csv)
# * generation of noisy observation
rng = np.random.default_rng(2023)
eps_a = rng.normal(loc=0.0, scale=sigma_a)
eps_m = rng.lognormal(
mean=-(sigma_m**2) / 2,
sigma=sigma_m,
)
assert eps_a.shape == (self.N, self.L), f"{(self.N, self.L)}"
assert eps_m.shape == (self.N, self.L), f"{(self.N, self.L)}"
f_Theta = forward_map.evaluate(self.Theta_true_scaled)
y = np.maximum(omega, eps_m * f_Theta + eps_a)
# rv_a = norm(loc=0, scale=sigma_a)
# rv_m = lognorm(scale=np.exp(-(sigma_m ** 2) / 2), s=sigma_m)
# y0 =forward_map.evaluate(self.Theta_true_scaled)
# eps_a = rv_a.rvs(random_state=42)
# eps_m = rv_m.rvs(random_state=42)
# assert eps_a.shape == (self.N, self.L)
# assert eps_m.shape == (self.N, self.L)
# y = np.maximum(omega, eps_m * y0 + eps_a) # (N,L)
# * save observation
df_observation = syn_map.copy()
df_observation = df_observation.drop(columns=self.list_names)
df_observation.loc[:, self.list_lines_fit] = y * 1
df_observation.to_csv(f"{self.path_data_csv_in}/observation_maps.csv")
return syn_map, y