1
0
Fork 0

cleaning branch

This commit is contained in:
Alejandro Moreo Fernandez 2024-02-23 16:55:14 +01:00
parent b3ccf71edb
commit caa7fd2884
3 changed files with 643 additions and 0 deletions

View File

@ -0,0 +1,315 @@
import numpy as np
import scipy.special
from sklearn.calibration import CalibratedClassifierCV
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score
from sklearn.svm import LinearSVC
import quapy as qp
from model_selection import GridSearchQ
from quapy.protocol import APP
from quapy.method.aggregative import PACC, ACC, EMQ, PCC, CC, DMy, T50, MS2, KDEyML, KDEyCS, KDEyHD
from sklearn import clone
import quapy.functional as F
# datasets = qp.datasets.UCI_DATASETS
datasets = ['imdb']
# target = 'f1'
target = 'acc'
errors = []
def method_1(cls, q, train, val, sample, y=None, y_hat=None):
"""
Converts a misclassification matrix computed in validation (i.e., in the train distribution P) into
the corresponding equivalent misclassification matrix in test (i.e., in the test distribution Q)
by relying on the PPS assumptions.
:return: tuple (tn, fn, fp, tp,) of floats in [0,1] summing up to 1
"""
y_val = val.labels
y_hat_val = cls.predict(val.instances)
# q = EMQ(LogisticRegression(class_weight='balanced'))
# q.fit(val, fit_classifier=True)
# q = EMQ(cls)
# q.fit(train, fit_classifier=False)
# q = KDEyML(cls)
# q.fit(train, val_split=val, fit_classifier=False)
M_hat = ACC.getPteCondEstim(train.classes_, y_val, y_hat_val)
M_true = ACC.getPteCondEstim(train.classes_, y, y_hat)
p_hat = q.quantify(sample.instances)
cont_table_hat = p_hat * M_hat
# cont_table_hat = np.clip(cont_table_hat, 0, 1)
# cont_table_hat = cont_table_hat / cont_table_hat.sum()
print('true_prev: ', sample.prevalence())
print('estim_prev: ', p_hat)
print('M-true:\n', M_true)
print('M-hat:\n', M_hat)
print('cont_table:\n', cont_table_hat)
tp = cont_table_hat[1, 1]
tn = cont_table_hat[0, 0]
fn = cont_table_hat[0, 1]
fp = cont_table_hat[1, 0]
return tn, fn, fp, tp
def method_2(cls, train, val, sample, y=None, y_hat=None):
"""
Assume P and Q are the training and test distributions
Solves the following system of linear equations:
tp + fp = CC (the classify & count estimate, observed)
fn + tp = Q(Y=1) (this is not observed but is estimated via quantification)
tp + fp + fn + tn = 1 (trivial)
There are 4 unknowns and 3 equations. The fourth required one is established
by assuming that the PPS conditions hold, i.e., that P(X|Y)=Q(X|Y); note that
this implies P(hatY|Y)=Q(hatY|Y) if hatY is computed by any measurable function.
In particular, we consider that the tpr in P (estimated via validation, hereafter tpr) and
in Q (unknown, hereafter tpr_Q) should
be the same. This means:
tpr = tpr_Q = tp / (tp + fn)
after some manipulation:
tp (tpr-1) + fn (tpr) = 0 <-- our last equation
Note that the last equation relies on the estimate tpr. It is likely that, the more
positives we have, the more reliable this estimate is. This suggests that, in cases
in which we have more negatives in the validation set than positives, it might be
convenient to resort to the true negative rate (tnr) instead. This gives rise to
the alternative fourth equation:
tn (tnr-1) + fp (tnr) = 0
:return: tuple (tn, fn, fp, tp,) of floats in [0,1] summing up to 1
"""
y_val = val.labels
y_hat_val = cls.predict(val.instances)
q = ACC(cls)
q.fit(train, val_split=val, fit_classifier=False)
p_hat = q.quantify(sample.instances)
pos_prev = p_hat[1]
# pos_prev = sample.prevalence()[1]
cc = CC(cls)
cc.fit(train, fit_classifier=False)
cc_prev = cc.quantify(sample.instances)[1]
M_hat = ACC.getPteCondEstim(train.classes_, y_val, y_hat_val)
M_true = ACC.getPteCondEstim(train.classes_, y, y_hat)
cont_table_true = sample.prevalence() * M_true
if val.prevalence()[1] > 0.5:
# in this case, the tpr might be a more reliable estimate than tnr
tpr_hat = M_hat[1, 1]
A = np.asarray([
[0, 0, 1, 1],
[0, 1, 0, 1],
[1, 1, 1, 1],
[0, tpr_hat, 0, tpr_hat - 1]
])
else:
# in this case, the tnr might be a more reliable estimate than tpr
tnr_hat = M_hat[0, 0]
A = np.asarray([
[0, 0, 1, 1],
[0, 1, 0, 1],
[1, 1, 1, 1],
[tnr_hat-1, 0, tnr_hat, 0]
])
b = np.asarray(
[cc_prev, pos_prev, 1, 0]
)
tn, fn, fp, tp = np.linalg.solve(A, b)
cont_table_estim = np.asarray([
[tn, fn],
[fp, tp]
])
# if (cont_table_estim < 0).any() or (cont_table_estim>1).any():
# cont_table_estim = scipy.special.softmax(cont_table_estim)
print('true_prev: ', sample.prevalence())
print('estim_prev: ', p_hat)
print('true_cont_table:\n', cont_table_true)
print('estim_cont_table:\n', cont_table_estim)
# print('true_tpr', M_true[1,1])
# print('estim_tpr', tpr_hat)
return tn, fn, fp, tp
def method_3(cls, train, val, sample, y=None, y_hat=None):
"""
This is just method 2 but without involving any quapy's quantifier.
:return: tuple (tn, fn, fp, tp,) of floats in [0,1] summing up to 1
"""
classes = val.classes_
y_val = val.labels
y_hat_val = cls.predict(val.instances)
M_hat = ACC.getPteCondEstim(classes, y_val, y_hat_val)
y_hat_test = cls.predict(sample.instances)
pos_prev_cc = F.prevalence_from_labels(y_hat_test, classes)[1]
tpr_hat = M_hat[1,1]
fpr_hat = M_hat[1,0]
tnr_hat = M_hat[0,0]
if tpr_hat!=fpr_hat:
pos_prev_test_hat = (pos_prev_cc - fpr_hat) / (tpr_hat - fpr_hat)
else:
print('-->', tpr_hat)
pos_prev_test_hat = pos_prev_cc
pos_prev_test_hat = np.clip(pos_prev_test_hat, 0, 1)
pos_prev_val = val.prevalence()[1]
if pos_prev_val > 0.5:
# in this case, the tpr might be a more reliable estimate than tnr
A = np.asarray([
[0, 0, 1, 1],
[0, 1, 0, 1],
[1, 1, 1, 1],
[0, tpr_hat, 0, tpr_hat - 1]
])
else:
# in this case, the tnr might be a more reliable estimate than tpr
A = np.asarray([
[0, 0, 1, 1],
[0, 1, 0, 1],
[1, 1, 1, 1],
[tnr_hat-1, 0, tnr_hat, 0]
])
b = np.asarray(
[pos_prev_cc, pos_prev_test_hat, 1, 0]
)
tn, fn, fp, tp = np.linalg.solve(A, b)
return tn, fn, fp, tp
def cls_eval_from_counters(tn, fn, fp, tp):
if target == 'acc':
acc_hat = (tp + tn)
else:
den = (2 * tp + fn + fp)
if den > 0:
acc_hat = 2 * tp / den
else:
acc_hat = 0
return acc_hat
def cls_eval_from_labels(y, y_hat):
if target == 'acc':
acc = (y_hat == y).mean()
else:
acc = f1_score(y, y_hat, zero_division=0)
return acc
for dataset_name in datasets:
train_orig, test = qp.datasets.fetch_reviews(dataset_name, tfidf=True, min_df=10).train_test
xs = []
ys_1 = []
ys_trval = []
ys_3 = []
train_prot = APP(train_orig, n_prevalences=11, repeats=1, return_type='labelled_collection', random_state=0, sample_size=10000)
for train in train_prot():
if np.product(train.prevalence()) == 0:
# skip experiments with no positives or no negatives in training
continue
cls = LogisticRegression(class_weight='balanced', C=100)
# cls = CalibratedClassifierCV(LinearSVC())
train, val = train.split_stratified(train_prop=0.5, random_state=0)
print(f'dataset name = {dataset_name}')
print(f'#train = {len(train)}, prev={F.strprev(train.prevalence())}')
print(f'#val = {len(val)}, prev={F.strprev(val.prevalence())}')
print(f'#test = {len(test)}, prev={F.strprev(test.prevalence())}')
cls.fit(*train.Xy)
# q = KDEyML(cls)
q = ACC(LogisticRegression())
q.fit(train, val_split=val, fit_classifier=True)
# q = GridSearchQ(PACC(cls),
# param_grid={'classifier__C':np.logspace(-2,2,5)},
# protocol=APP(val, sample_size=1000),
# verbose=True,
# n_jobs=-1).fit(train)
acc_trval = cls_eval_from_labels(val.labels, cls.predict(val.instances))
for sample in APP(test, n_prevalences=21, repeats=1, sample_size=1000, return_type='labelled_collection')():
print('='*80)
y_hat = cls.predict(sample.instances)
y = sample.labels
acc_true = cls_eval_from_labels(y, y_hat)
xs.append(acc_true)
ys_trval.append(acc_trval)
tn, fn, fp, tp = method_1(cls, q, train, val, sample, y, y_hat)
acc_hat = cls_eval_from_counters(tn, fn, fp, tp)
ys_1.append(acc_hat)
tn, fn, fp, tp = method_3(cls, train, val, sample, y, y_hat)
acc_hat = cls_eval_from_counters(tn, fn, fp, tp)
ys_3.append(acc_hat)
error = abs(acc_true - acc_hat)
errors.append(error)
print(f'classifier accuracy={acc_true:.3f}')
print(f'estimated accuracy={acc_hat:.3f}')
print(f'estimation error={error:.4f}')
print('process end')
print('='*80)
print(f'mean error = {np.mean(errors)}')
print(f'std error = {np.std(errors)}')
import matplotlib.pyplot as plt
# Create scatter plot
plt.plot([0, 1], [0, 1], color='black', linestyle='--')
plt.scatter(xs, ys_1, label='method 1')
plt.scatter(xs, ys_3, label='method 3')
plt.scatter(xs, ys_trval, label='tr-val')
plt.legend()
# Add labels and title
plt.xlabel('True Accuracy')
plt.ylabel('Estim Accuracy')
# Display the plot
plt.show()

