creating class BayesianKDEy
This commit is contained in:
parent
8a8988d2de
commit
4255098ba7
|
|
@ -0,0 +1,148 @@
|
|||
from sklearn.base import BaseEstimator
|
||||
import numpy as np
|
||||
from quapy.method._kdey import KDEBase
|
||||
from quapy.method.confidence import WithConfidenceABC, ConfidenceRegionABC
|
||||
from quapy.method.aggregative import AggregativeSoftQuantifier
|
||||
from tqdm import tqdm
|
||||
import quapy.functional as F
|
||||
|
||||
|
||||
class BayesianKDEy(AggregativeSoftQuantifier, KDEBase, WithConfidenceABC):
|
||||
"""
|
||||
`Bayesian version of KDEy.
|
||||
|
||||
:param classifier: a scikit-learn's BaseEstimator, or None, in which case the classifier is taken to be
|
||||
the one indicated in `qp.environ['DEFAULT_CLS']`
|
||||
:param val_split: specifies the data used for generating classifier predictions. This specification
|
||||
can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to
|
||||
be extracted from the training set; or as an integer (default 5), indicating that the predictions
|
||||
are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value
|
||||
for `k`); or as a tuple `(X,y)` defining the specific set of data to use for validation. Set to
|
||||
None when the method does not require any validation data, in order to avoid that some portion of
|
||||
the training data be wasted.
|
||||
:param num_warmup: number of warmup iterations for the MCMC sampler (default 500)
|
||||
:param num_samples: number of samples to draw from the posterior (default 1000)
|
||||
:param mcmc_seed: random seed for the MCMC sampler (default 0)
|
||||
:param confidence_level: float in [0,1] to construct a confidence region around the point estimate (default 0.95)
|
||||
:param region: string, set to `intervals` for constructing confidence intervals (default), or to
|
||||
`ellipse` for constructing an ellipse in the probability simplex, or to `ellipse-clr` for
|
||||
constructing an ellipse in the Centered-Log Ratio (CLR) unconstrained space.
|
||||
:param verbose: bool, whether to display progress bar
|
||||
"""
|
||||
def __init__(self,
|
||||
classifier: BaseEstimator=None,
|
||||
fit_classifier=True,
|
||||
val_split: int = 5,
|
||||
kernel='gaussian',
|
||||
bandwidth=0.1,
|
||||
num_warmup: int = 500,
|
||||
num_samples: int = 1_000,
|
||||
mcmc_seed: int = 0,
|
||||
confidence_level: float = 0.95,
|
||||
region: str = 'intervals',
|
||||
verbose: bool = False):
|
||||
|
||||
if num_warmup <= 0:
|
||||
raise ValueError(f'parameter {num_warmup=} must be a positive integer')
|
||||
if num_samples <= 0:
|
||||
raise ValueError(f'parameter {num_samples=} must be a positive integer')
|
||||
|
||||
super().__init__(classifier, fit_classifier, val_split)
|
||||
self.bandwidth = KDEBase._check_bandwidth(bandwidth)
|
||||
self.kernel = self._check_kernel(kernel)
|
||||
self.num_warmup = num_warmup
|
||||
self.num_samples = num_samples
|
||||
self.mcmc_seed = mcmc_seed
|
||||
self.confidence_level = confidence_level
|
||||
self.region = region
|
||||
self.verbose = verbose
|
||||
|
||||
def aggregation_fit(self, classif_predictions, labels):
|
||||
self.mix_densities = self.get_mixture_components(classif_predictions, labels, self.classes_, self.bandwidth, self.kernel)
|
||||
return self
|
||||
|
||||
def aggregate(self, classif_predictions):
|
||||
self.prevalence_samples = self._bayesian_kde(classif_predictions, init=None, verbose=self.verbose)
|
||||
return self.prevalence_samples.mean(axis=0)
|
||||
|
||||
def predict_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC):
|
||||
if confidence_level is None:
|
||||
confidence_level = self.confidence_level
|
||||
classif_predictions = self.classify(instances)
|
||||
point_estimate = self.aggregate(classif_predictions)
|
||||
samples = self.prevalence_samples # available after calling "aggregate" function
|
||||
region = WithConfidenceABC.construct_region(samples, confidence_level=confidence_level, method=self.region)
|
||||
return point_estimate, region
|
||||
|
||||
def _bayesian_kde(self, X_probs, init=None, verbose=False):
|
||||
"""
|
||||
Bayes:
|
||||
P(prev|data) = P(data|prev) P(prev) / P(data)
|
||||
i.e.,
|
||||
posterior = likelihood * prior / evidence
|
||||
we assume the likelihood be:
|
||||
prev @ [kde_i_likelihood(data) 1..i..n]
|
||||
prior be uniform in simplex
|
||||
"""
|
||||
|
||||
rng = np.random.default_rng(self.mcmc_seed)
|
||||
kdes = self.mix_densities
|
||||
test_densities = np.asarray([self.pdf(kde_i, X_probs, self.kernel) for kde_i in kdes])
|
||||
|
||||
def log_likelihood(prev, epsilon=1e-10):
|
||||
test_likelihoods = prev @ test_densities
|
||||
test_loglikelihood = np.log(test_likelihoods + epsilon)
|
||||
return np.sum(test_loglikelihood)
|
||||
|
||||
# def log_prior(prev):
|
||||
# todo: adapt to arbitrary prior knowledge (e.g., something around training prevalence)
|
||||
# return 1/np.sum((prev-init)**2) # it is not 1 but we assume uniform, son anyway is an useless constant
|
||||
|
||||
# def log_prior(prev, alpha_scale=1000):
|
||||
# alpha = np.array(init) * alpha_scale
|
||||
# return dirichlet.logpdf(prev, alpha)
|
||||
|
||||
def log_prior(prev):
|
||||
return 0
|
||||
|
||||
def sample_neighbour(prev, step_size=0.05):
|
||||
# random-walk Metropolis-Hastings
|
||||
dir_noise = rng.normal(scale=step_size, size=len(prev))
|
||||
neighbour = F.normalize_prevalence(prev + dir_noise, method='mapsimplex')
|
||||
return neighbour
|
||||
|
||||
n_classes = X_probs.shape[1]
|
||||
current_prev = F.uniform_prevalence(n_classes) if init is None else init
|
||||
current_likelihood = log_likelihood(current_prev) + log_prior(current_prev)
|
||||
|
||||
# Metropolis-Hastings with adaptive rate
|
||||
step_size = 0.05
|
||||
target_acceptance = 0.3
|
||||
adapt_rate = 0.01
|
||||
acceptance_history = []
|
||||
|
||||
samples = []
|
||||
total_steps = self.num_samples + self.num_warmup
|
||||
for i in tqdm(range(total_steps), total=total_steps, disable=not verbose):
|
||||
proposed_prev = sample_neighbour(current_prev, step_size)
|
||||
|
||||
# probability of acceptance
|
||||
proposed_likelihood = log_likelihood(proposed_prev) + log_prior(proposed_prev)
|
||||
acceptance = proposed_likelihood - current_likelihood
|
||||
|
||||
# decide acceptance
|
||||
accepted = np.log(rng.random()) < acceptance
|
||||
if accepted:
|
||||
current_prev = proposed_prev
|
||||
current_likelihood = proposed_likelihood
|
||||
|
||||
samples.append(current_prev)
|
||||
acceptance_history.append(1. if accepted else 0.)
|
||||
|
||||
if i < self.num_warmup and i%10==0 and len(acceptance_history)>=100:
|
||||
recent_accept_rate = np.mean(acceptance_history[-100:])
|
||||
step_size *= np.exp(adapt_rate * (recent_accept_rate - target_acceptance))
|
||||
|
||||
# remove "warmup" initial iterations
|
||||
samples = np.asarray(samples[self.num_warmup:])
|
||||
return samples
|
||||
|
|
@ -1,127 +0,0 @@
|
|||
from sklearn.linear_model import LogisticRegression
|
||||
import quapy as qp
|
||||
from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot
|
||||
from method.confidence import ConfidenceIntervals
|
||||
from quapy.functional import strprev
|
||||
from quapy.method.aggregative import KDEyML
|
||||
from quapy.protocol import UPP
|
||||
import quapy.functional as F
|
||||
import numpy as np
|
||||
from tqdm import tqdm
|
||||
from scipy.stats import dirichlet
|
||||
|
||||
|
||||
def bayesian(kdey, data, probabilistic_classifier, init=None, MAX_ITER=100_000, warmup=3_000):
|
||||
"""
|
||||
Bayes:
|
||||
P(prev|data) = P(data|prev) P(prev) / P(data)
|
||||
i.e.,
|
||||
posterior = likelihood * prior / evidence
|
||||
we assume the likelihood be:
|
||||
prev @ [kde_i_likelihood(data) 1..i..n]
|
||||
prior be uniform in simplex
|
||||
"""
|
||||
def pdf(kde, X):
|
||||
# todo: remove exp, since we are then doing the log every time? (not sure)
|
||||
return np.exp(kde.score_samples(X))
|
||||
|
||||
X = probabilistic_classifier.predict_proba(data)
|
||||
kdes = kdey.mix_densities
|
||||
test_densities = np.asarray([pdf(kde_i, X) for kde_i in kdes])
|
||||
|
||||
def log_likelihood(prev, epsilon=1e-10):
|
||||
test_likelihoods = prev @ test_densities
|
||||
test_loglikelihood = np.log(test_likelihoods + epsilon)
|
||||
return np.sum(test_loglikelihood)
|
||||
|
||||
# def log_prior(prev):
|
||||
# todo: adapt to arbitrary prior knowledge (e.g., something around training prevalence)
|
||||
# return 1/np.sum((prev-init)**2) # it is not 1 but we assume uniform, son anyway is an useless constant
|
||||
|
||||
# def log_prior(prev, alpha_scale=1000):
|
||||
# alpha = np.array(init) * alpha_scale
|
||||
# return dirichlet.logpdf(prev, alpha)
|
||||
|
||||
def log_prior(prev):
|
||||
return 0
|
||||
|
||||
def sample_neighbour(prev, step_size=0.05):
|
||||
# random-walk Metropolis-Hastings
|
||||
dir_noise = np.random.normal(scale=step_size, size=len(prev))
|
||||
neighbour = F.normalize_prevalence(prev + dir_noise, method='mapsimplex')
|
||||
return neighbour
|
||||
|
||||
n_classes = len(probabilistic_classifier.classes_)
|
||||
current_prev = F.uniform_prevalence(n_classes) if init is None else init
|
||||
current_likelihood = log_likelihood(current_prev) + log_prior(current_prev)
|
||||
|
||||
# Metropolis-Hastings with adaptive rate
|
||||
step_size = 0.05
|
||||
target_acceptance = 0.3
|
||||
adapt_rate = 0.01
|
||||
acceptance_history = []
|
||||
|
||||
samples = []
|
||||
for i in tqdm(range(MAX_ITER), total=MAX_ITER):
|
||||
proposed_prev = sample_neighbour(current_prev, step_size)
|
||||
|
||||
# probability of acceptance
|
||||
proposed_likelihood = log_likelihood(proposed_prev) + log_prior(proposed_prev)
|
||||
acceptance = proposed_likelihood - current_likelihood
|
||||
|
||||
# decide acceptance
|
||||
accepted = np.log(np.random.rand()) < acceptance
|
||||
if accepted:
|
||||
current_prev = proposed_prev
|
||||
current_likelihood = proposed_likelihood
|
||||
|
||||
samples.append(current_prev)
|
||||
acceptance_history.append(1. if accepted else 0.)
|
||||
|
||||
if i < warmup and i%10==0 and len(acceptance_history)>=100:
|
||||
recent_accept_rate = np.mean(acceptance_history[-100:])
|
||||
# print(f'{i=} recent_accept_rate={recent_accept_rate:.4f} (current step_size={step_size:.4f})')
|
||||
step_size *= np.exp(adapt_rate * (recent_accept_rate - target_acceptance))
|
||||
|
||||
# remove "warmup" initial iterations
|
||||
samples = np.asarray(samples[warmup:])
|
||||
return samples
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
qp.environ["SAMPLE_SIZE"] = 500
|
||||
cls = LogisticRegression()
|
||||
kdey = KDEyML(cls)
|
||||
|
||||
train, test = qp.datasets.fetch_UCIMulticlassDataset('academic-success', standardize=True).train_test
|
||||
|
||||
with qp.util.temp_seed(2):
|
||||
print('fitting KDEy')
|
||||
kdey.fit(*train.Xy)
|
||||
|
||||
# shifted = test.sampling(500, *[0.7, 0.1, 0.2])
|
||||
# shifted = test.sampling(500, *test.prevalence()[::-1])
|
||||
shifted = test.sampling(500, *F.uniform_prevalence_sampling(train.n_classes))
|
||||
prev_hat = kdey.predict(shifted.X)
|
||||
mae = qp.error.mae(shifted.prevalence(), prev_hat)
|
||||
print(f'true_prev={strprev(shifted.prevalence())}')
|
||||
print(f'prev_hat={strprev(prev_hat)}, {mae=:.4f}')
|
||||
|
||||
h = kdey.classifier
|
||||
samples = bayesian(kdey, shifted.X, h, init=None, MAX_ITER=5_000, warmup=3_000)
|
||||
|
||||
conf_interval = ConfidenceIntervals(samples, confidence_level=0.95)
|
||||
mae = qp.error.mae(shifted.prevalence(), conf_interval.point_estimate())
|
||||
print(f'mean posterior {strprev(samples.mean(axis=0))}, {mae=:.4f}')
|
||||
print(f'CI={conf_interval}')
|
||||
print(f'\tcontains true={conf_interval.coverage(true_value=shifted.prevalence())==1}')
|
||||
print(f'\tamplitude={conf_interval.montecarlo_proportion(50_000)*100.:.20f}%')
|
||||
|
||||
if train.n_classes == 3:
|
||||
plot_prev_points(samples, true_prev=shifted.prevalence(), point_estim=prev_hat, train_prev=train.prevalence())
|
||||
# plot_prev_points_matplot(samples)
|
||||
|
||||
# report = qp.evaluation.evaluation_report(kdey, protocol=UPP(test), verbose=True)
|
||||
# print(report.mean(numeric_only=True))
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
import warnings
|
||||
|
||||
from sklearn.linear_model import LogisticRegression
|
||||
import quapy as qp
|
||||
from BayesianKDEy._bayeisan_kdey import BayesianKDEy
|
||||
from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot
|
||||
from method.confidence import ConfidenceIntervals
|
||||
from quapy.functional import strprev
|
||||
from quapy.method.aggregative import KDEyML
|
||||
from quapy.protocol import UPP
|
||||
import quapy.functional as F
|
||||
import numpy as np
|
||||
from tqdm import tqdm
|
||||
from scipy.stats import dirichlet
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
qp.environ["SAMPLE_SIZE"] = 500
|
||||
cls = LogisticRegression()
|
||||
bayes_kdey = BayesianKDEy(cls, bandwidth=.3, kernel='aitchison', mcmc_seed=0)
|
||||
|
||||
datasets = qp.datasets.UCI_BINARY_DATASETS
|
||||
train, test = qp.datasets.fetch_UCIBinaryDataset(datasets[0]).train_test
|
||||
|
||||
# train, test = qp.datasets.fetch_UCIMulticlassDataset('academic-success', standardize=True).train_test
|
||||
|
||||
with qp.util.temp_seed(0):
|
||||
print('fitting KDEy')
|
||||
bayes_kdey.fit(*train.Xy)
|
||||
|
||||
shifted = test.sampling(500, *[0.2, 0.8])
|
||||
# shifted = test.sampling(500, *test.prevalence()[::-1])
|
||||
# shifted = test.sampling(500, *F.uniform_prevalence_sampling(train.n_classes))
|
||||
prev_hat = bayes_kdey.predict(shifted.X)
|
||||
mae = qp.error.mae(shifted.prevalence(), prev_hat)
|
||||
print(f'true_prev={strprev(shifted.prevalence())}')
|
||||
print(f'prev_hat={strprev(prev_hat)}, {mae=:.4f}')
|
||||
|
||||
prev_hat, conf_interval = bayes_kdey.predict_conf(shifted.X)
|
||||
|
||||
mae = qp.error.mae(shifted.prevalence(), prev_hat)
|
||||
print(f'mean posterior {strprev(prev_hat)}, {mae=:.4f}')
|
||||
print(f'CI={conf_interval}')
|
||||
print(f'\tcontains true={conf_interval.coverage(true_value=shifted.prevalence())==1}')
|
||||
print(f'\tamplitude={conf_interval.montecarlo_proportion(50_000)*100.:.3f}%')
|
||||
|
||||
if train.n_classes == 3:
|
||||
plot_prev_points(bayes_kdey.prevalence_samples, true_prev=shifted.prevalence(), point_estim=prev_hat, train_prev=train.prevalence())
|
||||
# plot_prev_points_matplot(samples)
|
||||
|
||||
# report = qp.evaluation.evaluation_report(kdey, protocol=UPP(test), verbose=True)
|
||||
# print(report.mean(numeric_only=True))
|
||||
|
||||
|
||||
Loading…
Reference in New Issue