import pickle from collections import defaultdict from crypt import methods import warnings import quapy as qp import numpy as np from numpy.ma.core import shape from quapy.method.aggregative import ExpectationMaximizationQuantifier, DistributionMatchingY, KDEyML from quapy.protocol import UPP from sklearn.calibration import CalibratedClassifierCV from sklearn.exceptions import ConvergenceWarning from sklearn.linear_model import LogisticRegression import quapy.functional as F warnings.filterwarnings('ignore', category=ConvergenceWarning) def calibration_error(y, posteriors, nbins=10, isometric=True): if not isometric: raise NotImplementedError('only isometric=True is supported at the moment') nclasses = posteriors.shape[1] if nclasses==2: mean_error = _calibration_binary_error(y, posteriors[:, 1], nbins=nbins, isometric=isometric) else: errors = [] for class_i in range(nclasses): binary_label = (y==class_i).astype(int) class_err = _calibration_binary_error(binary_label, posteriors[:, class_i], nbins=nbins, isometric=isometric) errors.append(class_err) mean_error = np.mean(errors) return mean_error def _calibration_binary_error(y, class_posteriors, nbins=10, isometric=True): if not isometric: raise NotImplementedError('only isometric=True is supperted at the moment') bins = np.linspace(0., 1., nbins+1) bin_indices = np.digitize(class_posteriors, bins) - 1 unique_bins = np.unique(bin_indices) err = 0. for bin_idx in unique_bins: sel = (bin_indices==bin_idx) sel_count = sum(sel) y_bin = y[sel] post_bin = class_posteriors[sel] expected_positives = np.mean(post_bin) true_positives = np.mean(y_bin) err += sel_count * (expected_positives-true_positives)**2 err /= len(y) return err # dataset = 'spambase' # qp.datasets.UCI_BINARY_DATASETS[6] # data = qp.datasets.fetch_UCIBinaryDataset(dataset) dataset = qp.datasets.UCI_MULTICLASS_DATASETS[5] print('loading', dataset) data = qp.datasets.fetch_UCIMulticlassDataset(dataset) train, test = data.train_test NBINS = 10 EPSILON = 1e-8 SAMPLE_SIZE = 100 qp.environ['SAMPLE_SIZE']=SAMPLE_SIZE print(f'test prevalence (orig): {F.strprev(test.prevalence())}') drift = [] results = defaultdict(lambda :[]) with qp.util.temp_seed(0): lr = LogisticRegression() lr.fit(*train.Xy) lr_kfcv = CalibratedClassifierCV(LogisticRegression()) lr_kfcv.fit(*train.Xy) emq = ExpectationMaximizationQuantifier(LogisticRegression()) emq.fit(train) # # dm = DistributionMatchingY(nbins=nbins) # dm.fit(train) # kdey = KDEyML(LogisticRegression(), bandwidth=0.01) # kdey.fit(train) devel, val = train.split_stratified(0.6, random_state=0) bandwidths = np.linspace(0.001, 0.2, 40) kdey = qp.model_selection.GridSearchQ( kdey, param_grid={'bandwidth': bandwidths}, protocol=UPP(val), refit=True, n_jobs=-1, verbose=False ).fit(train) print('best params', kdey.best_params_) kdey = kdey.best_model_ # kdes = [] # for bandwidth in np.linspace(0.01, 0.2, 5): # kdei = KDEyML(bandwidth=bandwidth) # kdei.fit(train) # kdes.append(kdei) prot = qp.protocol.UPP(test, repeats=100, random_state=0, return_type='labelled_collection') for test_i in prot(): print(f'test prevalence (shifted): {F.strprev(test_i.prevalence())}') drift_i = qp.error.ae(test_i.prevalence(), train.prevalence()) print(f'drift={drift_i:.4f}') drift.append(drift_i) #true labels y = test_i.y # uncalibrated LR posteriors = lr.predict_proba(test_i.X) err = calibration_error(y, posteriors, nbins=NBINS) results['lr'].append(err) print(f'LR {err=:.5f}') # calibrated LR (assuming iid) posteriors = lr_kfcv.predict_proba(test_i.X) err = calibration_error(y, posteriors, nbins=NBINS) results['lr-kfcv'].append(err) print(f'kFCV-LR {err=:.5f}') posteriors = emq.predict_proba(test_i.X) err = calibration_error(y, posteriors, nbins=NBINS) results['emq'].append(err) print(f'EMQ-LR {err=:.5f}') # estim_prev = dm.quantify(test_i.X) # # estim_prev = np.expand_dims(estim_prev, axis=-1) # hist_neg, hist_pos = dm.validation_distribution # # because the histograms were computed wrt the posterior of the first class (the negative one!), we invert the order # # which is equivalent to computing the histogram wrt the positive class # hist_neg = hist_neg.flatten()[::-1] # hist_pos = hist_pos.flatten()[::-1] # hist_neg = hist_neg * estim_prev[0] + eps # hist_pos = hist_pos * estim_prev[1] + eps # corrected_posteriors_bins = hist_pos / (hist_neg + hist_pos) # # corrected_posteriors_bins = 1 - corrected_posteriors_bins # because the histograms were computed wrt the posterior of the first class (the negative one!) # corrected_posteriors_bins = np.concatenate(([0.], corrected_posteriors_bins, [1.])) # x_coords = np.concatenate(([0.], (np.linspace(0., 1., nbins+1)[:-1]+0.5/nbins), [1.])) # this assumes binning=isometric # uncalibrated_posteriors_pos = dm.classifier.predict_proba(test_i.X)[:,1] # posteriors = np.interp(uncalibrated_posteriors_pos, x_coords, corrected_posteriors_bins) # posteriors = np.asarray([1-posteriors, posteriors]).T # err = calibration_binary_error(y, posteriors, nbins=nbins) # results['dm'].append(err) # print(f'DM-LR {err=:.5f}') estim_prev = kdey.quantify(test_i.X) class_densities = kdey.mix_densities uncalibrated_posteriors = kdey.classifier.predict_proba(test_i.X) test_densities = np.asarray([kdey.pdf(kde_i, posteriors)*prior_i for kde_i, prior_i in zip(class_densities, estim_prev)]).T test_densities += EPSILON posteriors = test_densities / test_densities.sum(axis=1, keepdims=True) err = calibration_error(y, posteriors, nbins=NBINS) results[f'kde'].append(err) print(f'KDEy-LR {err=:.5f}') print() with open('./results.pkl', 'wb') as foo: pickle.dump((drift,dict(results)), foo, pickle.HIGHEST_PROTOCOL) for method, errors in results.items(): print(f'{method=} got {np.mean(errors):.5f}') all_methods = list(results.keys()) all_results = [] for method in all_methods: all_results.append(results[method]) all_results = np.asarray(all_results) print() from scipy.stats import rankdata ranks = np.apply_along_axis(rankdata, axis=0, arr=all_results, method='ordinal') for method, ranks_i in zip(all_methods, ranks): print(f'{method=} got rank {np.mean(ranks_i):.5f}')