180 lines
6.7 KiB
Python
180 lines
6.7 KiB
Python
import pickle
|
|
from collections import defaultdict
|
|
from crypt import methods
|
|
import warnings
|
|
import quapy as qp
|
|
import numpy as np
|
|
from numpy.ma.core import shape
|
|
from quapy.method.aggregative import ExpectationMaximizationQuantifier, DistributionMatchingY, KDEyML
|
|
from quapy.protocol import UPP
|
|
from sklearn.calibration import CalibratedClassifierCV
|
|
from sklearn.exceptions import ConvergenceWarning
|
|
from sklearn.linear_model import LogisticRegression
|
|
import quapy.functional as F
|
|
|
|
warnings.filterwarnings('ignore', category=ConvergenceWarning)
|
|
|
|
|
|
def calibration_error(y, posteriors, nbins=10, isometric=True):
|
|
if not isometric:
|
|
raise NotImplementedError('only isometric=True is supported at the moment')
|
|
|
|
nclasses = posteriors.shape[1]
|
|
if nclasses==2:
|
|
mean_error = _calibration_binary_error(y, posteriors[:, 1], nbins=nbins, isometric=isometric)
|
|
else:
|
|
errors = []
|
|
for class_i in range(nclasses):
|
|
binary_label = (y==class_i).astype(int)
|
|
class_err = _calibration_binary_error(binary_label, posteriors[:, class_i], nbins=nbins, isometric=isometric)
|
|
errors.append(class_err)
|
|
mean_error = np.mean(errors)
|
|
return mean_error
|
|
|
|
|
|
def _calibration_binary_error(y, class_posteriors, nbins=10, isometric=True):
|
|
if not isometric:
|
|
raise NotImplementedError('only isometric=True is supperted at the moment')
|
|
|
|
bins = np.linspace(0., 1., nbins+1)
|
|
bin_indices = np.digitize(class_posteriors, bins) - 1
|
|
unique_bins = np.unique(bin_indices)
|
|
err = 0.
|
|
for bin_idx in unique_bins:
|
|
sel = (bin_indices==bin_idx)
|
|
sel_count = sum(sel)
|
|
y_bin = y[sel]
|
|
post_bin = class_posteriors[sel]
|
|
expected_positives = np.mean(post_bin)
|
|
true_positives = np.mean(y_bin)
|
|
err += sel_count * (expected_positives-true_positives)**2
|
|
err /= len(y)
|
|
return err
|
|
|
|
|
|
|
|
# dataset = 'spambase' # qp.datasets.UCI_BINARY_DATASETS[6]
|
|
# data = qp.datasets.fetch_UCIBinaryDataset(dataset)
|
|
dataset = qp.datasets.UCI_MULTICLASS_DATASETS[5]
|
|
print('loading', dataset)
|
|
data = qp.datasets.fetch_UCIMulticlassDataset(dataset)
|
|
train, test = data.train_test
|
|
NBINS = 10
|
|
EPSILON = 1e-8
|
|
SAMPLE_SIZE = 100
|
|
qp.environ['SAMPLE_SIZE']=SAMPLE_SIZE
|
|
|
|
print(f'test prevalence (orig): {F.strprev(test.prevalence())}')
|
|
|
|
drift = []
|
|
results = defaultdict(lambda :[])
|
|
|
|
with qp.util.temp_seed(0):
|
|
lr = LogisticRegression()
|
|
lr.fit(*train.Xy)
|
|
|
|
lr_kfcv = CalibratedClassifierCV(LogisticRegression())
|
|
lr_kfcv.fit(*train.Xy)
|
|
|
|
emq = ExpectationMaximizationQuantifier(LogisticRegression())
|
|
emq.fit(train)
|
|
#
|
|
# dm = DistributionMatchingY(nbins=nbins)
|
|
# dm.fit(train)
|
|
#
|
|
kdey = KDEyML(LogisticRegression(), bandwidth=0.01)
|
|
# kdey.fit(train)
|
|
devel, val = train.split_stratified(0.6, random_state=0)
|
|
bandwidths = np.linspace(0.001, 0.2, 40)
|
|
kdey = qp.model_selection.GridSearchQ(
|
|
kdey, param_grid={'bandwidth': bandwidths}, protocol=UPP(val),
|
|
refit=True, n_jobs=-1, verbose=False
|
|
).fit(train)
|
|
print('best params', kdey.best_params_)
|
|
kdey = kdey.best_model_
|
|
|
|
|
|
# kdes = []
|
|
# for bandwidth in np.linspace(0.01, 0.2, 5):
|
|
# kdei = KDEyML(bandwidth=bandwidth)
|
|
# kdei.fit(train)
|
|
# kdes.append(kdei)
|
|
|
|
|
|
prot = qp.protocol.UPP(test, repeats=100, random_state=0, return_type='labelled_collection')
|
|
for test_i in prot():
|
|
print(f'test prevalence (shifted): {F.strprev(test_i.prevalence())}')
|
|
|
|
drift_i = qp.error.ae(test_i.prevalence(), train.prevalence())
|
|
print(f'drift={drift_i:.4f}')
|
|
drift.append(drift_i)
|
|
|
|
#true labels
|
|
y = test_i.y
|
|
|
|
# uncalibrated LR
|
|
posteriors = lr.predict_proba(test_i.X)
|
|
err = calibration_error(y, posteriors, nbins=NBINS)
|
|
results['lr'].append(err)
|
|
print(f'LR {err=:.5f}')
|
|
|
|
# calibrated LR (assuming iid)
|
|
posteriors = lr_kfcv.predict_proba(test_i.X)
|
|
err = calibration_error(y, posteriors, nbins=NBINS)
|
|
results['lr-kfcv'].append(err)
|
|
print(f'kFCV-LR {err=:.5f}')
|
|
|
|
posteriors = emq.predict_proba(test_i.X)
|
|
err = calibration_error(y, posteriors, nbins=NBINS)
|
|
results['emq'].append(err)
|
|
print(f'EMQ-LR {err=:.5f}')
|
|
|
|
# estim_prev = dm.quantify(test_i.X)
|
|
# # estim_prev = np.expand_dims(estim_prev, axis=-1)
|
|
# hist_neg, hist_pos = dm.validation_distribution
|
|
# # because the histograms were computed wrt the posterior of the first class (the negative one!), we invert the order
|
|
# # which is equivalent to computing the histogram wrt the positive class
|
|
# hist_neg = hist_neg.flatten()[::-1]
|
|
# hist_pos = hist_pos.flatten()[::-1]
|
|
# hist_neg = hist_neg * estim_prev[0] + eps
|
|
# hist_pos = hist_pos * estim_prev[1] + eps
|
|
# corrected_posteriors_bins = hist_pos / (hist_neg + hist_pos)
|
|
# # corrected_posteriors_bins = 1 - corrected_posteriors_bins # because the histograms were computed wrt the posterior of the first class (the negative one!)
|
|
# corrected_posteriors_bins = np.concatenate(([0.], corrected_posteriors_bins, [1.]))
|
|
# x_coords = np.concatenate(([0.], (np.linspace(0., 1., nbins+1)[:-1]+0.5/nbins), [1.])) # this assumes binning=isometric
|
|
# uncalibrated_posteriors_pos = dm.classifier.predict_proba(test_i.X)[:,1]
|
|
# posteriors = np.interp(uncalibrated_posteriors_pos, x_coords, corrected_posteriors_bins)
|
|
# posteriors = np.asarray([1-posteriors, posteriors]).T
|
|
# err = calibration_binary_error(y, posteriors, nbins=nbins)
|
|
# results['dm'].append(err)
|
|
# print(f'DM-LR {err=:.5f}')
|
|
|
|
estim_prev = kdey.quantify(test_i.X)
|
|
class_densities = kdey.mix_densities
|
|
uncalibrated_posteriors = kdey.classifier.predict_proba(test_i.X)
|
|
test_densities = np.asarray([kdey.pdf(kde_i, posteriors)*prior_i for kde_i, prior_i in zip(class_densities, estim_prev)]).T
|
|
test_densities += EPSILON
|
|
posteriors = test_densities / test_densities.sum(axis=1, keepdims=True)
|
|
err = calibration_error(y, posteriors, nbins=NBINS)
|
|
results[f'kde'].append(err)
|
|
print(f'KDEy-LR {err=:.5f}')
|
|
|
|
print()
|
|
|
|
with open('./results.pkl', 'wb') as foo:
|
|
pickle.dump((drift,dict(results)), foo, pickle.HIGHEST_PROTOCOL)
|
|
|
|
for method, errors in results.items():
|
|
print(f'{method=} got {np.mean(errors):.5f}')
|
|
|
|
all_methods = list(results.keys())
|
|
all_results = []
|
|
for method in all_methods:
|
|
all_results.append(results[method])
|
|
all_results = np.asarray(all_results)
|
|
|
|
print()
|
|
from scipy.stats import rankdata
|
|
ranks = np.apply_along_axis(rankdata, axis=0, arr=all_results, method='ordinal')
|
|
for method, ranks_i in zip(all_methods, ranks):
|
|
print(f'{method=} got rank {np.mean(ranks_i):.5f}') |