Compare commits

...

23 Commits

Author SHA1 Message Date
Alejandro Moreo Fernandez 6dfa1d3536 testing first gp in binary data, with pdf table 2024-04-17 11:50:37 +02:00
Alejandro Moreo Fernandez 820bdc8f18 not understanding anything about the jensen shannon div representation 2024-04-15 17:14:48 +02:00
Alejandro Moreo Fernandez 09ee9efb3f trying to understand GP 2024-04-11 18:30:32 +02:00
Alejandro Moreo Fernandez d5b8d68f03 switching 2024-04-11 11:33:48 +02:00
Alejandro Moreo Fernandez 6c3c604a43 cleaning 2024-04-02 11:05:02 +02:00
Alejandro Moreo Fernandez d0444d3bbb added artificial accuracy protocol 2024-03-08 17:04:10 +01:00
Alejandro Moreo Fernandez 8b9b8957f5 bugfix, some methods modified h 2024-03-08 12:32:45 +01:00
Alejandro Moreo Fernandez 07a29d4b60 generating tables with captions, added 20 newsgroups and lequa 2022 t1b 2024-03-06 18:10:47 +01:00
Alejandro Moreo Fernandez d81bf305a3 improving custom quantifier 2024-03-06 11:45:08 +01:00
Alejandro Moreo Fernandez ea7a574185 adding support for f1 to doc 2024-03-03 19:25:00 +01:00
Alejandro Moreo Fernandez 60151dbebb added DOC and ATC 2024-03-03 14:52:37 +01:00
Alejandro Moreo Fernandez 86d9ce849c added DOC and ATC 2024-03-03 14:52:12 +01:00
Alejandro Moreo Fernandez b9fed349f0 added quacc and a method to allow quantification training and predict with empty classes 2024-02-29 01:25:27 +01:00
Alejandro Moreo Fernandez e69496246e renaming main 2024-02-28 08:54:09 +01:00
Alejandro Moreo Fernandez 29eaa54d82 copying a modification from devel 2024-02-28 08:49:19 +01:00
Alejandro Moreo Fernandez 3932cf22ce added Sebastiani's method and Pablo's method 2024-02-27 18:38:39 +01:00
Alejandro Moreo Fernandez 79dcef35ae half iplementation of the equatons method 2024-02-24 14:36:04 +01:00
Alejandro Moreo Fernandez 9d64d18cd4 some good refactoring 2024-02-23 18:19:00 +01:00
Alejandro Moreo Fernandez caa7fd2884 cleaning branch 2024-02-23 16:55:14 +01:00
Alejandro Moreo Fernandez b3ccf71edb Merge branch 'devel' of github.com:HLT-ISTI/QuaPy into devel 2024-02-23 16:30:11 +01:00
Alejandro Moreo Fernandez 320b3eac38 small fixes in kdey (now should work with string labels) and EMQ (in case some training prior prob was 0, it broke) 2024-02-23 16:29:53 +01:00
Alejandro Moreo Fernandez 9542eaee61 doing some benchmarking 2024-02-22 15:10:45 +01:00
Alejandro Moreo Fernandez d50a86daf4 sketching readme system by Lu and King, Hopings and King 2024-02-16 17:34:10 +01:00
24 changed files with 2413 additions and 48 deletions

3
.gitmodules vendored Normal file
View File

@ -0,0 +1,3 @@
[submodule "result_table"]
path = result_table
url = gitea@gitea-s2i2s.isti.cnr.it:moreo/result_table.git

View File

@ -1,3 +1,9 @@
Change Log 0.1.9
----------------
<...>
Change Log 0.1.8
----------------

View File

@ -0,0 +1,315 @@
import numpy as np
import scipy.special
from sklearn.calibration import CalibratedClassifierCV
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score
from sklearn.svm import LinearSVC
import quapy as qp
from model_selection import GridSearchQ
from quapy.protocol import APP
from quapy.method.aggregative import PACC, ACC, EMQ, PCC, CC, DMy, T50, MS2, KDEyML, KDEyCS, KDEyHD
from sklearn import clone
import quapy.functional as F
# datasets = qp.datasets.UCI_DATASETS
datasets = ['imdb']
# target = 'f1'
target = 'acc'
errors = []
def method_1(cls, q, train, val, sample, y=None, y_hat=None):
"""
Converts a misclassification matrix computed in validation (i.e., in the train distribution P) into
the corresponding equivalent misclassification matrix in test (i.e., in the test distribution Q)
by relying on the PPS assumptions.
:return: tuple (tn, fn, fp, tp,) of floats in [0,1] summing up to 1
"""
y_val = val.labels
y_hat_val = cls.predict(val.instances)
# q = EMQ(LogisticRegression(class_weight='balanced'))
# q.fit(val, fit_classifier=True)
# q = EMQ(cls)
# q.fit(train, fit_classifier=False)
# q = KDEyML(cls)
# q.fit(train, val_split=val, fit_classifier=False)
M_hat = ACC.getPteCondEstim(train.classes_, y_val, y_hat_val)
M_true = ACC.getPteCondEstim(train.classes_, y, y_hat)
p_hat = q.quantify(sample.instances)
cont_table_hat = p_hat * M_hat
# cont_table_hat = np.clip(cont_table_hat, 0, 1)
# cont_table_hat = cont_table_hat / cont_table_hat.sum()
print('true_prev: ', sample.prevalence())
print('estim_prev: ', p_hat)
print('M-true:\n', M_true)
print('M-hat:\n', M_hat)
print('cont_table:\n', cont_table_hat)
tp = cont_table_hat[1, 1]
tn = cont_table_hat[0, 0]
fn = cont_table_hat[0, 1]
fp = cont_table_hat[1, 0]
return tn, fn, fp, tp
def method_2(cls, train, val, sample, y=None, y_hat=None):
"""
Assume P and Q are the training and test distributions
Solves the following system of linear equations:
tp + fp = CC (the classify & count estimate, observed)
fn + tp = Q(Y=1) (this is not observed but is estimated via quantification)
tp + fp + fn + tn = 1 (trivial)
There are 4 unknowns and 3 equations. The fourth required one is established
by assuming that the PPS conditions hold, i.e., that P(X|Y)=Q(X|Y); note that
this implies P(hatY|Y)=Q(hatY|Y) if hatY is computed by any measurable function.
In particular, we consider that the tpr in P (estimated via validation, hereafter tpr) and
in Q (unknown, hereafter tpr_Q) should
be the same. This means:
tpr = tpr_Q = tp / (tp + fn)
after some manipulation:
tp (tpr-1) + fn (tpr) = 0 <-- our last equation
Note that the last equation relies on the estimate tpr. It is likely that, the more
positives we have, the more reliable this estimate is. This suggests that, in cases
in which we have more negatives in the validation set than positives, it might be
convenient to resort to the true negative rate (tnr) instead. This gives rise to
the alternative fourth equation:
tn (tnr-1) + fp (tnr) = 0
:return: tuple (tn, fn, fp, tp,) of floats in [0,1] summing up to 1
"""
y_val = val.labels
y_hat_val = cls.predict(val.instances)
q = ACC(cls)
q.fit(train, val_split=val, fit_classifier=False)
p_hat = q.quantify(sample.instances)
pos_prev = p_hat[1]
# pos_prev = sample.prevalence()[1]
cc = CC(cls)
cc.fit(train, fit_classifier=False)
cc_prev = cc.quantify(sample.instances)[1]
M_hat = ACC.getPteCondEstim(train.classes_, y_val, y_hat_val)
M_true = ACC.getPteCondEstim(train.classes_, y, y_hat)
cont_table_true = sample.prevalence() * M_true
if val.prevalence()[1] > 0.5:
# in this case, the tpr might be a more reliable estimate than tnr
tpr_hat = M_hat[1, 1]
A = np.asarray([
[0, 0, 1, 1],
[0, 1, 0, 1],
[1, 1, 1, 1],
[0, tpr_hat, 0, tpr_hat - 1]
])
else:
# in this case, the tnr might be a more reliable estimate than tpr
tnr_hat = M_hat[0, 0]
A = np.asarray([
[0, 0, 1, 1],
[0, 1, 0, 1],
[1, 1, 1, 1],
[tnr_hat-1, 0, tnr_hat, 0]
])
b = np.asarray(
[cc_prev, pos_prev, 1, 0]
)
tn, fn, fp, tp = np.linalg.solve(A, b)
cont_table_estim = np.asarray([
[tn, fn],
[fp, tp]
])
# if (cont_table_estim < 0).any() or (cont_table_estim>1).any():
# cont_table_estim = scipy.special.softmax(cont_table_estim)
print('true_prev: ', sample.prevalence())
print('estim_prev: ', p_hat)
print('true_cont_table:\n', cont_table_true)
print('estim_cont_table:\n', cont_table_estim)
# print('true_tpr', M_true[1,1])
# print('estim_tpr', tpr_hat)
return tn, fn, fp, tp
def method_3(cls, train, val, sample, y=None, y_hat=None):
"""
This is just method 2 but without involving any quapy's quantifier.
:return: tuple (tn, fn, fp, tp,) of floats in [0,1] summing up to 1
"""
classes = val.classes_
y_val = val.labels
y_hat_val = cls.predict(val.instances)
M_hat = ACC.getPteCondEstim(classes, y_val, y_hat_val)
y_hat_test = cls.predict(sample.instances)
pos_prev_cc = F.prevalence_from_labels(y_hat_test, classes)[1]
tpr_hat = M_hat[1,1]
fpr_hat = M_hat[1,0]
tnr_hat = M_hat[0,0]
if tpr_hat!=fpr_hat:
pos_prev_test_hat = (pos_prev_cc - fpr_hat) / (tpr_hat - fpr_hat)
else:
print('-->', tpr_hat)
pos_prev_test_hat = pos_prev_cc
pos_prev_test_hat = np.clip(pos_prev_test_hat, 0, 1)
pos_prev_val = val.prevalence()[1]
if pos_prev_val > 0.5:
# in this case, the tpr might be a more reliable estimate than tnr
A = np.asarray([
[0, 0, 1, 1],
[0, 1, 0, 1],
[1, 1, 1, 1],
[0, tpr_hat, 0, tpr_hat - 1]
])
else:
# in this case, the tnr might be a more reliable estimate than tpr
A = np.asarray([
[0, 0, 1, 1],
[0, 1, 0, 1],
[1, 1, 1, 1],
[tnr_hat-1, 0, tnr_hat, 0]
])
b = np.asarray(
[pos_prev_cc, pos_prev_test_hat, 1, 0]
)
tn, fn, fp, tp = np.linalg.solve(A, b)
return tn, fn, fp, tp
def cls_eval_from_counters(tn, fn, fp, tp):
if target == 'acc':
acc_hat = (tp + tn)
else:
den = (2 * tp + fn + fp)
if den > 0:
acc_hat = 2 * tp / den
else:
acc_hat = 0
return acc_hat
def cls_eval_from_labels(y, y_hat):
if target == 'acc':
acc = (y_hat == y).mean()
else:
acc = f1_score(y, y_hat, zero_division=0)
return acc
for dataset_name in datasets:
train_orig, test = qp.datasets.fetch_reviews(dataset_name, tfidf=True, min_df=10).train_test
xs = []
ys_1 = []
ys_trval = []
ys_3 = []
train_prot = APP(train_orig, n_prevalences=11, repeats=1, return_type='labelled_collection', random_state=0, sample_size=10000)
for train in train_prot():
if np.product(train.prevalence()) == 0:
# skip experiments with no positives or no negatives in training
continue
cls = LogisticRegression(class_weight='balanced', C=100)
# cls = CalibratedClassifierCV(LinearSVC())
train, val = train.split_stratified(train_prop=0.5, random_state=0)
print(f'dataset name = {dataset_name}')
print(f'#train = {len(train)}, prev={F.strprev(train.prevalence())}')
print(f'#val = {len(val)}, prev={F.strprev(val.prevalence())}')
print(f'#test = {len(test)}, prev={F.strprev(test.prevalence())}')
cls.fit(*train.Xy)
# q = KDEyML(cls)
q = ACC(LogisticRegression())
q.fit(train, val_split=val, fit_classifier=True)
# q = GridSearchQ(PACC(cls),
# param_grid={'classifier__C':np.logspace(-2,2,5)},
# protocol=APP(val, sample_size=1000),
# verbose=True,
# n_jobs=-1).fit(train)
acc_trval = cls_eval_from_labels(val.labels, cls.predict(val.instances))
for sample in APP(test, n_prevalences=21, repeats=1, sample_size=1000, return_type='labelled_collection')():
print('='*80)
y_hat = cls.predict(sample.instances)
y = sample.labels
acc_true = cls_eval_from_labels(y, y_hat)
xs.append(acc_true)
ys_trval.append(acc_trval)
tn, fn, fp, tp = method_1(cls, q, train, val, sample, y, y_hat)
acc_hat = cls_eval_from_counters(tn, fn, fp, tp)
ys_1.append(acc_hat)
tn, fn, fp, tp = method_3(cls, train, val, sample, y, y_hat)
acc_hat = cls_eval_from_counters(tn, fn, fp, tp)
ys_3.append(acc_hat)
error = abs(acc_true - acc_hat)
errors.append(error)
print(f'classifier accuracy={acc_true:.3f}')
print(f'estimated accuracy={acc_hat:.3f}')
print(f'estimation error={error:.4f}')
print('process end')
print('='*80)
print(f'mean error = {np.mean(errors)}')
print(f'std error = {np.std(errors)}')
import matplotlib.pyplot as plt
# Create scatter plot
plt.plot([0, 1], [0, 1], color='black', linestyle='--')
plt.scatter(xs, ys_1, label='method 1')
plt.scatter(xs, ys_3, label='method 3')
plt.scatter(xs, ys_trval, label='tr-val')
plt.legend()
# Add labels and title
plt.xlabel('True Accuracy')
plt.ylabel('Estim Accuracy')
# Display the plot
plt.show()

