adding fgsld

This commit is contained in:
Alejandro Moreo Fernandez 2021-03-10 11:25:42 +01:00
parent eabfb34626
commit 168c109794
5 changed files with 0 additions and 558 deletions

View File

@ -1,116 +0,0 @@
import numpy as np
import logging
from collections import namedtuple
from sklearn.metrics import brier_score_loss
from sklearn.preprocessing import MultiLabelBinarizer
from metrics import smoothmacroF1, isometric_brier_decomposition, isomerous_brier_decomposition
History = namedtuple('History', ('posteriors', 'priors', 'y', 'iteration', 'stopping_criterium'))
MeasureSingleHistory = namedtuple('MeasureSingleHistory', (
'soft_acc', 'soft_f1', 'abs_errors', 'test_priors', 'train_priors', 'predict_priors', 'brier',
'isometric_ref_loss', 'isometric_cal_loss', 'isomerous_ref_loss', 'isomerous_cal_loss'
))
def get_measures_single_history(history: History, multi_class) -> MeasureSingleHistory:
y = history.y
y_bin = MultiLabelBinarizer(classes=list(range(history.posteriors.shape[1]))).fit_transform(np.expand_dims(y, 1))
soft_acc = soft_accuracy(y, history.posteriors)
f1 = smoothmacroF1(y_bin, history.posteriors)
if multi_class:
test_priors = np.mean(y_bin, 0)
abs_errors = abs(test_priors - history.priors)
train_priors = history.priors
predict_priors = np.mean(history.posteriors, 0)
brier = 0
else:
test_priors = np.mean(y_bin, 0)[1]
abs_errors = abs(test_priors - history.priors[1])
train_priors = history.priors[1]
predict_priors = np.mean(history.posteriors[:, 1])
brier = brier_score_loss(y, history.posteriors[:, 1])
isometric_cal_loss, isometric_ref_loss = isometric_brier_decomposition(y, history.posteriors)
isomerous_em_cal_loss, isomerous_em_ref_loss = isomerous_brier_decomposition(y, history.posteriors)
return MeasureSingleHistory(
soft_acc, f1, abs_errors, test_priors, train_priors, predict_priors, brier, isometric_ref_loss,
isometric_cal_loss, isomerous_em_ref_loss, isomerous_em_cal_loss
)
def soft_accuracy(y, posteriors):
return sum(posteriors[y == c][:, c].sum() for c in range(posteriors.shape[1])) / posteriors.sum()
def soft_f1(y, posteriors):
cont_matrix = {
'TPM': posteriors[y == 1][:, 1].sum(),
'TNM': posteriors[y == 0][:, 0].sum(),
'FPM': posteriors[y == 0][:, 1].sum(),
'FNM': posteriors[y == 1][:, 0].sum()
}
precision = cont_matrix['TPM'] / (cont_matrix['TPM'] + cont_matrix['FPM'])
recall = cont_matrix['TPM'] / (cont_matrix['TPM'] + cont_matrix['FNM'])
return 2 * (precision * recall / (precision + recall))
def em(y, posteriors_zero, priors_zero, epsilon=1e-6, multi_class=False, return_posteriors_hist=False):
"""
Implements the prior correction method based on EM presented in:
"Adjusting the Outputs of a Classifier to New a Priori Probabilities: A Simple Procedure"
Saerens, Latinne and Decaestecker, 2002
http://www.isys.ucl.ac.be/staff/marco/Publications/Saerens2002a.pdf
:param y: true labels of test items, to measure accuracy, precision and recall.
:param posteriors_zero: posterior probabilities on test items, as returned by a classifier. A 2D-array with shape
Ø(items, classes).
:param priors_zero: prior probabilities measured on training set.
:param epsilon: stopping threshold.
:param multi_class: whether the algorithm is running in a multi-label multi-class context or not.
:param return_posteriors_hist: whether posteriors for each iteration should be returned or not. If true, the returned
posteriors_s will actually be the list of posteriors for every iteration.
:return: posteriors_s, priors_s, history: final adjusted posteriors, final adjusted priors, a list of length s
where each element is a tuple with the step counter, the current priors (as list), the stopping criterium value,
accuracy, precision and recall.
"""
s = 0
priors_s = np.copy(priors_zero)
posteriors_s = np.copy(posteriors_zero)
if return_posteriors_hist:
posteriors_hist = [posteriors_s.copy()]
val = 2 * epsilon
history = list()
history.append(get_measures_single_history(History(posteriors_zero, priors_zero, y, s, 1), multi_class))
while not val < epsilon and s < 999:
# M step
priors_s_minus_one = priors_s.copy()
priors_s = posteriors_s.mean(0)
# E step
ratios = priors_s / priors_zero
denominators = 0
for c in range(priors_zero.shape[0]):
denominators += ratios[c] * posteriors_zero[:, c]
for c in range(priors_zero.shape[0]):
posteriors_s[:, c] = ratios[c] * posteriors_zero[:, c] / denominators
# check for stop
val = 0
for i in range(len(priors_s_minus_one)):
val += abs(priors_s_minus_one[i] - priors_s[i])
logging.debug(f"Em iteration: {s}; Val: {val}")
s += 1
if return_posteriors_hist:
posteriors_hist.append(posteriors_s.copy())
history.append(get_measures_single_history(History(posteriors_s, priors_s, y, s, val), multi_class))
if return_posteriors_hist:
return posteriors_hist, priors_s, history
return posteriors_s, priors_s, history

