193 lines
6.6 KiB
Python
193 lines
6.6 KiB
Python
import math
|
|
from abc import abstractmethod
|
|
|
|
import numpy as np
|
|
import quapy as qp
|
|
from quapy.data import LabelledCollection
|
|
from quapy.method.aggregative import CC, SLD
|
|
from quapy.model_selection import GridSearchQ
|
|
from quapy.protocol import UPP
|
|
from sklearn.base import BaseEstimator
|
|
from sklearn.linear_model import LogisticRegression
|
|
from sklearn.model_selection import cross_val_predict
|
|
|
|
from quacc.data import ExtendedCollection
|
|
|
|
|
|
class AccuracyEstimator:
|
|
def __init__(self):
|
|
self.fit_score = None
|
|
|
|
def _gs_params(self, t_val: LabelledCollection):
|
|
return {
|
|
"param_grid": {
|
|
"classifier__C": np.logspace(-3, 3, 7),
|
|
"classifier__class_weight": [None, "balanced"],
|
|
"recalib": [None, "bcts"],
|
|
},
|
|
"protocol": UPP(t_val, repeats=1000),
|
|
"error": qp.error.mae,
|
|
"refit": False,
|
|
"timeout": -1,
|
|
"n_jobs": None,
|
|
"verbose": True,
|
|
}
|
|
|
|
def extend(self, base: LabelledCollection, pred_proba=None) -> ExtendedCollection:
|
|
if not pred_proba:
|
|
pred_proba = self.c_model.predict_proba(base.X)
|
|
return ExtendedCollection.extend_collection(base, pred_proba), pred_proba
|
|
|
|
@abstractmethod
|
|
def fit(self, train: LabelledCollection | ExtendedCollection):
|
|
...
|
|
|
|
@abstractmethod
|
|
def estimate(self, instances, ext=False):
|
|
...
|
|
|
|
|
|
AE = AccuracyEstimator
|
|
|
|
|
|
class MulticlassAccuracyEstimator(AccuracyEstimator):
|
|
def __init__(self, c_model: BaseEstimator, q_model="SLD", gs=False, recalib=None):
|
|
super().__init__()
|
|
self.c_model = c_model
|
|
self._q_model_name = q_model.upper()
|
|
self.e_train = None
|
|
self.gs = gs
|
|
self.recalib = recalib
|
|
|
|
def fit(self, train: LabelledCollection | ExtendedCollection):
|
|
# check if model is fit
|
|
# self.model.fit(*train.Xy)
|
|
if isinstance(train, LabelledCollection):
|
|
pred_prob_train = cross_val_predict(
|
|
self.c_model, *train.Xy, method="predict_proba"
|
|
)
|
|
self.e_train = ExtendedCollection.extend_collection(train, pred_prob_train)
|
|
else:
|
|
self.e_train = train
|
|
|
|
if self._q_model_name == "SLD":
|
|
if self.gs:
|
|
t_train, t_val = self.e_train.split_stratified(0.6, random_state=0)
|
|
gs_params = self._gs_params(t_val)
|
|
self.q_model = GridSearchQ(
|
|
SLD(LogisticRegression()),
|
|
**gs_params,
|
|
)
|
|
self.q_model.fit(t_train)
|
|
self.fit_score = self.q_model.best_score_
|
|
else:
|
|
self.q_model = SLD(LogisticRegression(), recalib=self.recalib)
|
|
self.q_model.fit(self.e_train)
|
|
elif self._q_model_name == "CC":
|
|
self.q_model = CC(LogisticRegression())
|
|
self.q_model.fit(self.e_train)
|
|
|
|
def estimate(self, instances, ext=False):
|
|
if not ext:
|
|
pred_prob = self.c_model.predict_proba(instances)
|
|
e_inst = ExtendedCollection.extend_instances(instances, pred_prob)
|
|
else:
|
|
e_inst = instances
|
|
|
|
estim_prev = self.q_model.quantify(e_inst)
|
|
|
|
return self._check_prevalence_classes(
|
|
self.e_train.classes_, self.q_model, estim_prev
|
|
)
|
|
|
|
def _check_prevalence_classes(self, true_classes, q_model, estim_prev):
|
|
if isinstance(q_model, GridSearchQ):
|
|
estim_classes = q_model.best_model().classes_
|
|
else:
|
|
estim_classes = q_model.classes_
|
|
for _cls in true_classes:
|
|
if _cls not in estim_classes:
|
|
estim_prev = np.insert(estim_prev, _cls, [0.0], axis=0)
|
|
return estim_prev
|
|
|
|
|
|
class BinaryQuantifierAccuracyEstimator(AccuracyEstimator):
|
|
def __init__(self, c_model: BaseEstimator, q_model="SLD", gs=False, recalib=None):
|
|
super().__init__()
|
|
self.c_model = c_model
|
|
self._q_model_name = q_model.upper()
|
|
self.q_models = []
|
|
self.gs = gs
|
|
self.recalib = recalib
|
|
self.e_train = None
|
|
|
|
def fit(self, train: LabelledCollection | ExtendedCollection):
|
|
# check if model is fit
|
|
# self.model.fit(*train.Xy)
|
|
if isinstance(train, LabelledCollection):
|
|
pred_prob_train = cross_val_predict(
|
|
self.c_model, *train.Xy, method="predict_proba"
|
|
)
|
|
|
|
self.e_train = ExtendedCollection.extend_collection(train, pred_prob_train)
|
|
elif isinstance(train, ExtendedCollection):
|
|
self.e_train = train
|
|
|
|
self.n_classes = self.e_train.n_classes
|
|
e_trains = self.e_train.split_by_pred()
|
|
|
|
if self._q_model_name == "SLD":
|
|
fit_scores = []
|
|
for e_train in e_trains:
|
|
if self.gs:
|
|
t_train, t_val = e_train.split_stratified(0.6, random_state=0)
|
|
gs_params = self._gs_params(t_val)
|
|
q_model = GridSearchQ(
|
|
SLD(LogisticRegression()),
|
|
**gs_params,
|
|
)
|
|
q_model.fit(t_train)
|
|
fit_scores.append(q_model.best_score_)
|
|
self.q_models.append(q_model)
|
|
else:
|
|
q_model = SLD(LogisticRegression(), recalib=self.recalib)
|
|
q_model.fit(e_train)
|
|
self.q_models.append(q_model)
|
|
|
|
if self.gs:
|
|
self.fit_score = np.mean(fit_scores)
|
|
|
|
elif self._q_model_name == "CC":
|
|
for e_train in e_trains:
|
|
q_model = CC(LogisticRegression())
|
|
q_model.fit(e_train)
|
|
self.q_models.append(q_model)
|
|
|
|
def estimate(self, instances, ext=False):
|
|
# TODO: test
|
|
if not ext:
|
|
pred_prob = self.c_model.predict_proba(instances)
|
|
e_inst = ExtendedCollection.extend_instances(instances, pred_prob)
|
|
else:
|
|
e_inst = instances
|
|
|
|
_ncl = int(math.sqrt(self.n_classes))
|
|
s_inst, norms = ExtendedCollection.split_inst_by_pred(_ncl, e_inst)
|
|
estim_prevs = [
|
|
self._quantify_helper(inst, norm, q_model)
|
|
for (inst, norm, q_model) in zip(s_inst, norms, self.q_models)
|
|
]
|
|
|
|
estim_prev = []
|
|
for prev_row in zip(*estim_prevs):
|
|
for prev in prev_row:
|
|
estim_prev.append(prev)
|
|
|
|
return np.asarray(estim_prev)
|
|
|
|
def _quantify_helper(self, inst, norm, q_model):
|
|
if inst.shape[0] > 0:
|
|
return np.asarray(list(map(lambda p: p * norm, q_model.quantify(inst))))
|
|
else:
|
|
return np.asarray([0.0, 0.0])
|