adding kdex

This commit is contained in:
Alejandro Moreo Fernandez 2025-10-23 14:12:39 +02:00
parent 41baeb78ca
commit f227ed2f60
2 changed files with 89 additions and 30 deletions

View File

@ -1,16 +1,21 @@
from scipy.sparse import issparse
from sklearn.decomposition import TruncatedSVD
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.linear_model import LogisticRegression from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
import quapy as qp import quapy as qp
from data import LabelledCollection from data import LabelledCollection
import numpy as np import numpy as np
from experimental_non_aggregative.custom_vectorizers import * from experimental_non_aggregative.custom_vectorizers import *
from method._kdey import KDEBase
from protocol import APP from protocol import APP
from quapy.method.aggregative import HDy, DistributionMatchingY from quapy.method.aggregative import HDy, DistributionMatchingY
from quapy.method.base import BaseQuantifier from quapy.method.base import BaseQuantifier
from scipy import optimize from scipy import optimize
import pandas as pd import pandas as pd
import quapy.functional as F
# TODO: explore the bernoulli (term presence/absence) variant # TODO: explore the bernoulli (term presence/absence) variant
@ -72,6 +77,51 @@ class DxS(BaseQuantifier):
class KDExML(BaseQuantifier, KDEBase):
def __init__(self, bandwidth=0.1, standardize=False):
self._check_bandwidth(bandwidth)
self.bandwidth = bandwidth
self.standardize = standardize
def fit(self, X, y):
classes = sorted(np.unique(y))
if self.standardize:
self.scaler = StandardScaler()
X = self.scaler.fit_transform(X)
if issparse(X):
X = X.toarray()
self.mix_densities = self.get_mixture_components(X, y, classes, self.bandwidth)
return self
def predict(self, X):
"""
Searches for the mixture model parameter (the sought prevalence values) that maximizes the likelihood
of the data (i.e., that minimizes the negative log-likelihood)
:param X: instances in the sample
:return: a vector of class prevalence estimates
"""
epsilon = 1e-10
if issparse(X):
X = X.toarray()
n_classes = len(self.mix_densities)
if self.standardize:
X = self.scaler.transform(X)
test_densities = [self.pdf(kde_i, X) for kde_i in self.mix_densities]
def neg_loglikelihood(prev):
test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip (prev, test_densities))
test_loglikelihood = np.log(test_mixture_likelihood + epsilon)
return -np.sum(test_loglikelihood)
return F.optim_minimize(neg_loglikelihood, n_classes)
if __name__ == '__main__': if __name__ == '__main__':
qp.environ['SAMPLE_SIZE'] = 250 qp.environ['SAMPLE_SIZE'] = 250
@ -91,43 +141,51 @@ if __name__ == '__main__':
data = qp.datasets.fetch_reviews(dataset, tfidf=False) data = qp.datasets.fetch_reviews(dataset, tfidf=False)
bernoulli_vectorizer = CountVectorizer(min_df=min_df, binary=True) # bernoulli_vectorizer = CountVectorizer(min_df=min_df, binary=True)
dxs = DxS(divergence=div, vectorizer=bernoulli_vectorizer) # dxs = DxS(divergence=div, vectorizer=bernoulli_vectorizer)
yield data, dxs, 'DxS-Bernoulli' # yield data, dxs, 'DxS-Bernoulli'
#
multinomial_vectorizer = CountVectorizer(min_df=min_df, binary=False) # multinomial_vectorizer = CountVectorizer(min_df=min_df, binary=False)
dxs = DxS(divergence=div, vectorizer=multinomial_vectorizer) # dxs = DxS(divergence=div, vectorizer=multinomial_vectorizer)
yield data, dxs, 'DxS-multinomial' # yield data, dxs, 'DxS-multinomial'
#
tf_vectorizer = TfidfVectorizer(sublinear_tf=False, use_idf=False, min_df=min_df, norm=None) # tf_vectorizer = TfidfVectorizer(sublinear_tf=False, use_idf=False, min_df=min_df, norm=None)
dxs = DxS(divergence=div, vectorizer=tf_vectorizer) # dxs = DxS(divergence=div, vectorizer=tf_vectorizer)
yield data, dxs, 'DxS-TF' # yield data, dxs, 'DxS-TF'
#
logtf_vectorizer = TfidfVectorizer(sublinear_tf=True, use_idf=False, min_df=min_df, norm=None) # logtf_vectorizer = TfidfVectorizer(sublinear_tf=True, use_idf=False, min_df=min_df, norm=None)
dxs = DxS(divergence=div, vectorizer=logtf_vectorizer) # dxs = DxS(divergence=div, vectorizer=logtf_vectorizer)
yield data, dxs, 'DxS-logTF' # yield data, dxs, 'DxS-logTF'
#
tfidf_vectorizer = TfidfVectorizer(use_idf=True, min_df=min_df, norm=None) # tfidf_vectorizer = TfidfVectorizer(use_idf=True, min_df=min_df, norm=None)
dxs = DxS(divergence=div, vectorizer=tfidf_vectorizer) # dxs = DxS(divergence=div, vectorizer=tfidf_vectorizer)
yield data, dxs, 'DxS-TFIDF' # yield data, dxs, 'DxS-TFIDF'
#
tfidf_vectorizer = TfidfVectorizer(use_idf=True, min_df=min_df, norm='l2') # tfidf_vectorizer = TfidfVectorizer(use_idf=True, min_df=min_df, norm='l2')
dxs = DxS(divergence=div, vectorizer=tfidf_vectorizer) # dxs = DxS(divergence=div, vectorizer=tfidf_vectorizer)
yield data, dxs, 'DxS-TFIDF-l2' # yield data, dxs, 'DxS-TFIDF-l2'
tsr_vectorizer = TSRweighting(tsr_function=information_gain, min_df=min_df, norm='l2') tsr_vectorizer = TSRweighting(tsr_function=information_gain, min_df=min_df, norm='l2')
dxs = DxS(divergence=div, vectorizer=tsr_vectorizer) dxs = DxS(divergence=div, vectorizer=tsr_vectorizer)
yield data, dxs, 'DxS-TFTSR-l2' yield data, dxs, 'DxS-TFTSR-l2'
data = qp.datasets.fetch_reviews(dataset, tfidf=True, min_df=min_df) data = qp.datasets.fetch_reviews(dataset, tfidf=True, min_df=min_df)
kdex = KDExML()
reduction = TruncatedSVD(n_components=100, random_state=0)
red_data = qp.data.preprocessing.instance_transformation(data, transformer=reduction, inplace=False)
yield red_data, kdex, 'KDEx'
hdy = HDy(LogisticRegression()) hdy = HDy(LogisticRegression())
yield data, hdy, 'HDy' yield data, hdy, 'HDy'
dm = DistributionMatchingY(LogisticRegression(), divergence=div, nbins=5) # dm = DistributionMatchingY(LogisticRegression(), divergence=div, nbins=5)
yield data, dm, 'DM-5b' # yield data, dm, 'DM-5b'
#
# dm = DistributionMatchingY(LogisticRegression(), divergence=div, nbins=10)
# yield data, dm, 'DM-10b'
dm = DistributionMatchingY(LogisticRegression(), divergence=div, nbins=10)
yield data, dm, 'DM-10b'
result_path = 'results.csv' result_path = 'results.csv'

