scripts for understanding the divergence approximations

This commit is contained in:
Alejandro Moreo Fernandez 2023-11-20 15:28:56 +01:00
parent d060ffa591
commit 61399d66f6
8 changed files with 573 additions and 0 deletions

View File

@ -0,0 +1,82 @@
import numpy as np
from scipy.stats import norm, uniform, expon
from scipy.special import rel_entr
def kld_approx(p, q, resolution=100):
steps = np.linspace(-5, 5, resolution)
integral = 0
for step_i1, step_i2 in zip(steps[:-1], steps[1:]):
base = step_i2-step_i1
middle = (step_i1+step_i2)/2
pi = p.pdf(middle)
qi = q.pdf(middle)
integral += (base * pi*np.log(pi/qi))
return integral
def integrate(f, resolution=10000):
steps = np.linspace(-10, 10, resolution)
integral = 0
for step_i1, step_i2 in zip(steps[:-1], steps[1:]):
base = step_i2 - step_i1
middle = (step_i1 + step_i2) / 2
integral += (base * f(middle))
return integral
def kl_analytic(m1, s1, m2, s2):
return np.log(s2/s1)+ (s1*s1 + (m1-m2)**2) / (2*s2*s2) - 0.5
def montecarlo(p, q, trials=1000000):
xs = p.rvs(trials)
ps = p.pdf(xs)
qs = q.pdf(xs)
return np.mean(np.log(ps/qs))
def montecarlo2(p, q, trials=100000):
#r = norm(-3, scale=5)
r = uniform(-10, 20)
# r = expon(loc=-6)
#r = p
xs = r.rvs(trials)
rs = r.pdf(xs)
print(rs)
ps = p.pdf(xs)+0.0000001
qs = q.pdf(xs)+0.0000001
return np.mean((ps/rs)*np.log(ps/qs))
def wrong(p, q, trials=100000):
xs = np.random.uniform(-10,10, trials)
ps = p.pdf(xs)
qs = q.pdf(xs)
return np.mean(ps*np.log(ps/qs))
p = norm(loc=0, scale=1)
q = norm(loc=1, scale=1)
integral_approx = kld_approx(p, q)
analytic_solution = kl_analytic(p.kwds['loc'], p.kwds['scale'], q.kwds['loc'], q.kwds['scale'])
montecarlo_approx = montecarlo(p, q)
montecarlo_approx2 = montecarlo2(p, q)
wrong_approx = wrong(p, q)
print(f'{analytic_solution=:.10f}')
print()
print(f'{integral_approx=:.10f}')
print(f'integra error = {np.abs(analytic_solution-integral_approx):.10f}')
print()
print(f'{montecarlo_approx=:.10f}')
print(f'montecarlo error = {np.abs(analytic_solution-montecarlo_approx):.10f}')
print()
print(f'{montecarlo_approx2=:.10f}')
print(f'montecarlo2 error = {np.abs(analytic_solution-montecarlo_approx2):.10f}')
print()
print(f'{wrong_approx=:.10f}')
print(f'wrong error = {np.abs(analytic_solution-wrong_approx):.10f}')

View File

