QuaPy/BayesianKDEy/bayesian_kdey.py

100 lines
3.5 KiB
Python

from sklearn.linear_model import LogisticRegression
import quapy as qp
from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot
from quapy.functional import strprev
from quapy.method.aggregative import KDEyML
from quapy.protocol import UPP
import quapy.functional as F
import numpy as np
from tqdm import tqdm
def bayesian(kdes, data, prob_classifier, init=None, MAX_ITER=100_000, warmup=3_000):
"""
Bayes:
P(prev|data) = P(data|prev) P(prev) / P(data)
i.e.,
posterior = likelihood * prior / evidence
we assume the likelihood be:
prev @ [kde_i_likelihood(data) 1..i..n]
prior be uniform in simplex
"""
def pdf(kde, X):
# todo: remove exp, since we are then doing the log every time? (not sure)
return np.exp(kde.score_samples(X))
X = prob_classifier.predict_proba(data)
test_densities = [pdf(kde_i, X) for kde_i in kdes]
def log_likelihood(prev, epsilon=1e-8):
# test_likelihoods = sum(prev_i*dens_i for prev_i, dens_i in zip (prev, test_densities))
test_likelihoods = prev @ test_densities
test_loglikelihood = np.log(test_likelihoods + epsilon)
return np.sum(test_loglikelihood)
def log_prior(prev):
# todo: adapt to arbitrary prior knowledge (e.g., something around training prevalence)
return 1 # it is not 1 but we assume uniform, son anyway is an useless constant
def sample_neighbour(prev):
rand_prev = F.uniform_prevalence_sampling(n_classes=len(prev), size=1)
rand_direction = rand_prev - prev
neighbour = F.normalize_prevalence(prev + rand_direction*0.05)
return neighbour
n_classes = len(prob_classifier.classes_)
current_prev = F.uniform_prevalence(n_classes) if init is None else init
current_likelihood = log_likelihood(current_prev) + log_prior(current_prev)
# Metropolis-Hastings
samples = []
for _ in tqdm(range(MAX_ITER), total=MAX_ITER):
proposed_prev = sample_neighbour(current_prev)
# probability of acceptance
proposed_likelihood = log_likelihood(proposed_prev) + log_prior(proposed_prev)
acceptance = proposed_likelihood - current_likelihood
# decide acceptance
if np.log(np.random.rand()) < acceptance:
# accept
current_prev = proposed_prev
current_likelihood = proposed_likelihood
samples.append(current_prev)
# remove "warmup" initial iterations
samples = np.asarray(samples[warmup:])
return samples
if __name__ == '__main__':
qp.environ["SAMPLE_SIZE"] = 500
cls = LogisticRegression()
kdey = KDEyML(cls)
train, test = qp.datasets.fetch_UCIMulticlassDataset('waveform-v1', standardize=True).train_test
with qp.util.temp_seed(0):
print('fitting KDEy')
kdey.fit(*train.Xy)
shifted = test.sampling(500, *[0.1, 0.1, 0.8])
prev_hat = kdey.predict(shifted.X)
mae = qp.error.mae(shifted.prevalence(), prev_hat)
print(f'true_prev={strprev(shifted.prevalence())}, prev_hat={strprev(prev_hat)}, {mae=:.4f}')
kdes = kdey.mix_densities
h = kdey.classifier
samples = bayesian(kdes, shifted.X, h, init=prev_hat, MAX_ITER=100_000, warmup=0)
print(f'mean posterior {strprev(samples.mean(axis=0))}')
plot_prev_points(samples, true_prev=shifted.prevalence())
# report = qp.evaluation.evaluation_report(kdey, protocol=UPP(test), verbose=True)
# print(report.mean(numeric_only=True))