View File

@ -0,0 +1,149 @@
from collections import defaultdict
from sklearn.calibration import CalibratedClassifierCV
from sklearn.svm import LinearSVC
from tqdm import tqdm
from sklearn.linear_model import LogisticRegression
import os
import quapy as qp
from method.aggregative import PACC, EMQ, PCC, CC, ACC, HDy
from models_binary import *
import matplotlib.pyplot as plt
from pathlib import Path
def clf():
# return CalibratedClassifierCV(LinearSVC(class_weight=None))
return LogisticRegression(class_weight=None)
def F1(contingency_table):
# tn = contingency_table[0, 0]
tp = contingency_table[1, 1]
fp = contingency_table[0, 1]
fn = contingency_table[1, 0]
den = (2*tp+fp+fn)
if den>0:
return 2*tp/den
else:
return 1
def accuracy(contingency_table):
tn = contingency_table[0, 0]
tp = contingency_table[1, 1]
fp = contingency_table[0, 1]
fn = contingency_table[1, 0]
return (tp+tn)/(tp+fp+fn+tn)
def plot_series(series, repeats, metric_name, train_prev=None, savepath=None):
for key in series:
print(series[key])
fig, ax = plt.subplots()
def bin(v):
mat = np.asarray(v).reshape(-1, repeats)
return mat.mean(axis=1), mat.std(axis=1)
x = series['prev']
x,_ = bin(x)
for serie in series:
if serie=='prev': continue
values = series[serie]
print(serie, values)
val_mean, val_std = bin(values)
ax.errorbar(x, val_mean, label=serie, fmt='-', marker='o')
ax.fill_between(x, val_mean - val_std, val_mean + val_std, alpha=0.25)
if train_prev is not None:
ax.axvline(x=train_prev, label='tr-prev', color='k', linestyle='--')
# ax.scatter(train_prev, train_prev, c='c', label='tr-prev', linewidth=2, edgecolor='k', s=100, zorder=3)
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))
ax.grid()
ax.set_title(metric_name)
ax.set(xlabel='$p_U(\oplus)$', ylabel='estimated '+metric_name,
title='Classifier accuracy in terms of '+metric_name)
if savepath is None:
plt.show()
else:
os.makedirs(Path(savepath).parent, exist_ok=True)
plt.savefig(savepath, bbox_inches='tight')
dataset='imdb'
data = qp.datasets.fetch_reviews(dataset, tfidf=True, min_df=5, pickle=True)
# qp.data.preprocessing.reduce_columns(data, min_df=5, inplace=True)
# print('num_features', data.training.instances.shape[1])
train = data.training
test = data.test
upper = UpperBound(clf(), y_test=None).fit(train)
mlcfe = MLCMEstimator(clf(), strategy='kfcv', k=5, n_jobs=-1).fit(train)
emq_quant = QuantificationCMPredictor(clf(), EMQ(LogisticRegression()), strategy='kfcv', k=5, n_jobs=-1).fit(train)
# cc_quant = QuantificationCMPredictor(clf(), CC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train)
# pcc_quant = QuantificationCMPredictor(clf(), PCC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train)
# acc_quant = QuantificationCMPredictor(clf(), ACC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train)
pacc_quant = QuantificationCMPredictor(clf(), PACC(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train)
# hdy_quant = QuantificationCMPredictor(clf(), HDy(clf()), strategy='kfcv', k=5, n_jobs=-1).fit(train)
sld = EMQ(LogisticRegression()).fit(train)
pacc = PACC(clf()).fit(train)
contenders = [
('kFCV+MLPE', mlcfe),
('SLD', emq_quant),
# ('CC', cc_quant),
# ('PCC', pcc_quant),
# ('ACC', acc_quant),
('PACC', pacc_quant),
# ('HDy', hdy_quant)
]
metric = F1
# metric = accuracy
repeats = 10
with qp.util.temp_seed(42):
samples_idx = [idx for idx in test.artificial_sampling_index_generator(sample_size=500, n_prevalences=21, repeats=repeats)]
series = defaultdict(lambda: [])
for idx in tqdm(samples_idx, desc='generating predictions'):
sample = test.sampling_from_index(idx)
upper.show_true_labels(sample.labels)
upper_conf_matrix = upper.predict(sample.instances)
metric_true = metric(upper_conf_matrix)
series['Upper'].append(metric_true)
for mname, method in contenders:
conf_matrix = method.predict(sample.instances)
estim_metric = metric(conf_matrix)
series[mname].append(estim_metric)
if hasattr(method, 'quantify'):
series[mname+'-prev'].append(method.quantify(sample.instances))
series['binsld-prev'].append(sld.quantify(sample.instances)[1])
series['binpacc-prev'].append(pacc.quantify(sample.instances)[1])
series['optimal-prev'].append(sample.prevalence()[1])
series['prev'].append(sample.prevalence()[1])
metricname = metric.__name__
plot_series(series, repeats, metric_name=metricname, train_prev=train.prevalence()[1], savepath='./plots/'+dataset+'_LinearSVC_'+metricname+'.pdf')

View File

@ -0,0 +1,179 @@
import numpy as np
import quapy as qp
from sklearn import clone
from sklearn.metrics import confusion_matrix
import scipy
from scipy.sparse import issparse, csr_matrix
from data import LabelledCollection
from abc import ABC, abstractmethod
from sklearn.model_selection import cross_val_predict
class ConfusionMatrixPredictor(ABC):
"""
Abstract class of predictors of a confusion matrix for the performance of a classifier.
For the binary case, this accounts to predicting the 4-cell contingency table consisting of the
true positives (TP), true negatives (TN), false positives (FP), and false negatives (FN) that
most evaluation metrics make use of.
"""
@abstractmethod
def fit(self, train: LabelledCollection):
pass
@abstractmethod
def predict(self, test):
pass
class MLCMEstimator(ConfusionMatrixPredictor):
"""
The Maximum Likelihood Confusion Matrix Estimator is a method that relies on the IID assumption, and thus
computes, via k-FCV (or any other technique) the counters of the confusion matrix, assuming that those are
good estimates for the test case.
"""
def __init__(self, classifier, strategy='kfcv', **kwargs):
assert strategy in ['kfcv'], 'unknown strategy'
if strategy=='kfcv':
assert 'k' in kwargs, 'strategy "kfcv" requires "k" to be passed as an argument'
self.classifier = classifier
self.strategy = strategy
self.kwargs = kwargs
def sout(self, msg):
if 'verbose' in self.kwargs:
print(msg)
def fit(self, train: LabelledCollection):
X, y = train.Xy
if self.strategy == 'kfcv':
k=self.kwargs['k']
n_jobs = self.kwargs['n_jobs'] if 'n_jobs' in self.kwargs else 1
predict = self.kwargs['predict'] if 'predict' in self.kwargs else 'predict'
self.sout(f'{self.__class__.__name__}: '
f'running cross_val_predict with k={k} n_jobs={n_jobs} predict={predict}')
predictions = cross_val_predict(self.classifier, X, y, cv=k, n_jobs=n_jobs, method=predict)
self.conf_matrix = confusion_matrix(y, predictions, labels=train.classes_)
return self
def predict(self, test):
"""
This method disregards the test set, under the assumption that it is IID wrt the training. This meaning that
the confusion matrix for the test data should coincide with the one computed for training (using any cross
validation strategy).
:param test: test collection (ignored)
:return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix`
"""
return self.conf_matrix
class UpperBound(ConfusionMatrixPredictor):
def __init__(self, classifier, y_test):
self.classifier = classifier
self.y_test = y_test
def fit(self, train: LabelledCollection):
self.classifier.fit(*train.Xy)
self.classes = train.classes_
return self
def show_true_labels(self, y_test):
self.y_test = y_test
def predict(self, test):
predictions = self.classifier.predict(test)
return confusion_matrix(self.y_test, predictions, labels=self.classes)
def get_counters(y_true, y_pred):
counters = np.full(shape=y_true.shape, fill_value=-1)
counters[np.logical_and(y_true == 1, y_pred == 1)] = 0
counters[np.logical_and(y_true == 1, y_pred == 0)] = 1
counters[np.logical_and(y_true == 0, y_pred == 1)] = 2
counters[np.logical_and(y_true == 0, y_pred == 0)] = 3
class_map = {
0:'tp',
1:'fn',
2:'fp',
3:'tn'
}
return counters, class_map
def safehstack(matrix, posteriors):
if issparse(matrix):
instances = csr_matrix(scipy.sparse.hstack([matrix, posteriors]))
else:
instances = np.hstack([matrix, posteriors])
return instances
class QuantificationCMPredictor(ConfusionMatrixPredictor):
"""
"""
def __init__(self, classifier, quantifier, strategy='kfcv', **kwargs):
assert strategy in ['kfcv'], 'unknown strategy'
if strategy=='kfcv':
assert 'k' in kwargs, 'strategy "kfcv" requires "k" to be passed as an argument'
self.classifier = clone(classifier)
self.quantifier = quantifier
self.strategy = strategy
self.kwargs = kwargs
def sout(self, msg):
if 'verbose' in self.kwargs:
print(msg)
def fit(self, train: LabelledCollection):
X, y = train.Xy
if self.strategy == 'kfcv':
k=self.kwargs['k']
n_jobs = self.kwargs['n_jobs'] if 'n_jobs' in self.kwargs else 1
self.sout(f'{self.__class__.__name__}: '
f'running cross_val_predict with k={k} n_jobs={n_jobs}')
predictions = cross_val_predict(self.classifier, X, y, cv=k, n_jobs=n_jobs, method='predict')
posteriors = cross_val_predict(self.classifier, X, y, cv=k, n_jobs=n_jobs, method='predict_proba')
self.classifier.fit(X, y)
instances = safehstack(train.instances, posteriors)
counters, class_map = get_counters(train.labels, predictions)
q_data = LabelledCollection(instances=instances, labels=counters, classes_=[0,1,2,3])
print('counters prevalence', q_data.counts())
self.quantifier.fit(q_data)
return self
def predict(self, test):
"""
:param test: test collection (ignored)
:return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix`
"""
posteriors = self.classifier.predict_proba(test)
instances = safehstack(test, posteriors)
counters = self.quantifier.quantify(instances)
tp, fn, fp, tn = counters
conf_matrix = np.asarray([[tn, fp], [fn, tp]])
return conf_matrix
def quantify(self, test):
posteriors = self.classifier.predict_proba(test)
instances = safehstack(test, posteriors)
counters = self.quantifier.quantify(instances)
tp, fn, fp, tn = counters
den_tpr = (tp+fn)
if den_tpr>0:
tpr = tp/den_tpr
else:
tpr = 1
den_fpr = (fp+tn)
if den_fpr>0:
fpr = fp / den_fpr
else:
fpr = 0
pcc = posteriors.sum(axis=0)[1]
pacc = (pcc-fpr)/(tpr-fpr)
pacc = np.clip(pacc, 0, 1)
q = tp+fn
return q

View File

@ -0,0 +1,83 @@
from ClassifierAccuracy.gen_tables import gen_tables
from ClassifierAccuracy.util.commons import *
from ClassifierAccuracy.util.generators import *
from ClassifierAccuracy.util.plotting import plot_diagonal
from quapy.protocol import UPP
PROBLEM = 'multiclass'
ORACLE = False
basedir = PROBLEM+('-oracle' if ORACLE else '')
if PROBLEM == 'binary':
qp.environ['SAMPLE_SIZE'] = 1000
NUM_TEST = 1000
gen_datasets = gen_bin_datasets
elif PROBLEM == 'multiclass':
qp.environ['SAMPLE_SIZE'] = 250
NUM_TEST = 1000
gen_datasets = gen_multi_datasets
elif PROBLEM == 'tweet':
qp.environ['SAMPLE_SIZE'] = 100
NUM_TEST = 1000
gen_datasets = gen_tweet_datasets
for (cls_name, h), (dataset_name, (L, V, U)) in itertools.product(gen_classifiers(), gen_datasets()):
print(f'training {cls_name} in {dataset_name}')
h.fit(*L.Xy)
# test generation protocol
test_prot = UPP(U, repeats=NUM_TEST, return_type='labelled_collection', random_state=0)
# compute some stats of the dataset
get_dataset_stats(f'dataset_stats/{dataset_name}.json', test_prot, L, V)
# precompute the actual accuracy values
true_accs = {}
for acc_name, acc_fn in gen_acc_measure():
true_accs[acc_name] = [true_acc(h, acc_fn, Ui) for Ui in test_prot()]
# instances of ClassifierAccuracyPrediction are bound to the evaluation measure, so they
# must be nested in the acc-for
for acc_name, acc_fn in gen_acc_measure():
print(f'\tfor measure {acc_name}')
for (method_name, method) in gen_CAP(h, acc_fn, with_oracle=ORACLE):
result_path = getpath(basedir, cls_name, acc_name, dataset_name, method_name)
if os.path.exists(result_path):
print(f'\t\t{method_name}-{acc_name} exists, skipping')
continue
print(f'\t\t{method_name} computing...')
method, t_train = fit_method(method, V)
estim_accs, t_test_ave = predictionsCAP(method, test_prot, ORACLE)
save_json_result(result_path, true_accs[acc_name], estim_accs, t_train, t_test_ave)
# instances of CAPContingencyTable instead are generic, and the evaluation measure can
# be nested to the predictions to speed up things
for (method_name, method) in gen_CAP_cont_table(h):
if not any_missing(basedir, cls_name, dataset_name, method_name, acc_measures):
print(f'\t\tmethod {method_name} has all results already computed. Skipping.')
continue
print(f'\t\tmethod {method_name} computing...')
method, t_train = fit_method(method, V)
estim_accs_dict, t_test_ave = predictionsCAPcont_table(method, test_prot, gen_acc_measure, ORACLE)
for acc_name in estim_accs_dict.keys():
result_path = getpath(basedir, cls_name, acc_name, dataset_name, method_name)
save_json_result(result_path, true_accs[acc_name], estim_accs_dict[acc_name], t_train, t_test_ave)
print()
# generate diagonal plots
print('generating plots')
for (cls_name, _), (acc_name, _) in itertools.product(gen_classifiers(), gen_acc_measure()):
plot_diagonal(basedir, cls_name, acc_name)
for dataset_name, _ in gen_datasets(only_names=True):
plot_diagonal(basedir, cls_name, acc_name, dataset_name=dataset_name)
print('generating tables')
gen_tables(basedir, datasets=[d for d,_ in gen_datasets(only_names=True)])

View File

@ -0,0 +1,273 @@
import os.path
import pickle
from pathlib import Path
from sklearn.linear_model import LogisticRegression
from method.aggregative import PACC, EMQ, KDEyML
"""
Ideas:
Try kernel based on feature covariance matrix, with dot product and with another kernel
Try Cauchy-Schwarz kernel
"""
import sklearn.metrics
from sklearn.gaussian_process import GaussianProcessRegressor
import numpy as np
from sklearn.gaussian_process.kernels import RBF, GenericKernelMixin, Kernel
from sklearn.metrics.pairwise import pairwise_distances, pairwise_kernels
from data import LabelledCollection
from protocol import UPP
from quapy.method.base import BaseQuantifier, BinaryQuantifier
import quapy.functional as F
from result_table.src.table import Table
np.random.seed(0)
class FeatCovKernel(GenericKernelMixin, Kernel):
def __init__(self, dimensions):
self.dimensions = dimensions
def _f(self, sample1, sample2):
"""
kernel value between a pair of samples
"""
sample1 = sample1.reshape(-1, self.dimensions)
sample2 = sample2.reshape(-1, self.dimensions)
featCov1 = pairwise_distances(sample1.T, metric='correlation')
featCov2 = pairwise_distances(sample2.T, metric='correlation')
featDiffNorm = np.linalg.norm(featCov1-featCov2)
simil = np.exp(-featDiffNorm)
return simil
def __call__(self, X, Y=None, eval_gradient=False):
if Y is None:
Y = X
if eval_gradient:
raise NotImplementedError()
else:
return np.array([[self._f(x, y) for y in Y] for x in X])
def diag(self, X):
return np.array([self._f(x, x) for x in X])
def is_stationary(self):
return True
class AveL2Kernel(GenericKernelMixin, Kernel):
"""
A minimal (but valid) convolutional kernel for sequences of variable
lengths."""
def __init__(self, dimensions):
self.dimensions=dimensions
def _f(self, sample1, sample2):
"""
kernel value between a pair of sequences
"""
sample1 = sample1.reshape(-1, self.dimensions)
sample2 = sample2.reshape(-1, self.dimensions)
dist = pairwise_distances(sample1, sample2)
mean_dist = dist.mean()
closenest = np.exp(-mean_dist)
return closenest
def __call__(self, X, Y=None, eval_gradient=False):
if Y is None:
Y = X
if eval_gradient:
raise NotImplementedError()
else:
return np.array([[self._f(x, y) for y in Y] for x in X])
def diag(self, X):
return np.array([self._f(x, x) for x in X])
def is_stationary(self):
return True
class RJSDkernel(GenericKernelMixin, Kernel):
"""
A minimal (but valid) convolutional kernel for sequences of variable
lengths."""
def __init__(self):
pass
def _f(self, sample1, sample2):
"""
kernel value between a pair of sequences
"""
div = RJSDk(sample1, sample2)
closenest = np.exp(-div)
print(f'{closenest:.4f}')
return closenest
def __call__(self, X, Y=None, eval_gradient=False):
if Y is None:
Y = X
if eval_gradient:
raise NotImplementedError()
else:
return np.array([[self._f(x, y) for y in Y] for x in X])
def diag(self, X):
return np.array([self._f(x, x) for x in X])
def is_stationary(self):
return True
def RJSDk(sample_1, sample_2):
sample_1 = sample_1.reshape(-1, 3)
sample_2 = sample_2.reshape(-1, 3)
n1 = sample_1.shape[0]
n2 = sample_2.shape[0]
pi1 = n1 / (n1 + n2)
pi2 = n2 / (n1 + n2)
Z = np.concatenate([sample_1, sample_2])
Kz = pairwise_kernels(Z, metric='rbf', n_jobs=-1)
# Kz = pairwise_kernels(Z, metric='cosine', n_jobs=-1)
Kx = Kz[:n1, :n1]
Ky = Kz[n1:, n1:]
SKz = S(Kz)
SKx = S(Kx)
SKy = S(Ky)
return SKz - (pi1 * SKx + pi2 * SKy)
def S(K):
K = K/np.trace(K)
M = K @ np.log(K)
s = -np.trace(M)
return s
# eigval, _ = np.linalg.eig(K)
# accum = 0
# for lamda_i in eigval:
# accum += (lamda_i * np.log(lamda_i))
# return -accum
def target_function(X):
X = X.reshape(-1,3)
return X[:,0]**3 + 2.1*X[:,1]**2 + X[:,0] + 0.1
# X = np.random.rand(14,3)
# X /= X.sum(axis=1, keepdims=True)
# Y = np.random.rand(10,3)
# Y /= Y.sum(axis=1, keepdims=True)
#
# X = X.flatten()
# Y = Y.flatten()
#
# d = RJSDk(X, Y)
#
# print(d)
#
# d = RJSDk(X, X)
#
# print(d)
#
# import sys ; sys.exit(0)
# X_train = [np.random.rand(10*3) for _ in range(50)]
# y_train = [target_function(X).mean() for X in X_train]
#
# X_test = [np.random.rand(10*3) for _ in range(20)]
# y_test = [target_function(X).mean() for X in X_test]
#
#
# print('fit')
# # kernel = 1 * RBF(length_scale=1.0, length_scale_bounds=(1e-2, 1e2))
# kernel = MinL2Kernel()
# # kernel = RJSDkernel()
# gaussian_process = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=9)
# gaussian_process.fit(X_train, y_train)
# print('[done]')
#
# print(gaussian_process.kernel_)
#
# y_pred = gaussian_process.predict(X_test)
#
# mse = np.mean((y_test - y_pred)**2)
#
# print(mse)
class GPQuantifier(BaseQuantifier):
def __init__(self, dimensions, kernel, num_tr_samples=20, size_tr_samples=50):
self.dimensions = dimensions
self.num_tr_samples = num_tr_samples
self.size_tr_samples = size_tr_samples
self.gp = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=9)
def fit(self, data: LabelledCollection):
sampler = UPP(data, sample_size=self.size_tr_samples, repeats=self.num_tr_samples)
Xs, ps = list(zip(*[(X,p) for X,p in sampler()]))
ps = [p[1] for p in ps]
Xs = [X.flatten() for X in Xs]
self.gp.fit(Xs, ps)
return self
def quantify(self, instances):
X = [instances.flatten()]
p = self.gp.predict(X)[0]
return F.as_binary_prevalence(p, clip_if_necessary=True)
import quapy as qp
from quapy.data.datasets import fetch_UCIBinaryDataset, UCI_BINARY_DATASETS
table = Table('avel2')
methodnames = ['AveL2','PACC', 'SLD', 'KDEyML']
for methodname in methodnames:
errors = []
for dataset_name in UCI_BINARY_DATASETS:
if dataset_name in ['balance.2']:
continue
result_path = f'./results_gp/{dataset_name}_{methodname}.pkl'
os.makedirs(Path(result_path).parent, exist_ok=True)
if os.path.exists(result_path):
aes = pickle.load(open(result_path, 'rb'))
else:
dataset = fetch_UCIBinaryDataset(dataset_name)
qp.data.preprocessing.standardize(dataset, inplace=True)
train, test = dataset.train_test
d = train.X.shape[1]
if methodname=='AveL2':
q = GPQuantifier(dimensions=d, kernel=AveL2Kernel(dimensions=d), num_tr_samples=150, size_tr_samples=100)
elif methodname=='PACC':
q = PACC(LogisticRegression())
elif methodname=='SLD':
q = EMQ(LogisticRegression())
elif methodname=='KDEyML':
q = KDEyML(LogisticRegression(), bandwidth=0.05)
else:
raise ValueError('unknown method' + methodname)
q.fit(train)
aes = qp.evaluation.evaluate(q, UPP(test, sample_size=100), error_metric='ae', verbose=False)
pickle.dump(aes, open(result_path, 'wb'), pickle.HIGHEST_PROTOCOL)
mae = np.mean(aes)
print(f'{dataset_name}\t{np.mean(mae):.4f}')
errors.append(mae)
table.add(dataset_name, methodname, aes)
print(f'\nmean={np.mean(errors):.5f}')
table.format.show_std=False
table.format.mean_prec=4
table.LatexPDF('./table_gp/gp.pdf', tables=[table], resizebox=False)

