import pickle from pathlib import Path from typing import List, Tuple import numpy as np import pandas as pd from quacc import plot from quacc.environment import env from quacc.utils import fmt_line_md def _get_metric(metric: str): return slice(None) if metric is None else metric def _get_estimators(estimators: List[str], cols: np.ndarray): return slice(None) if estimators is None else cols[np.in1d(cols, estimators)] class EvaluationReport: def __init__(self, name=None): self.data: pd.DataFrame = None self.fit_score = None self.name = name if name is not None else "default" def append_row(self, basep: np.ndarray | Tuple, **row): bp = basep[1] _keys, _values = zip(*row.items()) # _keys = list(row.keys()) # _values = list(row.values()) if self.data is None: _idx = 0 self.data = pd.DataFrame( {k: [v] for k, v in row.items()}, index=pd.MultiIndex.from_tuples([(bp, _idx)]), columns=_keys, ) return _idx = len(self.data.loc[(bp,), :]) if (bp,) in self.data.index else 0 not_in_data = np.setdiff1d(list(row.keys()), self.data.columns.unique(0)) self.data.loc[:, not_in_data] = np.nan self.data.loc[(bp, _idx), :] = row return @property def columns(self) -> np.ndarray: return self.data.columns.unique(0) @property def prevs(self): return np.sort(self.data.index.unique(0)) class CompReport: def __init__( self, reports: List[EvaluationReport], name="default", train_prev=None, valid_prev=None, times=None, ): self._data = ( pd.concat( [er.data for er in reports], keys=[er.name for er in reports], axis=1, ) .swaplevel(0, 1, axis=1) .sort_index(axis=1, level=0, sort_remaining=False) .sort_index(axis=0, level=0) ) self.fit_scores = { er.name: er.fit_score for er in reports if er.fit_score is not None } self.train_prev = train_prev self.valid_prev = valid_prev self.times = times @property def prevs(self) -> np.ndarray: return np.sort(self._data.index.unique(0)) @property def np_prevs(self) -> np.ndarray: return np.around([(1.0 - p, p) for p in self.prevs], decimals=2) def data(self, metric: str = None, estimators: List[str] = None) -> pd.DataFrame: _metric = _get_metric(metric) _estimators = _get_estimators(estimators, self._data.columns.unique(1)) f_data: pd.DataFrame = self._data.copy().loc[:, (_metric, _estimators)] if len(f_data.columns.unique(0)) == 1: f_data = f_data.droplevel(level=0, axis=1) return f_data def shift_data( self, metric: str = None, estimators: List[str] = None ) -> pd.DataFrame: shift_idx_0 = np.around( np.abs( self._data.index.get_level_values(0).to_numpy() - self.train_prev[1] ), decimals=2, ) shift_idx_1 = np.empty(shape=shift_idx_0.shape, dtype=" pd.DataFrame: f_dict = self.data(metric=metric, estimators=estimators) return f_dict.groupby(level=0).mean() def stdev_by_prevs( self, metric: str = None, estimators: List[str] = None ) -> pd.DataFrame: f_dict = self.data(metric=metric, estimators=estimators) return f_dict.groupby(level=0).std() def table(self, metric: str = None, estimators: List[str] = None) -> pd.DataFrame: f_data = self.data(metric=metric, estimators=estimators) avg_p = f_data.groupby(level=0).mean() avg_p.loc["avg", :] = f_data.mean() return avg_p def get_plots( self, mode="delta", metric="acc", estimators=None, conf="default", return_fig=False, ) -> List[Tuple[str, Path]]: if mode == "delta": avg_data = self.avg_by_prevs(metric=metric, estimators=estimators) return plot.plot_delta( base_prevs=self.np_prevs, columns=avg_data.columns.to_numpy(), data=avg_data.T.to_numpy(), metric=metric, name=conf, train_prev=self.train_prev, return_fig=return_fig, ) elif mode == "delta_stdev": avg_data = self.avg_by_prevs(metric=metric, estimators=estimators) st_data = self.stdev_by_prevs(metric=metric, estimators=estimators) return plot.plot_delta( base_prevs=self.np_prevs, columns=avg_data.columns.to_numpy(), data=avg_data.T.to_numpy(), metric=metric, name=conf, train_prev=self.train_prev, stdevs=st_data.T.to_numpy(), return_fig=return_fig, ) elif mode == "diagonal": f_data = self.data(metric=metric + "_score", estimators=estimators) ref: pd.Series = f_data.loc[:, "ref"] f_data.drop(columns=["ref"], inplace=True) return plot.plot_diagonal( reference=ref.to_numpy(), columns=f_data.columns.to_numpy(), data=f_data.T.to_numpy(), metric=metric, name=conf, train_prev=self.train_prev, return_fig=return_fig, ) elif mode == "shift": _shift_data = self.shift_data(metric=metric, estimators=estimators) shift_avg = _shift_data.groupby(level=0).mean() shift_counts = _shift_data.groupby(level=0).count() shift_prevs = np.around( [(1.0 - p, p) for p in np.sort(shift_avg.index.unique(0))], decimals=2, ) return plot.plot_shift( shift_prevs=shift_prevs, columns=shift_avg.columns.to_numpy(), data=shift_avg.T.to_numpy(), metric=metric, name=conf, train_prev=self.train_prev, counts=shift_counts.T.to_numpy(), return_fig=return_fig, ) def to_md(self, conf="default", metric="acc", estimators=None, stdev=False) -> str: res = f"## {int(np.around(self.train_prev, decimals=2)[1]*100)}% positives\n" res += fmt_line_md(f"train: {str(self.train_prev)}") res += fmt_line_md(f"validation: {str(self.valid_prev)}") for k, v in self.times.items(): res += fmt_line_md(f"{k}: {v:.3f}s") res += "\n" res += self.table(metric=metric, estimators=estimators).to_html() + "\n\n" plot_modes = np.array(["delta", "diagonal", "shift"], dtype="object") if stdev: whd = np.where(plot_modes == "delta")[0] if len(whd) > 0: plot_modes = np.insert(plot_modes, whd + 1, "delta_stdev") for mode in plot_modes: op = self.get_plots( mode=mode, metric=metric, estimators=estimators, conf=conf, ) res += f"![plot_{mode}]({op.relative_to(env.OUT_DIR).as_posix()})\n" return res class DatasetReport: def __init__(self, name, crs=None): self.name = name self.crs: List[CompReport] = [] if crs is None else crs def data(self, metric: str = None, estimators: str = None) -> pd.DataFrame: def _cr_train_prev(cr: CompReport): return cr.train_prev[1] def _cr_data(cr: CompReport): return cr.data(metric, estimators) _crs_sorted = sorted( [(_cr_train_prev(cr), _cr_data(cr)) for cr in self.crs], key=lambda cr: len(cr[1].columns), reverse=True, ) _crs_train, _crs_data = zip(*_crs_sorted) _data = pd.concat(_crs_data, axis=0, keys=np.around(_crs_train, decimals=2)) _data = _data.sort_index(axis=0, level=0) return _data def shift_data(self, metric: str = None, estimators: str = None) -> pd.DataFrame: _shift_data: pd.DataFrame = pd.concat( sorted( [cr.shift_data(metric, estimators) for cr in self.crs], key=lambda d: len(d.columns), reverse=True, ), axis=0, ) shift_idx_0 = _shift_data.index.get_level_values(0) shift_idx_1 = np.empty(shape=shift_idx_0.shape, dtype=" 0: a = np.insert(a, whb + 1, "pippo") print(a) print("-" * 100) dff: pd.DataFrame = df.loc[:, ("a",)] print(dff.to_dict(orient="list")) dff = dff.drop(columns=["v"]) print(dff) s: pd.Series = dff.loc[:, "e"] print(s) print(s.to_numpy()) print(type(s.to_numpy())) print("-" * 100) df3 = pd.concat([df, df], axis=0, keys=[0.5, 0.3]).sort_index(axis=0, level=0) print(df3) df3n = pd.concat([df, df], axis=0).sort_index(axis=0, level=0) print(df3n) df = df3 print("-" * 100) print(df.groupby(level=1).mean(), df.groupby(level=1).count()) print("-" * 100) print(df) for ls in df.T.to_numpy(): print(ls) print("-" * 100) if __name__ == "__main__": __test()