@ -0,0 +1,113 @@
import numpy as np
from sklearn.neighbors import KernelDensity
from scipy.stats import norm, multivariate_normal, Covariance
from sklearn.metrics.pairwise import rbf_kernel
from sklearn.neighbors import KDTree
# def CauchySchwarzDivGaussMix(pi_i, mu_i, Lambda_i, tau_i, nu_i, Omega_i):
# Lambda_i_inv =
# def z()
nD=2
mu = [[0.1,0.1], [0.2, 0.3], [1, 1]] # centers
bandwidth = 0.10 # bandwidth and scale (standard deviation)
standard_deviation = bandwidth
variance = standard_deviation**2
mus = np.asarray(mu).reshape(-1,nD)
kde = KernelDensity(bandwidth=bandwidth).fit(mus)
x = [0.5,0.2]
print('with scikit-learn KDE')
prob = np.exp(kde.score_samples(np.asarray(x).reshape(-1,nD)))
print(prob)
# univariate
# N = norm(loc=mu, scale=scale)
# prob = N.pdf(x)
# print(prob)
# multivariate
print('with scipy multivariate normal')
npoints = mus.shape[0]
probs = sum(multivariate_normal(mean=mu_i, cov=variance).pdf(x) for mu_i in mu)
probs /= npoints
print(probs)
print('with scikit learn rbf_kernel')
x = np.asarray(x).reshape(-1,nD)
gamma = 1/(2*variance)
gram = rbf_kernel(mus, x, gamma=gamma)
const = 1/np.sqrt(((2*np.pi)**nD) * (variance**(nD)))
gram *= const
print(gram)
print(np.sum(gram)/npoints)
print('with stackoverflow answer')
from scipy.spatial.distance import pdist, cdist, squareform
import scipy
# this is an NxD matrix, where N is number of items and D its dimensionalites
pairwise_sq_dists = cdist(mus, x, 'euclidean')
print(pairwise_sq_dists)
K = np.exp(-pairwise_sq_dists / variance)
print(K)
print(np.sum(K)/npoints)
print("trying with scipy multivariate on more than one instance")
probs = sum(multivariate_normal(mean=x.reshape(-nD), cov=variance).pdf(mus))
probs /= npoints
print(probs)
import sys
sys.exit(0)
# N1 = multivariate_normal(mean=mu[0], cov=variance)
# prob1 = N1.pdf(x)
# N2 = multivariate_normal(mean=mu[1], cov=variance)
# prob2 = N2.pdf(x)
# print(prob1+prob2)
cov = Covariance.from_diagonal([variance, variance])
print(cov.covariance)
precision_matrix = np.asarray([[1/variance, 0],[0, 1/variance]])
print(Covariance.from_precision(precision_matrix).covariance)
print(np.linalg.inv(precision_matrix))
print(np.linalg.inv(cov.covariance))
print('-'*100)
nD=2
mu = np.asarray([[0.1,0.5]]) # centers
bandwidth = 0.10 # bandwidth and scale (standard deviation)
standard_deviation = bandwidth
variance = standard_deviation**2
mus = np.asarray([mu]).reshape(-1,nD)
kde = KernelDensity(bandwidth=bandwidth).fit(mus)
x = np.asarray([0.5,0.2])
prob = np.exp(kde.score_samples(np.asarray(x).reshape(-1,nD)))
print(prob)
probs = sum(multivariate_normal(mean=mu_i, cov=variance).pdf(x) for mu_i in mu) / len(mu)
probs = sum(multivariate_normal(mean=[0,0], cov=1).pdf((x-mu_i)/bandwidth) for mu_i in mu) / len(mu)
probs = sum(multivariate_normal(mean=[0,0], cov=variance).pdf(x-mu_i) for mu_i in mu) / len(mu)
print(probs)
# N1 = multivariate_normal(mean=mu[0], cov=variance)
# prob1 = N1.pdf(x)
# N2 = multivariate_normal(mean=mu[1], cov=variance)
# prob2 = N2.pdf(x)
# print(prob1+prob2)
h=0.1
D=4
print(np.sqrt((4**2) * (5**2)))
print(4*5)

View File