View File

@ -0,0 +1,77 @@
def gen_tables(basedir, datasets):
mock_h = LogisticRegression(),
methods = [method for method, _ in gen_CAP(mock_h, None)] + [method for method, _ in gen_CAP_cont_table(mock_h)]
classifiers = [classifier for classifier, _ in gen_classifiers()]
os.makedirs('./tables', exist_ok=True)
with_oracle = 'oracle' in basedir
tex_doc = """
\\documentclass[10pt,a4paper]{article}
\\usepackage[utf8]{inputenc}
\\usepackage{amsmath}
\\usepackage{amsfonts}
\\usepackage{amssymb}
\\usepackage{graphicx}
\\usepackage{tabularx}
\\usepackage{color}
\\usepackage{colortbl}
\\usepackage{xcolor}
\\begin{document}
"""
for classifier in classifiers:
for metric in [measure for measure, _ in gen_acc_measure()]:
table = Table(datasets, methods, prec_mean=5, clean_zero=True)
for method, dataset in itertools.product(methods, datasets):
path = getpath(basedir, classifier, metric, dataset, method)
if not os.path.exists(path):
print('missing ', path)
continue
results = json.load(open(path, 'r'))
true_acc = results['true_acc']
estim_acc = np.asarray(results['estim_acc'])
if any(np.isnan(estim_acc)):
print(f'nan values found in {method=} {dataset=}')
continue
if any(estim_acc>1.00001):
print(f'values >1 found in {method=} {dataset=} [max={estim_acc.max()}]')
continue
if any(estim_acc<-0.00001):
print(f'values <0 found in {method=} {dataset=} [min={estim_acc.min()}]')
continue
errors = cap_errors(true_acc, estim_acc)
table.add(dataset, method, errors)
tex = table.latexTabular()
table_name = f'{basedir}_{classifier}_{metric}.tex'
table_name = table_name.replace('/', '_')
with open(f'./tables/{table_name}', 'wt') as foo:
foo.write('\\begin{table}[h]\n')
foo.write('\\centering\n')
foo.write('\\resizebox{\\textwidth}{!}{%\n')
foo.write('\\begin{tabular}{c|'+('c'*len(methods))+'}\n')
foo.write(tex)
foo.write('\\end{tabular}%\n')
foo.write('}\n')
foo.write('\\caption{Classifier ' + classifier.replace('_', ' ') + ('(oracle)' if with_oracle else '') +
' evaluated in terms of ' + metric.replace('_', ' ') + '}\n')
foo.write('\\end{table}\n')
tex_doc += "\input{" + table_name + "}\n\n"
tex_doc += """
\\end{document}
"""
with open(f'./tables/main.tex', 'wt') as foo:
foo.write(tex_doc)
print("[Tables Done] runing latex")
os.chdir('./tables/')
os.system('pdflatex main.tex')
os.system('rm main.aux main.log')