View File

@ -1,75 +0,0 @@
from sklearn.calibration import CalibratedClassifierCV
from sklearn.svm import LinearSVC
from NewMethods.fgsld.fine_grained_sld import FineGrainedSLD
from method.aggregative import EMQ, CC
from quapy.data import LabelledCollection
from quapy.method.base import BaseQuantifier
import quapy as qp
import quapy.functional as F
from sklearn.linear_model import LogisticRegression
class FakeFGLSD(BaseQuantifier):
def __init__(self, learner, nbins, isomerous):
self.learner = learner
self.nbins = nbins
self.isomerous = isomerous
def fit(self, data: LabelledCollection):
self.Xtr, self.ytr = data.Xy
self.learner.fit(self.Xtr, self.ytr)
return self
def quantify(self, instances):
tr_priors = F.prevalence_from_labels(self.ytr, n_classes=2)
fgsld = FineGrainedSLD(self.Xtr, instances, self.ytr, tr_priors, self.learner, n_bins=self.nbins)
priors, posteriors = fgsld.run(self.isomerous)
return priors
def get_params(self, deep=True):
pass
def set_params(self, **parameters):
pass
qp.environ['SAMPLE_SIZE'] = 500
dataset = qp.datasets.fetch_reviews('hp')
qp.data.preprocessing.text2tfidf(dataset, min_df=5, inplace=True)
training = dataset.training
test = dataset.test
cls = CalibratedClassifierCV(LinearSVC())
method_names, true_prevs, estim_prevs, tr_prevs = [], [], [], []
for model, model_name in [
(CC(cls), 'CC'),
(FakeFGLSD(cls, nbins=1, isomerous=False), 'FGSLD-1'),
(FakeFGLSD(cls, nbins=2, isomerous=False), 'FGSLD-2'),
#(FakeFGLSD(cls, nbins=5, isomerous=False), 'FGSLD-5'),
#(FakeFGLSD(cls, nbins=10, isomerous=False), 'FGSLD-10'),
#(FakeFGLSD(cls, nbins=50, isomerous=False), 'FGSLD-50'),
#(FakeFGLSD(cls, nbins=100, isomerous=False), 'FGSLD-100'),
# (FakeFGLSD(cls, nbins=1, isomerous=False), 'FGSLD-1'),
#(FakeFGLSD(cls, nbins=10, isomerous=True), 'FGSLD-10-ISO'),
# (FakeFGLSD(cls, nbins=50, isomerous=False), 'FGSLD-50'),
(EMQ(cls), 'SLD'),
]:
print('running ', model_name)
model.fit(training)
true_prev, estim_prev = qp.evaluation.artificial_sampling_prediction(
model, test, qp.environ['SAMPLE_SIZE'], n_repetitions=10, n_prevpoints=21, n_jobs=-1
)
method_names.append(model_name)
true_prevs.append(true_prev)
estim_prevs.append(estim_prev)
tr_prevs.append(training.prevalence())
qp.plot.binary_diagonal(method_names, true_prevs, estim_prevs, train_prev=tr_prevs[0], savepath='./plot_fglsd.png')