149
ClassifierAccuracy/main.py Normal file
View File

@ -0,0 +1,149 @@
from collections import defaultdict
from sklearn.calibration import CalibratedClassifierCV
from sklearn.svm import LinearSVC
from tqdm import tqdm
from sklearn.linear_model import LogisticRegression
import os
import quapy as qp
from method.aggregative import PACC, EMQ, PCC, CC, ACC, HDy
from models import *
import matplotlib.pyplot as plt
from pathlib import Path
def clf():
# return CalibratedClassifierCV(LinearSVC(class_weight=None))
return LogisticRegression(class_weight=None)
def F1(contingency_table):
# tn = contingency_table[0, 0]
tp = contingency_table[1, 1]
fp = contingency_table[0, 1]
fn = contingency_table[1, 0]
den = (2*tp+fp+fn)
if den>0:
return 2*tp/den
else:
return 1
def accuracy(contingency_table):
tn = contingency_table[0, 0]
tp = contingency_table[1, 1]
fp = contingency_table[0, 1]
fn = contingency_table[1, 0]
return (tp+tn)/(tp+fp+fn+tn)
def plot_series(series, repeats, metric_name, train_prev=None, savepath=None):
for key in series:
print(series[key])
fig, ax = plt.subplots()
def bin(v):
mat = np.asarray(v).reshape(-1, repeats)
return mat.mean(axis=1), mat.std(axis=1)
x = series['prev']
x,_ = bin(x)
for serie in series:
if serie=='prev': continue
values = series[serie]
print(serie, values)
val_mean, val_std = bin(values)
ax.errorbar(x, val_mean, label=serie, fmt='-', marker='o')
ax.fill_between(x, val_mean - val_std, val_mean + val_std, alpha=0.25)
if train_prev is not None:
ax.axvline(x=train_prev, label='tr-prev', color='k', linestyle='--')
# ax.scatter(train_prev, train_prev, c='c', label='tr-prev', linewidth=2, edgecolor='k', s=100, zorder=3)
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))
ax.grid()
ax.set_title(metric_name)
ax.set(xlabel='$p_U(\oplus)$', ylabel='estimated '+metric_name,
title='Classifier accuracy in terms of '+metric_name)
if savepath is None:
plt.show()
else:
os.makedirs(Path(savepath).parent, exist_ok=True)
plt.savefig(savepath, bbox_inches='tight')
dataset='imdb'
data = qp.datasets.fetch_reviews(dataset, tfidf=True, min_df=5, pickle=True)
# qp.data.preprocessing.reduce_columns(data, min_df=5, inplace=True)
# print('num_features', data.training.instances.shape[1])
train = data.training
test = data.test
upper = UpperBound(clf(), y_test=None).fit(train)
mlcfe = MLCMEstimator(clf(), strategy='kfcv', k=5, n_jobs=-1).fit(train)
emq_quant = QuantificationCMPredictor(clf(), EMQ(LogisticRegression()), strategy='kfcv', k=5, n_jobs=-1).fit(train)
# cc_quant = QuantificationCMPredictor(clf(), CC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train)
# pcc_quant = QuantificationCMPredictor(clf(), PCC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train)
# acc_quant = QuantificationCMPredictor(clf(), ACC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train)
pacc_quant = QuantificationCMPredictor(clf(), PACC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train)
# hdy_quant = QuantificationCMPredictor(clf(), HDy(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train)
sld = EMQ(LogisticRegression()).fit(train)
pacc = PACC(clf()).fit(train)
contenders = [
('kFCV+MLPE', mlcfe),
('SLD', emq_quant),
# ('CC', cc_quant),
# ('PCC', pcc_quant),
# ('ACC', acc_quant),
('PACC', pacc_quant),
# ('HDy', hdy_quant)
]
metric = F1
# metric = accuracy
repeats = 10
with qp.util.temp_seed(42):
samples_idx = [idx for idx in test.artificial_sampling_index_generator(sample_size=500, n_prevalences=21, repeats=repeats)]
series = defaultdict(lambda: [])
for idx in tqdm(samples_idx, desc='generating predictions'):
sample = test.sampling_from_index(idx)
upper.show_true_labels(sample.labels)
upper_conf_matrix = upper.predict(sample.instances)
metric_true = metric(upper_conf_matrix)
series['Upper'].append(metric_true)
for mname, method in contenders:
conf_matrix = method.predict(sample.instances)
estim_metric = metric(conf_matrix)
series[mname].append(estim_metric)
if hasattr(method, 'quantify'):
series[mname+'-prev'].append(method.quantify(sample.instances))
series['binsld-prev'].append(sld.quantify(sample.instances)[1])
series['binpacc-prev'].append(pacc.quantify(sample.instances)[1])
series['optimal-prev'].append(sample.prevalence()[1])
series['prev'].append(sample.prevalence()[1])
metricname = metric.__name__
plot_series(series, repeats, metric_name=metricname, train_prev=train.prevalence()[1], savepath='./plots/'+dataset+'_LinearSVC_'+metricname+'.pdf')