View File

@ -0,0 +1,756 @@
from copy import deepcopy
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.linear_model import LogisticRegression, LinearRegression
import quapy as qp
from sklearn import clone
from sklearn.metrics import confusion_matrix
import scipy
from scipy.sparse import issparse, csr_matrix
from data import LabelledCollection
from abc import ABC, abstractmethod
from sklearn.model_selection import cross_val_predict
from quapy.protocol import UPP
from quapy.method.base import BaseQuantifier
from quapy.method.aggregative import PACC, AggregativeQuantifier
import quapy.functional as F
class ClassifierAccuracyPrediction(ABC):
def __init__(self, h: BaseEstimator, acc: callable):
self.h = h
self.acc = acc
@abstractmethod
def fit(self, val: LabelledCollection):
...
@abstractmethod
def predict(self, X, oracle_prev=None):
"""
Evaluates the accuracy function on the predicted contingency table
:param X: test data
:param oracle_prev: np.ndarray with the class prevalence of the test set as estimated by
an oracle. This is meant to test the effect of the errors in CAP that are explained by
the errors in quantification performance
:return: float
"""
return ...
def true_acc(self, sample: LabelledCollection):
y_pred = self.h.predict(sample.X)
y_true = sample.y
conf_table = confusion_matrix(y_true, y_pred=y_pred, labels=sample.classes_)
return self.acc(conf_table)
class CAPContingencyTable(ClassifierAccuracyPrediction):
def __init__(self, h: BaseEstimator, acc: callable):
self.h = h
self.acc = acc
def predict(self, X, oracle_prev=None):
"""
Evaluates the accuracy function on the predicted contingency table
:param X: test data
:param oracle_prev: np.ndarray with the class prevalence of the test set as estimated by
an oracle. This is meant to test the effect of the errors in CAP that are explained by
the errors in quantification performance
:return: float
"""
cont_table = self.predict_ct(X, oracle_prev)
raw_acc = self.acc(cont_table)
norm_acc = np.clip(raw_acc, 0, 1)
return norm_acc
@abstractmethod
def predict_ct(self, X, oracle_prev=None):
"""
Predicts the contingency table for the test data
:param X: test data
:param oracle_prev: np.ndarray with the class prevalence of the test set as estimated by
an oracle. This is meant to test the effect of the errors in CAP that are explained by
the errors in quantification performance
:return: a contingency table
"""
...
class NaiveCAP(CAPContingencyTable):
"""
The Naive CAP is a method that relies on the IID assumption, and thus uses the estimation in the validation data
as an estimate for the test data.
"""
def __init__(self, h: BaseEstimator, acc: callable):
super().__init__(h, acc)
def fit(self, val: LabelledCollection):
y_hat = self.h.predict(val.X)
y_true = val.y
self.cont_table = confusion_matrix(y_true, y_pred=y_hat, labels=val.classes_)
return self
def predict_ct(self, test, oracle_prev=None):
"""
This method disregards the test set, under the assumption that it is IID wrt the training. This meaning that
the confusion matrix for the test data should coincide with the one computed for training (using any cross
validation strategy).
:param test: test collection (ignored)
:param oracle_prev: ignored
:return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix`
"""
return self.cont_table
class CAPContingencyTableQ(CAPContingencyTable):
def __init__(self, h: BaseEstimator, acc: callable, q_class: AggregativeQuantifier, reuse_h=False):
super().__init__(h, acc)
self.reuse_h = reuse_h
if reuse_h:
assert isinstance(q_class, AggregativeQuantifier), f'quantifier {q_class} is not of type aggregative'
self.q = deepcopy(q_class)
self.q.set_params(classifier=h)
else:
self.q = q_class
def quantifier_fit(self, val: LabelledCollection):
if self.reuse_h:
self.q.fit(val, fit_classifier=False, val_split=val)
else:
self.q.fit(val)
class ContTableTransferCAP(CAPContingencyTableQ):
"""
"""
def __init__(self, h: BaseEstimator, acc: callable, q_class, reuse_h=False):
super().__init__(h, acc, q_class, reuse_h)
def fit(self, val: LabelledCollection):
y_hat = self.h.predict(val.X)
y_true = val.y
self.cont_table = confusion_matrix(y_true=y_true, y_pred=y_hat, labels=val.classes_, normalize='all')
self.train_prev = val.prevalence()
self.quantifier_fit(val)
return self
def predict_ct(self, test, oracle_prev=None):
"""
:param test: test collection (ignored)
:param oracle_prev: np.ndarray with the class prevalence of the test set as estimated by
an oracle. This is meant to test the effect of the errors in CAP that are explained by
the errors in quantification performance
:return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix`
"""
if oracle_prev is None:
prev_hat = self.q.quantify(test)
else:
prev_hat = oracle_prev
adjustment = prev_hat / self.train_prev
return self.cont_table * adjustment[:, np.newaxis]
class NsquaredEquationsCAP(CAPContingencyTableQ):
"""
"""
def __init__(self, h: BaseEstimator, acc: callable, q_class, reuse_h=False):
super().__init__(h, acc, q_class, reuse_h)
def fit(self, val: LabelledCollection):
y_hat = self.h.predict(val.X)
y_true = val.y
self.cont_table = confusion_matrix(y_true, y_pred=y_hat, labels=val.classes_)
self.quantifier_fit(val)
self.A, self.partial_b = self._construct_equations()
return self
def _construct_equations(self):
# we need a n x n matrix of unknowns
n = self.cont_table.shape[1]
# I is the matrix of indexes of unknowns. For example, if we need the counts of
# all instances belonging to class i that have been classified as belonging to 0, 1, ..., n:
# the indexes of the corresponding unknowns are given by I[i,:]
I = np.arange(n * n).reshape(n, n)
# system of equations: Ax=b, A.shape=(n*n, n*n,), b.shape=(n*n,)
A = np.zeros(shape=(n * n, n * n))
b = np.zeros(shape=(n * n))
# first equation: the sum of all unknowns is 1
eq_no = 0
A[eq_no, :] = 1
b[eq_no] = 1
eq_no += 1
# (n-1)*(n-1) equations: the class cond rations should be the same in training and in test due to the
# PPS assumptions. Example in three classes, a ratio: a/(a+b+c) [test] = ar [a ratio in training]
# a / (a + b + c) = ar
# a = (a + b + c) * ar
# a = a ar + b ar + c ar
# a - a ar - b ar - c ar = 0
# a (1-ar) + b (-ar) + c (-ar) = 0
class_cond_ratios_tr = self.cont_table / self.cont_table.sum(axis=1, keepdims=True)
for i in range(1, n):
for j in range(1, n):
ratio_ij = class_cond_ratios_tr[i, j]
A[eq_no, I[i, :]] = -ratio_ij
A[eq_no, I[i, j]] = 1 - ratio_ij
b[eq_no] = 0
eq_no += 1
# n-1 equations: the sum of class-cond counts must equal the C&C prevalence prediction
for i in range(1, n):
A[eq_no, I[:, i]] = 1
#b[eq_no] = cc_prev_estim[i]
eq_no += 1
# n-1 equations: the sum of true true class-conditional positives must equal the class prev label in test
for i in range(1, n):
A[eq_no, I[i, :]] = 1
#b[eq_no] = q_prev_estim[i]
eq_no += 1
return A, b
def predict_ct(self, test, oracle_prev):
"""
:param test: test collection (ignored)
:param oracle_prev: np.ndarray with the class prevalence of the test set as estimated by
an oracle. This is meant to test the effect of the errors in CAP that are explained by
the errors in quantification performance
:return: a confusion matrix in the return format of `sklearn.metrics.confusion_matrix`
"""
n = self.cont_table.shape[1]
h_label_preds = self.h.predict(test)
cc_prev_estim = F.prevalence_from_labels(h_label_preds, self.h.classes_)
if oracle_prev is None:
q_prev_estim = self.q.quantify(test)
else:
q_prev_estim = oracle_prev
A = self.A
b = self.partial_b
# b is partially filled; we finish the vector by plugin in the classify and count
# prevalence estimates (n-1 values only), and the quantification estimates (n-1 values only)
b[-2*(n-1):-(n-1)] = cc_prev_estim[1:]
b[-(n-1):] = q_prev_estim[1:]
# try the fast solution (may not be valid)
x = np.linalg.solve(A, b)
if any(x<0) or any(x>0) or not np.isclose(x.sum(), 1):
print('L', end='')
# try the iterative solution
def loss(x):
return np.linalg.norm(A @ x - b, ord=2)
x = F.optim_minimize(loss, n_classes=n**2)
else:
print('.', end='')
cont_table_test = x.reshape(n,n)
return cont_table_test
class SebastianiCAP(ClassifierAccuracyPrediction):
def __init__(self, h, acc_fn, q_class, n_val_samples=500, alpha=0.3, predict_train_prev=True):
self.h = h
self.acc = acc_fn
self.q = q_class(h)
self.n_val_samples = n_val_samples
self.alpha = alpha
self.sample_size = qp.environ['SAMPLE_SIZE']
self.predict_train_prev = predict_train_prev
def fit(self, val: LabelledCollection):
v2, v1 = val.split_stratified(train_prop=0.5)
self.q.fit(v1, fit_classifier=False, val_split=v1)
# precompute classifier predictions on samples
gen_samples = UPP(v2, repeats=self.n_val_samples, sample_size=self.sample_size, return_type='labelled_collection')
self.sigma_acc = [self.true_acc(sigma_i) for sigma_i in gen_samples()]
# precompute prevalence predictions on samples
if self.predict_train_prev:
gen_samples.on_preclassified_instances(self.q.classify(v2.X), in_place=True)
self.sigma_pred_prevs = [self.q.aggregate(sigma_i.X) for sigma_i in gen_samples()]
else:
self.sigma_pred_prevs = [sigma_i.prevalence() for sigma_i in gen_samples()]
def predict(self, X, oracle_prev=None):
if oracle_prev is None:
test_pred_prev = self.q.quantify(X)
else:
test_pred_prev = oracle_prev
if self.alpha > 0:
# select samples from V2 with predicted prevalence close to the predicted prevalence for U
selected_accuracies = []
for pred_prev_i, acc_i in zip(self.sigma_pred_prevs, self.sigma_acc):
max_discrepancy = np.max(np.abs(pred_prev_i - test_pred_prev))
if max_discrepancy < self.alpha:
selected_accuracies.append(acc_i)
return np.median(selected_accuracies)
else:
# mean average, weights samples from V2 according to the closeness of predicted prevalence in U
accum_weight = 0
moving_mean = 0
epsilon = 10E-4
for pred_prev_i, acc_i in zip(self.sigma_pred_prevs, self.sigma_acc):
max_discrepancy = np.max(np.abs(pred_prev_i - test_pred_prev))
weight = -np.log(max_discrepancy+epsilon)
accum_weight += weight
moving_mean += (weight*acc_i)
return moving_mean/accum_weight
class PabloCAP(ClassifierAccuracyPrediction):
def __init__(self, h, acc_fn, q_class, n_val_samples=100, aggr='mean'):
self.h = h
self.acc = acc_fn
self.q = q_class(deepcopy(h))
self.n_val_samples = n_val_samples
self.aggr = aggr
assert aggr in ['mean', 'median'], 'unknown aggregation function, use mean or median'
def fit(self, val: LabelledCollection):
self.q.fit(val)
label_predictions = self.h.predict(val.X)
self.pre_classified = LabelledCollection(instances=label_predictions, labels=val.labels)
def predict(self, X, oracle_prev=None):
if oracle_prev is None:
pred_prev = F.smooth(self.q.quantify(X))
else:
pred_prev = oracle_prev
X_size = X.shape[0]
acc_estim = []
for _ in range(self.n_val_samples):
sigma_i = self.pre_classified.sampling(X_size, *pred_prev[:-1])
y_pred, y_true = sigma_i.Xy
conf_table = confusion_matrix(y_true, y_pred=y_pred, labels=sigma_i.classes_)
acc_i = self.acc(conf_table)
acc_estim.append(acc_i)
if self.aggr == 'mean':
return np.mean(acc_estim)
elif self.aggr == 'median':
return np.median(acc_estim)
else:
raise ValueError('unknown aggregation function')
def get_posteriors_from_h(h, X):
if hasattr(h, 'predict_proba'):
P = h.predict_proba(X)
else:
n_classes = len(h.classes_)
dec_scores = h.decision_function(X)
if n_classes == 1:
dec_scores = np.vstack([-dec_scores, dec_scores]).T
P = scipy.special.softmax(dec_scores, axis=1)
return P
def max_conf(P, keepdims=False):
mc = P.max(axis=1, keepdims=keepdims)
return mc
def neg_entropy(P, keepdims=False):
ne = scipy.stats.entropy(P, axis=1)
if keepdims:
ne = ne.reshape(-1, 1)
return ne
class QuAcc:
def _get_X_dot(self, X):
h = self.h
P = get_posteriors_from_h(h, X)
add_covs = []
if self.add_posteriors:
add_covs.append(P[:, 1:])
if self.add_maxconf:
mc = max_conf(P, keepdims=True)
add_covs.append(mc)
if self.add_negentropy:
ne = neg_entropy(P, keepdims=True)
add_covs.append(ne)
if self.add_maxinfsoft:
lgP = np.log(P)
mis = np.max(lgP -lgP.mean(axis=1, keepdims=True), axis=1, keepdims=True)
add_covs.append(mis)
if len(add_covs)>0:
X_dot = np.hstack(add_covs)
if self.add_X:
X_dot = safehstack(X, X_dot)
return X_dot
class QuAcc1xN2(CAPContingencyTableQ, QuAcc):
def __init__(self,
h: BaseEstimator,
acc: callable,
q_class: AggregativeQuantifier,
add_X=True,
add_posteriors=True,
add_maxconf=False,
add_negentropy=False,
add_maxinfsoft=False):
self.h = h
self.acc = acc
self.q = EmptySafeQuantifier(q_class)
self.add_X = add_X
self.add_posteriors = add_posteriors
self.add_maxconf = add_maxconf
self.add_negentropy = add_negentropy
self.add_maxinfsoft = add_maxinfsoft
def fit(self, val: LabelledCollection):
pred_labels = self.h.predict(val.X)
true_labels = val.y
n = val.n_classes
classes_dot = np.arange(n**2)
ct_class_idx = classes_dot.reshape(n, n)
X_dot = self._get_X_dot(val.X)
y_dot = ct_class_idx[true_labels, pred_labels]
val_dot = LabelledCollection(X_dot, y_dot, classes=classes_dot)
self.q.fit(val_dot)
def predict_ct(self, X, oracle_prev=None):
X_dot = self._get_X_dot(X)
return self.q.quantify(X_dot)
class QuAccNxN(CAPContingencyTableQ, QuAcc):
def __init__(self,
h: BaseEstimator,
acc: callable,
q_class: AggregativeQuantifier,
add_X=True,
add_posteriors=True,
add_maxconf=False,
add_negentropy=False,
add_maxinfsoft=False):
self.h = h
self.acc = acc
self.q_class = q_class
self.add_X = add_X
self.add_posteriors = add_posteriors
self.add_maxconf = add_maxconf
self.add_negentropy = add_negentropy
self.add_maxinfsoft = add_maxinfsoft
def fit(self, val: LabelledCollection):
pred_labels = self.h.predict(val.X)
true_labels = val.y
X_dot = self._get_X_dot(val.X)
self.q = []
for class_i in self.h.classes_:
X_dot_i = X_dot[pred_labels==class_i]
y_i = true_labels[pred_labels==class_i]
data_i = LabelledCollection(X_dot_i, y_i, classes=val.classes_)
q_i = EmptySafeQuantifier(deepcopy(self.q_class))
q_i.fit(data_i)
self.q.append(q_i)
def predict_ct(self, X, oracle_prev=None):
classes = self.h.classes_
pred_labels = self.h.predict(X)
X_dot = self._get_X_dot(X)
pred_prev = F.prevalence_from_labels(pred_labels, classes)
cont_table = []
for class_i, q_i, p_i in zip(classes, self.q, pred_prev):
X_dot_i = X_dot[pred_labels==class_i]
classcond_cond_table_prevs = q_i.quantify(X_dot_i)
cond_table_prevs = p_i * classcond_cond_table_prevs
cont_table.append(cond_table_prevs)
cont_table = np.vstack(cont_table)
return cont_table
def safehstack(X, P):
if issparse(X) or issparse(P):
XP = scipy.sparse.hstack([X, P])
XP = csr_matrix(XP)
else:
XP = np.hstack([X,P])
return XP
class EmptySafeQuantifier(BaseQuantifier):
def __init__(self, surrogate_quantifier: BaseQuantifier):
self.surrogate = surrogate_quantifier
def fit(self, data: LabelledCollection):
self.n_classes = data.n_classes
class_compact_data, self.old_class_idx = data.compact_classes()
if self.num_non_empty_classes() > 1:
self.surrogate.fit(class_compact_data)
return self
def quantify(self, instances):
num_instances = instances.shape[0]
if self.num_non_empty_classes() == 0 or num_instances==0:
# returns the uniform prevalence vector
uniform = np.full(fill_value=1./self.n_classes, shape=self.n_classes, dtype=float)
return uniform
elif self.num_non_empty_classes() == 1:
# returns a prevalence vector with 100% of the mass in the only non empty class
prev_vector = np.full(fill_value=0., shape=self.n_classes, dtype=float)
prev_vector[self.old_class_idx[0]] = 1
return prev_vector
else:
class_compact_prev = self.surrogate.quantify(instances)
prev_vector = np.full(fill_value=0., shape=self.n_classes, dtype=float)
prev_vector[self.old_class_idx] = class_compact_prev
return prev_vector
def num_non_empty_classes(self):
return len(self.old_class_idx)
def get_params(self, deep=True):
return self.surrogate.get_params(deep=deep)
def set_params(self, **params):
return self.surrogate.set_params(**params)
class EmptySafeAggregativeQuantifier(AggregativeQuantifier, EmptySafeQuantifier):
# Baselines:
class ATC(ClassifierAccuracyPrediction):
VALID_FUNCTIONS = {'maxconf', 'neg_entropy'}
def __init__(self, h, acc_fn, scoring_fn='maxconf'):
assert scoring_fn in ATC.VALID_FUNCTIONS, \
f'unknown scoring function, use any from {ATC.VALID_FUNCTIONS}'
#assert acc_fn == 'vanilla_accuracy', \
# 'use acc_fn=="vanilla_accuracy"; other metris are not yet tested in ATC'
self.h = h
self.acc_fn = acc_fn
self.scoring_fn = scoring_fn
def get_scores(self, P):
if self.scoring_fn == 'maxconf':
scores = max_conf(P)
else:
scores = neg_entropy(P)
return scores
def fit(self, val: LabelledCollection):
P = get_posteriors_from_h(self.h, val.X)
pred_labels = np.argmax(P, axis=1)
true_labels = val.y
scores = self.get_scores(P)
_, self.threshold = self.__find_ATC_threshold(scores=scores, labels=(pred_labels==true_labels))
def predict(self, X, oracle_prev=None):
P = get_posteriors_from_h(self.h, X)
scores = self.get_scores(P)
#assert self.acc_fn == 'vanilla_accuracy', \
# 'use acc_fn=="vanilla_accuracy"; other metris are not yet tested in ATC'
return self.__get_ATC_acc(self.threshold, scores)
def __find_ATC_threshold(self, scores, labels):
# code copy-pasted from https://github.com/saurabhgarg1996/ATC_code/blob/master/ATC_helper.py
sorted_idx = np.argsort(scores)
sorted_scores = scores[sorted_idx]
sorted_labels = labels[sorted_idx]
fp = np.sum(labels == 0)
fn = 0.0
min_fp_fn = np.abs(fp - fn)
thres = 0.0
for i in range(len(labels)):
if sorted_labels[i] == 0:
fp -= 1
else:
fn += 1
if np.abs(fp - fn) < min_fp_fn:
min_fp_fn = np.abs(fp - fn)
thres = sorted_scores[i]
return min_fp_fn, thres
def __get_ATC_acc(self, thres, scores):
# code copy-pasted from https://github.com/saurabhgarg1996/ATC_code/blob/master/ATC_helper.py
return np.mean(scores >= thres)
class DoC(ClassifierAccuracyPrediction):
def __init__(self, h, acc, sample_size, num_samples=500, clip_vals=(0,1)):
self.h = h
self.acc = acc
self.sample_size = sample_size
self.num_samples = num_samples
self.clip_vals = clip_vals
def _get_post_stats(self, X, y):
P = get_posteriors_from_h(self.h, X)
mc = max_conf(P)
pred_labels = np.argmax(P, axis=-1)
acc = self.acc(y, pred_labels)
return mc, acc
def _doc(self, mc1, mc2):
return mc2.mean() - mc1.mean()
def train_regression(self, v2_mcs, v2_accs):
docs = [self._doc(self.v1_mc, v2_mc_i) for v2_mc_i in v2_mcs]
target = [self.v1_acc - v2_acc_i for v2_acc_i in v2_accs]
docs = np.asarray(docs).reshape(-1,1)
target = np.asarray(target)
lin_reg = LinearRegression()
return lin_reg.fit(docs, target)
def predict_regression(self, test_mc):
docs = np.asarray([self._doc(self.v1_mc, test_mc)]).reshape(-1, 1)
pred_acc = self.reg_model.predict(docs)
return self.v1_acc - pred_acc
def fit(self, val: LabelledCollection):
v1, v2 = val.split_stratified(train_prop=0.5, random_state=0)
self.v1_mc, self.v1_acc = self._get_post_stats(*v1.Xy)
v2_prot = UPP(v2, sample_size=self.sample_size, repeats=self.num_samples, return_type='labelled_collection')
v2_stats = [self._get_post_stats(*sample.Xy) for sample in v2_prot()]
v2_mcs, v2_accs = list(zip(*v2_stats))
self.reg_model = self.train_regression(v2_mcs, v2_accs)
def predict(self, X, oracle_prev=None):
P = get_posteriors_from_h(self.h, X)
mc = max_conf(P)
acc_pred = self.predict_regression(mc)[0]
if self.clip_vals is not None:
acc_pred = np.clip(acc_pred, *self.clip_vals)
return acc_pred
"""
def doc(self,
c_model: BaseEstimator,
validation: LabelledCollection,
protocol: AbstractStochasticSeededProtocol,
predict_method="predict_proba"):
c_model_predict = getattr(c_model, predict_method)
f1_average = "binary" if validation.n_classes == 2 else "macro"
val1, val2 = validation.split_stratified(train_prop=0.5, random_state=env._R_SEED)
val1_probs = c_model_predict(val1.X)
val1_mc = np.max(val1_probs, axis=-1)
val1_preds = np.argmax(val1_probs, axis=-1)
val1_acc = metrics.accuracy_score(val1.y, val1_preds)
val1_f1 = metrics.f1_score(val1.y, val1_preds, average=f1_average)
val2_protocol = APP(
val2,
n_prevalences=21,
repeats=100,
return_type="labelled_collection",
)
val2_prot_mc = []
val2_prot_preds = []
val2_prot_y = []
for v2 in val2_protocol():
_probs = c_model_predict(v2.X)
_mc = np.max(_probs, axis=-1)
_preds = np.argmax(_probs, axis=-1)
val2_prot_mc.append(_mc)
val2_prot_preds.append(_preds)
val2_prot_y.append(v2.y)
val_scores = np.array([doclib.get_doc(val1_mc, v2_mc) for v2_mc in val2_prot_mc])
val_targets_acc = np.array(
[
val1_acc - metrics.accuracy_score(v2_y, v2_preds)
for v2_y, v2_preds in zip(val2_prot_y, val2_prot_preds)
]
)
reg_acc = LinearRegression().fit(val_scores[:, np.newaxis], val_targets_acc)
val_targets_f1 = np.array(
[
val1_f1 - metrics.f1_score(v2_y, v2_preds, average=f1_average)
for v2_y, v2_preds in zip(val2_prot_y, val2_prot_preds)
]
)
reg_f1 = LinearRegression().fit(val_scores[:, np.newaxis], val_targets_f1)
report = EvaluationReport(name="doc")
for test in protocol():
test_probs = c_model_predict(test.X)
test_preds = np.argmax(test_probs, axis=-1)
test_mc = np.max(test_probs, axis=-1)
acc_score = (
val1_acc
- reg_acc.predict(np.array([[doclib.get_doc(val1_mc, test_mc)]]))[0]
)
f1_score = (
val1_f1 - reg_f1.predict(np.array([[doclib.get_doc(val1_mc, test_mc)]]))[0]
)
meta_acc = abs(acc_score - metrics.accuracy_score(test.y, test_preds))
meta_f1 = abs(
f1_score - metrics.f1_score(test.y, test_preds, average=f1_average)
)
report.append_row(
test.prevalence(),
acc=meta_acc,
acc_score=acc_score,
f1=meta_f1,
f1_score=f1_score,
)
return report
def get_doc(probs1, probs2):
return np.mean(probs2) - np.mean(probs1)
"""

