forked from moreo/QuaPy
fgsld
This commit is contained in:
parent
f76a507e14
commit
eabfb34626
|
@ -0,0 +1,116 @@
|
|||
import numpy as np
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
|
||||
from sklearn.metrics import brier_score_loss
|
||||
from sklearn.preprocessing import MultiLabelBinarizer
|
||||
|
||||
from metrics import smoothmacroF1, isometric_brier_decomposition, isomerous_brier_decomposition
|
||||
|
||||
History = namedtuple('History', ('posteriors', 'priors', 'y', 'iteration', 'stopping_criterium'))
|
||||
MeasureSingleHistory = namedtuple('MeasureSingleHistory', (
|
||||
'soft_acc', 'soft_f1', 'abs_errors', 'test_priors', 'train_priors', 'predict_priors', 'brier',
|
||||
'isometric_ref_loss', 'isometric_cal_loss', 'isomerous_ref_loss', 'isomerous_cal_loss'
|
||||
))
|
||||
|
||||
|
||||
def get_measures_single_history(history: History, multi_class) -> MeasureSingleHistory:
|
||||
y = history.y
|
||||
|
||||
y_bin = MultiLabelBinarizer(classes=list(range(history.posteriors.shape[1]))).fit_transform(np.expand_dims(y, 1))
|
||||
|
||||
soft_acc = soft_accuracy(y, history.posteriors)
|
||||
f1 = smoothmacroF1(y_bin, history.posteriors)
|
||||
|
||||
if multi_class:
|
||||
test_priors = np.mean(y_bin, 0)
|
||||
abs_errors = abs(test_priors - history.priors)
|
||||
train_priors = history.priors
|
||||
predict_priors = np.mean(history.posteriors, 0)
|
||||
brier = 0
|
||||
else:
|
||||
test_priors = np.mean(y_bin, 0)[1]
|
||||
abs_errors = abs(test_priors - history.priors[1])
|
||||
train_priors = history.priors[1]
|
||||
predict_priors = np.mean(history.posteriors[:, 1])
|
||||
brier = brier_score_loss(y, history.posteriors[:, 1])
|
||||
|
||||
isometric_cal_loss, isometric_ref_loss = isometric_brier_decomposition(y, history.posteriors)
|
||||
isomerous_em_cal_loss, isomerous_em_ref_loss = isomerous_brier_decomposition(y, history.posteriors)
|
||||
|
||||
return MeasureSingleHistory(
|
||||
soft_acc, f1, abs_errors, test_priors, train_priors, predict_priors, brier, isometric_ref_loss,
|
||||
isometric_cal_loss, isomerous_em_ref_loss, isomerous_em_cal_loss
|
||||
)
|
||||
|
||||
|
||||
def soft_accuracy(y, posteriors):
|
||||
return sum(posteriors[y == c][:, c].sum() for c in range(posteriors.shape[1])) / posteriors.sum()
|
||||
|
||||
|
||||
def soft_f1(y, posteriors):
|
||||
cont_matrix = {
|
||||
'TPM': posteriors[y == 1][:, 1].sum(),
|
||||
'TNM': posteriors[y == 0][:, 0].sum(),
|
||||
'FPM': posteriors[y == 0][:, 1].sum(),
|
||||
'FNM': posteriors[y == 1][:, 0].sum()
|
||||
}
|
||||
precision = cont_matrix['TPM'] / (cont_matrix['TPM'] + cont_matrix['FPM'])
|
||||
recall = cont_matrix['TPM'] / (cont_matrix['TPM'] + cont_matrix['FNM'])
|
||||
return 2 * (precision * recall / (precision + recall))
|
||||
|
||||
|
||||
def em(y, posteriors_zero, priors_zero, epsilon=1e-6, multi_class=False, return_posteriors_hist=False):
|
||||
"""
|
||||
Implements the prior correction method based on EM presented in:
|
||||
"Adjusting the Outputs of a Classifier to New a Priori Probabilities: A Simple Procedure"
|
||||
Saerens, Latinne and Decaestecker, 2002
|
||||
http://www.isys.ucl.ac.be/staff/marco/Publications/Saerens2002a.pdf
|
||||
|
||||
:param y: true labels of test items, to measure accuracy, precision and recall.
|
||||
:param posteriors_zero: posterior probabilities on test items, as returned by a classifier. A 2D-array with shape
|
||||
Ø(items, classes).
|
||||
:param priors_zero: prior probabilities measured on training set.
|
||||
:param epsilon: stopping threshold.
|
||||
:param multi_class: whether the algorithm is running in a multi-label multi-class context or not.
|
||||
:param return_posteriors_hist: whether posteriors for each iteration should be returned or not. If true, the returned
|
||||
posteriors_s will actually be the list of posteriors for every iteration.
|
||||
:return: posteriors_s, priors_s, history: final adjusted posteriors, final adjusted priors, a list of length s
|
||||
where each element is a tuple with the step counter, the current priors (as list), the stopping criterium value,
|
||||
accuracy, precision and recall.
|
||||
"""
|
||||
s = 0
|
||||
priors_s = np.copy(priors_zero)
|
||||
posteriors_s = np.copy(posteriors_zero)
|
||||
if return_posteriors_hist:
|
||||
posteriors_hist = [posteriors_s.copy()]
|
||||
val = 2 * epsilon
|
||||
history = list()
|
||||
history.append(get_measures_single_history(History(posteriors_zero, priors_zero, y, s, 1), multi_class))
|
||||
while not val < epsilon and s < 999:
|
||||
# M step
|
||||
priors_s_minus_one = priors_s.copy()
|
||||
priors_s = posteriors_s.mean(0)
|
||||
|
||||
# E step
|
||||
ratios = priors_s / priors_zero
|
||||
denominators = 0
|
||||
for c in range(priors_zero.shape[0]):
|
||||
denominators += ratios[c] * posteriors_zero[:, c]
|
||||
for c in range(priors_zero.shape[0]):
|
||||
posteriors_s[:, c] = ratios[c] * posteriors_zero[:, c] / denominators
|
||||
|
||||
# check for stop
|
||||
val = 0
|
||||
for i in range(len(priors_s_minus_one)):
|
||||
val += abs(priors_s_minus_one[i] - priors_s[i])
|
||||
|
||||
logging.debug(f"Em iteration: {s}; Val: {val}")
|
||||
s += 1
|
||||
if return_posteriors_hist:
|
||||
posteriors_hist.append(posteriors_s.copy())
|
||||
history.append(get_measures_single_history(History(posteriors_s, priors_s, y, s, val), multi_class))
|
||||
|
||||
if return_posteriors_hist:
|
||||
return posteriors_hist, priors_s, history
|
||||
return posteriors_s, priors_s, history
|
|
@ -0,0 +1,75 @@
|
|||
from sklearn.calibration import CalibratedClassifierCV
|
||||
from sklearn.svm import LinearSVC
|
||||
|
||||
from NewMethods.fgsld.fine_grained_sld import FineGrainedSLD
|
||||
from method.aggregative import EMQ, CC
|
||||
from quapy.data import LabelledCollection
|
||||
from quapy.method.base import BaseQuantifier
|
||||
import quapy as qp
|
||||
import quapy.functional as F
|
||||
from sklearn.linear_model import LogisticRegression
|
||||
|
||||
|
||||
class FakeFGLSD(BaseQuantifier):
|
||||
def __init__(self, learner, nbins, isomerous):
|
||||
self.learner = learner
|
||||
self.nbins = nbins
|
||||
self.isomerous = isomerous
|
||||
|
||||
def fit(self, data: LabelledCollection):
|
||||
self.Xtr, self.ytr = data.Xy
|
||||
self.learner.fit(self.Xtr, self.ytr)
|
||||
return self
|
||||
|
||||
def quantify(self, instances):
|
||||
tr_priors = F.prevalence_from_labels(self.ytr, n_classes=2)
|
||||
fgsld = FineGrainedSLD(self.Xtr, instances, self.ytr, tr_priors, self.learner, n_bins=self.nbins)
|
||||
priors, posteriors = fgsld.run(self.isomerous)
|
||||
return priors
|
||||
|
||||
def get_params(self, deep=True):
|
||||
pass
|
||||
|
||||
def set_params(self, **parameters):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
qp.environ['SAMPLE_SIZE'] = 500
|
||||
|
||||
dataset = qp.datasets.fetch_reviews('hp')
|
||||
qp.data.preprocessing.text2tfidf(dataset, min_df=5, inplace=True)
|
||||
|
||||
training = dataset.training
|
||||
test = dataset.test
|
||||
|
||||
cls = CalibratedClassifierCV(LinearSVC())
|
||||
|
||||
|
||||
method_names, true_prevs, estim_prevs, tr_prevs = [], [], [], []
|
||||
|
||||
for model, model_name in [
|
||||
(CC(cls), 'CC'),
|
||||
(FakeFGLSD(cls, nbins=1, isomerous=False), 'FGSLD-1'),
|
||||
(FakeFGLSD(cls, nbins=2, isomerous=False), 'FGSLD-2'),
|
||||
#(FakeFGLSD(cls, nbins=5, isomerous=False), 'FGSLD-5'),
|
||||
#(FakeFGLSD(cls, nbins=10, isomerous=False), 'FGSLD-10'),
|
||||
#(FakeFGLSD(cls, nbins=50, isomerous=False), 'FGSLD-50'),
|
||||
#(FakeFGLSD(cls, nbins=100, isomerous=False), 'FGSLD-100'),
|
||||
# (FakeFGLSD(cls, nbins=1, isomerous=False), 'FGSLD-1'),
|
||||
#(FakeFGLSD(cls, nbins=10, isomerous=True), 'FGSLD-10-ISO'),
|
||||
# (FakeFGLSD(cls, nbins=50, isomerous=False), 'FGSLD-50'),
|
||||
(EMQ(cls), 'SLD'),
|
||||
]:
|
||||
print('running ', model_name)
|
||||
model.fit(training)
|
||||
true_prev, estim_prev = qp.evaluation.artificial_sampling_prediction(
|
||||
model, test, qp.environ['SAMPLE_SIZE'], n_repetitions=10, n_prevpoints=21, n_jobs=-1
|
||||
)
|
||||
method_names.append(model_name)
|
||||
true_prevs.append(true_prev)
|
||||
estim_prevs.append(estim_prev)
|
||||
tr_prevs.append(training.prevalence())
|
||||
|
||||
|
||||
qp.plot.binary_diagonal(method_names, true_prevs, estim_prevs, train_prev=tr_prevs[0], savepath='./plot_fglsd.png')
|
|
@ -0,0 +1,107 @@
|
|||
import numpy as np
|
||||
from metrics import isomerous_bins, isometric_bins
|
||||
from em import History, get_measures_single_history
|
||||
|
||||
|
||||
class FineGrainedSLD:
|
||||
def __init__(self, x_tr, x_te, y_tr, tr_priors, clf, n_bins=10):
|
||||
self.y_tr = y_tr
|
||||
self.clf = clf
|
||||
self.tr_priors = tr_priors
|
||||
self.tr_preds = clf.predict_proba(x_tr)
|
||||
self.te_preds = clf.predict_proba(x_te)
|
||||
self.n_bins = n_bins
|
||||
self.history: [History] = []
|
||||
self.multi_class = False
|
||||
|
||||
def run(self, isomerous_binning, epsilon=1e-6, compute_bins_at_every_iter=False, return_posteriors_hist=False):
|
||||
"""
|
||||
Run the FGSLD algorithm.
|
||||
|
||||
:param isomerous_binning: whether to use isomerous or isometric binning.
|
||||
:param epsilon: stopping condition.
|
||||
:param compute_bins_at_every_iter: whether FGSLD should recompute the posterior bins at every iteration or not.
|
||||
:param return_posteriors_hist: whether to return posteriors at every iteration or not.
|
||||
:return: If `return_posteriors_hist` is true, the returned posteriors will be a list of numpy arrays, else a single numpy array with posteriors at last iteration.
|
||||
"""
|
||||
smoothing_tr = 1 / (2 * self.y_tr.shape[0])
|
||||
smoothing_te = smoothing_tr
|
||||
s = 0
|
||||
tr_bin_priors = np.zeros((self.n_bins, self.tr_preds.shape[1]), dtype=np.float)
|
||||
te_bin_priors = np.zeros((self.n_bins, self.te_preds.shape[1]), dtype=np.float)
|
||||
tr_bins = self.__create_bins(training=True, isomerous_binning=isomerous_binning)
|
||||
te_bins = self.__create_bins(training=False, isomerous_binning=isomerous_binning)
|
||||
self.__compute_bins_priors(tr_bin_priors, self.tr_preds, tr_bins, smoothing_tr)
|
||||
|
||||
val = 2 * epsilon
|
||||
if return_posteriors_hist:
|
||||
posteriors_hist = [self.te_preds.copy()]
|
||||
while not val < epsilon and s < 1000:
|
||||
assert np.all(np.around(self.te_preds.sum(axis=1), 4) == 1), f"Probabilities do not sum to 1:\ns={s}, " \
|
||||
f"probs={self.te_preds.sum(axis=1)}"
|
||||
if compute_bins_at_every_iter:
|
||||
te_bins = self.__create_bins(training=False, isomerous_binning=isomerous_binning)
|
||||
|
||||
if s == 0:
|
||||
te_bin_priors_prev = tr_bin_priors.copy()
|
||||
else:
|
||||
te_bin_priors_prev = te_bin_priors.copy()
|
||||
self.__compute_bins_priors(te_bin_priors, self.te_preds, te_bins, smoothing_te)
|
||||
|
||||
te_preds_cp = self.te_preds.copy()
|
||||
for label_idx, bins in te_bins.items():
|
||||
for i, bin_ in enumerate(bins):
|
||||
if bin_.shape[0] == 0:
|
||||
continue
|
||||
self.te_preds[:, label_idx][bin_] = (te_preds_cp[:, label_idx][bin_]) * \
|
||||
(te_bin_priors[i][label_idx] / te_bin_priors_prev[i][label_idx])
|
||||
|
||||
# Normalization step
|
||||
self.te_preds = (self.te_preds.T / self.te_preds.sum(axis=1)).T
|
||||
|
||||
val = 0
|
||||
for label_idx in range(te_bin_priors.shape[1]):
|
||||
if (temp := max(abs((te_bin_priors[:, label_idx] / te_bin_priors_prev[:, label_idx]) - 1))) > val:
|
||||
val = temp
|
||||
s += 1
|
||||
if return_posteriors_hist:
|
||||
posteriors_hist.append(self.te_preds.copy())
|
||||
if return_posteriors_hist:
|
||||
return self.te_preds.mean(axis=0), posteriors_hist
|
||||
return self.te_preds.mean(axis=0), self.te_preds
|
||||
|
||||
def __compute_bins_priors(self, bin_priors_placeholder, posteriors, bins, smoothing):
|
||||
for label_idx, bins in bins.items():
|
||||
for i, bin_ in enumerate(bins):
|
||||
if bin_.shape[0] == 0:
|
||||
bin_priors_placeholder[i, label_idx] = smoothing
|
||||
continue
|
||||
numerator = posteriors[:, label_idx][bin_].mean()
|
||||
bin_prior = (numerator + smoothing) / (1 + self.n_bins * smoothing) # normalize priors
|
||||
bin_priors_placeholder[i, label_idx] = bin_prior
|
||||
|
||||
def __find_bin_idx(self, label_bins: [np.array], idx: int or list):
|
||||
if hasattr(idx, '__len__'):
|
||||
idxs = np.zeros(len(idx), dtype=np.int)
|
||||
for i, bin_ in enumerate(label_bins):
|
||||
for j, id_ in enumerate(idx):
|
||||
if id_ in bin_:
|
||||
idxs[j] = i
|
||||
return idxs
|
||||
else:
|
||||
for i, bin_ in enumerate(label_bins):
|
||||
if idx in bin_:
|
||||
return i
|
||||
|
||||
def __create_bins(self, training: bool, isomerous_binning: bool):
|
||||
bins = {}
|
||||
preds = self.tr_preds if training else self.te_preds
|
||||
if isomerous_binning:
|
||||
for label_idx in range(preds.shape[1]):
|
||||
bins[label_idx] = isomerous_bins(label_idx, preds, self.n_bins)
|
||||
else:
|
||||
intervals = np.linspace(0., 1., num=self.n_bins, endpoint=False)
|
||||
for label_idx in range(preds.shape[1]):
|
||||
bins_ = isometric_bins(label_idx, preds, intervals, 0.1)
|
||||
bins[label_idx] = [bins_[i] for i in intervals]
|
||||
return bins
|
|
@ -0,0 +1,260 @@
|
|||
import numpy as np
|
||||
|
||||
"""
|
||||
Scikit learn provides a full set of evaluation metrics, but they treat special cases differently.
|
||||
I.e., when the number of true positives, false positives, and false negatives ammount to 0, all
|
||||
affected metrics (precision, recall, and thus f1) output 0 in Scikit learn.
|
||||
We adhere to the common practice of outputting 1 in this case since the classifier has correctly
|
||||
classified all examples as negatives.
|
||||
"""
|
||||
|
||||
|
||||
def isometric_brier_decomposition(true_labels, predicted_labels, bin_intervals=np.arange(0., 1.1, 0.1), step=0.1):
|
||||
"""
|
||||
The Isometric Brier decomposition or score is obtained by partitioning U into intervals I_1j,...,I_bj that
|
||||
have equal length, where U is the total size of our test set (i.e., true_labels.shape[0]). This means that,
|
||||
if b=10 then I_1j = [0.0,0.1), I_2j = [0.2, 0.3),...,I_bj = [0.9,1.0).
|
||||
|
||||
bin_intervals is a numpy.array containing the range of the different intervals. Since it is a single dimensional
|
||||
array, for every interval I_n we take the posterior probabilities Pr_n(x) such that I_n <= Pr_n(x) < I_n + step.
|
||||
This variable defaults to np.arange(0., 1.0, 0.1), i.e. an array like [0.1, 0.2, ..., 1.0].
|
||||
|
||||
:return: a tuple (calibration score, refinement score)
|
||||
"""
|
||||
labels = set(true_labels)
|
||||
calibration_score, refinement_score = 0.0, 0.0
|
||||
for i in range(len(labels)):
|
||||
bins = isometric_bins(i, predicted_labels, bin_intervals, step)
|
||||
c_score, r_score = brier_decomposition(bins.values(), true_labels, predicted_labels, class_=i)
|
||||
calibration_score += c_score
|
||||
refinement_score += r_score
|
||||
return calibration_score, refinement_score
|
||||
|
||||
|
||||
def isomerous_brier_decomposition(true_labels, predicted_labels, n=10):
|
||||
"""
|
||||
The Isomerous Brier decomposition or score is obtained by partitioning U into intervals I_1j,...,I_bj such that
|
||||
the corresponding bins B_1j,...,B_bj have equal size, where U is our test set. This means that, for every x' in
|
||||
B_sj and x'' in B_tj with s < t, it holds that Pr(c_j|x') <= Pr(c_j|x'') and |B_sj| == |B_tj|, for any s,t in
|
||||
{1,...,b}.
|
||||
|
||||
The n variable holds the number of bins we want (defaults to 10). Notice that we perform a numpy.array_split on
|
||||
the predicted_labels, creating l % n sub-arrays of size l//n + 1 and the rest of size l//n, where l is the length
|
||||
of the array.
|
||||
|
||||
:return: a tuple (calibration score, refinement score)
|
||||
"""
|
||||
|
||||
labels = set(true_labels)
|
||||
calibration_score, refinement_score = 0.0, 0.0
|
||||
for i in range(len(labels)):
|
||||
bins = isomerous_bins(i, predicted_labels, n)
|
||||
c_score, r_score = brier_decomposition(bins, true_labels, predicted_labels, class_=i)
|
||||
calibration_score += c_score
|
||||
refinement_score += r_score
|
||||
return calibration_score, refinement_score
|
||||
|
||||
|
||||
def brier_decomposition(bins, true_labels, predicted_labels, class_=1):
|
||||
"""
|
||||
:param bins: must be an array of indices
|
||||
:return: a tuple (calibration_score, refinement_score)
|
||||
"""
|
||||
calibration_score = 0
|
||||
refinement_score = 0
|
||||
for bin_ in bins:
|
||||
if bin_.size <= 0:
|
||||
continue
|
||||
v_x = (bin_.shape[0] / true_labels.shape[0])
|
||||
ro_x = np.mean(true_labels[bin_] == class_)
|
||||
calibration_score += v_x * (predicted_labels[bin_, class_].mean() - ro_x)**2
|
||||
refinement_score += (v_x * ro_x) * (1 - ro_x)
|
||||
labels_len = len(set(true_labels))
|
||||
return calibration_score / (labels_len * len(bins)), refinement_score / (labels_len * len(bins))
|
||||
|
||||
|
||||
def isometric_bins(label_index, predicted_labels, bin_intervals, step):
|
||||
predicted_class_label = predicted_labels[:, label_index]
|
||||
return {interv: np.where(np.logical_and(interv <= predicted_class_label, predicted_class_label < interv + step))[0]
|
||||
for interv in bin_intervals}
|
||||
|
||||
|
||||
def isomerous_bins(label_index, predicted_labels, n):
|
||||
sorted_indices = predicted_labels[:, label_index].argsort()
|
||||
return np.array_split(sorted_indices, n)
|
||||
|
||||
|
||||
# true_labels and predicted_labels are two matrices in sklearn.preprocessing.MultiLabelBinarizer format
|
||||
def macroF1(true_labels, predicted_labels):
|
||||
return macro_average(true_labels, predicted_labels, f1)
|
||||
|
||||
|
||||
# true_labels and predicted_labels are two matrices in sklearn.preprocessing.MultiLabelBinarizer format
|
||||
def microF1(true_labels, predicted_labels):
|
||||
return micro_average(true_labels, predicted_labels, f1)
|
||||
|
||||
|
||||
# true_labels and predicted_labels are two matrices in sklearn.preprocessing.MultiLabelBinarizer format
|
||||
def macroK(true_labels, predicted_labels):
|
||||
return macro_average(true_labels, predicted_labels, K)
|
||||
|
||||
|
||||
# true_labels and predicted_labels are two matrices in sklearn.preprocessing.MultiLabelBinarizer format
|
||||
def microK(true_labels, predicted_labels):
|
||||
return micro_average(true_labels, predicted_labels, K)
|
||||
|
||||
|
||||
# true_labels is a matrix in sklearn.preprocessing.MultiLabelBinarizer format and posterior_probabilities is a matrix
|
||||
# of the same shape containing real values in [0,1]
|
||||
def smoothmacroF1(true_labels, posterior_probabilities):
|
||||
return macro_average(true_labels, posterior_probabilities, f1, metric_statistics=soft_single_metric_statistics)
|
||||
|
||||
|
||||
# true_labels is a matrix in sklearn.preprocessing.MultiLabelBinarizer format and posterior_probabilities is a matrix
|
||||
# of the same shape containing real values in [0,1]
|
||||
def smoothmicroF1(true_labels, posterior_probabilities):
|
||||
return micro_average(true_labels, posterior_probabilities, f1, metric_statistics=soft_single_metric_statistics)
|
||||
|
||||
|
||||
# true_labels is a matrix in sklearn.preprocessing.MultiLabelBinarizer format and posterior_probabilities is a matrix
|
||||
# of the same shape containing real values in [0,1]
|
||||
def smoothmacroK(true_labels, posterior_probabilities):
|
||||
return macro_average(true_labels, posterior_probabilities, K, metric_statistics=soft_single_metric_statistics)
|
||||
|
||||
|
||||
# true_labels is a matrix in sklearn.preprocessing.MultiLabelBinarizer format and posterior_probabilities is a matrix
|
||||
# of the same shape containing real values in [0,1]
|
||||
def smoothmicroK(true_labels, posterior_probabilities):
|
||||
return micro_average(true_labels, posterior_probabilities, K, metric_statistics=soft_single_metric_statistics)
|
||||
|
||||
|
||||
class ContTable:
|
||||
def __init__(self, tp=0, tn=0, fp=0, fn=0):
|
||||
self.tp = tp
|
||||
self.tn = tn
|
||||
self.fp = fp
|
||||
self.fn = fn
|
||||
|
||||
def get_d(self): return self.tp + self.tn + self.fp + self.fn
|
||||
|
||||
def get_c(self): return self.tp + self.fn
|
||||
|
||||
def get_not_c(self): return self.tn + self.fp
|
||||
|
||||
def get_f(self): return self.tp + self.fp
|
||||
|
||||
def get_not_f(self): return self.tn + self.fn
|
||||
|
||||
def p_c(self): return (1.0 * self.get_c()) / self.get_d()
|
||||
|
||||
def p_not_c(self): return 1.0 - self.p_c()
|
||||
|
||||
def p_f(self): return (1.0 * self.get_f()) / self.get_d()
|
||||
|
||||
def p_not_f(self): return 1.0 - self.p_f()
|
||||
|
||||
def p_tp(self): return (1.0 * self.tp) / self.get_d()
|
||||
|
||||
def p_tn(self): return (1.0 * self.tn) / self.get_d()
|
||||
|
||||
def p_fp(self): return (1.0 * self.fp) / self.get_d()
|
||||
|
||||
def p_fn(self): return (1.0 * self.fn) / self.get_d()
|
||||
|
||||
def tpr(self):
|
||||
c = 1.0 * self.get_c()
|
||||
return self.tp / c if c > 0.0 else 0.0
|
||||
|
||||
def fpr(self):
|
||||
_c = 1.0 * self.get_not_c()
|
||||
return self.fp / _c if _c > 0.0 else 0.0
|
||||
|
||||
def __add__(self, other):
|
||||
return ContTable(tp=self.tp + other.tp, tn=self.tn + other.tn, fp=self.fp + other.fp, fn=self.fn + other.fn)
|
||||
|
||||
|
||||
def accuracy(cell):
|
||||
return (cell.tp + cell.tn) * 1.0 / (cell.tp + cell.fp + cell.fn + cell.tn)
|
||||
|
||||
|
||||
def f1(cell):
|
||||
num = 2.0 * cell.tp
|
||||
den = 2.0 * cell.tp + cell.fp + cell.fn
|
||||
if den > 0: return num / den
|
||||
# we define f1 to be 1 if den==0 since the classifier has correctly classified all instances as negative
|
||||
return 1.0
|
||||
|
||||
|
||||
def K(cell):
|
||||
specificity, recall = 0., 0.
|
||||
|
||||
AN = cell.tn + cell.fp
|
||||
if AN != 0:
|
||||
specificity = cell.tn * 1. / AN
|
||||
|
||||
AP = cell.tp + cell.fn
|
||||
if AP != 0:
|
||||
recall = cell.tp * 1. / AP
|
||||
|
||||
if AP == 0:
|
||||
return 2. * specificity - 1.
|
||||
elif AN == 0:
|
||||
return 2. * recall - 1.
|
||||
else:
|
||||
return specificity + recall - 1.
|
||||
|
||||
|
||||
# computes the (hard) counters tp, fp, fn, and tn fron a true and predicted vectors of hard decisions
|
||||
# true_labels and predicted_labels are two vectors of shape (number_documents,)
|
||||
def hard_single_metric_statistics(true_labels, predicted_labels):
|
||||
assert len(true_labels) == len(predicted_labels), "Format not consistent between true and predicted labels."
|
||||
nd = len(true_labels)
|
||||
tp = np.sum(predicted_labels[true_labels == 1])
|
||||
fp = np.sum(predicted_labels[true_labels == 0])
|
||||
fn = np.sum(true_labels[predicted_labels == 0])
|
||||
tn = nd - (tp + fp + fn)
|
||||
return ContTable(tp=tp, tn=tn, fp=fp, fn=fn)
|
||||
|
||||
|
||||
# computes the (soft) contingency table where tp, fp, fn, and tn are the cumulative masses for the posterioir
|
||||
# probabilitiesfron with respect to the true binary labels
|
||||
# true_labels and posterior_probabilities are two vectors of shape (number_documents,)
|
||||
def soft_single_metric_statistics(true_labels, posterior_probabilities):
|
||||
assert len(true_labels) == len(posterior_probabilities), "Format not consistent between true and predicted labels."
|
||||
pos_probs = posterior_probabilities[true_labels == 1]
|
||||
neg_probs = posterior_probabilities[true_labels == 0]
|
||||
tp = np.sum(pos_probs)
|
||||
fn = np.sum(1. - pos_probs)
|
||||
fp = np.sum(neg_probs)
|
||||
tn = np.sum(1. - neg_probs)
|
||||
return ContTable(tp=tp, tn=tn, fp=fp, fn=fn)
|
||||
|
||||
|
||||
# if the classifier is single class, then the prediction is a vector of shape=(nD,) which causes issues when compared
|
||||
# to the true labels (of shape=(nD,1)). This method increases the dimensions of the predictions.
|
||||
def __check_consistency_and_adapt(true_labels, predictions):
|
||||
if predictions.ndim == 1:
|
||||
return __check_consistency_and_adapt(true_labels, np.expand_dims(predictions, axis=1))
|
||||
if true_labels.ndim == 1:
|
||||
return __check_consistency_and_adapt(np.expand_dims(true_labels, axis=1), predictions)
|
||||
if true_labels.shape != predictions.shape:
|
||||
raise ValueError("True and predicted label matrices shapes are inconsistent %s %s."
|
||||
% (true_labels.shape, predictions.shape))
|
||||
_, nC = true_labels.shape
|
||||
return true_labels, predictions, nC
|
||||
|
||||
|
||||
def macro_average(true_labels, predicted_labels, metric, metric_statistics=hard_single_metric_statistics):
|
||||
true_labels, predicted_labels, nC = __check_consistency_and_adapt(true_labels, predicted_labels)
|
||||
return np.mean([metric(metric_statistics(true_labels[:, c], predicted_labels[:, c])) for c in range(nC)])
|
||||
|
||||
|
||||
def micro_average(true_labels, predicted_labels, metric, metric_statistics=hard_single_metric_statistics):
|
||||
true_labels, predicted_labels, nC = __check_consistency_and_adapt(true_labels, predicted_labels)
|
||||
|
||||
accum = ContTable()
|
||||
for c in range(nC):
|
||||
other = metric_statistics(true_labels[:, c], predicted_labels[:, c])
|
||||
accum = accum + other
|
||||
|
||||
return metric(accum)
|
Binary file not shown.
After Width: | Height: | Size: 162 KiB |
|
@ -1,4 +1,5 @@
|
|||
import numpy as np
|
||||
from sklearn.base import BaseEstimator
|
||||
from sklearn.decomposition import PCA
|
||||
from sklearn.preprocessing import StandardScaler
|
||||
|
||||
|
@ -10,6 +11,8 @@ from quapy.method.base import BaseQuantifier, BinaryQuantifier
|
|||
from quapy.method.aggregative import PACC, EMQ, HDy
|
||||
import quapy.functional as F
|
||||
from tqdm import tqdm
|
||||
from scipy.sparse import issparse, csr_matrix
|
||||
import scipy
|
||||
|
||||
|
||||
class PACCSLD(PACC):
|
||||
|
@ -123,3 +126,49 @@ class AveragePoolQuantification(BinaryQuantifier):
|
|||
|
||||
def get_params(self, deep=True):
|
||||
return self.learner.get_params(deep=deep)
|
||||
|
||||
|
||||
class WinnowOrthogonal(BaseEstimator):
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def fit(self, X, y):
|
||||
self.classes_ = np.asarray(sorted(np.unique(y)))
|
||||
w1 = np.asarray(X[y == 0].mean(axis=0)).flatten()
|
||||
w2 = np.asarray(X[y == 1].mean(axis=0)).flatten()
|
||||
diff = w2 - w1
|
||||
orth = np.ones_like(diff)
|
||||
orth[0] = -diff[1:].sum() / diff[0]
|
||||
orth /= np.linalg.norm(orth)
|
||||
self.w = orth
|
||||
self.b = w1.dot(orth)
|
||||
return self
|
||||
|
||||
def decision_function(self, X):
|
||||
if issparse(X):
|
||||
Z = X.dot(csr_matrix(self.w).T).toarray().flatten()
|
||||
return Z - self.b
|
||||
else:
|
||||
return np.matmul(X, self.w) - self.b
|
||||
|
||||
def predict(self, X):
|
||||
return 1 * (self.decision_function(X) > 0)
|
||||
|
||||
def split(self, X, y):
|
||||
s = self.predict(X)
|
||||
X0a = X[np.logical_and(y == 0, s == 0)]
|
||||
X0b = X[np.logical_and(y == 0, s == 1)]
|
||||
X1a = X[np.logical_and(y == 1, s == 0)]
|
||||
X1b = X[np.logical_and(y == 1, s == 1)]
|
||||
y0a = np.zeros(X0a.shape[0], dtype=np.int)
|
||||
y0b = np.zeros(X0b.shape[0], dtype=np.int)
|
||||
y1a = np.ones(X1a.shape[0], dtype=np.int)
|
||||
y1b = np.ones(X1b.shape[0], dtype=np.int)
|
||||
return X0a, X0b, X1a, X1b, y0a, y0b, y1a, y1b
|
||||
|
||||
def get_params(self):
|
||||
return {}
|
||||
|
||||
def set_params(self, **params):
|
||||
pass
|
||||
|
|
Loading…
Reference in New Issue