from sklearn.linear_model import LogisticRegression import quapy as qp from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot from method.confidence import ConfidenceIntervals from quapy.functional import strprev from quapy.method.aggregative import KDEyML from quapy.protocol import UPP import quapy.functional as F import numpy as np from tqdm import tqdm from scipy.stats import dirichlet def bayesian(kdey, data, probabilistic_classifier, init=None, MAX_ITER=100_000, warmup=3_000): """ Bayes: P(prev|data) = P(data|prev) P(prev) / P(data) i.e., posterior = likelihood * prior / evidence we assume the likelihood be: prev @ [kde_i_likelihood(data) 1..i..n] prior be uniform in simplex """ def pdf(kde, X): # todo: remove exp, since we are then doing the log every time? (not sure) return np.exp(kde.score_samples(X)) X = probabilistic_classifier.predict_proba(data) kdes = kdey.mix_densities test_densities = np.asarray([pdf(kde_i, X) for kde_i in kdes]) def log_likelihood(prev, epsilon=1e-10): test_likelihoods = prev @ test_densities test_loglikelihood = np.log(test_likelihoods + epsilon) return np.sum(test_loglikelihood) # def log_prior(prev): # todo: adapt to arbitrary prior knowledge (e.g., something around training prevalence) # return 1/np.sum((prev-init)**2) # it is not 1 but we assume uniform, son anyway is an useless constant # def log_prior(prev, alpha_scale=1000): # alpha = np.array(init) * alpha_scale # return dirichlet.logpdf(prev, alpha) def log_prior(prev): return 0 def sample_neighbour(prev, step_size=0.05): # random-walk Metropolis-Hastings dir_noise = np.random.normal(scale=step_size, size=len(prev)) neighbour = F.normalize_prevalence(prev + dir_noise, method='mapsimplex') return neighbour n_classes = len(probabilistic_classifier.classes_) current_prev = F.uniform_prevalence(n_classes) if init is None else init current_likelihood = log_likelihood(current_prev) + log_prior(current_prev) # Metropolis-Hastings with adaptive rate step_size = 0.05 target_acceptance = 0.3 adapt_rate = 0.01 acceptance_history = [] samples = [] for i in tqdm(range(MAX_ITER), total=MAX_ITER): proposed_prev = sample_neighbour(current_prev, step_size) # probability of acceptance proposed_likelihood = log_likelihood(proposed_prev) + log_prior(proposed_prev) acceptance = proposed_likelihood - current_likelihood # decide acceptance accepted = np.log(np.random.rand()) < acceptance if accepted: current_prev = proposed_prev current_likelihood = proposed_likelihood samples.append(current_prev) acceptance_history.append(1. if accepted else 0.) if i < warmup and i%10==0 and len(acceptance_history)>=100: recent_accept_rate = np.mean(acceptance_history[-100:]) # print(f'{i=} recent_accept_rate={recent_accept_rate:.4f} (current step_size={step_size:.4f})') step_size *= np.exp(adapt_rate * (recent_accept_rate - target_acceptance)) # remove "warmup" initial iterations samples = np.asarray(samples[warmup:]) return samples if __name__ == '__main__': qp.environ["SAMPLE_SIZE"] = 500 cls = LogisticRegression() kdey = KDEyML(cls) train, test = qp.datasets.fetch_UCIMulticlassDataset('academic-success', standardize=True).train_test with qp.util.temp_seed(2): print('fitting KDEy') kdey.fit(*train.Xy) # shifted = test.sampling(500, *[0.7, 0.1, 0.2]) # shifted = test.sampling(500, *test.prevalence()[::-1]) shifted = test.sampling(500, *F.uniform_prevalence_sampling(train.n_classes)) prev_hat = kdey.predict(shifted.X) mae = qp.error.mae(shifted.prevalence(), prev_hat) print(f'true_prev={strprev(shifted.prevalence())}') print(f'prev_hat={strprev(prev_hat)}, {mae=:.4f}') h = kdey.classifier samples = bayesian(kdey, shifted.X, h, init=None, MAX_ITER=5_000, warmup=3_000) conf_interval = ConfidenceIntervals(samples, confidence_level=0.95) mae = qp.error.mae(shifted.prevalence(), conf_interval.point_estimate()) print(f'mean posterior {strprev(samples.mean(axis=0))}, {mae=:.4f}') print(f'CI={conf_interval}') print(f'\tcontains true={conf_interval.coverage(true_value=shifted.prevalence())==1}') print(f'\tamplitude={conf_interval.montecarlo_proportion(50_000)*100.:.20f}%') if train.n_classes == 3: plot_prev_points(samples, true_prev=shifted.prevalence(), point_estim=prev_hat, train_prev=train.prevalence()) # plot_prev_points_matplot(samples) # report = qp.evaluation.evaluation_report(kdey, protocol=UPP(test), verbose=True) # print(report.mean(numeric_only=True))