View File

@ -0,0 +1,32 @@
# Notes
Branch for research on classifier accuracy prediction.
I had some work done for binary (models_binary.py and main_binary.py).
I would like to approach the multiclass case directly now.
I think I will frame the problem setting as follows.
A Classifier Accuracy Prediction (CAP) method is method tha receives as input:
- h: classifier (already trained),
- V: labelled collection (for training the CAP),
- acc_func: callable: any function that works on a contingency table
And implements:
- fit: trains the CAP
- predict: predicts the evaluation measure on unseen data (provided, calls predict_ct and acc_func)
- predict_ct: predicts the contingency table
Important:
- When the quantifiers' iperparameters are optimized, we should make sure that the
classifier is not being reused, or that the iperparameters do no include any from
the underlying classifier
TODO:
- Add additional covariates [done, check]
- Add model selection for CAP
- Add Doc [done]
- Add ATC [done]
- Add APP in training and adapt plots and tables
- Add plots: error by drift, etc
- Add characterization of classifiers in terms of accuracy and use this as a variable
analyzing results

View File

View File

@ -0,0 +1,203 @@
from sklearn.base import BaseEstimator
import quapy as qp
import itertools
import json
import os
from collections import defaultdict
from glob import glob
from pathlib import Path
from time import time
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
from ClassifierAccuracy.util.tabular import Table
from quapy.protocol import OnLabelledCollectionProtocol, AbstractStochasticSeededProtocol
from quapy.data import LabelledCollection
def split(data: LabelledCollection):
train_val, test = data.split_stratified(train_prop=0.66, random_state=0)
train, val = train_val.split_stratified(train_prop=0.5, random_state=0)
return train, val, test
def fit_method(method, V):
tinit = time()
method.fit(V)
t_train = time() - tinit
return method, t_train
def predictionsCAP(method, test_prot, oracle=False):
tinit = time()
if not oracle:
estim_accs = [method.predict(Ui.X) for Ui in test_prot()]
else:
estim_accs = [method.predict(Ui.X, oracle_prev=Ui.prevalence()) for Ui in test_prot()]
t_test_ave = (time() - tinit) / test_prot.total()
return estim_accs, t_test_ave
def predictionsCAPcont_table(method, test_prot, gen_acc_measure, oracle=False):
estim_accs_dict = {}
tinit = time()
if not oracle:
estim_tables = [method.predict_ct(Ui.X) for Ui in test_prot()]
else:
estim_tables = [method.predict_ct(Ui.X, oracle_prev=Ui.prevalence()) for Ui in test_prot()]
for acc_name, acc_fn in gen_acc_measure():
estim_accs_dict[acc_name] = [acc_fn(cont_table) for cont_table in estim_tables]
t_test_ave = (time() - tinit) / test_prot.total()
return estim_accs_dict, t_test_ave
def any_missing(basedir, cls_name, dataset_name, method_name, acc_measures):
for acc_name in acc_measures():
if not os.path.exists(getpath(basedir, cls_name, acc_name, dataset_name, method_name)):
return True
return False
def true_acc(h:BaseEstimator, acc_fn: callable, U: LabelledCollection):
y_pred = h.predict(U.X)
y_true = U.y
conf_table = confusion_matrix(y_true, y_pred=y_pred, labels=U.classes_)
return acc_fn(conf_table)
def from_contingency_table(param1, param2):
if param2 is None and isinstance(param1, np.ndarray) and param1.ndim==2 and (param1.shape[0]==param1.shape[1]):
return True
elif isinstance(param1, np.ndarray) and isinstance(param2, np.ndarray) and param1.shape==param2.shape:
return False
else:
raise ValueError('parameters for evaluation function not understood')
def vanilla_acc_fn(param1, param2=None):
if from_contingency_table(param1, param2):
return _vanilla_acc_from_ct(param1)
else:
return accuracy_score(param1, param2)
def macrof1_fn(param1, param2=None):
if from_contingency_table(param1, param2):
return macro_f1_from_ct(param1)
else:
return f1_score(param1, param2, average='macro')
def _vanilla_acc_from_ct(cont_table):
return np.diag(cont_table).sum() / cont_table.sum()
def _f1_bin(tp, fp, fn):
if tp + fp + fn == 0:
return 1
else:
return (2 * tp) / (2 * tp + fp + fn)
def macro_f1_from_ct(cont_table):
n = cont_table.shape[0]
if n==2:
tp = cont_table[1,1]
fp = cont_table[0,1]
fn = cont_table[1,0]
return _f1_bin(tp, fp, fn)
f1_per_class = []
for i in range(n):
tp = cont_table[i,i]
fp = cont_table[:,i].sum() - tp
fn = cont_table[i,:].sum() - tp
f1_per_class.append(_f1_bin(tp, fp, fn))
return np.mean(f1_per_class)
def microf1(cont_table):
n = cont_table.shape[0]
if n == 2:
tp = cont_table[1, 1]
fp = cont_table[0, 1]
fn = cont_table[1, 0]
return _f1_bin(tp, fp, fn)
tp, fp, fn = 0, 0, 0
for i in range(n):
tp += cont_table[i, i]
fp += cont_table[:, i] - tp
fn += cont_table[i, :] - tp
return _f1_bin(tp, fp, fn)
def cap_errors(true_acc, estim_acc):
true_acc = np.asarray(true_acc)
estim_acc = np.asarray(estim_acc)
#return (true_acc - estim_acc)**2
return np.abs(true_acc - estim_acc)
def getpath(basedir, cls_name, acc_name, dataset_name, method_name):
return f"results/{basedir}/{cls_name}/{acc_name}/{dataset_name}/{method_name}.json"
def open_results(basedir, cls_name, acc_name, dataset_name='*', method_name='*'):
results = defaultdict(lambda : {'true_acc':[], 'estim_acc':[]})
if isinstance(method_name, str):
method_name = [method_name]
if isinstance(dataset_name, str):
dataset_name = [dataset_name]
for dataset_, method_ in itertools.product(dataset_name, method_name):
path = getpath(basedir, cls_name, acc_name, dataset_, method_)
for file in glob(path):
#print(file)
method = Path(file).name.replace('.json','')
result = json.load(open(file, 'r'))
results[method]['true_acc'].extend(result['true_acc'])
results[method]['estim_acc'].extend(result['estim_acc'])
return results
def save_json_file(path, data):
os.makedirs(Path(path).parent, exist_ok=True)
with open(path, 'w') as f:
json.dump(data, f)
def save_json_result(path, true_accs, estim_accs, t_train, t_test):
result = {
't_train': t_train,
't_test_ave': t_test,
'true_acc': true_accs,
'estim_acc': estim_accs
}
save_json_file(path, result)
def get_dataset_stats(path, test_prot, L, V):
test_prevs = [Ui.prevalence() for Ui in test_prot()]
shifts = [qp.error.ae(L.prevalence(), Ui_prev) for Ui_prev in test_prevs]
info = {
'n_classes': L.n_classes,
'n_train': len(L),
'n_val': len(V),
'train_prev': L.prevalence().tolist(),
'val_prev': V.prevalence().tolist(),
'test_prevs': [x.tolist() for x in test_prevs],
'shifts': [x.tolist() for x in shifts],
'sample_size': test_prot.sample_size,
'num_samples': test_prot.total()
}
save_json_file(path, info)

