some refactor and prior effect script

This commit is contained in:
Alejandro Moreo Fernandez 2026-01-13 12:19:05 +01:00
parent 4b7fc77e90
commit 323cd383f9
10 changed files with 236 additions and 85 deletions

View File

@ -21,3 +21,4 @@
- consider W as a measure of quantification error (the current e.g., w-CI is the winkler...) - consider W as a measure of quantification error (the current e.g., w-CI is the winkler...)
- optimize also C and class_weight? [I don't think so, but could be done easily now] - optimize also C and class_weight? [I don't think so, but could be done easily now]
- remove wikis from repo

79
BayesianKDEy/commons.py Normal file
View File

@ -0,0 +1,79 @@
import os
from pathlib import Path
from sklearn.base import BaseEstimator
import quapy as qp
import numpy as np
from method.aggregative import KDEyML
from quapy.functional import l1_norm, ILRtransformation
from scipy.stats import entropy
def fetch_UCI_multiclass(data_name):
return qp.datasets.fetch_UCIMulticlassDataset(data_name, min_class_support=0.01)
def fetch_UCI_binary(data_name):
return qp.datasets.fetch_UCIBinaryDataset(data_name)
# global configurations
binary = {
'datasets': qp.datasets.UCI_BINARY_DATASETS,
'fetch_fn': fetch_UCI_binary,
'sample_size': 500
}
multiclass = {
'datasets': qp.datasets.UCI_MULTICLASS_DATASETS,
'fetch_fn': fetch_UCI_multiclass,
'sample_size': 1000
}
multiclass['datasets'].remove('poker_hand') # random performance
multiclass['datasets'].remove('hcv') # random performance
multiclass['datasets'].remove('letter') # many classes
multiclass['datasets'].remove('isolet') # many classes
# utils
def experiment_path(dir:Path, dataset_name:str, method_name:str):
os.makedirs(dir, exist_ok=True)
return dir/f'{dataset_name}__{method_name}.pkl'
def normalized_entropy(p):
"""
Normalized Shannon entropy in [0, 1]
p: array-like, prevalence vector (sums to 1)
"""
p = np.asarray(p)
H = entropy(p) # Shannon entropy
H_max = np.log(len(p))
return np.clip(H / H_max, 0, 1)
def antagonistic_prevalence(p, strength=1):
ilr = ILRtransformation()
z = ilr(p)
z_ant = - strength * z
p_ant = ilr.inverse(z_ant)
return p_ant
class KDEyCLR(KDEyML):
def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=1., random_state=None):
super().__init__(
classifier=classifier, fit_classifier=fit_classifier, val_split=val_split, bandwidth=bandwidth,
random_state=random_state, kernel='aitchison'
)
class KDEyILR(KDEyML):
def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=1., random_state=None):
super().__init__(
classifier=classifier, fit_classifier=fit_classifier, val_split=val_split, bandwidth=bandwidth,
random_state=random_state, kernel='ilr'
)

View File

