QuaPy/KDEy/kdey_devel.py

367 lines
18 KiB
Python

from typing import Union, Callable
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.neighbors import KernelDensity
import quapy as qp
from quapy.protocol import UPP
from quapy.method._kdey import KDEBase
from quapy.data import LabelledCollection
from quapy.method.aggregative import AggregativeSoftQuantifier, KDEyML
import quapy.functional as F
from sklearn.metrics.pairwise import rbf_kernel
from scipy import optimize
from tqdm import tqdm
import quapy.functional as F
epsilon = 1e-10
BANDWIDTH_RANGE = (0.001, 0.2)
class KDEyMLauto(KDEyML):
def __init__(self, classifier: BaseEstimator = None, val_split=5, random_state=None, optim='two_steps'):
self.classifier = qp._get_classifier(classifier)
self.val_split = val_split
self.bandwidth = None
self.random_state = random_state
self.optim = optim
def chose_bandwidth(self, train, test_instances):
classif_predictions = self.classifier_fit_predict(train, fit_classifier=True, predict_on=self.val_split)
te_posteriors = self.classify(test_instances)
return self.transduce(classif_predictions, te_posteriors)
def transduce(self, classif_predictions, te_posteriors):
tr_posteriors, tr_y = classif_predictions.Xy
classes = classif_predictions.classes_
n_classes = len(classes)
current_bandwidth = 0.05
if self.optim == 'both_fine':
current_bandwidth = np.full(fill_value=current_bandwidth, shape=(n_classes,))
current_prevalence = F.uniform_prevalence(n_classes=n_classes)
if self.optim == 'max_likelihood':
current_prevalence, current_bandwidth = self.optim_minimize_like(tr_posteriors, tr_y, te_posteriors, classes, grid=True)
elif self.optim == 'max_likelihood2':
current_prevalence, current_bandwidth = self.optim_minimize_like(tr_posteriors, tr_y, te_posteriors, classes, grid=False)
else:
iterations = 0
convergence = False
with qp.util.temp_seed(self.random_state):
while not convergence:
previous_bandwidth = current_bandwidth
previous_prevalence = current_prevalence
iterations += 1
print(f'{iterations}:')
if self.optim == 'two_steps':
current_prevalence = self.optim_minimize_prevalence(current_bandwidth, current_prevalence, tr_posteriors, tr_y, te_posteriors, classes)
print(f'\testim-prev={F.strprev(current_prevalence)}')
current_bandwidth = self.optim_minimize_bandwidth(current_bandwidth, current_prevalence, tr_posteriors, tr_y, te_posteriors, classes)
print(f'\tbandwidth={current_bandwidth}')
elif self.optim == 'both':
current_prevalence, current_bandwidth = self.optim_minimize_both(current_bandwidth, current_prevalence, tr_posteriors, tr_y, te_posteriors, classes)
elif self.optim == 'both_fine':
current_prevalence, current_bandwidth = self.optim_minimize_both_fine(current_bandwidth, current_prevalence, tr_posteriors, tr_y, te_posteriors, classes)
# check converngece
prev_convergence = all(np.isclose(previous_prevalence, current_prevalence, atol=0.01))
if isinstance(current_bandwidth, float):
band_convergence = np.isclose(previous_bandwidth, current_bandwidth, atol=0.001)
else:
band_convergence = all(np.isclose(previous_bandwidth, current_bandwidth, atol=0.001))
convergence = band_convergence and prev_convergence
self.bandwidth = current_bandwidth
print('bandwidth=', current_bandwidth)
print('prevalence=', current_prevalence)
return current_prevalence
def optim_minimize_prevalence(self, current_bandwidth, current_prev, tr_posteriors, tr_y, te_posteriors, classes):
mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, current_bandwidth)
test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities]
def neg_loglikelihood_prev(prev):
test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prev, test_densities))
test_loglikelihood = np.log(test_mixture_likelihood + epsilon)
return -np.sum(test_loglikelihood)
return optim_minimize(neg_loglikelihood_prev, current_prev)
def optim_minimize_bandwidth(self, current_bandwidth, current_prev, tr_posteriors, tr_y, te_posteriors, classes):
def neg_loglikelihood_bandwidth(bandwidth):
# bandwidth = bandwidth[0]
mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, bandwidth)
test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities]
test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(current_prev, test_densities))
test_loglikelihood = np.log(test_mixture_likelihood + epsilon)
nll = -np.sum(test_loglikelihood)
# print(f'\t{bandwidth=:.10f}\t{nll=:.10f}')
return nll
# bounds = [(0.00001, 0.2)]
# r = optimize.minimize(neg_loglikelihood_bandwidth, x0=[current_bandwidth], method='SLSQP', bounds=bounds)
r = optimize.minimize_scalar(neg_loglikelihood_bandwidth, bounds=(0.0001, 0.2), options={'xatol': 0.005})
# print(f'iterations-bandwidth={r.nit}')
# assert r.success, f'Process did not converge! {r.message}'
return r.x
def optim_minimize_both(self, current_bandwidth, current_prev, tr_posteriors, tr_y, te_posteriors, classes):
n_classes = len(current_prev)
def neg_loglikelihood_bandwidth(prevalence_bandwidth):
bandwidth = prevalence_bandwidth[-1]
prevalence = prevalence_bandwidth[:-1]
mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, bandwidth)
test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities]
test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prevalence, test_densities))
test_loglikelihood = np.log(test_mixture_likelihood + epsilon)
return -np.sum(test_loglikelihood)
bounds = [(0, 1) for _ in range(n_classes)] + [(0.00001, 0.2)]
constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x[:n_classes])})
prevalence_bandwidth = np.append(current_prev, current_bandwidth)
r = optimize.minimize(neg_loglikelihood_bandwidth, x0=prevalence_bandwidth, method='SLSQP', bounds=bounds, constraints=constraints)
print(f'iterations-both={r.nit}')
# assert r.success, 'Process did not converge!'
prev_band = r.x
current_prevalence = prev_band[:-1]
current_bandwidth = prev_band[-1]
return current_prevalence, current_bandwidth
def optim_minimize_both_fine(self, current_bandwidth, current_prev, tr_posteriors, tr_y, te_posteriors, classes):
n_classes = len(current_bandwidth)
def neg_loglikelihood_bandwidth(prevalence_bandwidth):
prevalence = prevalence_bandwidth[:n_classes]
bandwidth = prevalence_bandwidth[n_classes:]
mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, bandwidth)
test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities]
test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prevalence, test_densities))
test_loglikelihood = np.log(test_mixture_likelihood + epsilon)
return -np.sum(test_loglikelihood)
bounds = [(0, 1) for _ in range(n_classes)] + [(0.0001, 0.2) for _ in range(n_classes)]
constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x[:n_classes])})
prevalence_bandwidth = np.concatenate((current_prev, current_bandwidth))
r = optimize.minimize(neg_loglikelihood_bandwidth, x0=prevalence_bandwidth, method='SLSQP', bounds=bounds, constraints=constraints)
print(f'iterations-both-fine={r.nit}')
# assert r.success, 'Process did not converge!'
prev_band = r.x
current_prevalence = prev_band[:n_classes]
current_bandwidth = prev_band[n_classes:]
return current_prevalence, current_bandwidth
def optim_minimize_like(self, tr_posteriors, tr_y, te_posteriors, classes, reduction=100, grid=True):
n_classes = len(classes)
# reduce samples to speed up computation
posteriors_subsample = LabelledCollection(tr_posteriors, tr_y)
posteriors_subsample = posteriors_subsample.sampling(reduction*n_classes)
n_test = te_posteriors.shape[0]
subsample_index = np.random.choice(np.arange(n_test), size=reduction)
te_posterior_subsample = te_posteriors[subsample_index]
if grid:
_, best_band = self.choose_bandwidth_maxlikelihood_grid(*posteriors_subsample.Xy, te_posterior_subsample, classes)
else:
best_band = self.choose_bandwidth_maxlikelihood_search(*posteriors_subsample.Xy, te_posterior_subsample, classes)
mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, best_band)
test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities]
def neg_loglikelihood_prev(prev):
test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prev, test_densities))
test_loglikelihood = np.log(test_mixture_likelihood + epsilon)
return -np.sum(test_loglikelihood)
init_prev = np.full(fill_value=1 / n_classes, shape=(n_classes,))
pred_prev, neglikelihood = optim_minimize(neg_loglikelihood_prev, init_prev, return_loss=True)
return pred_prev, best_band
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
self.classif_predictions = classif_predictions
return self
def aggregate(self, posteriors: np.ndarray):
return self.transduce(self.classif_predictions, posteriors)
def choose_bandwidth_maxlikelihood_grid(self, tr_posteriors, tr_y, te_posteriors, classes):
n_classes = len(classes)
best_band = None
best_like = None
best_prev = None
init_prev = np.full(fill_value=1 / n_classes, shape=(n_classes,))
for bandwidth in np.logspace(-4, np.log10(0.2), 50):
mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, bandwidth)
test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities]
def neg_loglikelihood_prev(prev):
test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prev, test_densities))
test_loglikelihood = np.log(test_mixture_likelihood + epsilon)
return -np.sum(test_loglikelihood)
pred_prev, neglikelihood = optim_minimize(neg_loglikelihood_prev, init_prev, return_loss=True)
if best_like is None or neglikelihood < best_like:
best_like = neglikelihood
best_band = bandwidth
best_prev = pred_prev
print(f'best-like={best_like:.4f}')
print(f'best-band={best_band:.4f}')
return best_prev, best_band
def choose_bandwidth_maxlikelihood_search(self, tr_posteriors, tr_y, te_posteriors, classes):
n_classes = len(classes)
init_prev = F.uniform_prevalence(n_classes)
def neglikelihood_band(bandwidth):
mix_densities = self.get_mixture_components(tr_posteriors, tr_y, classes, bandwidth[0])
test_densities = [self.pdf(kde_i, te_posteriors) for kde_i in mix_densities]
def neg_loglikelihood_prev(prev):
test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prev, test_densities))
test_loglikelihood = np.log(test_mixture_likelihood + epsilon)
return -np.sum(test_loglikelihood)
pred_prev, neglikelihood = optim_minimize(neg_loglikelihood_prev, init_prev, return_loss=True)
return neglikelihood
bounds = [(0.0001, 0.2)]
r = optimize.minimize(neglikelihood_band, x0=[0.001], method='SLSQP', bounds=bounds)
best_band = r.x[0]
# assert r.success, 'Process did not converge!'
print(f'solved in nit={r.nit}')
return best_band
def optim_minimize(loss: Callable, init_prev: np.ndarray, return_loss=False):
"""
Searches for the optimal prevalence values, i.e., an `n_classes`-dimensional vector of the (`n_classes`-1)-simplex
that yields the smallest lost. This optimization is carried out by means of a constrained search using scipy's
SLSQP routine.
:param loss: (callable) the function to minimize
:return: (ndarray) the best prevalence vector found
"""
n_classes = len(init_prev)
# solutions are bounded to those contained in the unit-simplex
bounds = tuple((0, 1) for _ in range(n_classes)) # values in [0,1]
constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x)}) # values summing up to 1
r = optimize.minimize(loss, x0=init_prev, method='SLSQP', bounds=bounds, constraints=constraints)
# print(f'iterations-prevalence={r.nit}')
# assert r.success, 'Process did not converge!'
if return_loss:
return r.x, r.fun
else:
return r.x
class KDEyMLauto2(KDEyML):
def __init__(self, classifier: BaseEstimator=None, val_split=5, bandwidth=0.1, random_state=None, reduction=100, max_reduced=500, target='likelihood', search='grid'):
"""
reduction: number of examples per class for automatically setting the bandwidth
"""
self.classifier = qp._get_classifier(classifier)
self.val_split = val_split
if bandwidth == 'auto':
self.bandwidth = bandwidth
else:
self.bandwidth = KDEBase._check_bandwidth(bandwidth)
self.reduction = reduction
self.max_reduced = max_reduced
self.random_state = random_state
assert target in ['likelihood'] or target in qp.error.QUANTIFICATION_ERROR_NAMES, 'unknown target for auto'
assert search in ['grid', 'optim'], 'unknown value for search'
self.target = target
self.search = search
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
if self.bandwidth == 'auto':
self.auto_bandwidth_likelihood(classif_predictions)
else:
self.bandwidth_ = self.bandwidth
self.mix_densities = self.get_mixture_components(*classif_predictions.Xy, data.classes_, self.bandwidth_)
return self
def auto_bandwidth_likelihood(self, classif_predictions: LabelledCollection):
n_classes = classif_predictions.n_classes
train, val = classif_predictions.split_stratified(train_prop=0.5, random_state=self.random_state)
if self.reduction is not None:
# reduce samples to speed up computation
tr_length = min(self.reduction * n_classes, self.max_reduced)
if len(train) > tr_length:
train = train.sampling(tr_length)
init_prev = F.uniform_prevalence(n_classes=n_classes)
repeats = 25
prot = UPP(val, sample_size=self.reduction, repeats=repeats, random_state=self.random_state)
def eval_bandwidth(bandwidth):
mix_densities = self.get_mixture_components(*train.Xy, train.classes_, bandwidth)
loss_accum = 0
for (sample, prevtrue) in prot():
test_densities = [self.pdf(kde_i, sample) for kde_i in mix_densities]
def neg_loglikelihood_prev(prev):
test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip(prev, test_densities))
test_loglikelihood = np.log(test_mixture_likelihood + epsilon)
nll = -np.sum(test_loglikelihood)
return nll
if self.target == 'likelihood':
loss_fn = neg_loglikelihood_prev
else:
loss_fn = lambda prev_hat: qp.error.from_name(self.target)(prevtrue, prev_hat)
pred_prev, neglikelihood = optim_minimize(loss_fn, init_prev, return_loss=True)
loss_accum += neglikelihood
return loss_accum
if self.search == 'optim':
r = optimize.minimize_scalar(eval_bandwidth, bounds=(0.0001, 0.2), options={'xatol': 0.005})
best_band = r.x
best_loss_value = r.fun
nit = r.nit
elif self.search=='grid':
nit=20
band_evals = [(band, eval_bandwidth(band)) for band in np.logspace(-4, np.log10(0.2), num=nit)]
best_band, best_loss_value = sorted(band_evals, key=lambda x:x[1])[0]
print(f'found bandwidth={best_band:.8f} after {nit=} iterations loss_val={best_loss_value:.5f})')
self.bandwidth_ = best_band
# class KDEyMLred(KDEyML):
# def __init__(self, classifier: BaseEstimator=None, val_split=5, bandwidth=0.1, random_state=None, reduction=100, max_reduced=500):
# self.classifier = qp._get_classifier(classifier)
# self.val_split = val_split
# self.bandwidth = KDEBase._check_bandwidth(bandwidth)
# self.reduction = reduction
# self.max_reduced = max_reduced
# self.random_state = random_state
#
# def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
# n_classes = classif_predictions.n_classes
# tr_length = min(self.reduction * n_classes, self.max_reduced)
# if len(classif_predictions) > tr_length:
# classif_predictions = classif_predictions.sampling(tr_length)
# self.mix_densities = self.get_mixture_components(*classif_predictions.Xy, data.classes_, self.bandwidth)
# return self