from sklearn.base import BaseEstimator import numpy as np from quapy.method._kdey import KDEBase from quapy.method.confidence import WithConfidenceABC, ConfidenceRegionABC, CLRtransformation from quapy.method.aggregative import AggregativeSoftQuantifier from tqdm import tqdm import quapy.functional as F class BayesianKDEy(AggregativeSoftQuantifier, KDEBase, WithConfidenceABC): """ `Bayesian version of KDEy. :param classifier: a scikit-learn's BaseEstimator, or None, in which case the classifier is taken to be the one indicated in `qp.environ['DEFAULT_CLS']` :param val_split: specifies the data used for generating classifier predictions. This specification can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to be extracted from the training set; or as an integer (default 5), indicating that the predictions are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value for `k`); or as a tuple `(X,y)` defining the specific set of data to use for validation. Set to None when the method does not require any validation data, in order to avoid that some portion of the training data be wasted. :param num_warmup: number of warmup iterations for the MCMC sampler (default 500) :param num_samples: number of samples to draw from the posterior (default 1000) :param mcmc_seed: random seed for the MCMC sampler (default 0) :param confidence_level: float in [0,1] to construct a confidence region around the point estimate (default 0.95) :param region: string, set to `intervals` for constructing confidence intervals (default), or to `ellipse` for constructing an ellipse in the probability simplex, or to `ellipse-clr` for constructing an ellipse in the Centered-Log Ratio (CLR) unconstrained space. :param verbose: bool, whether to display progress bar """ def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split: int = 5, kernel='gaussian', bandwidth=0.1, num_warmup: int = 500, num_samples: int = 1_000, mcmc_seed: int = 0, confidence_level: float = 0.95, region: str = 'intervals', explore_CLR=False, step_size=0.05, verbose: bool = False): if num_warmup <= 0: raise ValueError(f'parameter {num_warmup=} must be a positive integer') if num_samples <= 0: raise ValueError(f'parameter {num_samples=} must be a positive integer') super().__init__(classifier, fit_classifier, val_split) self.bandwidth = KDEBase._check_bandwidth(bandwidth, kernel) self.kernel = self._check_kernel(kernel) self.num_warmup = num_warmup self.num_samples = num_samples self.mcmc_seed = mcmc_seed self.confidence_level = confidence_level self.region = region self.explore_CLR = explore_CLR self.step_size = step_size self.verbose = verbose def aggregation_fit(self, classif_predictions, labels): self.mix_densities = self.get_mixture_components(classif_predictions, labels, self.classes_, self.bandwidth, self.kernel) return self def aggregate(self, classif_predictions): self.prevalence_samples = self._bayesian_kde(classif_predictions, init=None, verbose=self.verbose) return self.prevalence_samples.mean(axis=0) def predict_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC): if confidence_level is None: confidence_level = self.confidence_level classif_predictions = self.classify(instances) point_estimate = self.aggregate(classif_predictions) samples = self.prevalence_samples # available after calling "aggregate" function region = WithConfidenceABC.construct_region(samples, confidence_level=confidence_level, method=self.region) return point_estimate, region def _bayesian_kde(self, X_probs, init=None, verbose=False): """ Bayes: P(prev|data) = P(data|prev) P(prev) / P(data) i.e., posterior = likelihood * prior / evidence we assume the likelihood be: prev @ [kde_i_likelihood(data) 1..i..n] prior be uniform in simplex """ rng = np.random.default_rng(self.mcmc_seed) kdes = self.mix_densities test_densities = np.asarray([self.pdf(kde_i, X_probs, self.kernel) for kde_i in kdes]) def log_likelihood(prev, epsilon=1e-10): test_likelihoods = prev @ test_densities test_loglikelihood = np.log(test_likelihoods + epsilon) return np.sum(test_loglikelihood) # def log_prior(prev): # todo: adapt to arbitrary prior knowledge (e.g., something around training prevalence) # return 1/np.sum((prev-init)**2) # it is not 1 but we assume uniform, son anyway is an useless constant # def log_prior(prev, alpha_scale=1000): # alpha = np.array(init) * alpha_scale # return dirichlet.logpdf(prev, alpha) def log_prior(prev): return 0 def sample_neighbour(prev, step_size): # random-walk Metropolis-Hastings d = len(prev) if not self.explore_CLR: dir_noise = rng.normal(scale=step_size/np.sqrt(d), size=d) neighbour = F.normalize_prevalence(prev + dir_noise, method='mapsimplex') else: clr = CLRtransformation() clr_point = clr(prev) dir_noise = rng.normal(scale=step_size, size=d) clr_neighbour = clr_point+dir_noise neighbour = clr.inverse(clr_neighbour) assert in_simplex(neighbour), 'wrong CLR transformation' return neighbour n_classes = X_probs.shape[1] current_prev = F.uniform_prevalence(n_classes) if init is None else init current_likelihood = log_likelihood(current_prev) + log_prior(current_prev) # Metropolis-Hastings with adaptive rate step_size = self.step_size target_acceptance = 0.3 adapt_rate = 0.05 acceptance_history = [] samples = [] total_steps = self.num_samples + self.num_warmup for i in tqdm(range(total_steps), total=total_steps, disable=not verbose): proposed_prev = sample_neighbour(current_prev, step_size) # probability of acceptance proposed_likelihood = log_likelihood(proposed_prev) + log_prior(proposed_prev) acceptance = proposed_likelihood - current_likelihood # decide acceptance accepted = np.log(rng.random()) < acceptance if accepted: current_prev = proposed_prev current_likelihood = proposed_likelihood samples.append(current_prev) acceptance_history.append(1. if accepted else 0.) if i < self.num_warmup and i%10==0 and len(acceptance_history)>=100: recent_accept_rate = np.mean(acceptance_history[-100:]) step_size *= np.exp(adapt_rate * (recent_accept_rate - target_acceptance)) # step_size = float(np.clip(step_size, min_step, max_step)) print(f'acceptance-rate={recent_accept_rate*100:.3f}%, step-size={step_size:.5f}') # remove "warmup" initial iterations samples = np.asarray(samples[self.num_warmup:]) return samples def in_simplex(x): return np.all(x >= 0) and np.isclose(x.sum(), 1)