diff --git a/ClassifierAccuracy/deprecated/main_binary.py b/ClassifierAccuracy/deprecated/main_binary.py new file mode 100644 index 0000000..d3ca226 --- /dev/null +++ b/ClassifierAccuracy/deprecated/main_binary.py @@ -0,0 +1,149 @@ +from collections import defaultdict + +from sklearn.calibration import CalibratedClassifierCV +from sklearn.svm import LinearSVC +from tqdm import tqdm +from sklearn.linear_model import LogisticRegression +import os +import quapy as qp +from method.aggregative import PACC, EMQ, PCC, CC, ACC, HDy +from models_binary import * +import matplotlib.pyplot as plt +from pathlib import Path + + +def clf(): + # return CalibratedClassifierCV(LinearSVC(class_weight=None)) + return LogisticRegression(class_weight=None) + + +def F1(contingency_table): + # tn = contingency_table[0, 0] + tp = contingency_table[1, 1] + fp = contingency_table[0, 1] + fn = contingency_table[1, 0] + den = (2*tp+fp+fn) + if den>0: + return 2*tp/den + else: + return 1 + + +def accuracy(contingency_table): + tn = contingency_table[0, 0] + tp = contingency_table[1, 1] + fp = contingency_table[0, 1] + fn = contingency_table[1, 0] + return (tp+tn)/(tp+fp+fn+tn) + + +def plot_series(series, repeats, metric_name, train_prev=None, savepath=None): + + for key in series: + print(series[key]) + + fig, ax = plt.subplots() + + def bin(v): + mat = np.asarray(v).reshape(-1, repeats) + return mat.mean(axis=1), mat.std(axis=1) + + x = series['prev'] + x,_ = bin(x) + + for serie in series: + if serie=='prev': continue + values = series[serie] + print(serie, values) + val_mean, val_std = bin(values) + ax.errorbar(x, val_mean, label=serie, fmt='-', marker='o') + ax.fill_between(x, val_mean - val_std, val_mean + val_std, alpha=0.25) + + if train_prev is not None: + ax.axvline(x=train_prev, label='tr-prev', color='k', linestyle='--') + # ax.scatter(train_prev, train_prev, c='c', label='tr-prev', linewidth=2, edgecolor='k', s=100, zorder=3) + + ax.legend(loc='center left', bbox_to_anchor=(1, 0.5)) + + ax.grid() + ax.set_title(metric_name) + ax.set(xlabel='$p_U(\oplus)$', ylabel='estimated '+metric_name, + title='Classifier accuracy in terms of '+metric_name) + + if savepath is None: + plt.show() + else: + os.makedirs(Path(savepath).parent, exist_ok=True) + plt.savefig(savepath, bbox_inches='tight') + + +dataset='imdb' +data = qp.datasets.fetch_reviews(dataset, tfidf=True, min_df=5, pickle=True) + +# qp.data.preprocessing.reduce_columns(data, min_df=5, inplace=True) +# print('num_features', data.training.instances.shape[1]) + +train = data.training +test = data.test + +upper = UpperBound(clf(), y_test=None).fit(train) + +mlcfe = MLCMEstimator(clf(), strategy='kfcv', k=5, n_jobs=-1).fit(train) + +emq_quant = QuantificationCMPredictor(clf(), EMQ(LogisticRegression()), strategy='kfcv', k=5, n_jobs=-1).fit(train) +# cc_quant = QuantificationCMPredictor(clf(), CC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train) +# pcc_quant = QuantificationCMPredictor(clf(), PCC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train) +# acc_quant = QuantificationCMPredictor(clf(), ACC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train) +pacc_quant = QuantificationCMPredictor(clf(), PACC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train) +# hdy_quant = QuantificationCMPredictor(clf(), HDy(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train) + +sld = EMQ(LogisticRegression()).fit(train) +pacc = PACC(clf()).fit(train) + +contenders = [ + ('kFCV+MLPE', mlcfe), + ('SLD', emq_quant), + # ('CC', cc_quant), + # ('PCC', pcc_quant), + # ('ACC', acc_quant), + ('PACC', pacc_quant), + # ('HDy', hdy_quant) +] + +metric = F1 +# metric = accuracy + +repeats = 10 +with qp.util.temp_seed(42): + samples_idx = [idx for idx in test.artificial_sampling_index_generator(sample_size=500, n_prevalences=21, repeats=repeats)] + + +series = defaultdict(lambda: []) +for idx in tqdm(samples_idx, desc='generating predictions'): + sample = test.sampling_from_index(idx) + + upper.show_true_labels(sample.labels) + upper_conf_matrix = upper.predict(sample.instances) + metric_true = metric(upper_conf_matrix) + series['Upper'].append(metric_true) + + for mname, method in contenders: + conf_matrix = method.predict(sample.instances) + estim_metric = metric(conf_matrix) + series[mname].append(estim_metric) + if hasattr(method, 'quantify'): + series[mname+'-prev'].append(method.quantify(sample.instances)) + + series['binsld-prev'].append(sld.quantify(sample.instances)[1]) + series['binpacc-prev'].append(pacc.quantify(sample.instances)[1]) + series['optimal-prev'].append(sample.prevalence()[1]) + series['prev'].append(sample.prevalence()[1]) + +metricname = metric.__name__ +plot_series(series, repeats, metric_name=metricname, train_prev=train.prevalence()[1], savepath='./plots/'+dataset+'_LinearSVC_'+metricname+'.pdf') + + + + + + diff --git a/ClassifierAccuracy/models.py b/ClassifierAccuracy/deprecated/models_binary.py similarity index 100% rename from ClassifierAccuracy/models.py rename to ClassifierAccuracy/deprecated/models_binary.py diff --git a/ClassifierAccuracy/main.py b/ClassifierAccuracy/main.py index 1984951..1af44a2 100644 --- a/ClassifierAccuracy/main.py +++ b/ClassifierAccuracy/main.py @@ -1,148 +1,75 @@ from collections import defaultdict -from sklearn.calibration import CalibratedClassifierCV -from sklearn.svm import LinearSVC -from tqdm import tqdm +from sklearn.base import BaseEstimator from sklearn.linear_model import LogisticRegression -import os +import numpy as np +from sklearn.metrics import confusion_matrix + +from method.aggregative import PACC, EMQ +from utils import * + +import quapy.data.datasets import quapy as qp -from method.aggregative import PACC, EMQ, PCC, CC, ACC, HDy -from models import * -import matplotlib.pyplot as plt -from pathlib import Path +from models_multiclass import * +from quapy.data import LabelledCollection +from quapy.protocol import UPP +from quapy.data.datasets import fetch_UCIMulticlassLabelledCollection, UCI_MULTICLASS_DATASETS -def clf(): - # return CalibratedClassifierCV(LinearSVC(class_weight=None)) - return LogisticRegression(class_weight=None) +def split(data: LabelledCollection): + train_val, test = data.split_stratified(train_prop=0.66) + train, val = train_val.split_stratified(train_prop=0.5) + return train, val, test -def F1(contingency_table): - # tn = contingency_table[0, 0] - tp = contingency_table[1, 1] - fp = contingency_table[0, 1] - fn = contingency_table[1, 0] - den = (2*tp+fp+fn) - if den>0: - return 2*tp/den - else: - return 1 +def gen_datasets()-> [str,[LabelledCollection,LabelledCollection,LabelledCollection]]: + for dataset_name in UCI_MULTICLASS_DATASETS: + dataset = fetch_UCIMulticlassLabelledCollection(dataset_name) + yield dataset_name, split(dataset) -def accuracy(contingency_table): - tn = contingency_table[0, 0] - tp = contingency_table[1, 1] - fp = contingency_table[0, 1] - fn = contingency_table[1, 0] - return (tp+tn)/(tp+fp+fn+tn) +def gen_CAP(h, acc_fn)->[str,ClassifierAccuracyPrediction]: + yield 'Naive', NaiveCAP(h, acc_fn) + yield 'CT-PPS-PACC', ContTableTransferCAP(h, acc_fn, PACC(LogisticRegression())) + yield 'CT-PPSh-PACC', ContTableWithHTransferCAP(h, acc_fn, PACC) -def plot_series(series, repeats, metric_name, train_prev=None, savepath=None): - - for key in series: - print(series[key]) - - fig, ax = plt.subplots() - - def bin(v): - mat = np.asarray(v).reshape(-1, repeats) - return mat.mean(axis=1), mat.std(axis=1) - - x = series['prev'] - x,_ = bin(x) - - for serie in series: - if serie=='prev': continue - values = series[serie] - print(serie, values) - val_mean, val_std = bin(values) - ax.errorbar(x, val_mean, label=serie, fmt='-', marker='o') - ax.fill_between(x, val_mean - val_std, val_mean + val_std, alpha=0.25) - - if train_prev is not None: - ax.axvline(x=train_prev, label='tr-prev', color='k', linestyle='--') - # ax.scatter(train_prev, train_prev, c='c', label='tr-prev', linewidth=2, edgecolor='k', s=100, zorder=3) - - ax.legend(loc='center left', bbox_to_anchor=(1, 0.5)) - - ax.grid() - ax.set_title(metric_name) - ax.set(xlabel='$p_U(\oplus)$', ylabel='estimated '+metric_name, - title='Classifier accuracy in terms of '+metric_name) - - if savepath is None: - plt.show() - else: - os.makedirs(Path(savepath).parent, exist_ok=True) - plt.savefig(savepath, bbox_inches='tight') +def true_acc(h:BaseEstimator, acc_fn: callable, U: LabelledCollection): + y_pred = h.predict(U.X) + y_true = U.y + conf_table = confusion_matrix(y_true, y_pred=y_pred, labels=U.classes_) + return acc_fn(conf_table) -dataset='imdb' -data = qp.datasets.fetch_reviews(dataset, tfidf=True, min_df=5, pickle=True) - -# qp.data.preprocessing.reduce_columns(data, min_df=5, inplace=True) -# print('num_features', data.training.instances.shape[1]) - -train = data.training -test = data.test - -upper = UpperBound(clf(), y_test=None).fit(train) - -mlcfe = MLCMEstimator(clf(), strategy='kfcv', k=5, n_jobs=-1).fit(train) - -emq_quant = QuantificationCMPredictor(clf(), EMQ(LogisticRegression()), strategy='kfcv', k=5, n_jobs=-1).fit(train) -# cc_quant = QuantificationCMPredictor(clf(), CC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train) -# pcc_quant = QuantificationCMPredictor(clf(), PCC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train) -# acc_quant = QuantificationCMPredictor(clf(), ACC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train) -pacc_quant = QuantificationCMPredictor(clf(), PACC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train) -# hdy_quant = QuantificationCMPredictor(clf(), HDy(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train) - -sld = EMQ(LogisticRegression()).fit(train) -pacc = PACC(clf()).fit(train) - -contenders = [ - ('kFCV+MLPE', mlcfe), - ('SLD', emq_quant), - # ('CC', cc_quant), - # ('PCC', pcc_quant), - # ('ACC', acc_quant), - ('PACC', pacc_quant), - # ('HDy', hdy_quant) -] - -metric = F1 -# metric = accuracy - -repeats = 10 -with qp.util.temp_seed(42): - samples_idx = [idx for idx in test.artificial_sampling_index_generator(sample_size=500, n_prevalences=21, repeats=repeats)] +def acc_fn(cont_table): + return np.diag(cont_table).sum() / cont_table.sum() -series = defaultdict(lambda: []) -for idx in tqdm(samples_idx, desc='generating predictions'): - sample = test.sampling_from_index(idx) +qp.environ['SAMPLE_SIZE'] = 100 - upper.show_true_labels(sample.labels) - upper_conf_matrix = upper.predict(sample.instances) - metric_true = metric(upper_conf_matrix) - series['Upper'].append(metric_true) +h = LogisticRegression() - for mname, method in contenders: - conf_matrix = method.predict(sample.instances) - estim_metric = metric(conf_matrix) - series[mname].append(estim_metric) - if hasattr(method, 'quantify'): - series[mname+'-prev'].append(method.quantify(sample.instances)) - - series['binsld-prev'].append(sld.quantify(sample.instances)[1]) - series['binpacc-prev'].append(pacc.quantify(sample.instances)[1]) - series['optimal-prev'].append(sample.prevalence()[1]) - series['prev'].append(sample.prevalence()[1]) - -metricname = metric.__name__ -plot_series(series, repeats, metric_name=metricname, train_prev=train.prevalence()[1], savepath='./plots/'+dataset+'_LinearSVC_'+metricname+'.pdf') +acc_trues = [] +acc_predicted = defaultdict(lambda :[]) +for dataset_name, (L, V, U) in gen_datasets(): + print(dataset_name) + h.fit(*L.Xy) + + test_prot = UPP(U, repeats=100, return_type='labelled_collection') + + acc_trues.extend(true_acc(h, acc_fn, Ui) for Ui in test_prot()) + + for method_name, method in gen_CAP(h, acc_fn): + method.fit(V) + + for Ui in test_prot(): + acc_hat = method.predict(Ui.X) + acc_predicted[method_name].append(acc_hat) + +acc_predicted = list(acc_predicted.items()) +plot_diagonal('./plots/diagonal.png', acc_trues, acc_predicted) diff --git a/ClassifierAccuracy/models_multiclass.py b/ClassifierAccuracy/models_multiclass.py new file mode 100644 index 0000000..ba1fde7 --- /dev/null +++ b/ClassifierAccuracy/models_multiclass.py @@ -0,0 +1,236 @@ +import numpy as np +from sklearn.base import BaseEstimator + +import quapy as qp +from sklearn import clone +from sklearn.metrics import confusion_matrix +import scipy +from scipy.sparse import issparse, csr_matrix +from data import LabelledCollection +from abc import ABC, abstractmethod +from sklearn.model_selection import cross_val_predict + +from quapy.method.base import BaseQuantifier +from quapy.method.aggregative import PACC + + +class ClassifierAccuracyPrediction(ABC): + + def __init__(self, h: BaseEstimator, acc: callable): + self.h = h + self.acc = acc + + @abstractmethod + def fit(self, val: LabelledCollection): + ... + + def predict(self, X): + """ + Evaluates the accuracy function on the predicted contingency table + + :param X: test data + :return: float + """ + return self.acc(self.predict_ct(X)) + + @abstractmethod + def predict_ct(self, X): + """ + Predicts the contingency table for the test data + + :param X: test data + :return: a contingency table + """ + ... + + +class NaiveCAP(ClassifierAccuracyPrediction): + """ + The Naive CAP is a method that relies on the IID assumption, and thus uses the estimation in the validation data + as an estimate for the test data. + """ + def __init__(self, h: BaseEstimator, acc: callable): + super().__init__(h, acc) + + def fit(self, val: LabelledCollection): + y_hat = self.h.predict(val.X) + y_true = val.y + self.cont_table = confusion_matrix(y_true, y_pred=y_hat, labels=val.classes_) + return self + + def predict_ct(self, test): + """ + This method disregards the test set, under the assumption that it is IID wrt the training. This meaning that + the confusion matrix for the test data should coincide with the one computed for training (using any cross + validation strategy). + + :param test: test collection (ignored) + :return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix` + """ + return self.cont_table + + +class ContTableTransferCAP(ClassifierAccuracyPrediction): + """ + + """ + def __init__(self, h: BaseEstimator, acc: callable, q: BaseQuantifier): + super().__init__(h, acc) + self.q = q + + def fit(self, val: LabelledCollection): + y_hat = self.h.predict(val.X) + y_true = val.y + self.cont_table = confusion_matrix(y_true, y_pred=y_hat, labels=val.classes_) + self.train_prev = val.prevalence() + self.q.fit(val) + return self + + def predict_ct(self, test): + """ + :param test: test collection (ignored) + :return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix` + """ + prev_hat = self.q.quantify(test) + adjustment = prev_hat / self.train_prev + return self.cont_table * adjustment[:, np.newaxis] + + +class ContTableWithHTransferCAP(ClassifierAccuracyPrediction): + """ + + """ + def __init__(self, h: BaseEstimator, acc: callable, q_class): + super().__init__(h, acc) + self.q = q_class(classifier=h) + + def fit(self, val: LabelledCollection): + y_hat = self.h.predict(val.X) + y_true = val.y + self.cont_table = confusion_matrix(y_true, y_pred=y_hat, labels=val.classes_) + self.train_prev = val.prevalence() + self.q.fit(val, fit_classifier=False, val_split=val) + return self + + def predict_ct(self, test): + """ + :param test: test collection (ignored) + :return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix` + """ + prev_hat = self.q.quantify(test) + adjustment = prev_hat / self.train_prev + return self.cont_table * adjustment[:, np.newaxis] + + + + +class UpperBound(ClassifierAccuracyPrediction): + def __init__(self, classifier, y_test): + self.classifier = classifier + self.y_test = y_test + + def fit(self, train: LabelledCollection): + self.classifier.fit(*train.Xy) + self.classes = train.classes_ + return self + + def show_true_labels(self, y_test): + self.y_test = y_test + + def predict(self, test): + predictions = self.classifier.predict(test) + return confusion_matrix(self.y_test, predictions, labels=self.classes) + + +def get_counters(y_true, y_pred): + counters = np.full(shape=y_true.shape, fill_value=-1) + counters[np.logical_and(y_true == 1, y_pred == 1)] = 0 + counters[np.logical_and(y_true == 1, y_pred == 0)] = 1 + counters[np.logical_and(y_true == 0, y_pred == 1)] = 2 + counters[np.logical_and(y_true == 0, y_pred == 0)] = 3 + class_map = { + 0:'tp', + 1:'fn', + 2:'fp', + 3:'tn' + } + return counters, class_map + + +def safehstack(matrix, posteriors): + if issparse(matrix): + instances = csr_matrix(scipy.sparse.hstack([matrix, posteriors])) + else: + instances = np.hstack([matrix, posteriors]) + return instances + + +class QuantificationCMPredictor(ClassifierAccuracyPrediction): + """ + """ + def __init__(self, classifier, quantifier, strategy='kfcv', **kwargs): + assert strategy in ['kfcv'], 'unknown strategy' + if strategy=='kfcv': + assert 'k' in kwargs, 'strategy "kfcv" requires "k" to be passed as an argument' + self.classifier = clone(classifier) + self.quantifier = quantifier + self.strategy = strategy + self.kwargs = kwargs + + def sout(self, msg): + if 'verbose' in self.kwargs: + print(msg) + + def fit(self, train: LabelledCollection): + X, y = train.Xy + if self.strategy == 'kfcv': + k=self.kwargs['k'] + n_jobs = self.kwargs['n_jobs'] if 'n_jobs' in self.kwargs else 1 + self.sout(f'{self.__class__.__name__}: ' + f'running cross_val_predict with k={k} n_jobs={n_jobs}') + predictions = cross_val_predict(self.classifier, X, y, cv=k, n_jobs=n_jobs, method='predict') + posteriors = cross_val_predict(self.classifier, X, y, cv=k, n_jobs=n_jobs, method='predict_proba') + self.classifier.fit(X, y) + instances = safehstack(train.instances, posteriors) + counters, class_map = get_counters(train.labels, predictions) + q_data = LabelledCollection(instances=instances, labels=counters, classes_=[0,1,2,3]) + print('counters prevalence', q_data.counts()) + self.quantifier.fit(q_data) + return self + + def predict(self, test): + """ + + :param test: test collection (ignored) + :return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix` + """ + posteriors = self.classifier.predict_proba(test) + instances = safehstack(test, posteriors) + counters = self.quantifier.quantify(instances) + tp, fn, fp, tn = counters + conf_matrix = np.asarray([[tn, fp], [fn, tp]]) + return conf_matrix + + def quantify(self, test): + posteriors = self.classifier.predict_proba(test) + instances = safehstack(test, posteriors) + counters = self.quantifier.quantify(instances) + tp, fn, fp, tn = counters + den_tpr = (tp+fn) + if den_tpr>0: + tpr = tp/den_tpr + else: + tpr = 1 + + den_fpr = (fp+tn) + if den_fpr>0: + fpr = fp / den_fpr + else: + fpr = 0 + + pcc = posteriors.sum(axis=0)[1] + pacc = (pcc-fpr)/(tpr-fpr) + pacc = np.clip(pacc, 0, 1) + + q = tp+fn + return q \ No newline at end of file diff --git a/ClassifierAccuracy/notes.md b/ClassifierAccuracy/notes.md new file mode 100644 index 0000000..a515559 --- /dev/null +++ b/ClassifierAccuracy/notes.md @@ -0,0 +1,17 @@ +# Notes + +Branch for research on classifier accuracy prediction. + +I had some work done for binary (models_binary.py and main_binary.py). +I would like to approach the multiclass case directly now. + +I think I will frame the problem setting as follows. +A Classifier Accuracy Prediction (CAP) method is method tha receives as input: +- h: classifier (already trained), +- V: labelled collection (for training the CAP), +- acc_func: callable: any function that works on a contingency table + +And implements: +- fit: trains the CAP +- predict: predicts the evaluation measure on unseen data (provided, calls predict_ct and acc_func) +- predict_ct: predicts the contingency table \ No newline at end of file diff --git a/ClassifierAccuracy/utils.py b/ClassifierAccuracy/utils.py new file mode 100644 index 0000000..944dc2e --- /dev/null +++ b/ClassifierAccuracy/utils.py @@ -0,0 +1,29 @@ +import matplotlib.pyplot as plt +from pathlib import Path +from os import makedirs +import numpy as np + + +def plot_diagonal(outpath, xs, predictions:list): + + makedirs(Path(outpath).parent, exist_ok=True) + + # Create scatter plot + plt.figure(figsize=(10, 10)) + plt.xlim(0, 1) + plt.ylim(0, 1) + plt.plot([0, 1], [0, 1], color='black', linestyle='--') + + for method_name, ys in predictions: + pear_cor = np.corrcoef(xs, ys)[0, 1] + plt.scatter(xs, ys, label=f'{method_name} {pear_cor:.2f}') + + plt.legend() + + # Add labels and title + plt.xlabel('True Accuracy') + plt.ylabel('Estimated Accuracy') + + # Display the plot + # plt.show() + plt.savefig(outpath)