CalibrationViaQuantification/main.py

180 lines
6.7 KiB
Python
Raw Normal View History

2024-12-17 10:10:09 +01:00
import pickle
from collections import defaultdict
from crypt import methods
import warnings
import quapy as qp
import numpy as np
from numpy.ma.core import shape
from quapy.method.aggregative import ExpectationMaximizationQuantifier, DistributionMatchingY, KDEyML
from quapy.protocol import UPP
from sklearn.calibration import CalibratedClassifierCV
from sklearn.exceptions import ConvergenceWarning
from sklearn.linear_model import LogisticRegression
import quapy.functional as F
warnings.filterwarnings('ignore', category=ConvergenceWarning)
def calibration_error(y, posteriors, nbins=10, isometric=True):
if not isometric:
raise NotImplementedError('only isometric=True is supported at the moment')
nclasses = posteriors.shape[1]
if nclasses==2:
mean_error = _calibration_binary_error(y, posteriors[:, 1], nbins=nbins, isometric=isometric)
else:
errors = []
for class_i in range(nclasses):
binary_label = (y==class_i).astype(int)
class_err = _calibration_binary_error(binary_label, posteriors[:, class_i], nbins=nbins, isometric=isometric)
errors.append(class_err)
mean_error = np.mean(errors)
return mean_error
def _calibration_binary_error(y, class_posteriors, nbins=10, isometric=True):
if not isometric:
raise NotImplementedError('only isometric=True is supperted at the moment')
bins = np.linspace(0., 1., nbins+1)
bin_indices = np.digitize(class_posteriors, bins) - 1
unique_bins = np.unique(bin_indices)
err = 0.
for bin_idx in unique_bins:
sel = (bin_indices==bin_idx)
sel_count = sum(sel)
y_bin = y[sel]
post_bin = class_posteriors[sel]
expected_positives = np.mean(post_bin)
true_positives = np.mean(y_bin)
err += sel_count * (expected_positives-true_positives)**2
err /= len(y)
return err
# dataset = 'spambase' # qp.datasets.UCI_BINARY_DATASETS[6]
# data = qp.datasets.fetch_UCIBinaryDataset(dataset)
dataset = qp.datasets.UCI_MULTICLASS_DATASETS[5]
print('loading', dataset)
data = qp.datasets.fetch_UCIMulticlassDataset(dataset)
train, test = data.train_test
NBINS = 10
EPSILON = 1e-8
SAMPLE_SIZE = 100
qp.environ['SAMPLE_SIZE']=SAMPLE_SIZE
print(f'test prevalence (orig): {F.strprev(test.prevalence())}')
drift = []
results = defaultdict(lambda :[])
with qp.util.temp_seed(0):
lr = LogisticRegression()
lr.fit(*train.Xy)
lr_kfcv = CalibratedClassifierCV(LogisticRegression())
lr_kfcv.fit(*train.Xy)
emq = ExpectationMaximizationQuantifier(LogisticRegression())
emq.fit(train)
#
# dm = DistributionMatchingY(nbins=nbins)
# dm.fit(train)
#
kdey = KDEyML(LogisticRegression(), bandwidth=0.01)
# kdey.fit(train)
devel, val = train.split_stratified(0.6, random_state=0)
bandwidths = np.linspace(0.001, 0.2, 40)
kdey = qp.model_selection.GridSearchQ(
kdey, param_grid={'bandwidth': bandwidths}, protocol=UPP(val),
refit=True, n_jobs=-1, verbose=False
).fit(train)
print('best params', kdey.best_params_)
kdey = kdey.best_model_
# kdes = []
# for bandwidth in np.linspace(0.01, 0.2, 5):
# kdei = KDEyML(bandwidth=bandwidth)
# kdei.fit(train)
# kdes.append(kdei)
prot = qp.protocol.UPP(test, repeats=100, random_state=0, return_type='labelled_collection')
for test_i in prot():
print(f'test prevalence (shifted): {F.strprev(test_i.prevalence())}')
drift_i = qp.error.ae(test_i.prevalence(), train.prevalence())
print(f'drift={drift_i:.4f}')
drift.append(drift_i)
#true labels
y = test_i.y
# uncalibrated LR
posteriors = lr.predict_proba(test_i.X)
err = calibration_error(y, posteriors, nbins=NBINS)
results['lr'].append(err)
print(f'LR {err=:.5f}')
# calibrated LR (assuming iid)
posteriors = lr_kfcv.predict_proba(test_i.X)
err = calibration_error(y, posteriors, nbins=NBINS)
results['lr-kfcv'].append(err)
print(f'kFCV-LR {err=:.5f}')
posteriors = emq.predict_proba(test_i.X)
err = calibration_error(y, posteriors, nbins=NBINS)
results['emq'].append(err)
print(f'EMQ-LR {err=:.5f}')
# estim_prev = dm.quantify(test_i.X)
# # estim_prev = np.expand_dims(estim_prev, axis=-1)
# hist_neg, hist_pos = dm.validation_distribution
# # because the histograms were computed wrt the posterior of the first class (the negative one!), we invert the order
# # which is equivalent to computing the histogram wrt the positive class
# hist_neg = hist_neg.flatten()[::-1]
# hist_pos = hist_pos.flatten()[::-1]
# hist_neg = hist_neg * estim_prev[0] + eps
# hist_pos = hist_pos * estim_prev[1] + eps
# corrected_posteriors_bins = hist_pos / (hist_neg + hist_pos)
# # corrected_posteriors_bins = 1 - corrected_posteriors_bins # because the histograms were computed wrt the posterior of the first class (the negative one!)
# corrected_posteriors_bins = np.concatenate(([0.], corrected_posteriors_bins, [1.]))
# x_coords = np.concatenate(([0.], (np.linspace(0., 1., nbins+1)[:-1]+0.5/nbins), [1.])) # this assumes binning=isometric
# uncalibrated_posteriors_pos = dm.classifier.predict_proba(test_i.X)[:,1]
# posteriors = np.interp(uncalibrated_posteriors_pos, x_coords, corrected_posteriors_bins)
# posteriors = np.asarray([1-posteriors, posteriors]).T
# err = calibration_binary_error(y, posteriors, nbins=nbins)
# results['dm'].append(err)
# print(f'DM-LR {err=:.5f}')
estim_prev = kdey.quantify(test_i.X)
class_densities = kdey.mix_densities
uncalibrated_posteriors = kdey.classifier.predict_proba(test_i.X)
test_densities = np.asarray([kdey.pdf(kde_i, posteriors)*prior_i for kde_i, prior_i in zip(class_densities, estim_prev)]).T
test_densities += EPSILON
posteriors = test_densities / test_densities.sum(axis=1, keepdims=True)
err = calibration_error(y, posteriors, nbins=NBINS)
results[f'kde'].append(err)
print(f'KDEy-LR {err=:.5f}')
print()
with open('./results.pkl', 'wb') as foo:
pickle.dump((drift,dict(results)), foo, pickle.HIGHEST_PROTOCOL)
for method, errors in results.items():
print(f'{method=} got {np.mean(errors):.5f}')
all_methods = list(results.keys())
all_results = []
for method in all_methods:
all_results.append(results[method])
all_results = np.asarray(all_results)
print()
from scipy.stats import rankdata
ranks = np.apply_along_axis(rankdata, axis=0, arr=all_results, method='ordinal')
for method, ranks_i in zip(all_methods, ranks):
print(f'{method=} got rank {np.mean(ranks_i):.5f}')