From 3932cf22ce9e9c97d93e4f350731637c7806f554 Mon Sep 17 00:00:00 2001 From: Alex Moreo Date: Tue, 27 Feb 2024 18:38:39 +0100 Subject: [PATCH] added Sebastiani's method and Pablo's method --- ClassifierAccuracy/commons.py | 118 ++++++++ ClassifierAccuracy/gen_tables.py | 3 + ClassifierAccuracy/main.py | 109 ++++---- ClassifierAccuracy/main2.py | 97 +++++++ ClassifierAccuracy/models_multiclass.py | 321 ++++++++++++---------- ClassifierAccuracy/tabular.py | 349 ++++++++++++++++++++++++ ClassifierAccuracy/utils.py | 151 +++++++++- quapy/data/base.py | 15 +- quapy/functional.py | 15 +- 9 files changed, 970 insertions(+), 208 deletions(-) create mode 100644 ClassifierAccuracy/commons.py create mode 100644 ClassifierAccuracy/gen_tables.py create mode 100644 ClassifierAccuracy/main2.py create mode 100644 ClassifierAccuracy/tabular.py diff --git a/ClassifierAccuracy/commons.py b/ClassifierAccuracy/commons.py new file mode 100644 index 0000000..c054a4a --- /dev/null +++ b/ClassifierAccuracy/commons.py @@ -0,0 +1,118 @@ +from collections import defaultdict + +from sklearn.base import BaseEstimator +from sklearn.linear_model import LogisticRegression +import numpy as np +from time import time +from sklearn.metrics import confusion_matrix +from sklearn.naive_bayes import GaussianNB +from sklearn.svm import SVC, LinearSVC + +from method.aggregative import PACC, EMQ, ACC +from utils import * + +import quapy.data.datasets +import quapy as qp +from models_multiclass import * +from quapy.data import LabelledCollection +from quapy.protocol import UPP +from quapy.data.datasets import fetch_UCIMulticlassLabelledCollection, UCI_MULTICLASS_DATASETS + + +def split(data: LabelledCollection): + train_val, test = data.split_stratified(train_prop=0.66, random_state=0) + train, val = train_val.split_stratified(train_prop=0.5, random_state=0) + return train, val, test + + +def gen_classifiers(): + yield 'LR', LogisticRegression() + #yield 'NB', GaussianNB() + #yield 'SVM(rbf)', SVC() + #yield 'SVM(linear)', LinearSVC() + + +def gen_datasets()-> [str,[LabelledCollection,LabelledCollection,LabelledCollection]]: + for dataset_name in UCI_MULTICLASS_DATASETS: + dataset = fetch_UCIMulticlassLabelledCollection(dataset_name) + yield dataset_name, split(dataset) + + +def gen_CAP(h, acc_fn)->[str, ClassifierAccuracyPrediction]: + yield 'SebCAP', SebastianiCAP(h, acc_fn, ACC) + yield 'SebCAPweight', SebastianiCAP(h, acc_fn, ACC, alpha=0) + yield 'PabCAP', PabloCAP(h, acc_fn, ACC) + yield 'PabCAP-SLD-median', PabloCAP(h, acc_fn, EMQ, aggr='median') + +def gen_CAP_cont_table(h)->[str,CAPContingencyTable]: + acc_fn = None + # yield 'Naive', NaiveCAP(h, acc_fn) + yield 'CT-PPS-EMQ', ContTableTransferCAP(h, acc_fn, EMQ(LogisticRegression())) + #yield 'CT-PPSh-ACC', ContTableWithHTransferCAP(h, acc_fn, ACC) + yield 'Equations-ACCh', NsquaredEquationsCAP(h, acc_fn, ACC, reuse_h=True) + # yield 'Equations-ACC', NsquaredEquationsCAP(h, acc_fn, ACC) + yield 'Equations-SLD', NsquaredEquationsCAP(h, acc_fn, EMQ) + +def gen_acc_measure(): + yield 'vanilla_accuracy', vanilla_acc_fn + yield 'macro-F1', macrof1 + + +def true_acc(h:BaseEstimator, acc_fn: callable, U: LabelledCollection): + y_pred = h.predict(U.X) + y_true = U.y + conf_table = confusion_matrix(y_true, y_pred=y_pred, labels=U.classes_) + return acc_fn(conf_table) + + +def vanilla_acc_fn(cont_table): + return np.diag(cont_table).sum() / cont_table.sum() + + +def _f1_bin(tp, fp, fn): + if tp + fp + fn == 0: + return 1 + else: + return (2 * tp) / (2 * tp + fp + fn) + + +def macrof1(cont_table): + n = cont_table.shape[0] + + if n==2: + tp = cont_table[1,1] + fp = cont_table[0,1] + fn = cont_table[1,0] + return _f1_bin(tp, fp, fn) + + f1_per_class = [] + for i in range(n): + tp = cont_table[i,i] + fp = cont_table[:,i].sum() - tp + fn = cont_table[i,:].sum() - tp + f1_per_class.append(_f1_bin(tp, fp, fn)) + return np.mean(f1_per_class) + + +def microf1(cont_table): + n = cont_table.shape[0] + + if n == 2: + tp = cont_table[1, 1] + fp = cont_table[0, 1] + fn = cont_table[1, 0] + return _f1_bin(tp, fp, fn) + + tp, fp, fn = 0, 0, 0 + for i in range(n): + tp += cont_table[i, i] + fp += cont_table[:, i] - tp + fn += cont_table[i, :] - tp + return _f1_bin(tp, fp, fn) + + +def cap_errors(true_acc, estim_acc): + true_acc = np.asarray(true_acc) + estim_acc = np.asarray(estim_acc) + #return (true_acc - estim_acc)**2 + return np.abs(true_acc - estim_acc) \ No newline at end of file diff --git a/ClassifierAccuracy/gen_tables.py b/ClassifierAccuracy/gen_tables.py new file mode 100644 index 0000000..e5bd279 --- /dev/null +++ b/ClassifierAccuracy/gen_tables.py @@ -0,0 +1,3 @@ +from utils import gen_tables + +gen_tables() \ No newline at end of file diff --git a/ClassifierAccuracy/main.py b/ClassifierAccuracy/main.py index 1af44a2..16d2864 100644 --- a/ClassifierAccuracy/main.py +++ b/ClassifierAccuracy/main.py @@ -1,75 +1,66 @@ from collections import defaultdict - -from sklearn.base import BaseEstimator -from sklearn.linear_model import LogisticRegression -import numpy as np -from sklearn.metrics import confusion_matrix - -from method.aggregative import PACC, EMQ +from time import time from utils import * - -import quapy.data.datasets -import quapy as qp from models_multiclass import * -from quapy.data import LabelledCollection from quapy.protocol import UPP -from quapy.data.datasets import fetch_UCIMulticlassLabelledCollection, UCI_MULTICLASS_DATASETS +from commons import * -def split(data: LabelledCollection): - train_val, test = data.split_stratified(train_prop=0.66) - train, val = train_val.split_stratified(train_prop=0.5) - return train, val, test +qp.environ['SAMPLE_SIZE'] = 250 +NUM_TEST = 100 + +for cls_name, h in gen_classifiers(): + print(cls_name) + + acc_trues = defaultdict(lambda : []) # acc_name : list of results + acc_predicted = defaultdict(lambda : defaultdict(lambda : [])) # acc_name : method_name : list of results + + for dataset_name, (L, V, U) in gen_datasets(): + print(dataset_name) + + h.fit(*L.Xy) + + # test generation protocol + test_prot = UPP(U, repeats=NUM_TEST, return_type='labelled_collection', random_state=0) + + # compute some stats of the dataset + get_dataset_stats(f'dataset_stats/{dataset_name}.json', test_prot, L, V) + + # precompute the actual accuracy values + dataset_true_accs = {} + for acc_name, acc_fn in gen_acc_measure(): + dataset_true_accs[acc_name] = [true_acc(h, acc_fn, Ui) for Ui in test_prot()] + acc_trues[acc_name].extend(dataset_true_accs[acc_name]) -def gen_datasets()-> [str,[LabelledCollection,LabelledCollection,LabelledCollection]]: - for dataset_name in UCI_MULTICLASS_DATASETS: - dataset = fetch_UCIMulticlassLabelledCollection(dataset_name) - yield dataset_name, split(dataset) + for method_name, method in gen_CAP(h, vanilla_acc_fn): + print('PARCHEADO con vanilla accuracy') + # training + tinit = time() + method.fit(V) + t_train = time()-tinit + # predictions + dataset_method_accs, t_test_ave = get_method_predictions(method, test_prot, gen_acc_measure) -def gen_CAP(h, acc_fn)->[str,ClassifierAccuracyPrediction]: - yield 'Naive', NaiveCAP(h, acc_fn) - yield 'CT-PPS-PACC', ContTableTransferCAP(h, acc_fn, PACC(LogisticRegression())) - yield 'CT-PPSh-PACC', ContTableWithHTransferCAP(h, acc_fn, PACC) + # accumulate results across datasets + for acc_name, _ in gen_acc_measure(): + acc_predicted[acc_name][method_name].extend(dataset_method_accs[acc_name]) + print(f'\t{method_name} took train={t_train:.2f}s test(ave)={t_test_ave:.2f}s') + result = { + 't_train': t_train, + 't_test_ave': t_test_ave, + 'true_acc': dataset_true_accs[acc_name], + 'estim_acc': dataset_method_accs[acc_name] + } + save_json_file(f"results/{cls_name}/{acc_name}/{dataset_name}/{method_name}.json", result) -def true_acc(h:BaseEstimator, acc_fn: callable, U: LabelledCollection): - y_pred = h.predict(U.X) - y_true = U.y - conf_table = confusion_matrix(y_true, y_pred=y_pred, labels=U.classes_) - return acc_fn(conf_table) + for acc_name, _ in gen_acc_measure(): + acc_predicted_ = list(acc_predicted[acc_name].items()) + plot_diagonal(cls_name, acc_name, acc_trues[acc_name], acc_predicted_) - -def acc_fn(cont_table): - return np.diag(cont_table).sum() / cont_table.sum() - - -qp.environ['SAMPLE_SIZE'] = 100 - -h = LogisticRegression() - -acc_trues = [] -acc_predicted = defaultdict(lambda :[]) - -for dataset_name, (L, V, U) in gen_datasets(): - print(dataset_name) - - h.fit(*L.Xy) - - test_prot = UPP(U, repeats=100, return_type='labelled_collection') - - acc_trues.extend(true_acc(h, acc_fn, Ui) for Ui in test_prot()) - - for method_name, method in gen_CAP(h, acc_fn): - method.fit(V) - - for Ui in test_prot(): - acc_hat = method.predict(Ui.X) - acc_predicted[method_name].append(acc_hat) - -acc_predicted = list(acc_predicted.items()) -plot_diagonal('./plots/diagonal.png', acc_trues, acc_predicted) +gen_tables() diff --git a/ClassifierAccuracy/main2.py b/ClassifierAccuracy/main2.py new file mode 100644 index 0000000..a3b8bbf --- /dev/null +++ b/ClassifierAccuracy/main2.py @@ -0,0 +1,97 @@ +import itertools +import os.path +from collections import defaultdict +from time import time +from utils import * +from models_multiclass import * +from quapy.protocol import UPP +from commons import * + + +def fit_method(method, V): + tinit = time() + method.fit(V) + t_train = time() - tinit + return method, t_train + + +def predictionsCAP(method, test_prot): + tinit = time() + estim_accs = [method.predict(Ui.X) for Ui in test_prot()] + t_test_ave = (time() - tinit) / test_prot.total() + return estim_accs, t_test_ave + + +def predictionsCAPcont_table(method, test_prot, gen_acc_measure): + estim_accs_dict = {} + tinit = time() + estim_tables = [method.predict_ct(Ui.X) for Ui in test_prot()] + for acc_name, acc_fn in gen_acc_measure(): + estim_accs_dict[acc_name] = [acc_fn(cont_table) for cont_table in estim_tables] + t_test_ave = (time() - tinit) / test_prot.total() + return estim_accs_dict, t_test_ave + + +def any_missing(cls_name, dataset_name, method_name): + for acc_name, _ in gen_acc_measure(): + if not os.path.exists(getpath(cls_name, acc_name, dataset_name, method_name)): + return True + return False + + +qp.environ['SAMPLE_SIZE'] = 250 +NUM_TEST = 100 + + +for (cls_name, h), (dataset_name, (L, V, U)) in itertools.product(gen_classifiers(), gen_datasets()): + print(f'training {cls_name} in {dataset_name}') + h.fit(*L.Xy) + + # test generation protocol + test_prot = UPP(U, repeats=NUM_TEST, return_type='labelled_collection', random_state=0) + + # compute some stats of the dataset + get_dataset_stats(f'dataset_stats/{dataset_name}.json', test_prot, L, V) + + # precompute the actual accuracy values + true_accs = {} + for acc_name, acc_fn in gen_acc_measure(): + true_accs[acc_name] = [true_acc(h, acc_fn, Ui) for Ui in test_prot()] + + # instances of ClassifierAccuracyPrediction are bound to the evaluation measure, so they + # must be nested in the acc-for + for acc_name, acc_fn in gen_acc_measure(): + for (method_name, method) in gen_CAP(h, acc_fn): + result_path = getpath(cls_name, acc_name, dataset_name, method_name) + if os.path.exists(result_path): + print(f'\t{method_name}-{acc_name} exists, skipping') + continue + + print(f'\t{method_name}-{acc_name} computing...') + method, t_train = fit_method(method, V) + estim_accs, t_test_ave = predictionsCAP(method, test_prot) + save_json_result(result_path, true_accs[acc_name], estim_accs, t_train, t_test_ave) + + # instances of CAPContingencyTable instead are generic, and the evaluation measure can + # be nested to the predictions to speed up things + for (method_name, method) in gen_CAP_cont_table(h): + if not any_missing(cls_name, dataset_name, method_name): + print(f'\tmethod {method_name} has all results already computed. Skipping.') + continue + + print(f'\tmethod {method_name} computing...') + + method, t_train = fit_method(method, V) + estim_accs_dict, t_test_ave = predictionsCAPcont_table(method, test_prot, gen_acc_measure) + for acc_name in estim_accs_dict.keys(): + result_path = getpath(cls_name, acc_name, dataset_name, method_name) + save_json_result(result_path, true_accs[acc_name], estim_accs_dict[acc_name], t_train, t_test_ave) + + print() + +# generate diagonal plots +for (cls_name, _), (acc_name, _) in itertools.product(gen_classifiers(), gen_acc_measure()): + results = open_results(cls_name, acc_name) + plot_diagonal(cls_name, acc_name, results) + + diff --git a/ClassifierAccuracy/models_multiclass.py b/ClassifierAccuracy/models_multiclass.py index 0d098db..ce67d16 100644 --- a/ClassifierAccuracy/models_multiclass.py +++ b/ClassifierAccuracy/models_multiclass.py @@ -1,21 +1,49 @@ import numpy as np from sklearn.base import BaseEstimator +from sklearn.linear_model import LogisticRegression import quapy as qp from sklearn import clone from sklearn.metrics import confusion_matrix import scipy from scipy.sparse import issparse, csr_matrix + from data import LabelledCollection from abc import ABC, abstractmethod from sklearn.model_selection import cross_val_predict +from quapy.protocol import UPP from quapy.method.base import BaseQuantifier from quapy.method.aggregative import PACC import quapy.functional as F class ClassifierAccuracyPrediction(ABC): + def __init__(self, h: BaseEstimator, acc: callable): + self.h = h + self.acc = acc + + @abstractmethod + def fit(self, val: LabelledCollection): + ... + + def predict(self, X): + """ + Evaluates the accuracy function on the predicted contingency table + + :param X: test data + :return: float + """ + return ... + + def true_acc(self, sample: LabelledCollection): + y_pred = self.h.predict(sample.X) + y_true = sample.y + conf_table = confusion_matrix(y_true, y_pred=y_pred, labels=sample.classes_) + return self.acc(conf_table) + + +class CAPContingencyTable(ClassifierAccuracyPrediction): def __init__(self, h: BaseEstimator, acc: callable): self.h = h @@ -32,7 +60,10 @@ class ClassifierAccuracyPrediction(ABC): :param X: test data :return: float """ - return self.acc(self.predict_ct(X)) + cont_table = self.predict_ct(X) + raw_acc = self.acc(cont_table) + norm_acc = np.clip(raw_acc, 0, 1) + return norm_acc @abstractmethod def predict_ct(self, X): @@ -45,7 +76,7 @@ class ClassifierAccuracyPrediction(ABC): ... -class NaiveCAP(ClassifierAccuracyPrediction): +class NaiveCAP(CAPContingencyTable): """ The Naive CAP is a method that relies on the IID assumption, and thus uses the estimation in the validation data as an estimate for the test data. @@ -71,7 +102,7 @@ class NaiveCAP(ClassifierAccuracyPrediction): return self.cont_table -class ContTableTransferCAP(ClassifierAccuracyPrediction): +class ContTableTransferCAP(CAPContingencyTable): """ """ @@ -97,7 +128,7 @@ class ContTableTransferCAP(ClassifierAccuracyPrediction): return self.cont_table * adjustment[:, np.newaxis] -class ContTableWithHTransferCAP(ClassifierAccuracyPrediction): +class ContTableWithHTransferCAP(CAPContingencyTable): """ """ @@ -123,37 +154,41 @@ class ContTableWithHTransferCAP(ClassifierAccuracyPrediction): return self.cont_table * adjustment[:, np.newaxis] -class NsquaredEquationsCAP(ClassifierAccuracyPrediction): +class NsquaredEquationsCAP(CAPContingencyTable): """ """ - def __int__(self, h: BaseEstimator, acc: callable, q_class): + def __init__(self, h: BaseEstimator, acc: callable, q_class, reuse_h=False): super().__init__(h, acc) - self.q = q_class(classifier=h) + self.reuse_h = reuse_h + if reuse_h: + self.q = q_class(classifier=h) + else: + self.q = q_class(classifier=LogisticRegression()) def fit(self, val: LabelledCollection): y_hat = self.h.predict(val.X) y_true = val.y self.cont_table = confusion_matrix(y_true, y_pred=y_hat, labels=val.classes_) - self.q.fit(val, fit_classifier=False, val_split=val) + if self.reuse_h: + self.q.fit(val, fit_classifier=False, val_split=val) + else: + self.q.fit(val) + self.A, self.partial_b = self._construct_equations() return self - def predict_ct(self, test): - """ - :param test: test collection (ignored) - :return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix` - """ - + def _construct_equations(self): # we need a n x n matrix of unknowns - n = self.cont_table.shape[1] - I = np.arange(n*n).reshape(n,n) - h_label_preds = self.h.predict(test) - cc_prev_estim = F.prevalence_from_labels(h_label_preds, self.h.classes_) - q_prev_estim = self.q.quantify(test) - A = np.zeros_like(self.cont_table) - b = np.zeros(n) + # I is the matrix of indexes of unknowns. For example, if we need the counts of + # all instances belonging to class i that have been classified as belonging to 0, 1, ..., n: + # the indexes of the corresponding unknowns are given by I[i,:] + I = np.arange(n * n).reshape(n, n) + + # system of equations: Ax=b, A.shape=(n*n, n*n,), b.shape=(n*n,) + A = np.zeros(shape=(n * n, n * n)) + b = np.zeros(shape=(n * n)) # first equation: the sum of all unknowns is 1 eq_no = 0 @@ -161,133 +196,141 @@ class NsquaredEquationsCAP(ClassifierAccuracyPrediction): b[eq_no] = 1 eq_no += 1 - # n-1 equations: the sum of class-cond predictions must equal the sum of predictions - for i in range(n-1): - A[eq_no + i, I[:, i+1]] = 1 - b[eq_no + i] = cc_prev_estim[i+1] - eq_no += (n-1) + # (n-1)*(n-1) equations: the class cond rations should be the same in training and in test due to the + # PPS assumptions. Example in three classes, a ratio: a/(a+b+c) [test] = ar [a ratio in training] + # a / (a + b + c) = ar + # a = (a + b + c) * ar + # a = a ar + b ar + c ar + # a - a ar - b ar - c ar = 0 + # a (1-ar) + b (-ar) + c (-ar) = 0 + class_cond_ratios_tr = self.cont_table / self.cont_table.sum(axis=1, keepdims=True) + for i in range(1, n): + for j in range(1, n): + ratio_ij = class_cond_ratios_tr[i, j] + A[eq_no, I[i, :]] = -ratio_ij + A[eq_no, I[i, j]] = 1 - ratio_ij + b[eq_no] = 0 + eq_no += 1 + + # n-1 equations: the sum of class-cond counts must equal the C&C prevalence prediction + for i in range(1, n): + A[eq_no, I[:, i]] = 1 + #b[eq_no] = cc_prev_estim[i] + eq_no += 1 # n-1 equations: the sum of true true class-conditional positives must equal the class prev label in test - for i in range(n-1): - A[eq_no + i, I[i+1, :]] = 1 - b[eq_no + i] = q_prev_estim[i+1] + for i in range(1, n): + A[eq_no, I[i, :]] = 1 + #b[eq_no] = q_prev_estim[i] + eq_no += 1 - # (n-1)*(n-1) equations: the class cond rations should be the same in training and in test due to the - # PPS assumptions - + return A, b - - - - - -class UpperBound(ClassifierAccuracyPrediction): - def __init__(self, classifier, y_test): - self.classifier = classifier - self.y_test = y_test - - def fit(self, train: LabelledCollection): - self.classifier.fit(*train.Xy) - self.classes = train.classes_ - return self - - def show_true_labels(self, y_test): - self.y_test = y_test - - def predict(self, test): - predictions = self.classifier.predict(test) - return confusion_matrix(self.y_test, predictions, labels=self.classes) - - -def get_counters(y_true, y_pred): - counters = np.full(shape=y_true.shape, fill_value=-1) - counters[np.logical_and(y_true == 1, y_pred == 1)] = 0 - counters[np.logical_and(y_true == 1, y_pred == 0)] = 1 - counters[np.logical_and(y_true == 0, y_pred == 1)] = 2 - counters[np.logical_and(y_true == 0, y_pred == 0)] = 3 - class_map = { - 0:'tp', - 1:'fn', - 2:'fp', - 3:'tn' - } - return counters, class_map - - -def safehstack(matrix, posteriors): - if issparse(matrix): - instances = csr_matrix(scipy.sparse.hstack([matrix, posteriors])) - else: - instances = np.hstack([matrix, posteriors]) - return instances - - -class QuantificationCMPredictor(ClassifierAccuracyPrediction): - """ - """ - def __init__(self, classifier, quantifier, strategy='kfcv', **kwargs): - assert strategy in ['kfcv'], 'unknown strategy' - if strategy=='kfcv': - assert 'k' in kwargs, 'strategy "kfcv" requires "k" to be passed as an argument' - self.classifier = clone(classifier) - self.quantifier = quantifier - self.strategy = strategy - self.kwargs = kwargs - - def sout(self, msg): - if 'verbose' in self.kwargs: - print(msg) - - def fit(self, train: LabelledCollection): - X, y = train.Xy - if self.strategy == 'kfcv': - k=self.kwargs['k'] - n_jobs = self.kwargs['n_jobs'] if 'n_jobs' in self.kwargs else 1 - self.sout(f'{self.__class__.__name__}: ' - f'running cross_val_predict with k={k} n_jobs={n_jobs}') - predictions = cross_val_predict(self.classifier, X, y, cv=k, n_jobs=n_jobs, method='predict') - posteriors = cross_val_predict(self.classifier, X, y, cv=k, n_jobs=n_jobs, method='predict_proba') - self.classifier.fit(X, y) - instances = safehstack(train.instances, posteriors) - counters, class_map = get_counters(train.labels, predictions) - q_data = LabelledCollection(instances=instances, labels=counters, classes_=[0,1,2,3]) - print('counters prevalence', q_data.counts()) - self.quantifier.fit(q_data) - return self - - def predict(self, test): + def predict_ct(self, test): """ - :param test: test collection (ignored) :return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix` """ - posteriors = self.classifier.predict_proba(test) - instances = safehstack(test, posteriors) - counters = self.quantifier.quantify(instances) - tp, fn, fp, tn = counters - conf_matrix = np.asarray([[tn, fp], [fn, tp]]) - return conf_matrix - def quantify(self, test): - posteriors = self.classifier.predict_proba(test) - instances = safehstack(test, posteriors) - counters = self.quantifier.quantify(instances) - tp, fn, fp, tn = counters - den_tpr = (tp+fn) - if den_tpr>0: - tpr = tp/den_tpr + n = self.cont_table.shape[1] + + h_label_preds = self.h.predict(test) + cc_prev_estim = F.prevalence_from_labels(h_label_preds, self.h.classes_) + q_prev_estim = self.q.quantify(test) + + A = self.A + b = self.partial_b + + # b is partially filled; we finish the vector by plugin in the classify and count + # prevalence estimates (n-1 values only), and the quantification estimates (n-1 values only) + + b[-2*(n-1):-(n-1)] = cc_prev_estim[1:] + b[-(n-1):] = q_prev_estim[1:] + + x = np.linalg.solve(A, b) + + cont_table_test = x.reshape(n,n) + return cont_table_test + + +class SebastianiCAP(ClassifierAccuracyPrediction): + + def __init__(self, h, acc_fn, q_class, n_val_samples=500, alpha=0.3): + self.h = h + self.acc = acc_fn + self.q = q_class(h) + self.n_val_samples = n_val_samples + self.alpha = alpha + self.sample_size = qp.environ['SAMPLE_SIZE'] + + def fit(self, val: LabelledCollection): + v2, v1 = val.split_stratified(train_prop=0.5) + self.q.fit(v1, fit_classifier=False, val_split=v1) + + # precompute classifier predictions on samples + gen_samples = UPP(v2, repeats=self.n_val_samples, sample_size=self.sample_size, return_type='labelled_collection') + self.sigma_acc = [self.true_acc(sigma_i) for sigma_i in gen_samples()] + + # precompute prevalence predictions on samples + gen_samples.on_preclassified_instances(self.q.classify(v2.X), in_place=True) + self.sigma_pred_prevs = [self.q.aggregate(sigma_i.X) for sigma_i in gen_samples()] + + def predict(self, X): + test_pred_prev = self.q.quantify(X) + + if self.alpha > 0: + # select samples from V2 with predicted prevalence close to the predicted prevalence for U + selected_accuracies = [] + for pred_prev_i, acc_i in zip(self.sigma_pred_prevs, self.sigma_acc): + max_discrepancy = np.max(np.abs(pred_prev_i - test_pred_prev)) + if max_discrepancy < self.alpha: + selected_accuracies.append(acc_i) + + return np.median(selected_accuracies) + else: + # mean average, weights samples from V2 according to the closeness of predicted prevalence in U + accum_weight = 0 + moving_mean = 0 + epsilon = 10E-4 + for pred_prev_i, acc_i in zip(self.sigma_pred_prevs, self.sigma_acc): + max_discrepancy = np.max(np.abs(pred_prev_i - test_pred_prev)) + weight = -np.log(max_discrepancy+epsilon) + accum_weight += weight + moving_mean += (weight*acc_i) + + return moving_mean/accum_weight + + +class PabloCAP(ClassifierAccuracyPrediction): + + def __init__(self, h, acc_fn, q_class, n_val_samples=50, aggr='mean'): + self.h = h + self.acc = acc_fn + self.q = q_class(h) + self.n_val_samples = n_val_samples + self.aggr = aggr + assert aggr in ['mean', 'median'], 'unknown aggregation function, use mean or median' + + def fit(self, val: LabelledCollection): + self.q.fit(val) + label_predictions = self.h.predict(val.X) + self.pre_classified = LabelledCollection(instances=label_predictions, labels=val.labels) + + def predict(self, X): + pred_prev = F.smooth(self.q.quantify(X)) + X_size = X.shape[0] + acc_estim = [] + for _ in range(self.n_val_samples): + sigma_i = self.pre_classified.sampling(X_size, *pred_prev[:-1]) + y_pred, y_true = sigma_i.Xy + conf_table = confusion_matrix(y_true, y_pred=y_pred, labels=sigma_i.classes_) + acc_i = self.acc(conf_table) + acc_estim.append(acc_i) + if self.aggr == 'mean': + return np.mean(acc_estim) + elif self.aggr == 'median': + return np.median(acc_estim) else: - tpr = 1 + raise ValueError('unknown aggregation function') - den_fpr = (fp+tn) - if den_fpr>0: - fpr = fp / den_fpr - else: - fpr = 0 - pcc = posteriors.sum(axis=0)[1] - pacc = (pcc-fpr)/(tpr-fpr) - pacc = np.clip(pacc, 0, 1) - - q = tp+fn - return q \ No newline at end of file diff --git a/ClassifierAccuracy/tabular.py b/ClassifierAccuracy/tabular.py new file mode 100644 index 0000000..8cca81f --- /dev/null +++ b/ClassifierAccuracy/tabular.py @@ -0,0 +1,349 @@ +import numpy as np +import itertools +from scipy.stats import ttest_ind_from_stats, wilcoxon + + +class Table: + VALID_TESTS = [None, "wilcoxon", "ttest"] + + def __init__(self, benchmarks, methods, lower_is_better=True, ttest='ttest', prec_mean=3, + clean_zero=False, show_std=False, prec_std=3, average=True, missing=None, missing_str='--', color=True, + maxtone=50): + assert ttest in self.VALID_TESTS, f'unknown test, valid are {self.VALID_TESTS}' + + self.benchmarks = np.asarray(benchmarks) + self.benchmark_index = {row: i for i, row in enumerate(benchmarks)} + + self.methods = np.asarray(methods) + self.method_index = {col: j for j, col in enumerate(methods)} + + self.map = {} + # keyed (#rows,#cols)-ndarrays holding computations from self.map['values'] + self._addmap('values', dtype=object) + self.lower_is_better = lower_is_better + self.ttest = ttest + self.prec_mean = prec_mean + self.clean_zero = clean_zero + self.show_std = show_std + self.prec_std = prec_std + self.add_average = average + self.missing = missing + self.missing_str = missing_str + self.color = color + self.maxtone = maxtone + + self.touch() + + @property + def nbenchmarks(self): + return len(self.benchmarks) + + @property + def nmethods(self): + return len(self.methods) + + def touch(self): + self._modif = True + + def update(self): + if self._modif: + self.compute() + + def _getfilled(self): + return np.argwhere(self.map['fill']) + + @property + def values(self): + return self.map['values'] + + def _indexes(self): + return itertools.product(range(self.nbenchmarks), range(self.nmethods)) + + def _addmap(self, map, dtype, func=None): + self.map[map] = np.empty((self.nbenchmarks, self.nmethods), dtype=dtype) + if func is None: + return + m = self.map[map] + f = func + indexes = self._indexes() if map == 'fill' else self._getfilled() + for i, j in indexes: + m[i, j] = f(self.values[i, j]) + + def _addrank(self): + for i in range(self.nbenchmarks): + filled_cols_idx = np.argwhere(self.map['fill'][i]).flatten() + col_means = [self.map['mean'][i, j] for j in filled_cols_idx] + ranked_cols_idx = filled_cols_idx[np.argsort(col_means)] + if not self.lower_is_better: + ranked_cols_idx = ranked_cols_idx[::-1] + self.map['rank'][i, ranked_cols_idx] = np.arange(1, len(filled_cols_idx) + 1) + + def _addcolor(self): + for i in range(self.nbenchmarks): + filled_cols_idx = np.argwhere(self.map['fill'][i]).flatten() + if filled_cols_idx.size == 0: + continue + col_means = [self.map['mean'][i, j] for j in filled_cols_idx] + # col_means = [self.map['rank'][i, j] for j in filled_cols_idx] + + minval = min(col_means) + maxval = max(col_means) + + for col_idx in filled_cols_idx: + val = self.map['mean'][i, col_idx] + norm = (maxval - minval) + if norm > 0: + normval = (val - minval) / norm + else: + normval = 0.5 + + if self.lower_is_better: + normval = 1 - normval + + normval = np.clip(normval, 0, 1) + + self.map['color'][i, col_idx] = color_red2green_01(normval, self.maxtone) + + def _run_ttest(self, row, col1, col2): + mean1 = self.map['mean'][row, col1] + std1 = self.map['std'][row, col1] + nobs1 = self.map['nobs'][row, col1] + mean2 = self.map['mean'][row, col2] + std2 = self.map['std'][row, col2] + nobs2 = self.map['nobs'][row, col2] + _, p_val = ttest_ind_from_stats(mean1, std1, nobs1, mean2, std2, nobs2) + return p_val + + def _run_wilcoxon(self, row, col1, col2): + values1 = self.map['values'][row, col1] + values2 = self.map['values'][row, col2] + try: + _, p_val = wilcoxon(values1, values2) + except ValueError: + p_val = 0 + return p_val + + def _add_statistical_test(self): + if self.ttest is None: + return + self.some_similar = [False] * self.nmethods + for i in range(self.nbenchmarks): + filled_cols_idx = np.argwhere(self.map['fill'][i]).flatten() + if len(filled_cols_idx) <= 1: + continue + col_means = [self.map['mean'][i, j] for j in filled_cols_idx] + best_pos = filled_cols_idx[np.argmin(col_means)] + + for j in filled_cols_idx: + if j == best_pos: + continue + if self.ttest == 'ttest': + p_val = self._run_ttest(i, best_pos, j) + else: + p_val = self._run_wilcoxon(i, best_pos, j) + + pval_outcome = pval_interpretation(p_val) + self.map['ttest'][i, j] = pval_outcome + if pval_outcome != 'Diff': + self.some_similar[j] = True + + def compute(self): + self._addmap('fill', dtype=bool, func=lambda x: x is not None) + self._addmap('mean', dtype=float, func=np.mean) + self._addmap('std', dtype=float, func=np.std) + self._addmap('nobs', dtype=float, func=len) + self._addmap('rank', dtype=int, func=None) + self._addmap('color', dtype=object, func=None) + self._addmap('ttest', dtype=object, func=None) + self._addmap('latex', dtype=object, func=None) + self._addrank() + self._addcolor() + self._add_statistical_test() + if self.add_average: + self._addave() + self._modif = False + + def _is_column_full(self, col): + return all(self.map['fill'][:, self.method_index[col]]) + + def _addave(self): + ave = Table(['ave'], self.methods, + lower_is_better=self.lower_is_better, + ttest=self.ttest, + average=False, + missing=self.missing, + missing_str=self.missing_str, + prec_mean=self.prec_mean, + prec_std=self.prec_std, + clean_zero=self.clean_zero, + show_std=self.show_std, + color=self.color, + maxtone=self.maxtone) + for col in self.methods: + values = None + if self._is_column_full(col): + if self.ttest == 'ttest': + # values = np.asarray(self.map['mean'][:, self.method_index[col]]) + values = np.concatenate(self.values[:, self.method_index[col]]) + else: # wilcoxon + # values = np.asarray(self.map['mean'][:, self.method_index[col]]) + values = np.concatenate(self.values[:, self.method_index[col]]) + ave.add('ave', col, values) + self.average = ave + + def add(self, benchmark, method, values): + if values is not None: + values = np.asarray(values) + if values.ndim == 0: + values = values.flatten() + rid, cid = self._coordinates(benchmark, method) + self.map['values'][rid, cid] = values + self.touch() + + def get(self, benchmark, method, attr='mean'): + self.update() + assert attr in self.map, f'unknwon attribute {attr}' + rid, cid = self._coordinates(benchmark, method) + if self.map['fill'][rid, cid]: + v = self.map[attr][rid, cid] + if v is None or (isinstance(v, float) and np.isnan(v)): + return self.missing + return v + else: + return self.missing + + def _coordinates(self, benchmark, method): + assert benchmark in self.benchmark_index, f'benchmark {benchmark} out of range' + assert method in self.method_index, f'method {method} out of range' + rid = self.benchmark_index[benchmark] + cid = self.method_index[method] + return rid, cid + + def get_average(self, method, attr='mean'): + self.update() + if self.add_average: + return self.average.get('ave', method, attr=attr) + return None + + def get_color(self, benchmark, method): + color = self.get(benchmark, method, attr='color') + if color is None: + return '' + return color + + def latex(self, benchmark, method): + self.update() + i, j = self._coordinates(benchmark, method) + if self.map['fill'][i, j] == False: + return self.missing_str + + mean = self.map['mean'][i, j] + l = f" {mean:.{self.prec_mean}f}" + if self.clean_zero: + l = l.replace(' 0.', '.') + + isbest = self.map['rank'][i, j] == 1 + if isbest: + l = "\\textbf{" + l.strip() + "}" + + stat = '' if self.ttest is None else '^{\phantom{\ddag}}' + if self.ttest is not None and self.some_similar[j]: + test_label = self.map['ttest'][i, j] + if test_label == 'Sim': + stat = '^{\dag}' + elif test_label == 'Same': + stat = '^{\ddag}' + elif isbest or test_label == 'Diff': + stat = '^{\phantom{\ddag}}' + + std = '' + if self.show_std: + std = self.map['std'][i, j] + std = f" {std:.{self.prec_std}f}" + if self.clean_zero: + std = std.replace(' 0.', '.') + std = f"\pm {std:{self.prec_std}}" + + if stat != '' or std != '': + l = f'{l}${stat}{std}$' + + if self.color: + l += ' ' + self.map['color'][i, j] + + return l + + def latexTabular(self, benchmark_replace={}, method_replace={}, aslines=False, endl='\\\\\hline'): + lines = [] + l = '\multicolumn{1}{c|}{} & ' + l += ' & '.join([method_replace.get(col, col) for col in self.methods]) + l += ' \\\\\hline' + lines.append(l) + + for row in self.benchmarks: + rowname = benchmark_replace.get(row, row) + l = rowname + ' & ' + l += self.latexRow(row, endl=endl) + lines.append(l) + + if self.add_average: + # l += '\hline\n' + l = '\hline \n \\textit{Average} & ' + l += self.latexAverage(endl=endl) + lines.append(l) + if not aslines: + lines = '\n'.join(lines) + return lines + + def latexRow(self, benchmark, endl='\\\\\hline\n'): + s = [self.latex(benchmark, col) for col in self.methods] + s = ' & '.join(s) + s += ' ' + endl + return s + + def latexAverage(self, endl='\\\\\hline\n'): + if self.add_average: + return self.average.latexRow('ave', endl=endl) + + def getRankTable(self, prec_mean=0): + t = Table(benchmarks=self.benchmarks, methods=self.methods, prec_mean=prec_mean, average=True, + maxtone=self.maxtone, ttest=None) + for rid, cid in self._getfilled(): + row = self.benchmarks[rid] + col = self.methods[cid] + t.add(row, col, self.get(row, col, 'rank')) + t.compute() + return t + + def dropMethods(self, methods): + drop_index = [self.method_index[m] for m in methods] + new_methods = np.delete(self.methods, drop_index) + new_index = {col: j for j, col in enumerate(new_methods)} + + self.map['values'] = self.values[:, np.asarray([self.method_index[m] for m in new_methods], dtype=int)] + self.methods = new_methods + self.method_index = new_index + self.touch() + + +def pval_interpretation(p_val): + if 0.005 >= p_val: + return 'Diff' + elif 0.05 >= p_val > 0.005: + return 'Sim' + elif p_val > 0.05: + return 'Same' + + +def color_red2green_01(val, maxtone=50): + if np.isnan(val): return None + assert 0 <= val <= 1, f'val {val} out of range [0,1]' + + # rescale to [-1,1] + val = val * 2 - 1 + if val < 0: + color = 'red' + tone = maxtone * (-val) + else: + color = 'green' + tone = maxtone * val + return '\cellcolor{' + color + f'!{int(tone)}' + '}' diff --git a/ClassifierAccuracy/utils.py b/ClassifierAccuracy/utils.py index 944dc2e..257f69f 100644 --- a/ClassifierAccuracy/utils.py +++ b/ClassifierAccuracy/utils.py @@ -1,12 +1,27 @@ +import itertools +import os +from collections import defaultdict + import matplotlib.pyplot as plt from pathlib import Path from os import makedirs +from os.path import join import numpy as np +import json +from scipy.stats import pearsonr +from sklearn.linear_model import LogisticRegression +from time import time +import quapy as qp +from glob import glob + +from commons import cap_errors +from models_multiclass import ClassifierAccuracyPrediction, CAPContingencyTable -def plot_diagonal(outpath, xs, predictions:list): +def plot_diagonal(cls_name, measure_name, results, base_dir='plots'): - makedirs(Path(outpath).parent, exist_ok=True) + makedirs(base_dir, exist_ok=True) + makedirs(join(base_dir, measure_name), exist_ok=True) # Create scatter plot plt.figure(figsize=(10, 10)) @@ -14,16 +29,136 @@ def plot_diagonal(outpath, xs, predictions:list): plt.ylim(0, 1) plt.plot([0, 1], [0, 1], color='black', linestyle='--') - for method_name, ys in predictions: - pear_cor = np.corrcoef(xs, ys)[0, 1] - plt.scatter(xs, ys, label=f'{method_name} {pear_cor:.2f}') + for method_name in results.keys(): + print(method_name, measure_name) + xs = results[method_name]['true_acc'] + ys = results[method_name]['estim_acc'] + print('max xs', np.max(xs)) + print('max ys', np.max(ys)) + err = cap_errors(xs, ys).mean() + #pear_cor, _ = 0, 0 #pearsonr(xs, ys) + plt.scatter(xs, ys, label=f'{method_name} {err:.3f}', alpha=0.6) plt.legend() # Add labels and title - plt.xlabel('True Accuracy') - plt.ylabel('Estimated Accuracy') + plt.xlabel(f'True {measure_name}') + plt.ylabel(f'Estimated {measure_name}') # Display the plot # plt.show() - plt.savefig(outpath) + plt.savefig(join(base_dir, measure_name, 'diagonal_'+cls_name+'.png')) + + +def getpath(cls_name, acc_name, dataset_name, method_name): + return f"results/{cls_name}/{acc_name}/{dataset_name}/{method_name}.json" + + +def open_results(cls_name, acc_name, dataset_name='*', method_name='*'): + path = getpath(cls_name, acc_name, dataset_name, method_name) + results = defaultdict(lambda : {'true_acc':[], 'estim_acc':[]}) + for file in glob(path): + #print(file) + method = Path(file).name.replace('.json','') + result = json.load(open(file, 'r')) + results[method]['true_acc'].extend(result['true_acc']) + results[method]['estim_acc'].extend(result['estim_acc']) + return results + + +def save_json_file(path, data): + os.makedirs(Path(path).parent, exist_ok=True) + with open(path, 'w') as f: + json.dump(data, f) + + +def save_json_result(path, true_accs, estim_accs, t_train, t_test): + result = { + 't_train': t_train, + 't_test_ave': t_test, + 'true_acc': true_accs, + 'estim_acc': estim_accs + } + save_json_file(path, result) + + +def get_dataset_stats(path, test_prot, L, V): + test_prevs = [Ui.prevalence() for Ui in test_prot()] + shifts = [qp.error.ae(L.prevalence(), Ui_prev) for Ui_prev in test_prevs] + info = { + 'n_classes': L.n_classes, + 'n_train': len(L), + 'n_val': len(V), + 'train_prev': L.prevalence().tolist(), + 'val_prev': V.prevalence().tolist(), + 'test_prevs': [x.tolist() for x in test_prevs], + 'shifts': [x.tolist() for x in shifts], + 'sample_size': test_prot.sample_size, + 'num_samples': test_prot.total() + } + save_json_file(path, info) + + +def gen_tables(): + from commons import gen_datasets, gen_classifiers, gen_acc_measure, gen_CAP, gen_CAP_cont_table + from tabular import Table + + mock_h = LogisticRegression(), + methods = [method for method, _ in gen_CAP(mock_h, None)] + [method for method, _ in gen_CAP_cont_table(mock_h)] + datasets = [dataset for dataset, _ in gen_datasets()] + classifiers = [classifier for classifier, _ in gen_classifiers()] + measures = [measure for measure, _ in gen_acc_measure()] + + os.makedirs('tables', exist_ok=True) + + tex_doc = """ + \\documentclass[10pt,a4paper]{article} + \\usepackage[utf8]{inputenc} + \\usepackage{amsmath} + \\usepackage{amsfonts} + \\usepackage{amssymb} + \\usepackage{graphicx} + \\usepackage{tabularx} + \\usepackage{color} + \\usepackage{colortbl} + \\usepackage{xcolor} + \\begin{document} + """ + + classifier = classifiers[0] + metric = "vanilla_accuracy" + + table = Table(datasets, methods) + for method, dataset in itertools.product(methods, datasets): + path = f'results/{classifier}/{metric}/{dataset}/{method}.json' + results = json.load(open(path, 'r')) + true_acc = results['true_acc'] + estim_acc = np.asarray(results['estim_acc']) + if any(np.isnan(estim_acc)) or any(estim_acc>1) or any(estim_acc<0): + print(f'error in {method=} {dataset=}') + continue + errors = cap_errors(true_acc, estim_acc) + table.add(dataset, method, errors) + + tex = table.latexTabular() + table_name = f'{classifier}_{metric}.tex' + with open(f'./tables/{table_name}', 'wt') as foo: + foo.write('\\resizebox{\\textwidth}{!}{%\n') + foo.write('\\begin{tabular}{c|'+('c'*len(methods))+'}\n') + foo.write(tex) + foo.write('\\end{tabular}%\n') + foo.write('}\n') + + tex_doc += "\input{" + table_name + "}\n" + + tex_doc += """ + \\end{document} + """ + with open(f'./tables/main.tex', 'wt') as foo: + foo.write(tex_doc) + + print("[Tables Done] runing latex") + os.chdir('./tables/') + os.system('pdflatex main.tex') + os.system('rm main.aux main.bbl main.blg main.log main.out main.dvi') + diff --git a/quapy/data/base.py b/quapy/data/base.py index 9cc6441..ce05b95 100644 --- a/quapy/data/base.py +++ b/quapy/data/base.py @@ -236,11 +236,24 @@ class LabelledCollection: :return: two instances of :class:`LabelledCollection`, the first one with `train_prop` elements, and the second one with `1-train_prop` elements """ + instances = self.instances + labels = self.labels + remainder = None + for idx in np.argwhere(self.counts()==1): + class_with_1 = self.classes_[idx.item()] + if remainder is None: + remainder = LabelledCollection(instances[labels==class_with_1], [class_with_1], classes=self.classes_) + else: + remainder += LabelledCollection(instances[labels==class_with_1], [class_with_1], classes=self.classes_) + instances = instances[labels!=class_with_1] + labels = labels[labels!=class_with_1] tr_docs, te_docs, tr_labels, te_labels = train_test_split( - self.instances, self.labels, train_size=train_prop, stratify=self.labels, random_state=random_state + instances, labels, train_size=train_prop, stratify=labels, random_state=random_state ) training = LabelledCollection(tr_docs, tr_labels, classes=self.classes_) test = LabelledCollection(te_docs, te_labels, classes=self.classes_) + if remainder is not None: + training += remainder return training, test def split_random(self, train_prop=0.6, random_state=None): diff --git a/quapy/functional.py b/quapy/functional.py index c6dc351..b6a3f99 100644 --- a/quapy/functional.py +++ b/quapy/functional.py @@ -362,4 +362,17 @@ def linear_search(loss, n_classes): if min_score is None or score < min_score: prev_selected, min_score = prev, score - return np.asarray([1 - prev_selected, prev_selected]) \ No newline at end of file + return np.asarray([1 - prev_selected, prev_selected]) + + +def smooth(prevalences, epsilon=1E-5): + """ + Smooths a prevalence vector. + + :param prevalences: np.ndarray + :param epsilon: float, a small quantity (default 1E-5) + :return: smoothed prevalence vector + """ + prevalences = prevalences+epsilon + prevalences /= prevalences.sum() + return prevalences \ No newline at end of file