View File

@ -0,0 +1,179 @@
import numpy as np
import quapy as qp
from sklearn import clone
from sklearn.metrics import confusion_matrix
import scipy
from scipy.sparse import issparse, csr_matrix
from data import LabelledCollection
from abc import ABC, abstractmethod
from sklearn.model_selection import cross_val_predict
class ConfusionMatrixPredictor(ABC):
"""
Abstract class of predictors of a confusion matrix for the performance of a classifier.
For the binary case, this accounts to predicting the 4-cell contingency table consisting of the
true positives (TP), true negatives (TN), false positives (FP), and false negatives (FN) that
most evaluation metrics make use of.
"""
@abstractmethod
def fit(self, train: LabelledCollection):
pass
@abstractmethod
def predict(self, test):
pass
class MLCMEstimator(ConfusionMatrixPredictor):
"""
The Maximum Likelihood Confusion Matrix Estimator is a method that relies on the IID assumption, and thus
computes, via k-FCV (or any other technique) the counters of the confusion matrix, assuming that those are
good estimates for the test case.
"""
def __init__(self, classifier, strategy='kfcv', **kwargs):
assert strategy in ['kfcv'], 'unknown strategy'
if strategy=='kfcv':
assert 'k' in kwargs, 'strategy "kfcv" requires "k" to be passed as an argument'
self.classifier = classifier
self.strategy = strategy
self.kwargs = kwargs
def sout(self, msg):
if 'verbose' in self.kwargs:
print(msg)
def fit(self, train: LabelledCollection):
X, y = train.Xy
if self.strategy == 'kfcv':
k=self.kwargs['k']
n_jobs = self.kwargs['n_jobs'] if 'n_jobs' in self.kwargs else 1
predict = self.kwargs['predict'] if 'predict' in self.kwargs else 'predict'
self.sout(f'{self.__class__.__name__}: '
f'running cross_val_predict with k={k} n_jobs={n_jobs} predict={predict}')
predictions = cross_val_predict(self.classifier, X, y, cv=k, n_jobs=n_jobs, method=predict)
self.conf_matrix = confusion_matrix(y, predictions, labels=train.classes_)
return self
def predict(self, test):
"""
This method disregards the test set, under the assumption that it is IID wrt the training. This meaning that
the confusion matrix for the test data should coincide with the one computed for training (using any cross
validation strategy).
:param test: test collection (ignored)
:return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix`
"""
return self.conf_matrix
class UpperBound(ConfusionMatrixPredictor):
def __init__(self, classifier, y_test):
self.classifier = classifier
self.y_test = y_test
def fit(self, train: LabelledCollection):
self.classifier.fit(*train.Xy)
self.classes = train.classes_
return self
def show_true_labels(self, y_test):
self.y_test = y_test
def predict(self, test):
predictions = self.classifier.predict(test)
return confusion_matrix(self.y_test, predictions, labels=self.classes)
def get_counters(y_true, y_pred):
counters = np.full(shape=y_true.shape, fill_value=-1)
counters[np.logical_and(y_true == 1, y_pred == 1)] = 0
counters[np.logical_and(y_true == 1, y_pred == 0)] = 1
counters[np.logical_and(y_true == 0, y_pred == 1)] = 2
counters[np.logical_and(y_true == 0, y_pred == 0)] = 3
class_map = {
0:'tp',
1:'fn',
2:'fp',
3:'tn'
}
return counters, class_map
def safehstack(matrix, posteriors):
if issparse(matrix):
instances = csr_matrix(scipy.sparse.hstack([matrix, posteriors]))
else:
instances = np.hstack([matrix, posteriors])
return instances
class QuantificationCMPredictor(ConfusionMatrixPredictor):
"""
"""
def __init__(self, classifier, quantifier, strategy='kfcv', **kwargs):
assert strategy in ['kfcv'], 'unknown strategy'
if strategy=='kfcv':
assert 'k' in kwargs, 'strategy "kfcv" requires "k" to be passed as an argument'
self.classifier = clone(classifier)
self.quantifier = quantifier
self.strategy = strategy
self.kwargs = kwargs
def sout(self, msg):
if 'verbose' in self.kwargs:
print(msg)
def fit(self, train: LabelledCollection):
X, y = train.Xy
if self.strategy == 'kfcv':
k=self.kwargs['k']
n_jobs = self.kwargs['n_jobs'] if 'n_jobs' in self.kwargs else 1
self.sout(f'{self.__class__.__name__}: '
f'running cross_val_predict with k={k} n_jobs={n_jobs}')
predictions = cross_val_predict(self.classifier, X, y, cv=k, n_jobs=n_jobs, method='predict')
posteriors = cross_val_predict(self.classifier, X, y, cv=k, n_jobs=n_jobs, method='predict_proba')
self.classifier.fit(X, y)
instances = safehstack(train.instances, posteriors)
counters, class_map = get_counters(train.labels, predictions)
q_data = LabelledCollection(instances=instances, labels=counters, classes_=[0,1,2,3])
print('counters prevalence', q_data.counts())
self.quantifier.fit(q_data)
return self
def predict(self, test):
"""
:param test: test collection (ignored)
:return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix`
"""
posteriors = self.classifier.predict_proba(test)
instances = safehstack(test, posteriors)
counters = self.quantifier.quantify(instances)
tp, fn, fp, tn = counters
conf_matrix = np.asarray([[tn, fp], [fn, tp]])
return conf_matrix
def quantify(self, test):
posteriors = self.classifier.predict_proba(test)
instances = safehstack(test, posteriors)
counters = self.quantifier.quantify(instances)
tp, fn, fp, tn = counters
den_tpr = (tp+fn)
if den_tpr>0:
tpr = tp/den_tpr
else:
tpr = 1
den_fpr = (fp+tn)
if den_fpr>0:
fpr = fp / den_fpr
else:
fpr = 0
pcc = posteriors.sum(axis=0)[1]
pacc = (pcc-fpr)/(tpr-fpr)
pacc = np.clip(pacc, 0, 1)
q = tp+fn
return q