View File

@ -0,0 +1,46 @@
from pathlib import Path
import matplotlib.pyplot as plt
from os import makedirs
from os.path import join
from ClassifierAccuracy.util.commons import get_method_names, open_results
def plot_diagonal(basedir, cls_name, measure_name, dataset_name='*'):
methods = get_method_names()
results = open_results(basedir, cls_name, measure_name, dataset_name=dataset_name, method_name=methods)
methods, xs, ys = [], [], []
for method_name in results.keys():
methods.append(method_name)
xs.append(results[method_name]['true_acc'])
ys.append(results[method_name]['estim_acc'])
plotsubdir = 'all' if dataset_name=='*' else dataset_name
save_path = join('plots', basedir, measure_name, plotsubdir, 'diagonal.png')
_plot_diagonal(methods, xs, ys, save_path, measure_name)
def _plot_diagonal(methods_names, true_xs, estim_ys, save_path, measure_name, title=None):
makedirs(Path(save_path).parent, exist_ok=True)
# Create scatter plot
plt.figure(figsize=(10, 10))
plt.xlim(0, 1)
plt.ylim(0, 1)
plt.plot([0, 1], [0, 1], color='black', linestyle='--')
for (method_name, xs, ys) in zip(methods_names, true_xs, estim_ys):
plt.scatter(xs, ys, label=f'{method_name}', alpha=0.5, linewidths=0)
plt.legend()
# Add labels and title
if title is not None:
plt.title(title)
plt.xlabel(f'True {measure_name}')
plt.ylabel(f'Estimated {measure_name}')
# Display the plot
plt.savefig(save_path)
plt.cla()