@ -0,0 +1,153 @@
import numpy as np
from scipy.stats import multivariate_normal
from scipy import optimize
def cauchy_schwarz_divergence_kde(L:list, Xte:np.ndarray, bandwidth:float, alpha:np.ndarray):
"""
:param L: a list of np.ndarray (instances x dimensions) with the Li being the instances of class i
:param Xte: an np.ndarray (instances x dimensions)
:param bandwidth: the bandwidth of the kernel
:param alpha: the mixture parameter
:return: the Cauchy-Schwarz divergence between the validation KDE mixture distribution (with mixture paramerter
alpha) and the test KDE distribution
"""
n = len(L) # number of classes
K, D = Xte.shape # number of test instances, and number of dimensions
Kinv = 1/K
# the lengths of each block
l = np.asarray([len(Li) for Li in L])
# contains the a_i / l_i
alpha_r = alpha / l
alpha2_r_sum = np.sum(alpha * alpha_r) # contains the sum_i a_i**2 / l_i
h = bandwidth
# we will only use the bandwidth (h) between two gaussians with covariance matrix a "scalar matrix" h**2
cov_mix_scalar = 2*h*h # corresponds to a bandwidth of sqrt(2)*h
# constant
C = ((2*np.pi)**(-D/2))*h**(-D)
Kernel = multivariate_normal(mean=np.zeros(D), cov=cov_mix_scalar)
K0 = Kernel.pdf(np.zeros(D))
def compute_block_E():
kernel_block_E = []
for i,Li in enumerate(L):
acc = 0
for x_ji in Li: #optimize...
for x_k in Xte: #optimize...
acc += Kernel.pdf(x_ji - x_k) #optimize...
kernel_block_E.append(acc)
return np.asarray(kernel_block_E)
def compute_block_F_hash():
# this can be computed entirely at training time
Khash = {}
for a in range(n):
for b in range(l[a]):
for i in range(n):
for j in range(l[i]): # this for, and index j, can be supressed and store the sum across j
Khash[(a,b,i,j)] = Kernel.pdf(L[i][j]-L[a][b])
return Khash
def compute_block_Ktest():
# this can be optimized in several ways, starting by computing only the lower diagonal triangle... and remove
# then the K0 which is not needed after that
acc = 0
for x_i in Xte:
for x_j in Xte:
acc += Kernel.pdf(x_i-x_j)
return acc
def compute_block_F():
F = 0
for a in range(n):
tmp_b = 0
for b in range(l[a]):
tmp_i = 0
for i in range(n):
tmp_j = 0
for j in range(l[i]):
tmp_j += Fh[(a, b, i, j)]
tmp_i += (alpha_r[i] * tmp_j)
tmp_b += tmp_i
F += (alpha_r[a] * tmp_b)
return F
E = compute_block_E()
Fh = compute_block_F_hash()
# Ktest = compute_block_Ktest()
F = compute_block_F()
C1 = K*Kinv*Kinv*C
C2 = 2 * np.sum([Kernel.pdf(Xte[k]-Xte[k_p]) for k in range(K) for k_p in range(k)])
partA = -np.log(Kinv * (alpha_r @ E))
partB = 0.5*np.log(C*alpha2_r_sum + F - (K0*alpha2_r_sum))
# partC = 0.5*np.log(Kinv) + 0.5*np.log(C + Kinv*Ktest - K0)
partC = 0.5*np.log(C1+C2)
Dcs = partA + partB + partC
return Dcs
L = [
np.asarray([
[-1,-1,-1]
]),
np.asarray([
[0,0,0],
]),
np.asarray([
[0,0,0.1],
[1,1,1],
[3,3,1],
]),
np.asarray([
[1,0,0]
]),
np.asarray([
[0,1,0]
])
]
Xte = np.asarray(
[[0,0,0],
[0,0,0],
[1,0,0],
[0,1,0]]
)
bandwidth=0.01
alpha=np.asarray([0, 2/4, 0, 1/4, 1/4])
div = cauchy_schwarz_divergence_kde(L, Xte, bandwidth, alpha)
print(div)
def divergence(alpha):
return cauchy_schwarz_divergence_kde(L, Xte, bandwidth, alpha)
# the initial point is set as the uniform distribution
n_classes = len(L)
uniform_distribution = np.full(fill_value=1 / n_classes, shape=(n_classes,))
# solutions are bounded to those contained in the unit-simplex
bounds = tuple((0, 1) for _ in range(n_classes)) # values in [0,1]
constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x)}) # values summing up to 1
#print('searching for alpha')
r = optimize.minimize(divergence, x0=uniform_distribution, method='SLSQP', bounds=bounds, constraints=constraints)
sol = r.x
for x in sol:
print(f'{x:.4f}')
print(cauchy_schwarz_divergence_kde(L, Xte, bandwidth, sol))