View File

@ -1,107 +0,0 @@
import numpy as np
from metrics import isomerous_bins, isometric_bins
from em import History, get_measures_single_history
class FineGrainedSLD:
def __init__(self, x_tr, x_te, y_tr, tr_priors, clf, n_bins=10):
self.y_tr = y_tr
self.clf = clf
self.tr_priors = tr_priors
self.tr_preds = clf.predict_proba(x_tr)
self.te_preds = clf.predict_proba(x_te)
self.n_bins = n_bins
self.history: [History] = []
self.multi_class = False
def run(self, isomerous_binning, epsilon=1e-6, compute_bins_at_every_iter=False, return_posteriors_hist=False):
"""
Run the FGSLD algorithm.
:param isomerous_binning: whether to use isomerous or isometric binning.
:param epsilon: stopping condition.
:param compute_bins_at_every_iter: whether FGSLD should recompute the posterior bins at every iteration or not.
:param return_posteriors_hist: whether to return posteriors at every iteration or not.
:return: If `return_posteriors_hist` is true, the returned posteriors will be a list of numpy arrays, else a single numpy array with posteriors at last iteration.
"""
smoothing_tr = 1 / (2 * self.y_tr.shape[0])
smoothing_te = smoothing_tr
s = 0
tr_bin_priors = np.zeros((self.n_bins, self.tr_preds.shape[1]), dtype=np.float)
te_bin_priors = np.zeros((self.n_bins, self.te_preds.shape[1]), dtype=np.float)
tr_bins = self.__create_bins(training=True, isomerous_binning=isomerous_binning)
te_bins = self.__create_bins(training=False, isomerous_binning=isomerous_binning)
self.__compute_bins_priors(tr_bin_priors, self.tr_preds, tr_bins, smoothing_tr)
val = 2 * epsilon
if return_posteriors_hist:
posteriors_hist = [self.te_preds.copy()]
while not val < epsilon and s < 1000:
assert np.all(np.around(self.te_preds.sum(axis=1), 4) == 1), f"Probabilities do not sum to 1:\ns={s}, " \
f"probs={self.te_preds.sum(axis=1)}"
if compute_bins_at_every_iter:
te_bins = self.__create_bins(training=False, isomerous_binning=isomerous_binning)
if s == 0:
te_bin_priors_prev = tr_bin_priors.copy()
else:
te_bin_priors_prev = te_bin_priors.copy()
self.__compute_bins_priors(te_bin_priors, self.te_preds, te_bins, smoothing_te)
te_preds_cp = self.te_preds.copy()
for label_idx, bins in te_bins.items():
for i, bin_ in enumerate(bins):
if bin_.shape[0] == 0:
continue
self.te_preds[:, label_idx][bin_] = (te_preds_cp[:, label_idx][bin_]) * \
(te_bin_priors[i][label_idx] / te_bin_priors_prev[i][label_idx])
# Normalization step
self.te_preds = (self.te_preds.T / self.te_preds.sum(axis=1)).T
val = 0
for label_idx in range(te_bin_priors.shape[1]):
if (temp := max(abs((te_bin_priors[:, label_idx] / te_bin_priors_prev[:, label_idx]) - 1))) > val:
val = temp
s += 1
if return_posteriors_hist:
posteriors_hist.append(self.te_preds.copy())
if return_posteriors_hist:
return self.te_preds.mean(axis=0), posteriors_hist
return self.te_preds.mean(axis=0), self.te_preds
def __compute_bins_priors(self, bin_priors_placeholder, posteriors, bins, smoothing):
for label_idx, bins in bins.items():
for i, bin_ in enumerate(bins):
if bin_.shape[0] == 0:
bin_priors_placeholder[i, label_idx] = smoothing
continue
numerator = posteriors[:, label_idx][bin_].mean()
bin_prior = (numerator + smoothing) / (1 + self.n_bins * smoothing) # normalize priors
bin_priors_placeholder[i, label_idx] = bin_prior
def __find_bin_idx(self, label_bins: [np.array], idx: int or list):
if hasattr(idx, '__len__'):
idxs = np.zeros(len(idx), dtype=np.int)
for i, bin_ in enumerate(label_bins):
for j, id_ in enumerate(idx):
if id_ in bin_:
idxs[j] = i
return idxs
else:
for i, bin_ in enumerate(label_bins):
if idx in bin_:
return i
def __create_bins(self, training: bool, isomerous_binning: bool):
bins = {}
preds = self.tr_preds if training else self.te_preds
if isomerous_binning:
for label_idx in range(preds.shape[1]):
bins[label_idx] = isomerous_bins(label_idx, preds, self.n_bins)
else:
intervals = np.linspace(0., 1., num=self.n_bins, endpoint=False)
for label_idx in range(preds.shape[1]):
bins_ = isometric_bins(label_idx, preds, intervals, 0.1)
bins[label_idx] = [bins_[i] for i in intervals]
return bins