View File

@ -1,33 +1,79 @@
import quapy as qp
from quapy.data import LabelledCollection
from quapy.method.base import BinaryQuantifier
from quapy.method.base import BinaryQuantifier, BaseQuantifier
from quapy.model_selection import GridSearchQ
from quapy.method.aggregative import AggregativeSoftQuantifier
from quapy.protocol import APP
import numpy as np
from sklearn.linear_model import LogisticRegression
from time import time
# Define a custom quantifier: for this example, we will consider a new quantification algorithm that uses a
# logistic regressor for generating posterior probabilities, and then applies a custom threshold value to the
# posteriors. Since the quantifier internally uses a classifier, it is an aggregative quantifier; and since it
# relies on posterior probabilities, it is a probabilistic-aggregative quantifier. Note also it has an
# internal hyperparameter (let say, alpha) which is the decision threshold. Let's also assume the quantifier
# is binary, for simplicity.
# relies on posterior probabilities, it is a probabilistic-aggregative quantifier (aka AggregativeSoftQuantifier).
# Note also it has an internal hyperparameter (let say, alpha) which is the decision threshold.
#
# Let's also assume the quantifier is binary, for simplicity. Any quantifier (i.e., any subclass of BaseQuantifier)
# is required to implement the "fit" and "quantify" methods. Aggregative quantifiers are special subtypes of base
# quantifiers, i.e., are quantifiers that undertake a classification-phase followed by an aggregation-phase. QuaPy
# already implements most common functionality, and requires the developer to simply implement the "aggregation_fit"
# and the "aggregation" methods.
#
# We are providing two implementations of the same method to illustrate this characteristic of QuaPy. Let us begin
# with the general case, in which we implement a (base) quantifier
class MyQuantifier(BaseQuantifier):
class MyQuantifier(AggregativeSoftQuantifier, BinaryQuantifier):
def __init__(self, classifier, alpha=0.5):
self.alpha = alpha
# aggregative quantifiers have an internal self.classifier attribute
self.classifier = classifier
def fit(self, data: LabelledCollection, fit_classifier=True):
assert fit_classifier, 'this quantifier needs to fit the classifier!'
# in general, we would need to implement the method fit(self, data: LabelledCollection, fit_classifier=True,
# val_split=None); this would amount to:
def fit(self, data: LabelledCollection):
assert data.n_classes==2, \
'this quantifier is only valid for binary problems [abort]'
self.classifier.fit(*data.Xy)
return self
# in general, we would need to implement the method quantify(self, instances) but, since this method is of
# type aggregative, we can simply implement the method aggregate, which has the following interface
# in general, we would need to implement the method quantify(self, instances); this would amount to:
def quantify(self, instances):
assert hasattr(self.classifier, 'predict_proba'), \
'the underlying classifier is not probabilistic! [abort]'
posterior_probabilities = self.classifier.predict_proba(instances)
positive_probabilities = posterior_probabilities[:, 1]
crisp_decisions = positive_probabilities > self.alpha
pos_prev = crisp_decisions.mean()
neg_prev = 1 - pos_prev
return np.asarray([neg_prev, pos_prev])
# Note that the above implementation contains a lot of boilerplate code. Many parts can be omitted since QuaPy
# provides implementations for them. Some of these routines (like, for example, training a classifier and generating
# posterior probabilities) are often carried out in a k-fold cross-validation manner. These, along with many other
# common routines are already provided by highly-optimized routines in QuaPy. Let's see a much better implementation
# of the method, now adhering to the AggregativeSoftQuantifier:
class MyAggregativeSoftQuantifier(AggregativeSoftQuantifier, BinaryQuantifier):
def __init__(self, classifier, alpha=0.5):
# aggregative quantifiers have an internal attribute called self.classifier
self.classifier = classifier
self.alpha = alpha
# since this method is of type aggregative, we can simply implement the method aggregation_fit, which
# assumes the classifier has already been fitted properly and the predictions for the training set required
# to train the aggregation function have been properly generated (i.e., on a validation split, or using a
# k-fold cross validation strategy). What remains ahead is to learn an aggregation function. In our case
# this amounts to doing... nothing, since our method was pretty basic. BinaryQuantifier also add some
# basic functionality for checking binary consistency.
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
pass
# since this method is of type aggregative, we can simply implement the method aggregate (i.e., we should
# only describe what to do with the classifier predictions --which in this case are posterior probabilities
# because we are inheriting from the "Soft" subtype). This comes down to:
def aggregate(self, classif_predictions: np.ndarray):
# the posterior probabilities have already been generated by the quantify method; we only need to
# specify what to do with them
@ -38,31 +84,68 @@ class MyQuantifier(AggregativeSoftQuantifier, BinaryQuantifier):
return np.asarray([neg_prev, pos_prev])
# a small example using these two implementations of our method
if __name__ == '__main__':
qp.environ['SAMPLE_SIZE'] = 100
# define an instance of our custom quantifier
quantifier = MyQuantifier(LogisticRegression(), alpha=0.5)
qp.environ['SAMPLE_SIZE'] = 250
# load the IMDb dataset
train, test = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=5).train_test
train, val = train.split_stratified(train_prop=0.75) # let's create a validation set for optimizing hyperparams
# model selection
# let us assume we want to explore our hyperparameter alpha along with one hyperparameter of the classifier
train, val = train.split_stratified(train_prop=0.75)
param_grid = {
'alpha': np.linspace(0, 1, 11), # quantifier-dependent hyperparameter
'classifier__C': np.logspace(-2, 2, 5) # classifier-dependent hyperparameter
}
quantifier = GridSearchQ(quantifier, param_grid, protocol=APP(val), n_jobs=-1, verbose=True).fit(train)
def test_implementation(quantifier):
class_name = quantifier.__class__.__name__
print(f'\ntesting implementation {class_name}...')
# model selection
# let us assume we want to explore our hyperparameter alpha along with one hyperparameter of the classifier
tinit = time()
param_grid = {
'alpha': np.linspace(0, 1, 11), # quantifier-dependent hyperparameter
'classifier__C': np.logspace(-2, 2, 5) # classifier-dependent hyperparameter
}
gridsearch = GridSearchQ(quantifier, param_grid, protocol=APP(val), n_jobs=-1, verbose=False).fit(train)
t_modsel = time() - tinit
print(f'\tmodel selection took {t_modsel:.2f}s', flush=True)
# evaluation
mae = qp.evaluation.evaluate(quantifier, protocol=APP(test), error_metric='mae')
# evaluation
optimized_model = gridsearch.best_model_
mae = qp.evaluation.evaluate(
optimized_model,
protocol=APP(test, repeats=5000, sanity_check=None), # disable the check, we want to generate many tests!
error_metric='mae',
verbose=True)
print(f'MAE = {mae:.4f}')
t_eval = time() - t_modsel - tinit
print(f'\tevaluation took {t_eval:.2f}s [MAE = {mae:.4f}]')
# final remarks: this method is only for demonstration purposes and makes little sense in general. The method relies
# define an instance of our custom quantifier and test it!
quantifier = MyQuantifier(LogisticRegression(), alpha=0.5)
test_implementation(quantifier)
# define an instance of our custom quantifier, with the second implementation, and test it!
quantifier = MyAggregativeSoftQuantifier(LogisticRegression(), alpha=0.5)
test_implementation(quantifier)
# the output should look like this:
"""
testing implementation MyQuantifier...
model selection took 12.86s
predicting: 100%|| 105000/105000 [00:22<00:00, 4626.30it/s]
evaluation took 22.75s [MAE = 0.0630]
testing implementation MyAggregativeSoftQuantifier...
model selection took 3.10s
speeding up the prediction for the aggregative quantifier, total classifications 25000 instead of 26250000
predicting: 100%|| 105000/105000 [00:04<00:00, 22779.62it/s]
evaluation took 4.66s [MAE = 0.0630]
"""
# Note that the first implementation is much slower, both in terms of grid-search optimization and in terms of
# evaluation. The reason why is that QuaPy is highly optimized for aggregative quantifiers (by far, the most
# popular type of quantification methods), thus significantly speeding up model selection and test routines.
# Furthermore, it is simpler to extend an aggregation type since QuaPy implements boilerplate functions for you.
# Final remarks: this method is only for demonstration purposes and makes little sense in general. The method relies
# on an hyperparameter alpha for binarizing the posterior probabilities. A much better way for fulfilling this
# goal would be to calibrate the classifier (LogisticRegression is already reasonably well calibrated) and then
# simply cut at 0.5.

View File

