148 lines
6.8 KiB
Python
148 lines
6.8 KiB
Python
from sklearn.base import BaseEstimator
|
|
import numpy as np
|
|
from quapy.method._kdey import KDEBase
|
|
from quapy.method.confidence import WithConfidenceABC, ConfidenceRegionABC
|
|
from quapy.method.aggregative import AggregativeSoftQuantifier
|
|
from tqdm import tqdm
|
|
import quapy.functional as F
|
|
|
|
|
|
class BayesianKDEy(AggregativeSoftQuantifier, KDEBase, WithConfidenceABC):
|
|
"""
|
|
`Bayesian version of KDEy.
|
|
|
|
:param classifier: a scikit-learn's BaseEstimator, or None, in which case the classifier is taken to be
|
|
the one indicated in `qp.environ['DEFAULT_CLS']`
|
|
:param val_split: specifies the data used for generating classifier predictions. This specification
|
|
can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to
|
|
be extracted from the training set; or as an integer (default 5), indicating that the predictions
|
|
are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value
|
|
for `k`); or as a tuple `(X,y)` defining the specific set of data to use for validation. Set to
|
|
None when the method does not require any validation data, in order to avoid that some portion of
|
|
the training data be wasted.
|
|
:param num_warmup: number of warmup iterations for the MCMC sampler (default 500)
|
|
:param num_samples: number of samples to draw from the posterior (default 1000)
|
|
:param mcmc_seed: random seed for the MCMC sampler (default 0)
|
|
:param confidence_level: float in [0,1] to construct a confidence region around the point estimate (default 0.95)
|
|
:param region: string, set to `intervals` for constructing confidence intervals (default), or to
|
|
`ellipse` for constructing an ellipse in the probability simplex, or to `ellipse-clr` for
|
|
constructing an ellipse in the Centered-Log Ratio (CLR) unconstrained space.
|
|
:param verbose: bool, whether to display progress bar
|
|
"""
|
|
def __init__(self,
|
|
classifier: BaseEstimator=None,
|
|
fit_classifier=True,
|
|
val_split: int = 5,
|
|
kernel='gaussian',
|
|
bandwidth=0.1,
|
|
num_warmup: int = 500,
|
|
num_samples: int = 1_000,
|
|
mcmc_seed: int = 0,
|
|
confidence_level: float = 0.95,
|
|
region: str = 'intervals',
|
|
verbose: bool = False):
|
|
|
|
if num_warmup <= 0:
|
|
raise ValueError(f'parameter {num_warmup=} must be a positive integer')
|
|
if num_samples <= 0:
|
|
raise ValueError(f'parameter {num_samples=} must be a positive integer')
|
|
|
|
super().__init__(classifier, fit_classifier, val_split)
|
|
self.bandwidth = KDEBase._check_bandwidth(bandwidth)
|
|
self.kernel = self._check_kernel(kernel)
|
|
self.num_warmup = num_warmup
|
|
self.num_samples = num_samples
|
|
self.mcmc_seed = mcmc_seed
|
|
self.confidence_level = confidence_level
|
|
self.region = region
|
|
self.verbose = verbose
|
|
|
|
def aggregation_fit(self, classif_predictions, labels):
|
|
self.mix_densities = self.get_mixture_components(classif_predictions, labels, self.classes_, self.bandwidth, self.kernel)
|
|
return self
|
|
|
|
def aggregate(self, classif_predictions):
|
|
self.prevalence_samples = self._bayesian_kde(classif_predictions, init=None, verbose=self.verbose)
|
|
return self.prevalence_samples.mean(axis=0)
|
|
|
|
def predict_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC):
|
|
if confidence_level is None:
|
|
confidence_level = self.confidence_level
|
|
classif_predictions = self.classify(instances)
|
|
point_estimate = self.aggregate(classif_predictions)
|
|
samples = self.prevalence_samples # available after calling "aggregate" function
|
|
region = WithConfidenceABC.construct_region(samples, confidence_level=confidence_level, method=self.region)
|
|
return point_estimate, region
|
|
|
|
def _bayesian_kde(self, X_probs, init=None, verbose=False):
|
|
"""
|
|
Bayes:
|
|
P(prev|data) = P(data|prev) P(prev) / P(data)
|
|
i.e.,
|
|
posterior = likelihood * prior / evidence
|
|
we assume the likelihood be:
|
|
prev @ [kde_i_likelihood(data) 1..i..n]
|
|
prior be uniform in simplex
|
|
"""
|
|
|
|
rng = np.random.default_rng(self.mcmc_seed)
|
|
kdes = self.mix_densities
|
|
test_densities = np.asarray([self.pdf(kde_i, X_probs, self.kernel) for kde_i in kdes])
|
|
|
|
def log_likelihood(prev, epsilon=1e-10):
|
|
test_likelihoods = prev @ test_densities
|
|
test_loglikelihood = np.log(test_likelihoods + epsilon)
|
|
return np.sum(test_loglikelihood)
|
|
|
|
# def log_prior(prev):
|
|
# todo: adapt to arbitrary prior knowledge (e.g., something around training prevalence)
|
|
# return 1/np.sum((prev-init)**2) # it is not 1 but we assume uniform, son anyway is an useless constant
|
|
|
|
# def log_prior(prev, alpha_scale=1000):
|
|
# alpha = np.array(init) * alpha_scale
|
|
# return dirichlet.logpdf(prev, alpha)
|
|
|
|
def log_prior(prev):
|
|
return 0
|
|
|
|
def sample_neighbour(prev, step_size=0.05):
|
|
# random-walk Metropolis-Hastings
|
|
dir_noise = rng.normal(scale=step_size, size=len(prev))
|
|
neighbour = F.normalize_prevalence(prev + dir_noise, method='mapsimplex')
|
|
return neighbour
|
|
|
|
n_classes = X_probs.shape[1]
|
|
current_prev = F.uniform_prevalence(n_classes) if init is None else init
|
|
current_likelihood = log_likelihood(current_prev) + log_prior(current_prev)
|
|
|
|
# Metropolis-Hastings with adaptive rate
|
|
step_size = 0.05
|
|
target_acceptance = 0.3
|
|
adapt_rate = 0.01
|
|
acceptance_history = []
|
|
|
|
samples = []
|
|
total_steps = self.num_samples + self.num_warmup
|
|
for i in tqdm(range(total_steps), total=total_steps, disable=not verbose):
|
|
proposed_prev = sample_neighbour(current_prev, step_size)
|
|
|
|
# probability of acceptance
|
|
proposed_likelihood = log_likelihood(proposed_prev) + log_prior(proposed_prev)
|
|
acceptance = proposed_likelihood - current_likelihood
|
|
|
|
# decide acceptance
|
|
accepted = np.log(rng.random()) < acceptance
|
|
if accepted:
|
|
current_prev = proposed_prev
|
|
current_likelihood = proposed_likelihood
|
|
|
|
samples.append(current_prev)
|
|
acceptance_history.append(1. if accepted else 0.)
|
|
|
|
if i < self.num_warmup and i%10==0 and len(acceptance_history)>=100:
|
|
recent_accept_rate = np.mean(acceptance_history[-100:])
|
|
step_size *= np.exp(adapt_rate * (recent_accept_rate - target_acceptance))
|
|
|
|
# remove "warmup" initial iterations
|
|
samples = np.asarray(samples[self.num_warmup:])
|
|
return samples |