View File

@ -1,260 +0,0 @@
import numpy as np
"""
Scikit learn provides a full set of evaluation metrics, but they treat special cases differently.
I.e., when the number of true positives, false positives, and false negatives ammount to 0, all
affected metrics (precision, recall, and thus f1) output 0 in Scikit learn.
We adhere to the common practice of outputting 1 in this case since the classifier has correctly
classified all examples as negatives.
"""
def isometric_brier_decomposition(true_labels, predicted_labels, bin_intervals=np.arange(0., 1.1, 0.1), step=0.1):
"""
The Isometric Brier decomposition or score is obtained by partitioning U into intervals I_1j,...,I_bj that
have equal length, where U is the total size of our test set (i.e., true_labels.shape[0]). This means that,
if b=10 then I_1j = [0.0,0.1), I_2j = [0.2, 0.3),...,I_bj = [0.9,1.0).
bin_intervals is a numpy.array containing the range of the different intervals. Since it is a single dimensional
array, for every interval I_n we take the posterior probabilities Pr_n(x) such that I_n <= Pr_n(x) < I_n + step.
This variable defaults to np.arange(0., 1.0, 0.1), i.e. an array like [0.1, 0.2, ..., 1.0].
:return: a tuple (calibration score, refinement score)
"""
labels = set(true_labels)
calibration_score, refinement_score = 0.0, 0.0
for i in range(len(labels)):
bins = isometric_bins(i, predicted_labels, bin_intervals, step)
c_score, r_score = brier_decomposition(bins.values(), true_labels, predicted_labels, class_=i)
calibration_score += c_score
refinement_score += r_score
return calibration_score, refinement_score
def isomerous_brier_decomposition(true_labels, predicted_labels, n=10):
"""
The Isomerous Brier decomposition or score is obtained by partitioning U into intervals I_1j,...,I_bj such that
the corresponding bins B_1j,...,B_bj have equal size, where U is our test set. This means that, for every x' in
B_sj and x'' in B_tj with s < t, it holds that Pr(c_j|x') <= Pr(c_j|x'') and |B_sj| == |B_tj|, for any s,t in
{1,...,b}.
The n variable holds the number of bins we want (defaults to 10). Notice that we perform a numpy.array_split on
the predicted_labels, creating l % n sub-arrays of size l//n + 1 and the rest of size l//n, where l is the length
of the array.
:return: a tuple (calibration score, refinement score)
"""
labels = set(true_labels)
calibration_score, refinement_score = 0.0, 0.0
for i in range(len(labels)):
bins = isomerous_bins(i, predicted_labels, n)
c_score, r_score = brier_decomposition(bins, true_labels, predicted_labels, class_=i)
calibration_score += c_score
refinement_score += r_score
return calibration_score, refinement_score
def brier_decomposition(bins, true_labels, predicted_labels, class_=1):
"""
:param bins: must be an array of indices
:return: a tuple (calibration_score, refinement_score)
"""
calibration_score = 0
refinement_score = 0
for bin_ in bins:
if bin_.size <= 0:
continue
v_x = (bin_.shape[0] / true_labels.shape[0])
ro_x = np.mean(true_labels[bin_] == class_)
calibration_score += v_x * (predicted_labels[bin_, class_].mean() - ro_x)**2
refinement_score += (v_x * ro_x) * (1 - ro_x)
labels_len = len(set(true_labels))
return calibration_score / (labels_len * len(bins)), refinement_score / (labels_len * len(bins))
def isometric_bins(label_index, predicted_labels, bin_intervals, step):
predicted_class_label = predicted_labels[:, label_index]
return {interv: np.where(np.logical_and(interv <= predicted_class_label, predicted_class_label < interv + step))[0]
for interv in bin_intervals}
def isomerous_bins(label_index, predicted_labels, n):
sorted_indices = predicted_labels[:, label_index].argsort()
return np.array_split(sorted_indices, n)
# true_labels and predicted_labels are two matrices in sklearn.preprocessing.MultiLabelBinarizer format
def macroF1(true_labels, predicted_labels):
return macro_average(true_labels, predicted_labels, f1)
# true_labels and predicted_labels are two matrices in sklearn.preprocessing.MultiLabelBinarizer format
def microF1(true_labels, predicted_labels):
return micro_average(true_labels, predicted_labels, f1)
# true_labels and predicted_labels are two matrices in sklearn.preprocessing.MultiLabelBinarizer format
def macroK(true_labels, predicted_labels):
return macro_average(true_labels, predicted_labels, K)
# true_labels and predicted_labels are two matrices in sklearn.preprocessing.MultiLabelBinarizer format
def microK(true_labels, predicted_labels):
return micro_average(true_labels, predicted_labels, K)
# true_labels is a matrix in sklearn.preprocessing.MultiLabelBinarizer format and posterior_probabilities is a matrix
# of the same shape containing real values in [0,1]
def smoothmacroF1(true_labels, posterior_probabilities):
return macro_average(true_labels, posterior_probabilities, f1, metric_statistics=soft_single_metric_statistics)
# true_labels is a matrix in sklearn.preprocessing.MultiLabelBinarizer format and posterior_probabilities is a matrix
# of the same shape containing real values in [0,1]
def smoothmicroF1(true_labels, posterior_probabilities):
return micro_average(true_labels, posterior_probabilities, f1, metric_statistics=soft_single_metric_statistics)
# true_labels is a matrix in sklearn.preprocessing.MultiLabelBinarizer format and posterior_probabilities is a matrix
# of the same shape containing real values in [0,1]
def smoothmacroK(true_labels, posterior_probabilities):
return macro_average(true_labels, posterior_probabilities, K, metric_statistics=soft_single_metric_statistics)
# true_labels is a matrix in sklearn.preprocessing.MultiLabelBinarizer format and posterior_probabilities is a matrix
# of the same shape containing real values in [0,1]
def smoothmicroK(true_labels, posterior_probabilities):
return micro_average(true_labels, posterior_probabilities, K, metric_statistics=soft_single_metric_statistics)
class ContTable:
def __init__(self, tp=0, tn=0, fp=0, fn=0):
self.tp = tp
self.tn = tn
self.fp = fp
self.fn = fn
def get_d(self): return self.tp + self.tn + self.fp + self.fn
def get_c(self): return self.tp + self.fn
def get_not_c(self): return self.tn + self.fp
def get_f(self): return self.tp + self.fp
def get_not_f(self): return self.tn + self.fn
def p_c(self): return (1.0 * self.get_c()) / self.get_d()
def p_not_c(self): return 1.0 - self.p_c()
def p_f(self): return (1.0 * self.get_f()) / self.get_d()
def p_not_f(self): return 1.0 - self.p_f()
def p_tp(self): return (1.0 * self.tp) / self.get_d()
def p_tn(self): return (1.0 * self.tn) / self.get_d()
def p_fp(self): return (1.0 * self.fp) / self.get_d()
def p_fn(self): return (1.0 * self.fn) / self.get_d()
def tpr(self):
c = 1.0 * self.get_c()
return self.tp / c if c > 0.0 else 0.0
def fpr(self):
_c = 1.0 * self.get_not_c()
return self.fp / _c if _c > 0.0 else 0.0
def __add__(self, other):
return ContTable(tp=self.tp + other.tp, tn=self.tn + other.tn, fp=self.fp + other.fp, fn=self.fn + other.fn)
def accuracy(cell):
return (cell.tp + cell.tn) * 1.0 / (cell.tp + cell.fp + cell.fn + cell.tn)
def f1(cell):
num = 2.0 * cell.tp
den = 2.0 * cell.tp + cell.fp + cell.fn
if den > 0: return num / den
# we define f1 to be 1 if den==0 since the classifier has correctly classified all instances as negative
return 1.0
def K(cell):
specificity, recall = 0., 0.
AN = cell.tn + cell.fp
if AN != 0:
specificity = cell.tn * 1. / AN
AP = cell.tp + cell.fn
if AP != 0:
recall = cell.tp * 1. / AP
if AP == 0:
return 2. * specificity - 1.
elif AN == 0:
return 2. * recall - 1.
else:
return specificity + recall - 1.
# computes the (hard) counters tp, fp, fn, and tn fron a true and predicted vectors of hard decisions
# true_labels and predicted_labels are two vectors of shape (number_documents,)
def hard_single_metric_statistics(true_labels, predicted_labels):
assert len(true_labels) == len(predicted_labels), "Format not consistent between true and predicted labels."
nd = len(true_labels)
tp = np.sum(predicted_labels[true_labels == 1])
fp = np.sum(predicted_labels[true_labels == 0])
fn = np.sum(true_labels[predicted_labels == 0])
tn = nd - (tp + fp + fn)
return ContTable(tp=tp, tn=tn, fp=fp, fn=fn)
# computes the (soft) contingency table where tp, fp, fn, and tn are the cumulative masses for the posterioir
# probabilitiesfron with respect to the true binary labels
# true_labels and posterior_probabilities are two vectors of shape (number_documents,)
def soft_single_metric_statistics(true_labels, posterior_probabilities):
assert len(true_labels) == len(posterior_probabilities), "Format not consistent between true and predicted labels."
pos_probs = posterior_probabilities[true_labels == 1]
neg_probs = posterior_probabilities[true_labels == 0]
tp = np.sum(pos_probs)
fn = np.sum(1. - pos_probs)
fp = np.sum(neg_probs)
tn = np.sum(1. - neg_probs)
return ContTable(tp=tp, tn=tn, fp=fp, fn=fn)
# if the classifier is single class, then the prediction is a vector of shape=(nD,) which causes issues when compared
# to the true labels (of shape=(nD,1)). This method increases the dimensions of the predictions.
def __check_consistency_and_adapt(true_labels, predictions):
if predictions.ndim == 1:
return __check_consistency_and_adapt(true_labels, np.expand_dims(predictions, axis=1))
if true_labels.ndim == 1:
return __check_consistency_and_adapt(np.expand_dims(true_labels, axis=1), predictions)
if true_labels.shape != predictions.shape:
raise ValueError("True and predicted label matrices shapes are inconsistent %s %s."
% (true_labels.shape, predictions.shape))
_, nC = true_labels.shape
return true_labels, predictions, nC
def macro_average(true_labels, predicted_labels, metric, metric_statistics=hard_single_metric_statistics):
true_labels, predicted_labels, nC = __check_consistency_and_adapt(true_labels, predicted_labels)
return np.mean([metric(metric_statistics(true_labels[:, c], predicted_labels[:, c])) for c in range(nC)])
def micro_average(true_labels, predicted_labels, metric, metric_statistics=hard_single_metric_statistics):
true_labels, predicted_labels, nC = __check_consistency_and_adapt(true_labels, predicted_labels)
accum = ContTable()
for c in range(nC):
other = metric_statistics(true_labels[:, c], predicted_labels[:, c])
accum = accum + other
return metric(accum)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 162 KiB

After

Width:  |  Height:  |  Size: 0 B