@ -11,7 +11,7 @@ from . import util
from . import model_selection
from . import classification
__version__ = '0.1.8'
__version__ = '0.1.9'
environ = {
'SAMPLE_SIZE': None,

View File

@ -108,8 +108,7 @@ class LabelledCollection:
"""
Returns an index to be used to extract a random sample of desired size and desired prevalence values. If the
prevalence values are not specified, then returns the index of a uniform sampling.
For each class, the sampling is drawn with replacement if the requested prevalence is larger than
the actual prevalence of the class, or without replacement otherwise.
For each class, the sampling is drawn with replacement.
:param size: integer, the requested size
:param prevs: the prevalence for each class; the prevalence value for the last class can be lead empty since
@ -124,7 +123,7 @@ class LabelledCollection:
if len(prevs) == self.n_classes - 1:
prevs = prevs + (1 - sum(prevs),)
assert len(prevs) == self.n_classes, 'unexpected number of prevalences'
assert sum(prevs) == 1, f'prevalences ({prevs}) wrong range (sum={sum(prevs)})'
assert np.isclose(sum(prevs), 1), f'prevalences ({prevs}) wrong range (sum={sum(prevs)})'
# Decide how many instances should be taken for each class in order to satisfy the requested prevalence
# accurately, and the number of instances in the sample (exactly). If int(size * prevs[i]) (which is
@ -152,8 +151,10 @@ class LabelledCollection:
indexes_sample = []
for class_, n_requested in n_requests.items():
n_candidates = len(self.index[class_])
#print(n_candidates)
#print(n_requested, 'rq')
index_sample = self.index[class_][
np.random.choice(n_candidates, size=n_requested, replace=(n_requested > n_candidates))
np.random.choice(n_candidates, size=n_requested, replace=True)
] if n_requested > 0 else []
indexes_sample.append(index_sample)
@ -168,8 +169,7 @@ class LabelledCollection:
def uniform_sampling_index(self, size, random_state=None):
"""
Returns an index to be used to extract a uniform sample of desired size. The sampling is drawn
with replacement if the requested size is greater than the number of instances, or without replacement
otherwise.
with replacement.
:param size: integer, the size of the uniform sample
:param random_state: if specified, guarantees reproducibility of the split.
@ -179,13 +179,12 @@ class LabelledCollection:
ng = RandomState(seed=random_state)
else:
ng = np.random
return ng.choice(len(self), size, replace=size > len(self))
return ng.choice(len(self), size, replace=True)
def sampling(self, size, *prevs, shuffle=True, random_state=None):
"""
Return a random sample (an instance of :class:`LabelledCollection`) of desired size and desired prevalence
values. For each class, the sampling is drawn without replacement if the requested prevalence is larger than
the actual prevalence of the class, or with replacement otherwise.
values. For each class, the sampling is drawn with replacement.
:param size: integer, the requested size
:param prevs: the prevalence for each class; the prevalence value for the last class can be lead empty since
@ -202,8 +201,7 @@ class LabelledCollection:
def uniform_sampling(self, size, random_state=None):
"""
Returns a uniform sample (an instance of :class:`LabelledCollection`) of desired size. The sampling is drawn
with replacement if the requested size is greater than the number of instances, or without replacement
otherwise.
with replacement.
:param size: integer, the requested size
:param random_state: if specified, guarantees reproducibility of the split.
@ -236,11 +234,24 @@ class LabelledCollection:
:return: two instances of :class:`LabelledCollection`, the first one with `train_prop` elements, and the
second one with `1-train_prop` elements
"""
instances = self.instances
labels = self.labels
remainder = None
for idx in np.argwhere(self.counts()==1):
class_with_1 = self.classes_[idx.item()]
if remainder is None:
remainder = LabelledCollection(instances[labels==class_with_1], [class_with_1], classes=self.classes_)
else:
remainder += LabelledCollection(instances[labels==class_with_1], [class_with_1], classes=self.classes_)
instances = instances[labels!=class_with_1]
labels = labels[labels!=class_with_1]
tr_docs, te_docs, tr_labels, te_labels = train_test_split(
self.instances, self.labels, train_size=train_prop, stratify=self.labels, random_state=random_state
instances, labels, train_size=train_prop, stratify=labels, random_state=random_state
)
training = LabelledCollection(tr_docs, tr_labels, classes=self.classes_)
test = LabelledCollection(te_docs, te_labels, classes=self.classes_)
if remainder is not None:
training += remainder
return training, test
def split_random(self, train_prop=0.6, random_state=None):
@ -418,6 +429,47 @@ class LabelledCollection:
test = self.sampling_from_index(test_index)
yield train, test
def empty_classes(self):
"""
Returns a np.ndarray of empty classes (classes present in self.classes_ but with
no positive instance). In case there is none, then an empty np.ndarray is returned
:return: np.ndarray
"""
idx = np.argwhere(self.counts()==0).flatten()
return self.classes_[idx]
def non_empty_classes(self):
"""
Returns a np.ndarray of non-empty classes (classes present in self.classes_ but with
at least one positive instance). In case there is none, then an empty np.ndarray is returned
:return: np.ndarray
"""
idx = np.argwhere(self.counts() > 0).flatten()
return self.classes_[idx]
def has_empty_classes(self):
"""
Checks whether the collection has empty classes
:return: boolean
"""
return len(self.empty_classes()) > 0
def compact_classes(self):
"""
Generates a new LabelledCollection object with no empty classes. It also returns a np.ndarray of
indexes that correspond to the old indexes of the new self.classes_.
:return: (LabelledCollection, np.ndarray,)
"""
non_empty = self.non_empty_classes()
all_classes = self.classes_
old_pos = np.searchsorted(all_classes, non_empty)
non_empty_collection = LabelledCollection(*self.Xy, classes=non_empty)
return non_empty_collection, old_pos
class Dataset:
"""

View File

@ -20,8 +20,11 @@ TWITTER_SENTIMENT_DATASETS_TEST = ['gasp', 'hcr', 'omd', 'sanders',
TWITTER_SENTIMENT_DATASETS_TRAIN = ['gasp', 'hcr', 'omd', 'sanders',
'semeval', 'semeval16',
'sst', 'wa', 'wb']
UCI_BINARY_DATASETS = ['acute.a', 'acute.b',
'balance.1', 'balance.2', 'balance.3',
UCI_BINARY_DATASETS = [
#'acute.a', 'acute.b',
'balance.1',
#'balance.2',
'balance.3',
'breast-cancer',
'cmc.1', 'cmc.2', 'cmc.3',
'ctg.1', 'ctg.2', 'ctg.3',

View File

@ -362,4 +362,17 @@ def linear_search(loss, n_classes):
if min_score is None or score < min_score:
prev_selected, min_score = prev, score
return np.asarray([1 - prev_selected, prev_selected])
return np.asarray([1 - prev_selected, prev_selected])
def smooth(prevalences, epsilon=1E-5):
"""
Smooths a prevalence vector.
:param prevalences: np.ndarray
:param epsilon: float, a small quantity (default 1E-5)
:return: smoothed prevalence vector
"""
prevalences = prevalences+epsilon
prevalences /= prevalences.sum()
return prevalences

View File

@ -52,7 +52,7 @@ class KDEBase:
"""
return np.exp(kde.score_samples(X))
def get_mixture_components(self, X, y, n_classes, bandwidth):
def get_mixture_components(self, X, y, classes, bandwidth):
"""
Returns an array containing the mixture components, i.e., the KDE functions for each class.
@ -62,7 +62,7 @@ class KDEBase:
:param bandwidth: float, the bandwidth of the kernel
:return: a list of KernelDensity objects, each fitted with the corresponding class-specific covariates
"""
return [self.get_kde_function(X[y == cat], bandwidth) for cat in range(n_classes)]
return [self.get_kde_function(X[y == cat], bandwidth) for cat in classes]
@ -114,7 +114,7 @@ class KDEyML(AggregativeSoftQuantifier, KDEBase):
self.random_state=random_state
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
self.mix_densities = self.get_mixture_components(*classif_predictions.Xy, data.n_classes, self.bandwidth)
self.mix_densities = self.get_mixture_components(*classif_predictions.Xy, data.classes_, self.bandwidth)
return self
def aggregate(self, posteriors: np.ndarray):
@ -196,7 +196,7 @@ class KDEyHD(AggregativeSoftQuantifier, KDEBase):
self.montecarlo_trials = montecarlo_trials
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
self.mix_densities = self.get_mixture_components(*classif_predictions.Xy, data.n_classes, self.bandwidth)
self.mix_densities = self.get_mixture_components(*classif_predictions.Xy, data.classes_, self.bandwidth)
N = self.montecarlo_trials
rs = self.random_state

View File

@ -640,6 +640,8 @@ class EMQ(AggregativeSoftQuantifier):
raise ValueError('invalid param argument for recalibration method; available ones are '
'"nbvs", "bcts", "ts", and "vs".')
if not np.issubdtype(y.dtype, np.number):
y = np.searchsorted(data.classes_, y)
self.calibration_function = calibrator(P, np.eye(data.n_classes)[y], posterior_supplied=True)
if self.exact_train_prev:
@ -681,6 +683,11 @@ class EMQ(AggregativeSoftQuantifier):
"""
Px = posterior_probabilities
Ptr = np.copy(tr_prev)
if np.product(Ptr) == 0: # some entry is 0; we should smooth the values to avoid 0 division
Ptr += epsilon
Ptr /= Ptr.sum()
qs = np.copy(Ptr) # qs (the running estimate) is initialized as the training prevalence
s, converged = 0, False

View File

@ -1,5 +1,6 @@
from typing import Union, Callable
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
from quapy.functional import get_divergence
from quapy.data import LabelledCollection
@ -146,6 +147,53 @@ class DMx(BaseQuantifier):
return F.argmin_prevalence(loss, n_classes, method=self.search)
class ReadMe(BaseQuantifier):
def __init__(self, bootstrap_trials=100, bootstrap_range=100, bagging_trials=100, bagging_range=25, **vectorizer_kwargs):
self.bootstrap_trials = bootstrap_trials
self.bootstrap_range = bootstrap_range
self.bagging_trials = bagging_trials
self.bagging_range = bagging_range
self.vectorizer_kwargs = vectorizer_kwargs
def fit(self, data: LabelledCollection):
X, y = data.Xy
self.vectorizer = CountVectorizer(binary=True, **self.vectorizer_kwargs)
X = self.vectorizer.fit_transform(X)
self.class_conditional_X = {i: X[y==i] for i in range(data.classes_)}
def quantify(self, instances):
X = self.vectorizer.transform(instances)
# number of features
num_docs, num_feats = X.shape
# bootstrap
p_boots = []
for _ in range(self.bootstrap_trials):
docs_idx = np.random.choice(num_docs, size=self.bootstra_range, replace=False)
class_conditional_X = {i: X[docs_idx] for i, X in self.class_conditional_X.items()}
Xboot = X[docs_idx]
# bagging
p_bags = []
for _ in range(self.bagging_trials):
feat_idx = np.random.choice(num_feats, size=self.bagging_range, replace=False)
class_conditional_Xbag = {i: X[:, feat_idx] for i, X in class_conditional_X.items()}
Xbag = Xboot[:,feat_idx]
p = self.std_constrained_linear_ls(Xbag, class_conditional_Xbag)
p_bags.append(p)
p_boots.append(np.mean(p_bags, axis=0))
p_mean = np.mean(p_boots, axis=0)
p_std = np.std(p_bags, axis=0)
return p_mean
def std_constrained_linear_ls(self, X, class_cond_X: dict):
pass
def _get_features_range(X):
feat_ranges = []

View File

@ -211,8 +211,9 @@ class GridSearchQ(BaseQuantifier):
self._sout(f'error={status}')
def fit(self, training: LabelledCollection):
""" Learning routine. Fits methods with all combinations of hyperparameters and selects the one minimizing
the error metric.
"""
Learning routine. Fits methods with all combinations of hyperparameters and selects the one minimizing
the error metric.
:param training: the training set on which to optimize the hyperparameters
:return: self

View File

@ -56,6 +56,7 @@ def parallel(func, args, n_jobs, seed=None, asarray=True, backend='loky'):
:param seed: the numeric seed
:param asarray: set to True to return a np.ndarray instead of a list
:param backend: indicates the backend used for handling parallel works
:param open_args: if True, then the delayed function is called on *args_i, instead of on args_i
"""
def func_dec(environ, seed, *args):
qp.environ = environ.copy()
@ -74,6 +75,40 @@ def parallel(func, args, n_jobs, seed=None, asarray=True, backend='loky'):
return out
def parallel_unpack(func, args, n_jobs, seed=None, asarray=True, backend='loky'):
"""
A wrapper of multiprocessing:
>>> Parallel(n_jobs=n_jobs)(
>>> delayed(func)(*args_i) for args_i in args
>>> )
that takes the `quapy.environ` variable as input silently.
Seeds the child processes to ensure reproducibility when n_jobs>1.
:param func: callable
:param args: args of func
:param seed: the numeric seed
:param asarray: set to True to return a np.ndarray instead of a list
:param backend: indicates the backend used for handling parallel works
"""
def func_dec(environ, seed, *args):
qp.environ = environ.copy()
qp.environ['N_JOBS'] = 1
# set a context with a temporal seed to ensure results are reproducibles in parallel
with ExitStack() as stack:
if seed is not None:
stack.enter_context(qp.util.temp_seed(seed))
return func(*args)
out = Parallel(n_jobs=n_jobs, backend=backend)(
delayed(func_dec)(qp.environ, None if seed is None else seed + i, *args_i) for i, args_i in enumerate(args)
)
if asarray:
out = np.asarray(out)
return out
@contextlib.contextmanager
def temp_seed(random_state):
"""

1
result_table Submodule

@ -0,0 +1 @@
Subproject commit 2e0e3d7fc0464f9c9b50ace3c7785dd8d97710a6