diff --git a/ClassifierAccuracy/accuracy_prediction_via_quantification.py b/ClassifierAccuracy/accuracy_prediction_via_quantification.py new file mode 100644 index 0000000..b117f3a --- /dev/null +++ b/ClassifierAccuracy/accuracy_prediction_via_quantification.py @@ -0,0 +1,315 @@ +import numpy as np +import scipy.special +from sklearn.calibration import CalibratedClassifierCV +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import f1_score +from sklearn.svm import LinearSVC + +import quapy as qp +from model_selection import GridSearchQ +from quapy.protocol import APP +from quapy.method.aggregative import PACC, ACC, EMQ, PCC, CC, DMy, T50, MS2, KDEyML, KDEyCS, KDEyHD +from sklearn import clone +import quapy.functional as F + +# datasets = qp.datasets.UCI_DATASETS +datasets = ['imdb'] + +# target = 'f1' +target = 'acc' + +errors = [] + +def method_1(cls, q, train, val, sample, y=None, y_hat=None): + """ + Converts a misclassification matrix computed in validation (i.e., in the train distribution P) into + the corresponding equivalent misclassification matrix in test (i.e., in the test distribution Q) + by relying on the PPS assumptions. + + :return: tuple (tn, fn, fp, tp,) of floats in [0,1] summing up to 1 + """ + + y_val = val.labels + y_hat_val = cls.predict(val.instances) + + # q = EMQ(LogisticRegression(class_weight='balanced')) + # q.fit(val, fit_classifier=True) + # q = EMQ(cls) + # q.fit(train, fit_classifier=False) + + + # q = KDEyML(cls) + # q.fit(train, val_split=val, fit_classifier=False) + M_hat = ACC.getPteCondEstim(train.classes_, y_val, y_hat_val) + M_true = ACC.getPteCondEstim(train.classes_, y, y_hat) + p_hat = q.quantify(sample.instances) + cont_table_hat = p_hat * M_hat + # cont_table_hat = np.clip(cont_table_hat, 0, 1) + # cont_table_hat = cont_table_hat / cont_table_hat.sum() + + print('true_prev: ', sample.prevalence()) + print('estim_prev: ', p_hat) + print('M-true:\n', M_true) + print('M-hat:\n', M_hat) + print('cont_table:\n', cont_table_hat) + + tp = cont_table_hat[1, 1] + tn = cont_table_hat[0, 0] + fn = cont_table_hat[0, 1] + fp = cont_table_hat[1, 0] + + return tn, fn, fp, tp + + +def method_2(cls, train, val, sample, y=None, y_hat=None): + """ + Assume P and Q are the training and test distributions + Solves the following system of linear equations: + tp + fp = CC (the classify & count estimate, observed) + fn + tp = Q(Y=1) (this is not observed but is estimated via quantification) + tp + fp + fn + tn = 1 (trivial) + + There are 4 unknowns and 3 equations. The fourth required one is established + by assuming that the PPS conditions hold, i.e., that P(X|Y)=Q(X|Y); note that + this implies P(hatY|Y)=Q(hatY|Y) if hatY is computed by any measurable function. + In particular, we consider that the tpr in P (estimated via validation, hereafter tpr) and + in Q (unknown, hereafter tpr_Q) should + be the same. This means: + tpr = tpr_Q = tp / (tp + fn) + after some manipulation: + tp (tpr-1) + fn (tpr) = 0 <-- our last equation + + Note that the last equation relies on the estimate tpr. It is likely that, the more + positives we have, the more reliable this estimate is. This suggests that, in cases + in which we have more negatives in the validation set than positives, it might be + convenient to resort to the true negative rate (tnr) instead. This gives rise to + the alternative fourth equation: + tn (tnr-1) + fp (tnr) = 0 + + :return: tuple (tn, fn, fp, tp,) of floats in [0,1] summing up to 1 + """ + + y_val = val.labels + y_hat_val = cls.predict(val.instances) + + q = ACC(cls) + q.fit(train, val_split=val, fit_classifier=False) + p_hat = q.quantify(sample.instances) + pos_prev = p_hat[1] + # pos_prev = sample.prevalence()[1] + + cc = CC(cls) + cc.fit(train, fit_classifier=False) + cc_prev = cc.quantify(sample.instances)[1] + + M_hat = ACC.getPteCondEstim(train.classes_, y_val, y_hat_val) + M_true = ACC.getPteCondEstim(train.classes_, y, y_hat) + cont_table_true = sample.prevalence() * M_true + + if val.prevalence()[1] > 0.5: + + # in this case, the tpr might be a more reliable estimate than tnr + tpr_hat = M_hat[1, 1] + + A = np.asarray([ + [0, 0, 1, 1], + [0, 1, 0, 1], + [1, 1, 1, 1], + [0, tpr_hat, 0, tpr_hat - 1] + ]) + + else: + + # in this case, the tnr might be a more reliable estimate than tpr + tnr_hat = M_hat[0, 0] + + A = np.asarray([ + [0, 0, 1, 1], + [0, 1, 0, 1], + [1, 1, 1, 1], + [tnr_hat-1, 0, tnr_hat, 0] + ]) + + b = np.asarray( + [cc_prev, pos_prev, 1, 0] + ) + + tn, fn, fp, tp = np.linalg.solve(A, b) + + cont_table_estim = np.asarray([ + [tn, fn], + [fp, tp] + ]) + + # if (cont_table_estim < 0).any() or (cont_table_estim>1).any(): + # cont_table_estim = scipy.special.softmax(cont_table_estim) + + print('true_prev: ', sample.prevalence()) + print('estim_prev: ', p_hat) + print('true_cont_table:\n', cont_table_true) + print('estim_cont_table:\n', cont_table_estim) + # print('true_tpr', M_true[1,1]) + # print('estim_tpr', tpr_hat) + + + return tn, fn, fp, tp + + +def method_3(cls, train, val, sample, y=None, y_hat=None): + """ + This is just method 2 but without involving any quapy's quantifier. + + :return: tuple (tn, fn, fp, tp,) of floats in [0,1] summing up to 1 + """ + + classes = val.classes_ + y_val = val.labels + y_hat_val = cls.predict(val.instances) + M_hat = ACC.getPteCondEstim(classes, y_val, y_hat_val) + y_hat_test = cls.predict(sample.instances) + pos_prev_cc = F.prevalence_from_labels(y_hat_test, classes)[1] + tpr_hat = M_hat[1,1] + fpr_hat = M_hat[1,0] + tnr_hat = M_hat[0,0] + if tpr_hat!=fpr_hat: + pos_prev_test_hat = (pos_prev_cc - fpr_hat) / (tpr_hat - fpr_hat) + else: + print('-->', tpr_hat) + pos_prev_test_hat = pos_prev_cc + pos_prev_test_hat = np.clip(pos_prev_test_hat, 0, 1) + pos_prev_val = val.prevalence()[1] + + if pos_prev_val > 0.5: + # in this case, the tpr might be a more reliable estimate than tnr + A = np.asarray([ + [0, 0, 1, 1], + [0, 1, 0, 1], + [1, 1, 1, 1], + [0, tpr_hat, 0, tpr_hat - 1] + ]) + else: + # in this case, the tnr might be a more reliable estimate than tpr + A = np.asarray([ + [0, 0, 1, 1], + [0, 1, 0, 1], + [1, 1, 1, 1], + [tnr_hat-1, 0, tnr_hat, 0] + ]) + + b = np.asarray( + [pos_prev_cc, pos_prev_test_hat, 1, 0] + ) + + tn, fn, fp, tp = np.linalg.solve(A, b) + + return tn, fn, fp, tp + + +def cls_eval_from_counters(tn, fn, fp, tp): + if target == 'acc': + acc_hat = (tp + tn) + else: + den = (2 * tp + fn + fp) + if den > 0: + acc_hat = 2 * tp / den + else: + acc_hat = 0 + return acc_hat + + +def cls_eval_from_labels(y, y_hat): + if target == 'acc': + acc = (y_hat == y).mean() + else: + acc = f1_score(y, y_hat, zero_division=0) + return acc + + +for dataset_name in datasets: + + train_orig, test = qp.datasets.fetch_reviews(dataset_name, tfidf=True, min_df=10).train_test + + xs = [] + ys_1 = [] + ys_trval = [] + ys_3 = [] + + train_prot = APP(train_orig, n_prevalences=11, repeats=1, return_type='labelled_collection', random_state=0, sample_size=10000) + for train in train_prot(): + if np.product(train.prevalence()) == 0: + # skip experiments with no positives or no negatives in training + continue + + cls = LogisticRegression(class_weight='balanced', C=100) + # cls = CalibratedClassifierCV(LinearSVC()) + + train, val = train.split_stratified(train_prop=0.5, random_state=0) + + print(f'dataset name = {dataset_name}') + print(f'#train = {len(train)}, prev={F.strprev(train.prevalence())}') + print(f'#val = {len(val)}, prev={F.strprev(val.prevalence())}') + print(f'#test = {len(test)}, prev={F.strprev(test.prevalence())}') + + cls.fit(*train.Xy) + + # q = KDEyML(cls) + q = ACC(LogisticRegression()) + q.fit(train, val_split=val, fit_classifier=True) + # q = GridSearchQ(PACC(cls), + # param_grid={'classifier__C':np.logspace(-2,2,5)}, + # protocol=APP(val, sample_size=1000), + # verbose=True, + # n_jobs=-1).fit(train) + + acc_trval = cls_eval_from_labels(val.labels, cls.predict(val.instances)) + + + for sample in APP(test, n_prevalences=21, repeats=1, sample_size=1000, return_type='labelled_collection')(): + print('='*80) + y_hat = cls.predict(sample.instances) + y = sample.labels + acc_true = cls_eval_from_labels(y, y_hat) + xs.append(acc_true) + ys_trval.append(acc_trval) + + tn, fn, fp, tp = method_1(cls, q, train, val, sample, y, y_hat) + acc_hat = cls_eval_from_counters(tn, fn, fp, tp) + ys_1.append(acc_hat) + + tn, fn, fp, tp = method_3(cls, train, val, sample, y, y_hat) + acc_hat = cls_eval_from_counters(tn, fn, fp, tp) + ys_3.append(acc_hat) + + error = abs(acc_true - acc_hat) + errors.append(error) + + print(f'classifier accuracy={acc_true:.3f}') + print(f'estimated accuracy={acc_hat:.3f}') + print(f'estimation error={error:.4f}') + + +print('process end') +print('='*80) +print(f'mean error = {np.mean(errors)}') +print(f'std error = {np.std(errors)}') + +import matplotlib.pyplot as plt + +# Create scatter plot +plt.plot([0, 1], [0, 1], color='black', linestyle='--') +plt.scatter(xs, ys_1, label='method 1') +plt.scatter(xs, ys_3, label='method 3') +plt.scatter(xs, ys_trval, label='tr-val') +plt.legend() + +# Add labels and title +plt.xlabel('True Accuracy') +plt.ylabel('Estim Accuracy') + +# Display the plot +plt.show() + + + + + diff --git a/ClassifierAccuracy/main.py b/ClassifierAccuracy/main.py new file mode 100644 index 0000000..1984951 --- /dev/null +++ b/ClassifierAccuracy/main.py @@ -0,0 +1,149 @@ +from collections import defaultdict + +from sklearn.calibration import CalibratedClassifierCV +from sklearn.svm import LinearSVC +from tqdm import tqdm +from sklearn.linear_model import LogisticRegression +import os +import quapy as qp +from method.aggregative import PACC, EMQ, PCC, CC, ACC, HDy +from models import * +import matplotlib.pyplot as plt +from pathlib import Path + + +def clf(): + # return CalibratedClassifierCV(LinearSVC(class_weight=None)) + return LogisticRegression(class_weight=None) + + +def F1(contingency_table): + # tn = contingency_table[0, 0] + tp = contingency_table[1, 1] + fp = contingency_table[0, 1] + fn = contingency_table[1, 0] + den = (2*tp+fp+fn) + if den>0: + return 2*tp/den + else: + return 1 + + +def accuracy(contingency_table): + tn = contingency_table[0, 0] + tp = contingency_table[1, 1] + fp = contingency_table[0, 1] + fn = contingency_table[1, 0] + return (tp+tn)/(tp+fp+fn+tn) + + +def plot_series(series, repeats, metric_name, train_prev=None, savepath=None): + + for key in series: + print(series[key]) + + fig, ax = plt.subplots() + + def bin(v): + mat = np.asarray(v).reshape(-1, repeats) + return mat.mean(axis=1), mat.std(axis=1) + + x = series['prev'] + x,_ = bin(x) + + for serie in series: + if serie=='prev': continue + values = series[serie] + print(serie, values) + val_mean, val_std = bin(values) + ax.errorbar(x, val_mean, label=serie, fmt='-', marker='o') + ax.fill_between(x, val_mean - val_std, val_mean + val_std, alpha=0.25) + + if train_prev is not None: + ax.axvline(x=train_prev, label='tr-prev', color='k', linestyle='--') + # ax.scatter(train_prev, train_prev, c='c', label='tr-prev', linewidth=2, edgecolor='k', s=100, zorder=3) + + ax.legend(loc='center left', bbox_to_anchor=(1, 0.5)) + + ax.grid() + ax.set_title(metric_name) + ax.set(xlabel='$p_U(\oplus)$', ylabel='estimated '+metric_name, + title='Classifier accuracy in terms of '+metric_name) + + if savepath is None: + plt.show() + else: + os.makedirs(Path(savepath).parent, exist_ok=True) + plt.savefig(savepath, bbox_inches='tight') + + +dataset='imdb' +data = qp.datasets.fetch_reviews(dataset, tfidf=True, min_df=5, pickle=True) + +# qp.data.preprocessing.reduce_columns(data, min_df=5, inplace=True) +# print('num_features', data.training.instances.shape[1]) + +train = data.training +test = data.test + +upper = UpperBound(clf(), y_test=None).fit(train) + +mlcfe = MLCMEstimator(clf(), strategy='kfcv', k=5, n_jobs=-1).fit(train) + +emq_quant = QuantificationCMPredictor(clf(), EMQ(LogisticRegression()), strategy='kfcv', k=5, n_jobs=-1).fit(train) +# cc_quant = QuantificationCMPredictor(clf(), CC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train) +# pcc_quant = QuantificationCMPredictor(clf(), PCC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train) +# acc_quant = QuantificationCMPredictor(clf(), ACC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train) +pacc_quant = QuantificationCMPredictor(clf(), PACC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train) +# hdy_quant = QuantificationCMPredictor(clf(), HDy(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train) + +sld = EMQ(LogisticRegression()).fit(train) +pacc = PACC(clf()).fit(train) + +contenders = [ + ('kFCV+MLPE', mlcfe), + ('SLD', emq_quant), + # ('CC', cc_quant), + # ('PCC', pcc_quant), + # ('ACC', acc_quant), + ('PACC', pacc_quant), + # ('HDy', hdy_quant) +] + +metric = F1 +# metric = accuracy + +repeats = 10 +with qp.util.temp_seed(42): + samples_idx = [idx for idx in test.artificial_sampling_index_generator(sample_size=500, n_prevalences=21, repeats=repeats)] + + +series = defaultdict(lambda: []) +for idx in tqdm(samples_idx, desc='generating predictions'): + sample = test.sampling_from_index(idx) + + upper.show_true_labels(sample.labels) + upper_conf_matrix = upper.predict(sample.instances) + metric_true = metric(upper_conf_matrix) + series['Upper'].append(metric_true) + + for mname, method in contenders: + conf_matrix = method.predict(sample.instances) + estim_metric = metric(conf_matrix) + series[mname].append(estim_metric) + if hasattr(method, 'quantify'): + series[mname+'-prev'].append(method.quantify(sample.instances)) + + series['binsld-prev'].append(sld.quantify(sample.instances)[1]) + series['binpacc-prev'].append(pacc.quantify(sample.instances)[1]) + series['optimal-prev'].append(sample.prevalence()[1]) + series['prev'].append(sample.prevalence()[1]) + +metricname = metric.__name__ +plot_series(series, repeats, metric_name=metricname, train_prev=train.prevalence()[1], savepath='./plots/'+dataset+'_LinearSVC_'+metricname+'.pdf') + + + + + + diff --git a/ClassifierAccuracy/models.py b/ClassifierAccuracy/models.py new file mode 100644 index 0000000..e0114cf --- /dev/null +++ b/ClassifierAccuracy/models.py @@ -0,0 +1,179 @@ +import numpy as np +import quapy as qp +from sklearn import clone +from sklearn.metrics import confusion_matrix +import scipy +from scipy.sparse import issparse, csr_matrix +from data import LabelledCollection +from abc import ABC, abstractmethod +from sklearn.model_selection import cross_val_predict + + +class ConfusionMatrixPredictor(ABC): + """ + Abstract class of predictors of a confusion matrix for the performance of a classifier. + For the binary case, this accounts to predicting the 4-cell contingency table consisting of the + true positives (TP), true negatives (TN), false positives (FP), and false negatives (FN) that + most evaluation metrics make use of. + """ + @abstractmethod + def fit(self, train: LabelledCollection): + pass + + @abstractmethod + def predict(self, test): + pass + + +class MLCMEstimator(ConfusionMatrixPredictor): + """ + The Maximum Likelihood Confusion Matrix Estimator is a method that relies on the IID assumption, and thus + computes, via k-FCV (or any other technique) the counters of the confusion matrix, assuming that those are + good estimates for the test case. + """ + def __init__(self, classifier, strategy='kfcv', **kwargs): + assert strategy in ['kfcv'], 'unknown strategy' + if strategy=='kfcv': + assert 'k' in kwargs, 'strategy "kfcv" requires "k" to be passed as an argument' + self.classifier = classifier + self.strategy = strategy + self.kwargs = kwargs + + def sout(self, msg): + if 'verbose' in self.kwargs: + print(msg) + + def fit(self, train: LabelledCollection): + X, y = train.Xy + if self.strategy == 'kfcv': + k=self.kwargs['k'] + n_jobs = self.kwargs['n_jobs'] if 'n_jobs' in self.kwargs else 1 + predict = self.kwargs['predict'] if 'predict' in self.kwargs else 'predict' + self.sout(f'{self.__class__.__name__}: ' + f'running cross_val_predict with k={k} n_jobs={n_jobs} predict={predict}') + predictions = cross_val_predict(self.classifier, X, y, cv=k, n_jobs=n_jobs, method=predict) + self.conf_matrix = confusion_matrix(y, predictions, labels=train.classes_) + return self + + def predict(self, test): + """ + This method disregards the test set, under the assumption that it is IID wrt the training. This meaning that + the confusion matrix for the test data should coincide with the one computed for training (using any cross + validation strategy). + + :param test: test collection (ignored) + :return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix` + """ + return self.conf_matrix + + +class UpperBound(ConfusionMatrixPredictor): + def __init__(self, classifier, y_test): + self.classifier = classifier + self.y_test = y_test + + def fit(self, train: LabelledCollection): + self.classifier.fit(*train.Xy) + self.classes = train.classes_ + return self + + def show_true_labels(self, y_test): + self.y_test = y_test + + def predict(self, test): + predictions = self.classifier.predict(test) + return confusion_matrix(self.y_test, predictions, labels=self.classes) + + +def get_counters(y_true, y_pred): + counters = np.full(shape=y_true.shape, fill_value=-1) + counters[np.logical_and(y_true == 1, y_pred == 1)] = 0 + counters[np.logical_and(y_true == 1, y_pred == 0)] = 1 + counters[np.logical_and(y_true == 0, y_pred == 1)] = 2 + counters[np.logical_and(y_true == 0, y_pred == 0)] = 3 + class_map = { + 0:'tp', + 1:'fn', + 2:'fp', + 3:'tn' + } + return counters, class_map + + +def safehstack(matrix, posteriors): + if issparse(matrix): + instances = csr_matrix(scipy.sparse.hstack([matrix, posteriors])) + else: + instances = np.hstack([matrix, posteriors]) + return instances + + +class QuantificationCMPredictor(ConfusionMatrixPredictor): + """ + """ + def __init__(self, classifier, quantifier, strategy='kfcv', **kwargs): + assert strategy in ['kfcv'], 'unknown strategy' + if strategy=='kfcv': + assert 'k' in kwargs, 'strategy "kfcv" requires "k" to be passed as an argument' + self.classifier = clone(classifier) + self.quantifier = quantifier + self.strategy = strategy + self.kwargs = kwargs + + def sout(self, msg): + if 'verbose' in self.kwargs: + print(msg) + + def fit(self, train: LabelledCollection): + X, y = train.Xy + if self.strategy == 'kfcv': + k=self.kwargs['k'] + n_jobs = self.kwargs['n_jobs'] if 'n_jobs' in self.kwargs else 1 + self.sout(f'{self.__class__.__name__}: ' + f'running cross_val_predict with k={k} n_jobs={n_jobs}') + predictions = cross_val_predict(self.classifier, X, y, cv=k, n_jobs=n_jobs, method='predict') + posteriors = cross_val_predict(self.classifier, X, y, cv=k, n_jobs=n_jobs, method='predict_proba') + self.classifier.fit(X, y) + instances = safehstack(train.instances, posteriors) + counters, class_map = get_counters(train.labels, predictions) + q_data = LabelledCollection(instances=instances, labels=counters, classes_=[0,1,2,3]) + print('counters prevalence', q_data.counts()) + self.quantifier.fit(q_data) + return self + + def predict(self, test): + """ + + :param test: test collection (ignored) + :return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix` + """ + posteriors = self.classifier.predict_proba(test) + instances = safehstack(test, posteriors) + counters = self.quantifier.quantify(instances) + tp, fn, fp, tn = counters + conf_matrix = np.asarray([[tn, fp], [fn, tp]]) + return conf_matrix + + def quantify(self, test): + posteriors = self.classifier.predict_proba(test) + instances = safehstack(test, posteriors) + counters = self.quantifier.quantify(instances) + tp, fn, fp, tn = counters + den_tpr = (tp+fn) + if den_tpr>0: + tpr = tp/den_tpr + else: + tpr = 1 + + den_fpr = (fp+tn) + if den_fpr>0: + fpr = fp / den_fpr + else: + fpr = 0 + + pcc = posteriors.sum(axis=0)[1] + pacc = (pcc-fpr)/(tpr-fpr) + pacc = np.clip(pacc, 0, 1) + + q = tp+fn + return q \ No newline at end of file