QuaPy/BayesianKDEy/full_experiments.py

156 lines
6.2 KiB
Python

import os
import warnings
from os.path import join
from pathlib import Path
from sklearn.calibration import CalibratedClassifierCV
from sklearn.linear_model import LogisticRegression as LR
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from copy import deepcopy as cp
import quapy as qp
from BayesianKDEy._bayeisan_kdey import BayesianKDEy
from method.aggregative import DistributionMatchingY as DMy
from quapy.method.base import BinaryQuantifier
from quapy.model_selection import GridSearchQ
from quapy.data import Dataset
# from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot
from quapy.method.confidence import ConfidenceIntervals, BayesianCC, PQ, WithConfidenceABC, AggregativeBootstrap
from quapy.functional import strprev
from quapy.method.aggregative import KDEyML, ACC
from quapy.protocol import UPP
import quapy.functional as F
import numpy as np
from tqdm import tqdm
from scipy.stats import dirichlet
from collections import defaultdict
from time import time
from sklearn.base import clone
# def new_classifier(training):
# print('optimizing hyperparameters of Logistic Regression')
# mod_sel = GridSearchCV(
# estimator=LogisticRegression(),
# param_grid={
# 'C': np.logspace(-4, 4, 9),
# 'class_weight': ['balanced', None]
# },
# cv=StratifiedKFold(n_splits=10, shuffle=True, random_state=0),
# n_jobs=-1,
# refit=False,
# )
# mod_sel.fit(*training.Xy)
# # optim = LogisticRegression(**mod_sel.best_params_)
# print(f'Done: hyperparameters chosen={mod_sel.best_params_}')
# # calib = CalibratedClassifierCV(optim, cv=10, n_jobs=-1, ensemble=False).fit(*training.Xy)
# # return calib
# return LogisticRegression(**mod_sel.best_params_)
def methods():
acc_hyper = {}
hdy_hyper = {'n_bins': [3,4,5,8,16,32]}
kdey_hyper = {'bandwidth': [0.001, 0.005, 0.01, 0.05, 0.1, 0.2]}
yield 'BootstrapACC', AggregativeBootstrap(ACC(LR()), n_test_samples=1000, random_state=0), acc_hyper
# yield 'BootstrapHDy', AggregativeBootstrap(DMy(LR(), divergence='HD'), n_test_samples=1000, random_state=0), hdy_hyper
# yield 'BootstrapKDEy', AggregativeBootstrap(KDEyML(LR()), n_test_samples=1000, random_state=0), kdey_hyper
# yield 'BayesianACC', BayesianCC(LR(), mcmc_seed=0), acc_hyper
# yield 'BayesianHDy', PQ(LR(), stan_seed=0), hdy_hyper
yield 'BayesianKDEy', BayesianKDEy(LR(), mcmc_seed=0), kdey_hyper
def experiment(dataset: Dataset, method: WithConfidenceABC, grid: dict):
with qp.util.temp_seed(0):
# model selection
train, test = dataset.train_test
train_prevalence = train.prevalence()
if len(grid)>0:
train, val = train.split_stratified(train_prop=0.6, random_state=0)
mod_sel = GridSearchQ(
model=method,
param_grid=grid,
protocol=qp.protocol.UPP(val, repeats=250, random_state=0),
refit=True,
n_jobs=-1,
verbose=True
).fit(*train.Xy)
optim_quantifier = mod_sel.best_model()
best_params = mod_sel.best_params_
best_score = mod_sel.best_score_
tr_time = mod_sel.refit_time_
else:
t_init = time()
method.fit(*train.Xy)
tr_time = time() - t_init
best_params, best_score = {}, -1
optim_quantifier = method
# test
results = defaultdict(list)
test_generator = UPP(test, repeats=500, random_state=0)
for i, (sample_X, true_prevalence) in tqdm(enumerate(test_generator()), total=test_generator.total(), desc=f'{method_name} predictions'):
t_init = time()
point_estimate, region = optim_quantifier.predict_conf(sample_X)
ttime = time()-t_init
results['true-prevs'].append(true_prevalence)
results['point-estim'].append(point_estimate)
results['shift'].append(qp.error.ae(true_prevalence, train_prevalence))
results['ae'].append(qp.error.ae(prevs_true=true_prevalence, prevs_hat=point_estimate))
results['rae'].append(qp.error.rae(prevs_true=true_prevalence, prevs_hat=point_estimate))
results['coverage'].append(region.coverage(true_prevalence))
results['amplitude'].append(region.montecarlo_proportion(n_trials=50_000))
results['test-time'].append(ttime)
results['samples'].append(optim_quantifier.)
report = {
'optim_hyper': best_params,
'optim_score': best_score,
'refit_time': tr_time,
'train-prev': train_prevalence,
'results': {k:np.asarray(v) for k,v in results.items()}
}
return report
def experiment_path(dir:Path, dataset_name:str, method_name:str):
os.makedirs(dir, exist_ok=True)
return dir/f'{dataset_name}__{method_name}.pkl'
if __name__ == '__main__':
binary = {
'datasets': qp.datasets.UCI_BINARY_DATASETS,
'fetch_fn': qp.datasets.fetch_UCIBinaryDataset,
'sample_size': 500
}
multiclass = {
'datasets': qp.datasets.UCI_MULTICLASS_DATASETS,
'fetch_fn': qp.datasets.fetch_UCIMulticlassDataset,
'sample_size': 1000
}
result_dir = Path('./results')
for setup in [binary, multiclass]:
qp.environ['SAMPLE_SIZE'] = setup['sample_size']
for data_name in setup['datasets']:
data = setup['fetch_fn'](data_name)
is_binary = data.n_classes==2
result_subdir = result_dir / ('binary' if is_binary else 'multiclass')
for method_name, method, hyper_params in methods():
if isinstance(method, BinaryQuantifier) and not is_binary:
continue
result_path = experiment_path(result_subdir, data_name, method_name)
report = qp.util.pickled_resource(result_path, experiment, data, method, hyper_params)
print(f'dataset={data_name}, '
f'method={method_name}: '
f'mae={report["results"]["ae"].mean():.3f}, '
f'coverage={report["results"]["coverage"].mean():.3f}, '
f'amplitude={report["results"]["amplitude"].mean():.3f}, ')