QuaPy/experimental_non_aggregative/method_dxs.py

209 lines
7.6 KiB
Python

from scipy.sparse import issparse
from sklearn.decomposition import TruncatedSVD
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
import quapy as qp
from data import LabelledCollection
import numpy as np
from experimental_non_aggregative.custom_vectorizers import *
from method._kdey import KDEBase
from protocol import APP
from quapy.method.aggregative import HDy, DistributionMatchingY
from quapy.method.base import BaseQuantifier
from scipy import optimize
import pandas as pd
import quapy.functional as F
# TODO: explore the bernoulli (term presence/absence) variant
# TODO: explore the multinomial (term frequency) variant
# TODO: explore the multinomial + length normalization variant
# TODO: consolidate the TSR-variant (e.g., using information gain) variant;
# - works better with the idf?
# - works better with length normalization?
# - etc
class DxS(BaseQuantifier):
def __init__(self, vectorizer=None, divergence='topsoe'):
self.vectorizer = vectorizer
self.divergence = divergence
# def __as_distribution(self, instances):
# return np.asarray(instances.sum(axis=0) / instances.sum()).flatten()
def __as_distribution(self, instances):
dist = instances.mean(axis=0)
return np.asarray(dist).flatten()
def fit(self, text_instances, labels):
classes = np.unique(labels)
if self.vectorizer is not None:
text_instances = self.vectorizer.fit_transform(text_instances, y=labels)
distributions = []
for class_i in classes:
distributions.append(self.__as_distribution(text_instances[labels == class_i]))
self.validation_distribution = np.asarray(distributions)
return self
def predict(self, text_instances):
if self.vectorizer is not None:
text_instances = self.vectorizer.transform(text_instances)
test_distribution = self.__as_distribution(text_instances)
divergence = qp.functional.get_divergence(self.divergence)
n_classes, n_feats = self.validation_distribution.shape
def match(prev):
prev = np.expand_dims(prev, axis=0)
mixture_distribution = (prev @ self.validation_distribution).flatten()
return divergence(test_distribution, mixture_distribution)
# the initial point is set as the uniform distribution
uniform_distribution = np.full(fill_value=1 / n_classes, shape=(n_classes,))
# solutions are bounded to those contained in the unit-simplex
bounds = tuple((0, 1) for x in range(n_classes)) # values in [0,1]
constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x)}) # values summing up to 1
r = optimize.minimize(match, x0=uniform_distribution, method='SLSQP', bounds=bounds, constraints=constraints)
return r.x
class KDExML(BaseQuantifier, KDEBase):
def __init__(self, bandwidth=0.1, standardize=False):
self._check_bandwidth(bandwidth)
self.bandwidth = bandwidth
self.standardize = standardize
def fit(self, X, y):
classes = sorted(np.unique(y))
if self.standardize:
self.scaler = StandardScaler()
X = self.scaler.fit_transform(X)
if issparse(X):
X = X.toarray()
self.mix_densities = self.get_mixture_components(X, y, classes, self.bandwidth)
return self
def predict(self, X):
"""
Searches for the mixture model parameter (the sought prevalence values) that maximizes the likelihood
of the data (i.e., that minimizes the negative log-likelihood)
:param X: instances in the sample
:return: a vector of class prevalence estimates
"""
epsilon = 1e-10
if issparse(X):
X = X.toarray()
n_classes = len(self.mix_densities)
if self.standardize:
X = self.scaler.transform(X)
test_densities = [self.pdf(kde_i, X) for kde_i in self.mix_densities]
def neg_loglikelihood(prev):
test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip (prev, test_densities))
test_loglikelihood = np.log(test_mixture_likelihood + epsilon)
return -np.sum(test_loglikelihood)
return F.optim_minimize(neg_loglikelihood, n_classes)
if __name__ == '__main__':
qp.environ['SAMPLE_SIZE'] = 250
qp.environ['N_JOBS'] = -1
min_df = 10
# dataset = 'imdb'
repeats = 10
error = 'mae'
div = 'topsoe'
# generates tuples (dataset, method, method_name)
# (the dataset is needed for methods that process the dataset differently)
def gen_methods():
for dataset in qp.datasets.REVIEWS_SENTIMENT_DATASETS:
data = qp.datasets.fetch_reviews(dataset, tfidf=False)
# bernoulli_vectorizer = CountVectorizer(min_df=min_df, binary=True)
# dxs = DxS(divergence=div, vectorizer=bernoulli_vectorizer)
# yield data, dxs, 'DxS-Bernoulli'
#
# multinomial_vectorizer = CountVectorizer(min_df=min_df, binary=False)
# dxs = DxS(divergence=div, vectorizer=multinomial_vectorizer)
# yield data, dxs, 'DxS-multinomial'
#
# tf_vectorizer = TfidfVectorizer(sublinear_tf=False, use_idf=False, min_df=min_df, norm=None)
# dxs = DxS(divergence=div, vectorizer=tf_vectorizer)
# yield data, dxs, 'DxS-TF'
#
# logtf_vectorizer = TfidfVectorizer(sublinear_tf=True, use_idf=False, min_df=min_df, norm=None)
# dxs = DxS(divergence=div, vectorizer=logtf_vectorizer)
# yield data, dxs, 'DxS-logTF'
#
# tfidf_vectorizer = TfidfVectorizer(use_idf=True, min_df=min_df, norm=None)
# dxs = DxS(divergence=div, vectorizer=tfidf_vectorizer)
# yield data, dxs, 'DxS-TFIDF'
#
# tfidf_vectorizer = TfidfVectorizer(use_idf=True, min_df=min_df, norm='l2')
# dxs = DxS(divergence=div, vectorizer=tfidf_vectorizer)
# yield data, dxs, 'DxS-TFIDF-l2'
tsr_vectorizer = TSRweighting(tsr_function=information_gain, min_df=min_df, norm='l2')
dxs = DxS(divergence=div, vectorizer=tsr_vectorizer)
yield data, dxs, 'DxS-TFTSR-l2'
data = qp.datasets.fetch_reviews(dataset, tfidf=True, min_df=min_df)
kdex = KDExML()
reduction = TruncatedSVD(n_components=100, random_state=0)
red_data = qp.data.preprocessing.instance_transformation(data, transformer=reduction, inplace=False)
yield red_data, kdex, 'KDEx'
hdy = HDy(LogisticRegression())
yield data, hdy, 'HDy'
# dm = DistributionMatchingY(LogisticRegression(), divergence=div, nbins=5)
# yield data, dm, 'DM-5b'
#
# dm = DistributionMatchingY(LogisticRegression(), divergence=div, nbins=10)
# yield data, dm, 'DM-10b'
result_path = 'results.csv'
with open(result_path, 'wt') as csv:
csv.write(f'Method\tDataset\tMAE\tMRAE\n')
for data, quantifier, quant_name in gen_methods():
quantifier.fit(*data.training.Xy)
report = qp.evaluation.evaluation_report(quantifier, APP(data.test, repeats=repeats), error_metrics=['mae','mrae'], verbose=True)
means = report.mean(numeric_only=True)
csv.write(f'{quant_name}\t{data.name}\t{means["mae"]:.5f}\t{means["mrae"]:.5f}\n')
df = pd.read_csv(result_path, sep='\t')
# print(df)
pv = df.pivot_table(index='Method', columns="Dataset", values=["MAE", "MRAE"])
print(pv)