View File

@ -0,0 +1,17 @@
import pickle
dataset = 'lequa/T1A'
for metric in ['mae', 'mrae']:
method1 = 'KDEy-closed++'
# method2 = 'KDEy-DMhd3+'
# method1 = 'KDEy-ML'
# method2 = 'KDEy-ML+'
path = f'../results/{dataset}/{metric}/{method1}.hyper.pkl'
obj = pickle.load(open(path, 'rb'))
print(method1, obj)
# path = f'../results/{dataset}/{metric}/{method2}.hyper.pkl'
# obj = pickle.load(open(path, 'rb'))
# print(method2, obj)

View File

@ -0,0 +1,27 @@
from time import time
from sklearn.metrics.pairwise import rbf_kernel
import torch
import numpy as np
def rbf(X, Y):
return X @ Y.T
@torch.compile
def rbf_comp(X, Y):
return X @ Y.T
def measure_time(X, Y, func):
tinit = time()
func(X, Y)
tend = time()
print(f'took {tend-tinit:.3}s')
X = np.random.rand(1000, 100)
Y = np.random.rand(1000, 100)
measure_time(X, Y, rbf)
measure_time(X, Y, rbf_comp)

View File

@ -0,0 +1,20 @@
import pickle
import os
import quapy as qp
from quapy.model_selection import GridSearchQ
from quapy.protocol import UPP
toprint = []
for dataset in qp.datasets.UCI_MULTICLASS_DATASETS:
data = qp.datasets.fetch_UCIMulticlassDataset(dataset)
# model selection
train, test = data.train_test
toprint.append(f'{dataset}\t{len(train)}\t{len(test)}\t{data.n_classes}')
print()
for pr in toprint:
print(pr)

View File

@ -0,0 +1,6 @@
import pandas as pd
report = pd.read_csv('../results_lequa_mrae/KDEy-MLE.dataframe')
means = report.mean()
print(f'KDEy-MLE\tLeQua-T1B\t{means["mae"]:.5f}\t{means["mrae"]:.5f}\t{means["kld"]:.5f}\n')

View File

