113 lines
4.0 KiB
Python
113 lines
4.0 KiB
Python
from sklearn.linear_model import LogisticRegression
|
|
import quapy as qp
|
|
from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot
|
|
from method.confidence import ConfidenceIntervals
|
|
from quapy.functional import strprev
|
|
from quapy.method.aggregative import KDEyML
|
|
from quapy.protocol import UPP
|
|
import quapy.functional as F
|
|
import numpy as np
|
|
from tqdm import tqdm
|
|
from scipy.stats import dirichlet
|
|
|
|
|
|
def bayesian(kdes, data, probabilistic_classifier, init=None, MAX_ITER=100_000, warmup=3_000):
|
|
"""
|
|
Bayes:
|
|
P(prev|data) = P(data|prev) P(prev) / P(data)
|
|
i.e.,
|
|
posterior = likelihood * prior / evidence
|
|
we assume the likelihood be:
|
|
prev @ [kde_i_likelihood(data) 1..i..n]
|
|
prior be uniform in simplex
|
|
"""
|
|
def pdf(kde, X):
|
|
# todo: remove exp, since we are then doing the log every time? (not sure)
|
|
return np.exp(kde.score_samples(X))
|
|
|
|
X = probabilistic_classifier.predict_proba(data)
|
|
test_densities = np.asarray([pdf(kde_i, X) for kde_i in kdes])
|
|
|
|
def log_likelihood(prev, epsilon=1e-10):
|
|
test_likelihoods = prev @ test_densities
|
|
test_loglikelihood = np.log(test_likelihoods + epsilon)
|
|
return np.sum(test_loglikelihood)
|
|
|
|
# def log_prior(prev):
|
|
# todo: adapt to arbitrary prior knowledge (e.g., something around training prevalence)
|
|
# return 1/np.sum((prev-init)**2) # it is not 1 but we assume uniform, son anyway is an useless constant
|
|
|
|
# def log_prior(prev, alpha_scale=1000):
|
|
# alpha = np.array(init) * alpha_scale
|
|
# return dirichlet.logpdf(prev, alpha)
|
|
|
|
def log_prior(prev):
|
|
return 0
|
|
|
|
def sample_neighbour(prev):
|
|
dir_noise = np.random.normal(scale=0.05, size=len(prev))
|
|
# neighbour = F.normalize_prevalence(prev + dir_noise, method='clip')
|
|
neighbour = F.normalize_prevalence(prev + dir_noise, method='mapsimplex')
|
|
return neighbour
|
|
|
|
n_classes = len(probabilistic_classifier.classes_)
|
|
current_prev = F.uniform_prevalence(n_classes) if init is None else init
|
|
current_likelihood = log_likelihood(current_prev) + log_prior(current_prev)
|
|
|
|
# Metropolis-Hastings
|
|
samples = []
|
|
for _ in tqdm(range(MAX_ITER), total=MAX_ITER):
|
|
proposed_prev = sample_neighbour(current_prev)
|
|
|
|
# probability of acceptance
|
|
proposed_likelihood = log_likelihood(proposed_prev) + log_prior(proposed_prev)
|
|
acceptance = proposed_likelihood - current_likelihood
|
|
|
|
# decide acceptance
|
|
if np.log(np.random.rand()) < acceptance:
|
|
# accept
|
|
current_prev = proposed_prev
|
|
current_likelihood = proposed_likelihood
|
|
|
|
samples.append(current_prev)
|
|
|
|
# remove "warmup" initial iterations
|
|
samples = np.asarray(samples[warmup:])
|
|
return samples
|
|
|
|
|
|
if __name__ == '__main__':
|
|
qp.environ["SAMPLE_SIZE"] = 500
|
|
cls = LogisticRegression()
|
|
kdey = KDEyML(cls)
|
|
|
|
train, test = qp.datasets.fetch_UCIMulticlassDataset('dry-bean', standardize=True).train_test
|
|
|
|
with qp.util.temp_seed(2):
|
|
print('fitting KDEy')
|
|
kdey.fit(*train.Xy)
|
|
|
|
# shifted = test.sampling(500, *[0.7, 0.1, 0.2])
|
|
shifted = test.sampling(500, *test.prevalence()[::-1])
|
|
prev_hat = kdey.predict(shifted.X)
|
|
mae = qp.error.mae(shifted.prevalence(), prev_hat)
|
|
print(f'true_prev={strprev(shifted.prevalence())}, prev_hat={strprev(prev_hat)}, {mae=:.4f}')
|
|
|
|
kdes = kdey.mix_densities
|
|
h = kdey.classifier
|
|
samples = bayesian(kdes, shifted.X, h, init=None, MAX_ITER=5_000, warmup=1_000)
|
|
|
|
print(f'mean posterior {strprev(samples.mean(axis=0))}')
|
|
conf_interval = ConfidenceIntervals(samples, confidence_level=0.95)
|
|
print()
|
|
|
|
if train.n_classes == 3:
|
|
plot_prev_points(samples, true_prev=shifted.prevalence(), point_estim=prev_hat, train_prev=train.prevalence())
|
|
# plot_prev_points_matplot(samples)
|
|
|
|
|
|
# report = qp.evaluation.evaluation_report(kdey, protocol=UPP(test), verbose=True)
|
|
# print(report.mean(numeric_only=True))
|
|
|
|
|