View File

@ -24,6 +24,7 @@ def instance_transformation(dataset:Dataset, transformer, inplace=False):
""" """
training_transformed = transformer.fit_transform(*dataset.training.Xy) training_transformed = transformer.fit_transform(*dataset.training.Xy)
test_transformed = transformer.transform(dataset.test.X) test_transformed = transformer.transform(dataset.test.X)
orig_name = dataset.name
if inplace: if inplace:
dataset.training = LabelledCollection(training_transformed, dataset.training.labels, dataset.classes_) dataset.training = LabelledCollection(training_transformed, dataset.training.labels, dataset.classes_)
@ -34,10 +35,10 @@ def instance_transformation(dataset:Dataset, transformer, inplace=False):
else: else:
training = LabelledCollection(training_transformed, dataset.training.labels.copy(), dataset.classes_) training = LabelledCollection(training_transformed, dataset.training.labels.copy(), dataset.classes_)
test = LabelledCollection(test_transformed, dataset.test.labels.copy(), dataset.classes_) test = LabelledCollection(test_transformed, dataset.test.labels.copy(), dataset.classes_)
vocab = None
if hasattr(transformer, 'vocabulary_'): if hasattr(transformer, 'vocabulary_'):
return Dataset(training, test, transformer.vocabulary_) vocab = transformer.vocabulary_
else: return Dataset(training, test, vocabulary=vocab, name=orig_name)
return Dataset(training, test)
def text2tfidf(dataset:Dataset, min_df=3, sublinear_tf=True, inplace=False, **kwargs): def text2tfidf(dataset:Dataset, min_df=3, sublinear_tf=True, inplace=False, **kwargs):