@ -1,48 +1,23 @@
import os
import warnings
from os.path import join
from pathlib import Path from pathlib import Path
from sklearn.calibration import CalibratedClassifierCV
from sklearn.linear_model import LogisticRegression as LR from sklearn.linear_model import LogisticRegression as LR
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from copy import deepcopy as cp from copy import deepcopy as cp
import quapy as qp import quapy as qp
from BayesianKDEy._bayeisan_kdey import BayesianKDEy from BayesianKDEy._bayeisan_kdey import BayesianKDEy
from BayesianKDEy.commons import multiclass, experiment_path, KDEyCLR
from BayesianKDEy.temperature_calibration import temp_calibration from BayesianKDEy.temperature_calibration import temp_calibration
from build.lib.quapy.data import LabelledCollection from build.lib.quapy.data import LabelledCollection
from quapy.method.aggregative import DistributionMatchingY as DMy, AggregativeQuantifier, EMQ from quapy.method.aggregative import DistributionMatchingY as DMy, AggregativeQuantifier, EMQ
from quapy.method.base import BinaryQuantifier, BaseQuantifier
from quapy.model_selection import GridSearchQ from quapy.model_selection import GridSearchQ
from quapy.data import Dataset from quapy.data import Dataset
# from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot # from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot
from quapy.method.confidence import ConfidenceIntervals, BayesianCC, PQ, WithConfidenceABC, AggregativeBootstrap from quapy.method.confidence import BayesianCC, AggregativeBootstrap
from quapy.functional import strprev
from quapy.method.aggregative import KDEyML, ACC from quapy.method.aggregative import KDEyML, ACC
from quapy.protocol import UPP from quapy.protocol import UPP
import quapy.functional as F
import numpy as np import numpy as np
from tqdm import tqdm from tqdm import tqdm
from scipy.stats import dirichlet
from collections import defaultdict from collections import defaultdict
from time import time from time import time
from sklearn.base import clone, BaseEstimator
class KDEyCLR(KDEyML):
def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=1., random_state=None):
super().__init__(
classifier=classifier, fit_classifier=fit_classifier, val_split=val_split, bandwidth=bandwidth,
random_state=random_state, kernel='aitchison'
)
class KDEyILR(KDEyML):
def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=1., random_state=None):
super().__init__(
classifier=classifier, fit_classifier=fit_classifier, val_split=val_split, bandwidth=bandwidth,
random_state=random_state, kernel='ilr'
)
def methods(): def methods():
@ -160,33 +135,8 @@ def experiment(dataset: Dataset, point_quantifier: AggregativeQuantifier, method
return report return report
def experiment_path(dir:Path, dataset_name:str, method_name:str):
os.makedirs(dir, exist_ok=True)
return dir/f'{dataset_name}__{method_name}.pkl'
def fetch_UCI_binary(data_name):
return qp.datasets.fetch_UCIBinaryDataset(data_name)
def fetch_UCI_multiclass(data_name):
return qp.datasets.fetch_UCIMulticlassDataset(data_name, min_class_support=0.01)
if __name__ == '__main__': if __name__ == '__main__':
binary = {
'datasets': qp.datasets.UCI_BINARY_DATASETS,
'fetch_fn': fetch_UCI_binary,
'sample_size': 500
}
multiclass = {
'datasets': qp.datasets.UCI_MULTICLASS_DATASETS,
'fetch_fn': fetch_UCI_multiclass,
'sample_size': 1000
}
result_dir = Path('./results') result_dir = Path('./results')
for setup in [multiclass]: # [binary, multiclass]: for setup in [multiclass]: # [binary, multiclass]:
@ -200,15 +150,15 @@ if __name__ == '__main__':
is_binary = data.n_classes==2 is_binary = data.n_classes==2
result_subdir = result_dir / ('binary' if is_binary else 'multiclass') result_subdir = result_dir / ('binary' if is_binary else 'multiclass')
hyper_subdir = result_dir / 'hyperparams' / ('binary' if is_binary else 'multiclass') hyper_subdir = result_dir / 'hyperparams' / ('binary' if is_binary else 'multiclass')
for method_name, method, hyper_params, withconf_constructor, method_scope in methods(): for method_name, surrogate_quant, hyper_params, withconf_constructor, method_scope in methods():
if method_scope == 'only_binary' and not is_binary: if method_scope == 'only_binary' and not is_binary:
continue continue
if method_scope == 'only_multiclass' and is_binary: if method_scope == 'only_multiclass' and is_binary:
continue continue
result_path = experiment_path(result_subdir, data_name, method_name) result_path = experiment_path(result_subdir, data_name, method_name)
hyper_path = experiment_path(hyper_subdir, data_name, method.__class__.__name__) hyper_path = experiment_path(hyper_subdir, data_name, surrogate_quant.__class__.__name__)
report = qp.util.pickled_resource( report = qp.util.pickled_resource(
result_path, experiment, data, method, method_name, hyper_params, withconf_constructor, hyper_path result_path, experiment, data, surrogate_quant, method_name, hyper_params, withconf_constructor, hyper_path
) )
print(f'dataset={data_name}, ' print(f'dataset={data_name}, '
f'method={method_name}: ' f'method={method_name}: '

View File

@ -7,7 +7,7 @@ import pandas as pd
from glob import glob from glob import glob
from pathlib import Path from pathlib import Path
import quapy as qp import quapy as qp
from BayesianKDEy.full_experiments import fetch_UCI_multiclass, fetch_UCI_binary from BayesianKDEy.commons import fetch_UCI_binary, fetch_UCI_multiclass
from error import dist_aitchison from error import dist_aitchison
from quapy.method.confidence import ConfidenceIntervals from quapy.method.confidence import ConfidenceIntervals
from quapy.method.confidence import ConfidenceEllipseSimplex, ConfidenceEllipseCLR, ConfidenceEllipseILR, ConfidenceIntervals, ConfidenceRegionABC from quapy.method.confidence import ConfidenceEllipseSimplex, ConfidenceEllipseCLR, ConfidenceEllipseILR, ConfidenceIntervals, ConfidenceRegionABC

View File

@ -7,6 +7,7 @@ import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap from matplotlib.colors import ListedColormap
from scipy.stats import gaussian_kde from scipy.stats import gaussian_kde
from BayesianKDEy.commons import antagonistic_prevalence
from method.confidence import (ConfidenceIntervals as CI, from method.confidence import (ConfidenceIntervals as CI,
ConfidenceEllipseSimplex as CE, ConfidenceEllipseSimplex as CE,
ConfidenceEllipseCLR as CLR, ConfidenceEllipseCLR as CLR,
@ -302,6 +303,7 @@ def plot_simplex(
plt.tight_layout() plt.tight_layout()
if save_path: if save_path:
os.makedirs(Path(save_path).parent, exist_ok=True)
plt.savefig(save_path) plt.savefig(save_path)
else: else:
plt.show() plt.show()
@ -377,17 +379,17 @@ if __name__ == '__main__':
# ], # ],
# save_path=f'./plots/prior_test/uniform.png' # save_path=f'./plots/prior_test/uniform.png'
# ) # )
#
alpha = [40, 10, 10] # alpha = [40, 10, 10]
train_prevs = np.random.dirichlet(alpha=alpha, size=n) # train_prevs = np.random.dirichlet(alpha=alpha, size=n)
test_prevs = np.random.dirichlet(alpha=alpha, size=n) # test_prevs = np.random.dirichlet(alpha=alpha, size=n)
plot_simplex( # plot_simplex(
point_layers=[ # point_layers=[
{"points": train_prevs, "label": "train", "style": train_style}, # {"points": train_prevs, "label": "train", "style": train_style},
{"points": test_prevs, "label": "test", "style": test_style}, # {"points": test_prevs, "label": "test", "style": test_style},
], # ],
save_path=f'./plots/prior_test/informative.png' # save_path=f'./plots/prior_test/informative.png'
) # )
# train_prevs = np.random.dirichlet(alpha=[8, 1, 1], size=n) # train_prevs = np.random.dirichlet(alpha=[8, 1, 1], size=n)
# test_prevs = np.random.dirichlet(alpha=[1, 8, 1], size=n) # test_prevs = np.random.dirichlet(alpha=[1, 8, 1], size=n)
@ -402,13 +404,16 @@ if __name__ == '__main__':
p = 0.6 p = 0.6
K = 3 K = 3
alpha = [p] + [(1. - p) / (K - 1)] * (K - 1) # alpha = [p] + [(1. - p) / (K - 1)] * (K - 1)
alpha = [0.095, 0.246, 0.658]
alpha = np.array(alpha) alpha = np.array(alpha)
for c in [100, 500, 1_000]:
alpha_c = alpha * c for c in [50, 500, 5_000]:
train_prevs = np.random.dirichlet(alpha=alpha_c, size=n) alpha_tr = alpha * c
test_prevs = np.random.dirichlet(alpha=alpha_c[::-1], size=n) alpha_te = antagonistic_prevalence(alpha, strength=1) * c
train_prevs = np.random.dirichlet(alpha=alpha_tr, size=n)
test_prevs = np.random.dirichlet(alpha=alpha_te, size=n)
plot_simplex( plot_simplex(
point_layers=[ point_layers=[
{"points": train_prevs, "label": "train", "style": train_style}, {"points": train_prevs, "label": "train", "style": train_style},

View File

@ -1,11 +1,125 @@
import numpy as np from collections import defaultdict
n = 3 import model_selection
import quapy as qp
from BayesianKDEy._bayeisan_kdey import BayesianKDEy
from BayesianKDEy.temperature_calibration import temp_calibration
from commons import *
from data import Dataset
from protocol import DirichletProtocol
from quapy.method.confidence import BayesianCC
from quapy.method.aggregative import ACC, AggregativeQuantifier
from sklearn.linear_model import LogisticRegression as LR
from copy import deepcopy as cp
from tqdm import tqdm
p = 0.5
alpha = [p] + [(1.-p)/(n-1)]*(n-1) def select_imbalanced_datasets(top_m=5):
alpha = np.array(alpha) datasets_prevs = []
# choose top-m imbalanced datasets
for data_name in multiclass['datasets']:
data_prev = multiclass['fetch_fn'](data_name).training.prevalence()
balance = normalized_entropy(data_prev)
datasets_prevs.append((data_name, balance))
datasets_prevs.sort(key=lambda x: x[1])
print(datasets_prevs)
data_selected = [data_name for data_name, balance in datasets_prevs[:top_m]]
return data_selected
for c in [1_000, 5_000, 10_000]:
print(alpha*c) def methods():
acc_hyper = {}
kdey_hyper = {'bandwidth': [0.001, 0.005, 0.01, 0.05, 0.1, 0.2]}
kdey_hyper_clr = {'bandwidth': [0.05, 0.1, 0.5, 1., 2., 5.]}
#yield 'BayesianACC', ACC(LR()), acc_hyper, lambda hyper: BayesianCC(LR(), mcmc_seed=0)
yield f'BaKDE-Ait', KDEyCLR(LR()), kdey_hyper_clr, lambda hyper: BayesianKDEy(kernel='aitchison',
mcmc_seed=0,
engine='numpyro',
temperature=None,
prior='uniform',
**hyper)
def run_test(test, alpha_test, alpha_train, concentration, prior_type, bay_quant, train_prev, results):
test_generator = DirichletProtocol(test, alpha=alpha_test, repeats=100, random_state=0)
for i, (sample_X, true_prev) in tqdm(enumerate(test_generator()), total=test_generator.total(),
desc=f'{method_name} informative alpha with {concentration=}'):
estim_prev, region = bay_quant.predict_conf(sample_X)
results['prior-type'].append(prior_type)
results['train-prev'].append(train_prev)
results['concentration'].append(concentration)
results['train-alpha'].append(alpha_train)
results['test-alpha'].append(alpha_test)
results['true-prevs'].append(true_prev)
results['point-estim'].append(estim_prev)
results['shift'].append(qp.error.ae(true_prev, train_prev))
results['ae'].append(qp.error.ae(prevs_true=true_prev, prevs_hat=estim_prev))
results['sre'].append(qp.error.sre(prevs_true=true_prev, prevs_hat=estim_prev, prevs_train=train_prev))
results['rae'].append(qp.error.rae(prevs_true=true_prev, prevs_hat=estim_prev))
results['coverage'].append(region.coverage(true_prev))
results['amplitude'].append(region.montecarlo_proportion(n_trials=50_000))
results['samples'].append(region.samples)
def experiment(dataset: Dataset, point_quantifier: AggregativeQuantifier, grid: dict, bay_constructor, hyper_choice_path: Path):
with qp.util.temp_seed(0):
training, test = dataset.train_test
# model selection
best_hyperparams = qp.util.pickled_resource(
hyper_choice_path, model_selection, training, cp(point_quantifier), grid
)
bay_quant = bay_constructor(best_hyperparams)
if hasattr(bay_quant, 'temperature') and bay_quant.temperature is None:
train, val = data.training.split_stratified(train_prop=0.6, random_state=0)
temperature = temp_calibration(bay_quant, train, val, temp_grid=[.5, 1., 1.5, 2., 5., 10., 100.], n_jobs=-1)
bay_quant.temperature = temperature
bay_quant.fit(*training.Xy)
# test
train_prev = training.prevalence()
results = defaultdict(list)
for concentration in [50, 500, 5_000]:
alpha_train = train_prev * concentration
bay_quant.prior = alpha_train
# informative prior
alpha_test_informative = alpha_train
prior_type = 'informative'
run_test(test, alpha_test_informative, alpha_train, concentration, prior_type, bay_quant, train_prev, results)
# informative prior
alpha_test_wrong = antagonistic_prevalence(train_prev, strength=1) * concentration
prior_type = 'wrong'
run_test(test, alpha_test_wrong, alpha_train, concentration, prior_type, bay_quant, train_prev, results)
report = {
'optim_hyper': best_hyperparams,
'train-prev': train_prev,
'results': {k: np.asarray(v) for k, v in results.items()}
}
return report
if __name__ == '__main__':
result_dir = Path('./results/prior_effect')
selected = select_imbalanced_datasets()
qp.environ['SAMPLE_SIZE'] = multiclass['sample_size']
for data_name in selected:
data = multiclass['fetch_fn'](data_name)
for method_name, surrogate_quant, hyper_params, bay_constructor, method_scope in methods():
result_path = experiment_path(result_dir, data_name, method_name)
hyper_path = experiment_path(result_dir/'hyperparams', data_name, surrogate_quant.__class__.__name__)
print(f'Launching {method_name} in dataset {data_name}')
experiment(dataset=data,
point_quantifier=surrogate_quant,
grid=hyper_params,
bay_constructor=bay_constructor,
hyper_choice_path=hyper_path)

View File

@ -10,7 +10,8 @@ from sklearn.model_selection import GridSearchCV, StratifiedKFold
from copy import deepcopy as cp from copy import deepcopy as cp
import quapy as qp import quapy as qp
from BayesianKDEy._bayeisan_kdey import BayesianKDEy from BayesianKDEy._bayeisan_kdey import BayesianKDEy
from BayesianKDEy.full_experiments import experiment, experiment_path, KDEyCLR from BayesianKDEy.full_experiments import experiment
from BayesianKDEy.commons import experiment_path, KDEyCLR
from build.lib.quapy.data import LabelledCollection from build.lib.quapy.data import LabelledCollection
from quapy.method.aggregative import DistributionMatchingY as DMy, AggregativeQuantifier from quapy.method.aggregative import DistributionMatchingY as DMy, AggregativeQuantifier
from quapy.method.base import BinaryQuantifier, BaseQuantifier from quapy.method.base import BinaryQuantifier, BaseQuantifier

View File

@ -5,7 +5,7 @@ from pathlib import Path
import pandas as pd import pandas as pd
import quapy as qp import quapy as qp
from BayesianKDEy._bayeisan_kdey import BayesianKDEy from BayesianKDEy._bayeisan_kdey import BayesianKDEy
from BayesianKDEy.full_experiments import experiment_path from BayesianKDEy.commons import experiment_path
from quapy.protocol import UPP from quapy.protocol import UPP
import numpy as np import numpy as np
from tqdm import tqdm from tqdm import tqdm

View File

@ -13,7 +13,7 @@ import copy
def temp_calibration(method:WithConfidenceABC, def temp_calibration(method:WithConfidenceABC,
train:LabelledCollection, train:LabelledCollection,
val:LabelledCollection, val:LabelledCollection,
temp_grid=[1, 1.5, 2], temp_grid=[.5, 1., 1.5, 2., 5., 10., 100.],
num_samples=100, num_samples=100,
nominal_coverage=0.95, nominal_coverage=0.95,
amplitude_threshold='auto', amplitude_threshold='auto',

View File

@ -3,6 +3,7 @@ import unittest
from sklearn.linear_model import LogisticRegression from sklearn.linear_model import LogisticRegression
import BayesianKDEy.commons
import quapy as qp import quapy as qp
from quapy.method.aggregative import ACC from quapy.method.aggregative import ACC
from quapy.method.meta import Ensemble from quapy.method.meta import Ensemble
@ -47,7 +48,7 @@ class TestMethods(unittest.TestCase):
learner.fit(*dataset.training.Xy) learner.fit(*dataset.training.Xy)
for model in AGGREGATIVE_METHODS: for model in AGGREGATIVE_METHODS:
if not dataset.binary and model in BINARY_METHODS: if not BayesianKDEy.commons.binary and model in BINARY_METHODS:
print(f'skipping the test of binary model {model.__name__} on multiclass dataset {dataset.name}') print(f'skipping the test of binary model {model.__name__} on multiclass dataset {dataset.name}')
continue continue
@ -61,7 +62,7 @@ class TestMethods(unittest.TestCase):
for dataset in TestMethods.datasets: for dataset in TestMethods.datasets:
for model in NON_AGGREGATIVE_METHODS: for model in NON_AGGREGATIVE_METHODS:
if not dataset.binary and model in BINARY_METHODS: if not BayesianKDEy.commons.binary and model in BINARY_METHODS:
print(f'skipping the test of binary model {model.__name__} on multiclass dataset {dataset.name}') print(f'skipping the test of binary model {model.__name__} on multiclass dataset {dataset.name}')
continue continue
@ -76,7 +77,7 @@ class TestMethods(unittest.TestCase):
base_quantifier = ACC(LogisticRegression()) base_quantifier = ACC(LogisticRegression())
for dataset, policy in itertools.product(TestMethods.datasets, Ensemble.VALID_POLICIES): for dataset, policy in itertools.product(TestMethods.datasets, Ensemble.VALID_POLICIES):
if not dataset.binary and policy == 'ds': if not BayesianKDEy.commons.binary and policy == 'ds':
print(f'skipping the test of binary policy ds on non-binary dataset {dataset}') print(f'skipping the test of binary policy ds on non-binary dataset {dataset}')
continue continue