@ -0,0 +1,155 @@
import numpy as np
from scipy.stats import norm
EPS=1e-8
LARGE_NUM=1000000
TRIALS=LARGE_NUM
# calculamos: D(p||q)
# se asume:
# q = distribución de referencia (en nuestro caso: el mixture de KDEs en training)
# p = distribución (en nuestro caso: KDE del test)
# N = TRIALS el número de trials para montecarlo
# n = numero de clases, en este ejemplo n=3
# ejemplo de f-generator function: Squared Hellinger Distance
def hd2(u):
v = (np.sqrt(u)-1)
return v*v
# esta funcion estima la divergencia entre dos mixtures (univariates)
def integrate(p, q, f=hd2, resolution=TRIALS, epsilon=EPS):
xs = np.linspace(-50, 50, resolution)
ps = p.pdf(xs)+epsilon
qs = q.pdf(xs)+epsilon
rs = ps/qs
base = xs[1]-xs[0]
return np.sum(base * f(rs) * qs)
# esta es la aproximación de montecarlo según está en el artículo
# es decir, sampleando la q, y haciendo la media (1/N). En el artículo
# en realidad se "presamplean" N por cada clase y luego se eligen los que
# corresponda a cada clase. Aquí directamente sampleamos solo los
# que correspondan.
def montecarlo(p, q, f=hd2, trials=TRIALS, epsilon=EPS):
xs = q.rvs(trials)
ps = p.pdf(xs)+epsilon
qs = q.pdf(xs)+epsilon
N = trials
return (1/N)*np.sum(f(ps/qs))
# esta es la aproximación de montecarlo asumiendo que vayamos con todos los N*n
# puntos (es decir, si hubieramos sampleado de una "q" en la que todas las clases
# eran igualmente probables, 1/3) y luego reescalamos los pesos con "importance
# weighting" <-- esto no está en el artículo, pero me gusta más que lo que hay
# porque desaparece el paso de "seleccionar los que corresponden por clase"
def montecarlo_importancesampling(p, q, r, f=hd2, trials=TRIALS, epsilon=EPS):
xs = r.rvs(trials) # <- distribución de referencia (aquí: class weight uniforme)
rs = r.pdf(xs)
ps = p.pdf(xs)+epsilon
qs = q.pdf(xs)+epsilon
N = trials
importance_weight = qs/rs
return (1/N)*np.sum(f(ps/qs)*(importance_weight))
# he intentado implementar la variante que propne Juanjo pero creo que no la he
# entendido bien. Tal vez haya que reescalar el peso por 1/3 o algo así, pero no
# doy con la tecla...
def montecarlo_classweight(p, q, f=hd2, trials=TRIALS, epsilon=EPS):
xs_1 = q.rvs_fromclass(0, trials)
xs_2 = q.rvs_fromclass(1, trials)
xs_3 = q.rvs_fromclass(2, trials)
xs = np.concatenate([xs_1, xs_2, xs_3])
weights = np.asarray([[alpha_i]*trials for alpha_i in q.alphas]).flatten()
ps = p.pdf(xs)+epsilon
qs = q.pdf(xs)+epsilon
N = trials
n = q.n
return (1/(N*n))*np.sum(weights*f(ps/qs))
class Q:
"""
Esta clase es un mixture de gausianas.
:param locs: medias, una por clase
:param scales: stds, una por clase
:param alphas: peso, uno por clase
"""
def __init__(self, locs, scales, alphas):
self.qs = []
for loc, scale in zip(locs, scales):
self.qs.append(norm(loc=loc, scale=scale))
assert np.isclose(np.sum(alphas), 1), 'alphas do not sum up to 1!'
self.alphas = np.asarray(alphas) / np.sum(alphas)
def pdf(self, xs):
q_xs = np.vstack([
q_i.pdf(xs) * alpha_i for q_i, alpha_i in zip(self.qs, self.alphas)
])
v = q_xs.sum(axis=0)
if len(v)==1:
v = v[0]
return v
@property
def n(self):
return len(self.alphas)
def rvs_fromclass(self, inclass, trials):
return self.qs[inclass].rvs(trials)
def rvs(self, trials):
variates = []
added = 0
for i, (q_i, alpha_i) in enumerate(zip(self.qs, self.alphas)):
trials_i = int(trials*alpha_i) if (i < self.n-1) else trials-added
variates_i = q_i.rvs(trials_i)
variates.append(variates_i)
added += len(variates_i)
return np.concatenate(variates)
@property
def locs(self):
return np.asarray([x.kwds['loc'] for x in self.qs])
@property
def scales(self):
return np.asarray([x.kwds['scale'] for x in self.qs])
@classmethod
def change_priors(cls, q, alphas):
assert len(alphas)==len(q.alphas)
return Q(locs=q.locs, scales=q.scales, alphas=alphas)
# distribucion de test
p = Q(locs=[1, 1.5], scales=[1,1], alphas=[0.8, 0.2])
# distribucion de training (mixture por clase)
q = Q(locs=[0, 0.5, 1.2], scales=[1,1,1.5], alphas=[0.1, 0.2, 0.7])
# distribucion de referencia (mixture copia de q, con pesos de clase uniformes)
r = Q.change_priors(q, alphas=[1/3, 1/3, 1/3])
integral_approx = integrate(p, q)
montecarlo_approx = montecarlo(p, q)
montecarlo_approx2 = montecarlo_importancesampling(p, q, r)
montecarlo_approx3 = montecarlo_classweight(p, q)
print(f'{integral_approx=:.10f}')
print()
print(f'{montecarlo_approx=:.10f}, error={np.abs(integral_approx-montecarlo_approx):.10f}')
print()
print(f'{montecarlo_approx2=:.10f}, error={np.abs(integral_approx-montecarlo_approx2):.10f}')
print()
print(f'{montecarlo_approx3=:.10f}, error={np.abs(integral_approx-montecarlo_approx3):.10f}')