scripting experiments binary and multiclass

This commit is contained in:
Alejandro Moreo Fernandez 2025-11-17 12:22:40 +01:00
parent 2f83a520c7
commit 12b431ef4b
3 changed files with 126 additions and 35 deletions

View File

@ -1,13 +1,16 @@
import os
import warnings import warnings
from os.path import join
from pathlib import Path
from sklearn.linear_model import LogisticRegression from sklearn.linear_model import LogisticRegression
import quapy as qp import quapy as qp
from BayesianKDEy._bayeisan_kdey import BayesianKDEy from BayesianKDEy._bayeisan_kdey import BayesianKDEy
from method.aggregative import AggregativeQuantifier from quapy.method.base import BinaryQuantifier
from quapy.model_selection import GridSearchQ from quapy.model_selection import GridSearchQ
from quapy.data import Dataset from quapy.data import Dataset
# from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot # from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot
from quapy.method.confidence import ConfidenceIntervals, BayesianCC, PQ from quapy.method.confidence import ConfidenceIntervals, BayesianCC, PQ, WithConfidenceABC
from quapy.functional import strprev from quapy.functional import strprev
from quapy.method.aggregative import KDEyML from quapy.method.aggregative import KDEyML
from quapy.protocol import UPP from quapy.protocol import UPP
@ -15,27 +18,33 @@ import quapy.functional as F
import numpy as np import numpy as np
from tqdm import tqdm from tqdm import tqdm
from scipy.stats import dirichlet from scipy.stats import dirichlet
from collections import defaultdict
from time import time
from sklearn.base import clone
def new_classifier(): def new_classifier():
lr_hyper = { # lr_hyper = {
'classifier__C': np.logspace(-3,3,7), # 'classifier__C': np.logspace(-3,3,7),
'classifier__class_weight': ['balanced', None] # 'classifier__class_weight': ['balanced', None]
} # }
lr_hyper = {}
lr = LogisticRegression() lr = LogisticRegression()
return lr, lr_hyper return lr, lr_hyper
def methods(): def methods():
cls, cls_hyper = new_classifier() cls, cls_hyper = new_classifier()
# yield 'BayesianACC', BayesianCC(cls, mcmc_seed=0), cls_hyper # yield 'BayesianACC', BayesianCC(clone(cls), mcmc_seed=0), cls_hyper
# yield 'BayesianHDy', PQ(cls, stan_seed=0), {**cls_hyper, 'n_bins': [3,4,5,8,16,32]} # yield 'BayesianHDy', PQ(clone(cls), stan_seed=0), {**cls_hyper, 'n_bins': [3,4,5,8,16,32]}
yield 'BayesianKDEy', BayesianKDEy(cls, mcmc_seed=0), {**cls_hyper, 'bandwidth': [0.001, 0.005, 0.01, 0.05, 0.1, 0.2]} yield 'BayesianKDEy', BayesianKDEy(clone(cls), mcmc_seed=0), {**cls_hyper, 'bandwidth': [0.001, 0.005, 0.01, 0.05, 0.1, 0.2]}
def experiment(dataset: Dataset, method: AggregativeQuantifier, method_name: str, grid: dict): def experiment(dataset: Dataset, method: WithConfidenceABC, grid: dict):
with qp.util.temp_seed(0): with qp.util.temp_seed(0):
# model selection # model selection
train, test = dataset.train_test train, test = dataset.train_test
train_prevalence = train.prevalence()
if len(grid)>0:
train, val = train.split_stratified(train_prop=0.6, random_state=0) train, val = train.split_stratified(train_prop=0.6, random_state=0)
mod_sel = GridSearchQ( mod_sel = GridSearchQ(
model=method, model=method,
@ -46,29 +55,80 @@ def experiment(dataset: Dataset, method: AggregativeQuantifier, method_name: str
verbose=True verbose=True
).fit(*train.Xy) ).fit(*train.Xy)
optim_quantifier = mod_sel.best_model() optim_quantifier = mod_sel.best_model()
optim_hyper = mod_sel.best_params_ best_params = mod_sel.best_params_
print(f'model_selection for {method_name} ended: chosen hyper-params {optim_hyper}') best_score = mod_sel.best_score_
tr_time = mod_sel.refit_time_
else:
t_init = time()
method.fit(*train.Xy)
tr_time = time() - t_init
best_params, best_score = {}, -1
optim_quantifier = method
# test # test
report = qp.evaluation.evaluation_report( results = defaultdict(list)
optim_quantifier, test_generator = UPP(test, repeats=500, random_state=0)
protocol=UPP(test, repeats=500, random_state=0), for i, (sample_X, true_prevalence) in enumerate(test_generator()):
verbose=True t_init = time()
) point_estimate, region = optim_quantifier.predict_conf(sample_X)
ttime = time()-t_init
results['true-prevs'].append(true_prevalence)
results['point-estim'].append(point_estimate)
results['shift'].append(qp.error.ae(true_prevalence, train_prevalence))
results['ae'].append(qp.error.ae(prevs_true=true_prevalence, prevs_hat=point_estimate))
results['rae'].append(qp.error.rae(prevs_true=true_prevalence, prevs_hat=point_estimate))
results['coverage'].append(region.coverage(true_prevalence))
results['amplitude'].append(region.montecarlo_proportion(n_trials=50_000))
results['test-time'].append(ttime)
report = {
'optim_hyper': best_params,
'optim_score': best_score,
'refit_time': tr_time,
'train-prev': train_prevalence,
'results': {k:np.asarray(v) for k,v in results.items()}
}
return report return report
def experiment_path(dir:Path, dataset_name:str, method_name:str):
os.makedirs(dir, exist_ok=True)
return dir/f'{dataset_name}__{method_name}.pkl'
if __name__ == '__main__': if __name__ == '__main__':
qp.environ["SAMPLE_SIZE"] = 500
datasets = qp.datasets.UCI_BINARY_DATASETS binary = {
for dataset in datasets: 'datasets': qp.datasets.UCI_BINARY_DATASETS,
data = qp.datasets.fetch_UCIBinaryDataset(dataset) 'fetch_fn': qp.datasets.fetch_UCIBinaryDataset,
'sample_size': 500
}
multiclass = {
'datasets': qp.datasets.UCI_MULTICLASS_DATASETS,
'fetch_fn': qp.datasets.fetch_UCIMulticlassDataset,
'sample_size': 1000
}
result_dir = Path('./results')
for setup in [binary, multiclass]:
qp.environ['SAMPLE_SIZE'] = setup['sample_size']
for data_name in setup['datasets']:
data = setup['fetch_fn'](data_name)
is_binary = data.n_classes==2
result_subdir = result_dir / ('binary' if is_binary else 'multiclass')
for method_name, method, hyper_params in methods(): for method_name, method, hyper_params in methods():
report = experiment(data, method, method_name, hyper_params) if isinstance(method, BinaryQuantifier) and not is_binary:
print(f'{method_name=} got {report.mean(numeric_only=True)}') continue
result_path = experiment_path(result_subdir, data_name, method_name)
report = qp.util.pickled_resource(result_path, experiment, data, method, hyper_params)
print(f'dataset={data_name}, '
f'method={method_name}: '
f'mae={report["results"]["ae"].mean():.3f}, '
f'coverage={report["results"]["coverage"].mean():.3f}, '
f'amplitude={report["results"]["amplitude"].mean():.3f}, ')

View File

@ -0,0 +1,31 @@
import pickle
from collections import defaultdict
import pandas as pd
from glob import glob
from pathlib import Path
for setup in ['binary', 'multiclass']:
path = f'./results/{setup}/*.pkl'
table = defaultdict(list)
for file in glob(path):
file = Path(file)
dataset, method = file.name.replace('.pkl', '').split('__')
report = pickle.load(open(file, 'rb'))
results = report['results']
n_samples = len(results['ae'])
table['method'].extend([method] * n_samples)
table['dataset'].extend([dataset] * n_samples)
table['ae'].extend(results['ae'])
table['coverage'].extend(results['coverage'])
table['amplitude'].extend(results['amplitude'])
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)
pd.set_option('display.max_rows', None)
df = pd.DataFrame(table)
pv = pd.pivot_table(df, index='dataset', columns='method', values=['ae', 'coverage', 'amplitude'])
print(f'{setup=}')
print(pv)
print()

View File

@ -112,7 +112,7 @@ class WithConfidenceABC(ABC):
return self.predict_conf(instances=instances, confidence_level=confidence_level) return self.predict_conf(instances=instances, confidence_level=confidence_level)
@classmethod @classmethod
def construct_region(cls, prev_estims, confidence_level=0.95, method='intervals'): def construct_region(cls, prev_estims, confidence_level=0.95, method='intervals')->ConfidenceRegionABC:
""" """
Construct a confidence region given many prevalence estimations. Construct a confidence region given many prevalence estimations.