cleaning new methods from master

This commit is contained in:
Alejandro Moreo Fernandez 2021-04-28 11:28:13 +02:00
parent 1d12e96867
commit 3d544135f1
14 changed files with 0 additions and 1539 deletions

View File

@ -1,35 +0,0 @@
import numpy as np
import quapy as qp
import settings
import os
import pickle
from glob import glob
import itertools
import pathlib
qp.environ['SAMPLE_SIZE'] = settings.SAMPLE_SIZE
resultdir = './results'
methods = ['*']
def evaluate_results(methods, datasets, error_name):
results_str = []
all = []
error = qp.error.from_name(error_name)
for method, dataset in itertools.product(methods, datasets):
for experiment in glob(f'{resultdir}/{dataset}-{method}-{error_name}.pkl'):
true_prevalences, estim_prevalences, tr_prev, te_prev, te_prev_estim, best_params = \
pickle.load(open(experiment, 'rb'))
result = error(true_prevalences, estim_prevalences)
string = f'{pathlib.Path(experiment).name}: {result:.3f}'
results_str.append(string)
all.append(result)
results_str = sorted(results_str)
for r in results_str:
print(r)
print()
print(f'Ave: {np.mean(all):.3f}')
evaluate_results(methods=['*'], datasets=['*'], error_name='mae')

View File

@ -1,223 +0,0 @@
from sklearn.linear_model import LogisticRegression
import quapy as qp
from NewMethods.fgsld.fgsld_quantifiers import FakeFGLSD
from classification.methods import PCALR
from method.meta import QuaNet
from method.non_aggregative import MaximumLikelihoodPrevalenceEstimation
from methods import *
from quapy.method.aggregative import CC, ACC, PCC, PACC, EMQ, OneVsAll, SVMQ, SVMKLD, SVMNKLD, SVMAE, SVMRAE, HDy
from quapy.method.meta import EPACC, EEMQ
import quapy.functional as F
import numpy as np
import os
import pickle
import itertools
from joblib import Parallel, delayed
import settings
import argparse
import torch
import shutil
qp.environ['SAMPLE_SIZE'] = settings.SAMPLE_SIZE
def newLR():
return LogisticRegression(max_iter=1000, solver='lbfgs', n_jobs=-1)
__C_range = np.logspace(-4, 5, 10)
lr_params = {'C': __C_range, 'class_weight': [None, 'balanced']}
svmperf_params = {'C': __C_range}
def experimental_models():
def newLR():
return LogisticRegression(max_iter=1000, solver='lbfgs', n_jobs=-1)
__C_range = np.logspace(-4, 5, 10)
lr_params = {'C': __C_range, 'class_weight': [None, 'balanced']}
svmperf_params = {'C': __C_range}
#yield 'paccsld', PACCSLD(newLR()), lr_params
# yield 'hdysld', OneVsAll(HDySLD(newLR())), lr_params # <-- promising!
#yield 'PACC(5)', PACC(newLR(), val_split=5), {}
#yield 'PACC(10)', PACC(newLR(), val_split=10), {}
yield 'FGSLD(3)', FakeFGLSD(newLR(), nbins=3, isomerous=False, recompute_bins=True), {}
yield 'FGSLD(5)', FakeFGLSD(newLR(), nbins=5, isomerous=False, recompute_bins=True), {}
def classic_models():
# methods tested in Gao & Sebastiani 2016
yield 'cc', CC(newLR()), lr_params
yield 'acc', ACC(newLR()), lr_params
yield 'pcc', PCC(newLR()), lr_params
yield 'pacc', PACC(newLR()), lr_params
yield 'sld', EMQ(newLR()), lr_params
yield 'svmq', OneVsAll(SVMQ(args.svmperfpath)), svmperf_params
yield 'svmkld', OneVsAll(SVMKLD(args.svmperfpath)), svmperf_params
yield 'svmnkld', OneVsAll(SVMNKLD(args.svmperfpath)), svmperf_params
# methods added
yield 'svmmae', OneVsAll(SVMAE(args.svmperfpath)), svmperf_params
yield 'svmmrae', OneVsAll(SVMRAE(args.svmperfpath)), svmperf_params
yield 'hdy', OneVsAll(HDy(newLR())), lr_params
def cuda_models():
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Running QuaNet in {device}')
learner = PCALR(**newLR().get_params())
yield 'quanet', QuaNet(learner, settings.SAMPLE_SIZE, checkpointdir=args.checkpointdir, device=device), lr_params
def ensembles():
param_mod_sel = {
'sample_size': settings.SAMPLE_SIZE,
'n_prevpoints': 21,
'n_repetitions': 5,
'verbose': False
}
common={
'max_sample_size': 1000,
'n_jobs': settings.ENSEMBLE_N_JOBS,
'param_grid': lr_params,
'param_mod_sel': param_mod_sel,
'val_split': 0.4,
'min_pos': 10
}
# hyperparameters will be evaluated within each quantifier of the ensemble, and so the typical model selection
# will be skipped (by setting hyperparameters to None)
hyper_none = None
#yield 'epaccmaeptr', EPACC(newLR(), optim='mae', policy='ptr', **common), hyper_none
yield 'epaccmaemae1k', EPACC(newLR(), optim='mae', policy='mae', **common), hyper_none
# yield 'esldmaeptr', EEMQ(newLR(), optim='mae', policy='ptr', **common), hyper_none
# yield 'esldmaemae', EEMQ(newLR(), optim='mae', policy='mae', **common), hyper_none
#yield 'epaccmraeptr', EPACC(newLR(), optim='mrae', policy='ptr', **common), hyper_none
#yield 'epaccmraemrae', EPACC(newLR(), optim='mrae', policy='mrae', **common), hyper_none
#yield 'esldmraeptr', EEMQ(newLR(), optim='mrae', policy='ptr', **common), hyper_none
#yield 'esldmraemrae', EEMQ(newLR(), optim='mrae', policy='mrae', **common), hyper_none
def evaluate_experiment(true_prevalences, estim_prevalences):
print('\nEvaluation Metrics:\n'+'='*22)
for eval_measure in [qp.error.mae, qp.error.mrae]:
err = eval_measure(true_prevalences, estim_prevalences)
print(f'\t{eval_measure.__name__}={err:.4f}')
print()
def evaluate_method_point_test(true_prev, estim_prev):
print('\nPoint-Test evaluation:\n' + '=' * 22)
print(f'true-prev={F.strprev(true_prev)}, estim-prev={F.strprev(estim_prev)}')
for eval_measure in [qp.error.mae, qp.error.mrae]:
err = eval_measure(true_prev, estim_prev)
print(f'\t{eval_measure.__name__}={err:.4f}')
def result_path(path, dataset_name, model_name, optim_loss):
return os.path.join(path, f'{dataset_name}-{model_name}-{optim_loss}.pkl')
def is_already_computed(dataset_name, model_name, optim_loss):
if dataset_name=='semeval':
check_datasets = ['semeval13', 'semeval14', 'semeval15']
else:
check_datasets = [dataset_name]
return all(os.path.exists(result_path(args.results, name, model_name, optim_loss)) for name in check_datasets)
def save_results(dataset_name, model_name, optim_loss, *results):
rpath = result_path(args.results, dataset_name, model_name, optim_loss)
qp.util.create_parent_dir(rpath)
with open(rpath, 'wb') as foo:
pickle.dump(tuple(results), foo, pickle.HIGHEST_PROTOCOL)
def run(experiment):
optim_loss, dataset_name, (model_name, model, hyperparams) = experiment
if is_already_computed(dataset_name, model_name, optim_loss=optim_loss):
print(f'result for dataset={dataset_name} model={model_name} loss={optim_loss} already computed.')
return
elif (optim_loss == 'mae' and 'mrae' in model_name) or (optim_loss=='mrae' and 'mae' in model_name):
print(f'skipping model={model_name} for optim_loss={optim_loss}')
return
else:
print(f'running dataset={dataset_name} model={model_name} loss={optim_loss}')
benchmark_devel = qp.datasets.fetch_twitter(dataset_name, for_model_selection=True, min_df=5, pickle=True)
benchmark_devel.stats()
# model selection (hyperparameter optimization for a quantification-oriented loss)
if hyperparams is not None:
model_selection = qp.model_selection.GridSearchQ(
model,
param_grid=hyperparams,
sample_size=settings.SAMPLE_SIZE,
n_prevpoints=21,
n_repetitions=5,
error=optim_loss,
refit=False,
timeout=60*60,
verbose=True
)
model_selection.fit(benchmark_devel.training, benchmark_devel.test)
model = model_selection.best_model()
best_params = model_selection.best_params_
else:
best_params = {}
# model evaluation
test_names = [dataset_name] if dataset_name != 'semeval' else ['semeval13', 'semeval14', 'semeval15']
for test_no, test_name in enumerate(test_names):
benchmark_eval = qp.datasets.fetch_twitter(test_name, for_model_selection=False, min_df=5, pickle=True)
if test_no == 0:
print('fitting the selected model')
# fits the model only the first time
model.fit(benchmark_eval.training)
true_prevalences, estim_prevalences = qp.evaluation.artificial_sampling_prediction(
model,
test=benchmark_eval.test,
sample_size=settings.SAMPLE_SIZE,
n_prevpoints=21,
n_repetitions=25,
n_jobs=-1 if isinstance(model, qp.method.meta.Ensemble) else 1
)
test_estim_prevalence = model.quantify(benchmark_eval.test.instances)
test_true_prevalence = benchmark_eval.test.prevalence()
evaluate_experiment(true_prevalences, estim_prevalences)
evaluate_method_point_test(test_true_prevalence, test_estim_prevalence)
save_results(test_name, model_name, optim_loss,
true_prevalences, estim_prevalences,
benchmark_eval.training.prevalence(), test_true_prevalence, test_estim_prevalence,
best_params)
#if isinstance(model, QuaNet):
#model.clean_checkpoint_dir()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Run experiments for Tweeter Sentiment Quantification')
parser.add_argument('results', metavar='RESULT_PATH', type=str,
help='path to the directory where to store the results')
parser.add_argument('--svmperfpath', metavar='SVMPERF_PATH', type=str, default='./svm_perf_quantification',
help='path to the directory with svmperf')
parser.add_argument('--checkpointdir', metavar='PATH', type=str, default='./checkpoint',
help='path to the directory where to dump QuaNet checkpoints')
args = parser.parse_args()
print(f'Result folder: {args.results}')
np.random.seed(0)
optim_losses = ['mae']
datasets = qp.datasets.TWITTER_SENTIMENT_DATASETS_TRAIN
qp.util.parallel(run, itertools.product(optim_losses, datasets, experimental_models()), n_jobs=settings.N_JOBS)
# qp.util.parallel(run, itertools.product(optim_losses, datasets, classic_models()), n_jobs=settings.N_JOBS)
# qp.util.parallel(run, itertools.product(optim_losses, datasets, cuda_models()), n_jobs=settings.CUDA_N_JOBS)
# qp.util.parallel(run, itertools.product(optim_losses, datasets, ensembles()), n_jobs=1)

View File

@ -1,116 +0,0 @@
import numpy as np
import logging
from collections import namedtuple
from sklearn.metrics import brier_score_loss
from sklearn.preprocessing import MultiLabelBinarizer
from NewMethods.fgsld.metrics import smoothmacroF1, isometric_brier_decomposition, isomerous_brier_decomposition
History = namedtuple('History', ('posteriors', 'priors', 'y', 'iteration', 'stopping_criterium'))
MeasureSingleHistory = namedtuple('MeasureSingleHistory', (
'soft_acc', 'soft_f1', 'abs_errors', 'test_priors', 'train_priors', 'predict_priors', 'brier',
'isometric_ref_loss', 'isometric_cal_loss', 'isomerous_ref_loss', 'isomerous_cal_loss'
))
def get_measures_single_history(history: History, multi_class) -> MeasureSingleHistory:
y = history.y
y_bin = MultiLabelBinarizer(classes=list(range(history.posteriors.shape[1]))).fit_transform(np.expand_dims(y, 1))
soft_acc = soft_accuracy(y, history.posteriors)
f1 = smoothmacroF1(y_bin, history.posteriors)
if multi_class:
test_priors = np.mean(y_bin, 0)
abs_errors = abs(test_priors - history.priors)
train_priors = history.priors
predict_priors = np.mean(history.posteriors, 0)
brier = 0
else:
test_priors = np.mean(y_bin, 0)[1]
abs_errors = abs(test_priors - history.priors[1])
train_priors = history.priors[1]
predict_priors = np.mean(history.posteriors[:, 1])
brier = brier_score_loss(y, history.posteriors[:, 1])
isometric_cal_loss, isometric_ref_loss = isometric_brier_decomposition(y, history.posteriors)
isomerous_em_cal_loss, isomerous_em_ref_loss = isomerous_brier_decomposition(y, history.posteriors)
return MeasureSingleHistory(
soft_acc, f1, abs_errors, test_priors, train_priors, predict_priors, brier, isometric_ref_loss,
isometric_cal_loss, isomerous_em_ref_loss, isomerous_em_cal_loss
)
def soft_accuracy(y, posteriors):
return sum(posteriors[y == c][:, c].sum() for c in range(posteriors.shape[1])) / posteriors.sum()
def soft_f1(y, posteriors):
cont_matrix = {
'TPM': posteriors[y == 1][:, 1].sum(),
'TNM': posteriors[y == 0][:, 0].sum(),
'FPM': posteriors[y == 0][:, 1].sum(),
'FNM': posteriors[y == 1][:, 0].sum()
}
precision = cont_matrix['TPM'] / (cont_matrix['TPM'] + cont_matrix['FPM'])
recall = cont_matrix['TPM'] / (cont_matrix['TPM'] + cont_matrix['FNM'])
return 2 * (precision * recall / (precision + recall))
def em(y, posteriors_zero, priors_zero, epsilon=1e-6, multi_class=False, return_posteriors_hist=False):
"""
Implements the prior correction method based on EM presented in:
"Adjusting the Outputs of a Classifier to New a Priori Probabilities: A Simple Procedure"
Saerens, Latinne and Decaestecker, 2002
http://www.isys.ucl.ac.be/staff/marco/Publications/Saerens2002a.pdf
:param y: true labels of test items, to measure accuracy, precision and recall.
:param posteriors_zero: posterior probabilities on test items, as returned by a classifier. A 2D-array with shape
Ø(items, classes).
:param priors_zero: prior probabilities measured on training set.
:param epsilon: stopping threshold.
:param multi_class: whether the algorithm is running in a multi-label multi-class context or not.
:param return_posteriors_hist: whether posteriors for each iteration should be returned or not. If true, the returned
posteriors_s will actually be the list of posteriors for every iteration.
:return: posteriors_s, priors_s, history: final adjusted posteriors, final adjusted priors, a list of length s
where each element is a tuple with the step counter, the current priors (as list), the stopping criterium value,
accuracy, precision and recall.
"""
s = 0
priors_s = np.copy(priors_zero)
posteriors_s = np.copy(posteriors_zero)
if return_posteriors_hist:
posteriors_hist = [posteriors_s.copy()]
val = 2 * epsilon
history = list()
history.append(get_measures_single_history(History(posteriors_zero, priors_zero, y, s, 1), multi_class))
while not val < epsilon and s < 999:
# M step
priors_s_minus_one = priors_s.copy()
priors_s = posteriors_s.mean(0)
# E step
ratios = priors_s / priors_zero
denominators = 0
for c in range(priors_zero.shape[0]):
denominators += ratios[c] * posteriors_zero[:, c]
for c in range(priors_zero.shape[0]):
posteriors_s[:, c] = ratios[c] * posteriors_zero[:, c] / denominators
# check for stop
val = 0
for i in range(len(priors_s_minus_one)):
val += abs(priors_s_minus_one[i] - priors_s[i])
logging.debug(f"Em iteration: {s}; Val: {val}")
s += 1
if return_posteriors_hist:
posteriors_hist.append(posteriors_s.copy())
history.append(get_measures_single_history(History(posteriors_s, priors_s, y, s, val), multi_class))
if return_posteriors_hist:
return posteriors_hist, priors_s, history
return posteriors_s, priors_s, history

View File

@ -1,51 +0,0 @@
from sklearn.calibration import CalibratedClassifierCV
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from fgsld_quantifiers import FakeFGLSD
from method.aggregative import EMQ, CC
import quapy as qp
import numpy as np
qp.environ['SAMPLE_SIZE'] = 500
dataset = qp.datasets.fetch_reviews('hp')
qp.data.preprocessing.text2tfidf(dataset, min_df=5, inplace=True)
training = dataset.training
test = dataset.test
cls = CalibratedClassifierCV(LinearSVC())
#cls = LogisticRegression()
method_names, true_prevs, estim_prevs, tr_prevs = [], [], [], []
for model, model_name in [
(CC(cls), 'CC'),
# (FakeFGLSD(cls, nbins=20, isomerous=False, recompute_bins=True), 'FGSLD-isometric-dyn-20'),
#(FakeFGLSD(cls, nbins=11, isomerous=False, recompute_bins=True), 'FGSLD-isometric-dyn-11'),
#(FakeFGLSD(cls, nbins=8, isomerous=False, recompute_bins=True), 'FGSLD-isometric-dyn-8'),
#(FakeFGLSD(cls, nbins=6, isomerous=False, recompute_bins=True), 'FGSLD-isometric-dyn-6'),
(FakeFGLSD(cls, nbins=5, isomerous=False, recompute_bins=True), 'FGSLD-isometric-dyn-5'),
#(FakeFGLSD(cls, nbins=4, isomerous=False, recompute_bins=True), 'FGSLD-isometric-dyn-4'),
#(FakeFGLSD(cls, nbins=3, isomerous=False, recompute_bins=True), 'FGSLD-isometric-dyn-3'),
(FakeFGLSD(cls, nbins=1, isomerous=False, recompute_bins=True), 'FGSLD-isometric-dyn-1'),
# (FakeFGLSD(cls, nbins=3, isomerous=False, recompute_bins=False), 'FGSLD-isometric-sta-3'),
(EMQ(cls), 'SLD'),
]:
print('running ', model_name)
model.fit(training)
true_prev, estim_prev = qp.evaluation.artificial_sampling_prediction(
model, test, qp.environ['SAMPLE_SIZE'], n_repetitions=5, n_prevpoints=11, n_jobs=-1
)
method_names.append(model_name)
true_prevs.append(true_prev)
estim_prevs.append(estim_prev)
tr_prevs.append(training.prevalence())
#if hasattr(model, 'iterations'):
# print(f'iterations ave={np.mean(model.iterations):.3f}, min={np.min(model.iterations):.3f}, max={np.max(model.iterations):.3f}')
qp.plot.binary_diagonal(method_names, true_prevs, estim_prevs, train_prev=tr_prevs[0], savepath='./plot_fglsd.png')

View File

@ -1,37 +0,0 @@
from sklearn.calibration import CalibratedClassifierCV
from sklearn.svm import LinearSVC
from NewMethods.fgsld.fine_grained_sld import FineGrainedSLD
from quapy.method.aggregative import EMQ, CC, training_helper
from quapy.data import LabelledCollection
from quapy.method.base import BaseQuantifier
import quapy.functional as F
class FakeFGLSD(BaseQuantifier):
def __init__(self, learner, nbins, isomerous, recompute_bins):
self.learner = learner
self.nbins = nbins
self.isomerous = isomerous
self.recompute_bins = recompute_bins
self.iterations=[]
def fit(self, data: LabelledCollection):
self.Xtr, self.ytr = data.Xy
self.learner.fit(self.Xtr, self.ytr)
return self
def quantify(self, instances):
tr_priors = F.prevalence_from_labels(self.ytr, n_classes=2)
fgsld = FineGrainedSLD(self.Xtr, instances, self.ytr, tr_priors, self.learner, n_bins=self.nbins)
priors, posteriors = fgsld.run(self.isomerous, compute_bins_at_every_iter=self.recompute_bins)
self.iterations.append(fgsld.iterations)
return priors
def get_params(self, deep=True):
pass
def set_params(self, **parameters):
pass

View File

@ -1,112 +0,0 @@
import numpy as np
from NewMethods.fgsld.metrics import isomerous_bins, isometric_bins
from NewMethods.fgsld.em import History, get_measures_single_history
from sklearn.model_selection import cross_val_predict
import math
from scipy.special import softmax
class FineGrainedSLD:
def __init__(self, x_tr, x_te, y_tr, tr_priors, clf, n_bins=10):
self.y_tr = y_tr
self.clf = clf
self.tr_priors = tr_priors
self.te_preds = clf.predict_proba(x_te)
self.tr_preds = cross_val_predict(clf, x_tr, y_tr, method='predict_proba', n_jobs=10)
self.n_bins = n_bins
self.history: [History] = []
self.multi_class = False
def run(self, isomerous_binning, epsilon=1e-6, compute_bins_at_every_iter=True):
"""
Run the FGSLD algorithm.
:param isomerous_binning: whether to use isomerous or isometric binning.
:param epsilon: stopping condition.
:param compute_bins_at_every_iter: whether FGSLD should recompute the posterior bins at every iteration or not.
:param return_posteriors_hist: whether to return posteriors at every iteration or not.
:return: If `return_posteriors_hist` is true, the returned posteriors will be a list of numpy arrays, else a single numpy array with posteriors at last iteration.
"""
smoothing_tr = 1e-9 # 1 / (2 * self.tr_preds.shape[0])
smoothing_te = 1e-9 # 1 / (2 * self.te_preds.shape[0])
s = 0
tr_bin_priors = np.zeros((self.n_bins, self.tr_preds.shape[1]), dtype=np.float)
te_bin_priors = np.zeros((self.n_bins, self.te_preds.shape[1]), dtype=np.float)
tr_bins = self.__create_bins(training=True, isomerous_binning=isomerous_binning)
self.__compute_bins_priors(tr_bin_priors, self.tr_preds, tr_bins, smoothing_tr)
te_preds_cp = self.te_preds.copy()
val = 2 * epsilon
while not val < epsilon and s < 1000:
if compute_bins_at_every_iter or s==0:
te_bins = self.__create_bins(training=False, isomerous_binning=isomerous_binning)
if s == 0:
te_bin_priors_prev = tr_bin_priors.copy()
else:
te_bin_priors_prev = te_bin_priors.copy()
self.__compute_bins_priors(te_bin_priors, self.te_preds, te_bins, smoothing_te)
for label_idx, bins in te_bins.items():
for i, bin_ in enumerate(bins):
if bin_.shape[0] == 0:
continue
alpha = 1
beta = 0.1
local_te = te_bin_priors[i][label_idx]
global_te = self.te_preds[:,label_idx].mean()
te = local_te*alpha + global_te*(1-alpha)
local_tr = tr_bin_priors[i][label_idx]
global_tr = self.tr_priors[label_idx]
tr = local_tr*beta + global_tr*(1-beta)
#local_min = (math.floor(tr * self.n_bins) / self.n_bins)
# local_max = local_min + .1
# trans = lambda l: min(max((l - local_min) / 1, 0), 1)
assert not isomerous_binning, 'not tested'
#trans = lambda l: l - local_min
# trans = lambda l: l
# ratio = (trans(te) / trans(tr))
#ratio = np.clip(ratio, 0.1, 2)
#ratio = ratio**3
#self.te_preds[:, label_idx][bin_] = (te_preds_cp[:, label_idx][bin_]) * ratio
old_posterior = te_preds_cp[:, label_idx][bin_]
lr = 1
#self.te_preds[:, label_idx][bin_] = np.clip(old_posterior + (te-tr)*lr, 0, None)
self.te_preds[:, label_idx][bin_] = np.clip(old_posterior + (te - tr) * lr, 0, None)
#self.te_preds[:, label_idx][bin_] = (te_preds_cp[:, label_idx][bin_]) * ratio
# Normalization step
self.te_preds = (self.te_preds / self.te_preds.sum(axis=1, keepdims=True))
#self.te_preds = softmax(self.te_preds, axis=1)
val = np.max(np.abs(te_bin_priors / te_bin_priors_prev) - 1)
s += 1
self.iterations = s
priors = self.te_preds.mean(axis=0)
posteriors = self.te_preds
return priors, posteriors
def __compute_bins_priors(self, bin_priors_placeholder, posteriors, bins, smoothing):
for label_idx, bins in bins.items():
for i, bin_ in enumerate(bins):
if bin_.shape[0] == 0:
bin_priors_placeholder[i, label_idx] = smoothing
continue
numerator = posteriors[bin_, label_idx].mean()
bin_prior = (numerator + smoothing) / (1 + self.n_bins * smoothing) # normalize priors
bin_priors_placeholder[i, label_idx] = bin_prior
def __create_bins(self, training: bool, isomerous_binning: bool):
bins = {}
preds = self.tr_preds if training else self.te_preds
if isomerous_binning:
for label_idx in range(preds.shape[1]):
bins[label_idx] = isomerous_bins(label_idx, preds, self.n_bins)
else:
intervals = np.linspace(0., 1., num=self.n_bins, endpoint=False)
for label_idx in range(preds.shape[1]):
bins_ = isometric_bins(label_idx, preds, intervals)
bins[label_idx] = [bins_[i] for i in intervals]
return bins

View File

@ -1,271 +0,0 @@
import numpy as np
"""
Scikit learn provides a full set of evaluation metrics, but they treat special cases differently.
I.e., when the number of true positives, false positives, and false negatives ammount to 0, all
affected metrics (precision, recall, and thus f1) output 0 in Scikit learn.
We adhere to the common practice of outputting 1 in this case since the classifier has correctly
classified all examples as negatives.
"""
def isometric_brier_decomposition(true_labels, predicted_labels, bin_intervals=np.arange(0., 1.1, 0.1), step=0.1):
"""
The Isometric Brier decomposition or score is obtained by partitioning U into intervals I_1j,...,I_bj that
have equal length, where U is the total size of our test set (i.e., true_labels.shape[0]). This means that,
if b=10 then I_1j = [0.0,0.1), I_2j = [0.2, 0.3),...,I_bj = [0.9,1.0).
bin_intervals is a numpy.array containing the range of the different intervals. Since it is a single dimensional
array, for every interval I_n we take the posterior probabilities Pr_n(x) such that I_n <= Pr_n(x) < I_n + step.
This variable defaults to np.arange(0., 1.0, 0.1), i.e. an array like [0.1, 0.2, ..., 1.0].
:return: a tuple (calibration score, refinement score)
"""
labels = set(true_labels)
calibration_score, refinement_score = 0.0, 0.0
for i in range(len(labels)):
bins = isometric_bins(i, predicted_labels, bin_intervals, step)
c_score, r_score = brier_decomposition(bins.values(), true_labels, predicted_labels, class_=i)
calibration_score += c_score
refinement_score += r_score
return calibration_score, refinement_score
def isomerous_brier_decomposition(true_labels, predicted_labels, n=10):
"""
The Isomerous Brier decomposition or score is obtained by partitioning U into intervals I_1j,...,I_bj such that
the corresponding bins B_1j,...,B_bj have equal size, where U is our test set. This means that, for every x' in
B_sj and x'' in B_tj with s < t, it holds that Pr(c_j|x') <= Pr(c_j|x'') and |B_sj| == |B_tj|, for any s,t in
{1,...,b}.
The n variable holds the number of bins we want (defaults to 10). Notice that we perform a numpy.array_split on
the predicted_labels, creating l % n sub-arrays of size l//n + 1 and the rest of size l//n, where l is the length
of the array.
:return: a tuple (calibration score, refinement score)
"""
labels = set(true_labels)
calibration_score, refinement_score = 0.0, 0.0
for i in range(len(labels)):
bins = isomerous_bins(i, predicted_labels, n)
c_score, r_score = brier_decomposition(bins, true_labels, predicted_labels, class_=i)
calibration_score += c_score
refinement_score += r_score
return calibration_score, refinement_score
def brier_decomposition(bins, true_labels, predicted_labels, class_=1):
"""
:param bins: must be an array of indices
:return: a tuple (calibration_score, refinement_score)
"""
calibration_score = 0
refinement_score = 0
for bin_ in bins:
if bin_.size <= 0:
continue
v_x = (bin_.shape[0] / true_labels.shape[0])
ro_x = np.mean(true_labels[bin_] == class_)
calibration_score += v_x * (predicted_labels[bin_, class_].mean() - ro_x)**2
refinement_score += (v_x * ro_x) * (1 - ro_x)
labels_len = len(set(true_labels))
return calibration_score / (labels_len * len(bins)), refinement_score / (labels_len * len(bins))
#def isometric_bins(label_index, predicted_labels, bin_intervals, step):
# predicted_class_label = predicted_labels[:, label_index]
# return {interv: np.where(np.logical_and(interv <= predicted_class_label, predicted_class_label < interv + step))[0]
# for interv in bin_intervals}
def isometric_bins(label_index, predicted_labels, bin_intervals):
def next_intv(i):
return bin_intervals[i + 1] if (i + 1) < len(bin_intervals) else 1.
predicted_class_label = predicted_labels[:, label_index]
return {
interv:
np.where(np.logical_and(interv <= predicted_class_label, predicted_class_label < next_intv(i)))[
0]
for i, interv in enumerate(bin_intervals)
}
def isomerous_bins(label_index, predicted_labels, n):
sorted_indices = predicted_labels[:, label_index].argsort()
return np.array_split(sorted_indices, n)
# true_labels and predicted_labels are two matrices in sklearn.preprocessing.MultiLabelBinarizer format
def macroF1(true_labels, predicted_labels):
return macro_average(true_labels, predicted_labels, f1)
# true_labels and predicted_labels are two matrices in sklearn.preprocessing.MultiLabelBinarizer format
def microF1(true_labels, predicted_labels):
return micro_average(true_labels, predicted_labels, f1)
# true_labels and predicted_labels are two matrices in sklearn.preprocessing.MultiLabelBinarizer format
def macroK(true_labels, predicted_labels):
return macro_average(true_labels, predicted_labels, K)
# true_labels and predicted_labels are two matrices in sklearn.preprocessing.MultiLabelBinarizer format
def microK(true_labels, predicted_labels):
return micro_average(true_labels, predicted_labels, K)
# true_labels is a matrix in sklearn.preprocessing.MultiLabelBinarizer format and posterior_probabilities is a matrix
# of the same shape containing real values in [0,1]
def smoothmacroF1(true_labels, posterior_probabilities):
return macro_average(true_labels, posterior_probabilities, f1, metric_statistics=soft_single_metric_statistics)
# true_labels is a matrix in sklearn.preprocessing.MultiLabelBinarizer format and posterior_probabilities is a matrix
# of the same shape containing real values in [0,1]
def smoothmicroF1(true_labels, posterior_probabilities):
return micro_average(true_labels, posterior_probabilities, f1, metric_statistics=soft_single_metric_statistics)
# true_labels is a matrix in sklearn.preprocessing.MultiLabelBinarizer format and posterior_probabilities is a matrix
# of the same shape containing real values in [0,1]
def smoothmacroK(true_labels, posterior_probabilities):
return macro_average(true_labels, posterior_probabilities, K, metric_statistics=soft_single_metric_statistics)
# true_labels is a matrix in sklearn.preprocessing.MultiLabelBinarizer format and posterior_probabilities is a matrix
# of the same shape containing real values in [0,1]
def smoothmicroK(true_labels, posterior_probabilities):
return micro_average(true_labels, posterior_probabilities, K, metric_statistics=soft_single_metric_statistics)
class ContTable:
def __init__(self, tp=0, tn=0, fp=0, fn=0):
self.tp = tp
self.tn = tn
self.fp = fp
self.fn = fn
def get_d(self): return self.tp + self.tn + self.fp + self.fn
def get_c(self): return self.tp + self.fn
def get_not_c(self): return self.tn + self.fp
def get_f(self): return self.tp + self.fp
def get_not_f(self): return self.tn + self.fn
def p_c(self): return (1.0 * self.get_c()) / self.get_d()
def p_not_c(self): return 1.0 - self.p_c()
def p_f(self): return (1.0 * self.get_f()) / self.get_d()
def p_not_f(self): return 1.0 - self.p_f()
def p_tp(self): return (1.0 * self.tp) / self.get_d()
def p_tn(self): return (1.0 * self.tn) / self.get_d()
def p_fp(self): return (1.0 * self.fp) / self.get_d()
def p_fn(self): return (1.0 * self.fn) / self.get_d()
def tpr(self):
c = 1.0 * self.get_c()
return self.tp / c if c > 0.0 else 0.0
def fpr(self):
_c = 1.0 * self.get_not_c()
return self.fp / _c if _c > 0.0 else 0.0
def __add__(self, other):
return ContTable(tp=self.tp + other.tp, tn=self.tn + other.tn, fp=self.fp + other.fp, fn=self.fn + other.fn)
def accuracy(cell):
return (cell.tp + cell.tn) * 1.0 / (cell.tp + cell.fp + cell.fn + cell.tn)
def f1(cell):
num = 2.0 * cell.tp
den = 2.0 * cell.tp + cell.fp + cell.fn
if den > 0: return num / den
# we define f1 to be 1 if den==0 since the classifier has correctly classified all instances as negative
return 1.0
def K(cell):
specificity, recall = 0., 0.
AN = cell.tn + cell.fp
if AN != 0:
specificity = cell.tn * 1. / AN
AP = cell.tp + cell.fn
if AP != 0:
recall = cell.tp * 1. / AP
if AP == 0:
return 2. * specificity - 1.
elif AN == 0:
return 2. * recall - 1.
else:
return specificity + recall - 1.
# computes the (hard) counters tp, fp, fn, and tn fron a true and predicted vectors of hard decisions
# true_labels and predicted_labels are two vectors of shape (number_documents,)
def hard_single_metric_statistics(true_labels, predicted_labels):
assert len(true_labels) == len(predicted_labels), "Format not consistent between true and predicted labels."
nd = len(true_labels)
tp = np.sum(predicted_labels[true_labels == 1])
fp = np.sum(predicted_labels[true_labels == 0])
fn = np.sum(true_labels[predicted_labels == 0])
tn = nd - (tp + fp + fn)
return ContTable(tp=tp, tn=tn, fp=fp, fn=fn)
# computes the (soft) contingency table where tp, fp, fn, and tn are the cumulative masses for the posterioir
# probabilitiesfron with respect to the true binary labels
# true_labels and posterior_probabilities are two vectors of shape (number_documents,)
def soft_single_metric_statistics(true_labels, posterior_probabilities):
assert len(true_labels) == len(posterior_probabilities), "Format not consistent between true and predicted labels."
pos_probs = posterior_probabilities[true_labels == 1]
neg_probs = posterior_probabilities[true_labels == 0]
tp = np.sum(pos_probs)
fn = np.sum(1. - pos_probs)
fp = np.sum(neg_probs)
tn = np.sum(1. - neg_probs)
return ContTable(tp=tp, tn=tn, fp=fp, fn=fn)
# if the classifier is single class, then the prediction is a vector of shape=(nD,) which causes issues when compared
# to the true labels (of shape=(nD,1)). This method increases the dimensions of the predictions.
def __check_consistency_and_adapt(true_labels, predictions):
if predictions.ndim == 1:
return __check_consistency_and_adapt(true_labels, np.expand_dims(predictions, axis=1))
if true_labels.ndim == 1:
return __check_consistency_and_adapt(np.expand_dims(true_labels, axis=1), predictions)
if true_labels.shape != predictions.shape:
raise ValueError("True and predicted label matrices shapes are inconsistent %s %s."
% (true_labels.shape, predictions.shape))
_, nC = true_labels.shape
return true_labels, predictions, nC
def macro_average(true_labels, predicted_labels, metric, metric_statistics=hard_single_metric_statistics):
true_labels, predicted_labels, nC = __check_consistency_and_adapt(true_labels, predicted_labels)
return np.mean([metric(metric_statistics(true_labels[:, c], predicted_labels[:, c])) for c in range(nC)])
def micro_average(true_labels, predicted_labels, metric, metric_statistics=hard_single_metric_statistics):
true_labels, predicted_labels, nC = __check_consistency_and_adapt(true_labels, predicted_labels)
accum = ContTable()
for c in range(nC):
other = metric_statistics(true_labels[:, c], predicted_labels[:, c])
accum = accum + other
return metric(accum)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 179 KiB

View File

@ -1,95 +0,0 @@
import quapy as qp
import settings
import os
import pathlib
import pickle
from glob import glob
import sys
from TweetSentQuant.util import nicename
from os.path import join
qp.environ['SAMPLE_SIZE'] = settings.SAMPLE_SIZE
plotext='png'
resultdir = './results'
plotdir = './plots'
os.makedirs(plotdir, exist_ok=True)
def gather_results(methods, error_name):
method_names, true_prevs, estim_prevs, tr_prevs = [], [], [], []
for method in methods:
for experiment in glob(f'{resultdir}/*-{method}-m{error_name}.pkl'):
true_prevalences, estim_prevalences, tr_prev, te_prev, te_prev_estim, best_params = pickle.load(open(experiment, 'rb'))
method_names.append(nicename(method))
true_prevs.append(true_prevalences)
estim_prevs.append(estim_prevalences)
tr_prevs.append(tr_prev)
return method_names, true_prevs, estim_prevs, tr_prevs
def plot_error_by_drift(methods, error_name, logscale=False, path=None):
print('plotting error by drift')
if path is not None:
path = join(path, f'error_by_drift_{error_name}.{plotext}')
method_names, true_prevs, estim_prevs, tr_prevs = gather_results(methods, error_name)
qp.plot.error_by_drift(
method_names,
true_prevs,
estim_prevs,
tr_prevs,
n_bins=20,
error_name=error_name,
show_std=False,
logscale=logscale,
title=f'Quantification error as a function of distribution shift',
savepath=path
)
def diagonal_plot(methods, error_name, path=None):
print('plotting diagonal plots')
if path is not None:
path = join(path, f'diag_{error_name}')
method_names, true_prevs, estim_prevs, tr_prevs = gather_results(methods, error_name)
qp.plot.binary_diagonal(method_names, true_prevs, estim_prevs, pos_class=0, title='Negative', legend=False, show_std=False, savepath=f'{path}_neg.{plotext}')
qp.plot.binary_diagonal(method_names, true_prevs, estim_prevs, pos_class=1, title='Neutral', legend=False, show_std=False, savepath=f'{path}_neu.{plotext}')
qp.plot.binary_diagonal(method_names, true_prevs, estim_prevs, pos_class=2, title='Positive', legend=True, show_std=False, savepath=f'{path}_pos.{plotext}')
def binary_bias_global(methods, error_name, path=None):
print('plotting bias global')
if path is not None:
path = join(path, f'globalbias_{error_name}')
method_names, true_prevs, estim_prevs, tr_prevs = gather_results(methods, error_name)
qp.plot.binary_bias_global(method_names, true_prevs, estim_prevs, pos_class=0, title='Negative', savepath=f'{path}_neg.{plotext}')
qp.plot.binary_bias_global(method_names, true_prevs, estim_prevs, pos_class=1, title='Neutral', savepath=f'{path}_neu.{plotext}')
qp.plot.binary_bias_global(method_names, true_prevs, estim_prevs, pos_class=2, title='Positive', savepath=f'{path}_pos.{plotext}')
def binary_bias_bins(methods, error_name, path=None):
print('plotting bias local')
if path is not None:
path = join(path, f'localbias_{error_name}')
method_names, true_prevs, estim_prevs, tr_prevs = gather_results(methods, error_name)
qp.plot.binary_bias_bins(method_names, true_prevs, estim_prevs, pos_class=0, title='Negative', legend=False, savepath=f'{path}_neg.{plotext}')
qp.plot.binary_bias_bins(method_names, true_prevs, estim_prevs, pos_class=1, title='Neutral', legend=False, savepath=f'{path}_neu.{plotext}')
qp.plot.binary_bias_bins(method_names, true_prevs, estim_prevs, pos_class=2, title='Positive', legend=True, savepath=f'{path}_pos.{plotext}')
gao_seb_methods = ['cc', 'acc', 'pcc', 'pacc', 'sld', 'svmq', 'svmkld', 'svmnkld']
new_methods_ae = ['svmmae' , 'epaccmaeptr', 'epaccmaemae', 'hdy', 'quanet']
new_methods_rae = ['svmmrae' , 'epaccmraeptr', 'epaccmraemrae', 'hdy', 'quanet']
plot_error_by_drift(gao_seb_methods+new_methods_ae, error_name='ae', path=plotdir)
plot_error_by_drift(gao_seb_methods+new_methods_rae, error_name='rae', logscale=True, path=plotdir)
diagonal_plot(gao_seb_methods+new_methods_ae, error_name='ae', path=plotdir)
diagonal_plot(gao_seb_methods+new_methods_rae, error_name='rae', path=plotdir)
binary_bias_global(gao_seb_methods+new_methods_ae, error_name='ae', path=plotdir)
binary_bias_global(gao_seb_methods+new_methods_rae, error_name='rae', path=plotdir)
#binary_bias_bins(gao_seb_methods+new_methods_ae, error_name='ae', path=plotdir)
#binary_bias_bins(gao_seb_methods+new_methods_rae, error_name='rae', path=plotdir)

View File

@ -1,161 +0,0 @@
import quapy as qp
import numpy as np
from os import makedirs
import sys, os
import pickle
from experiments import result_path
from tabular import Table
import argparse
tables_path = './tables'
MAXTONE = 50 # sets the intensity of the maximum color reached by the worst (red) and best (green) results
makedirs(tables_path, exist_ok=True)
sample_size = 100
qp.environ['SAMPLE_SIZE'] = sample_size
nice = {
'mae':'AE',
'mrae':'RAE',
'ae':'AE',
'rae':'RAE',
'svmkld': 'SVM(KLD)',
'svmnkld': 'SVM(NKLD)',
'svmq': 'SVM(Q)',
'svmae': 'SVM(AE)',
'svmnae': 'SVM(NAE)',
'svmmae': 'SVM(AE)',
'svmmrae': 'SVM(RAE)',
'quanet': 'QuaNet',
'hdy': 'HDy',
'hdysld': 'HDy-SLD',
'dys': 'DyS',
'svmperf':'',
'sanders': 'Sanders',
'semeval13': 'SemEval13',
'semeval14': 'SemEval14',
'semeval15': 'SemEval15',
'semeval16': 'SemEval16',
'Average': 'Average'
}
def save_table(path, table):
print(f'saving results in {path}')
with open(path, 'wt') as foo:
foo.write(table)
def experiment_errors(path, dataset, method, loss):
path = result_path(path, dataset, method, 'm'+loss if not loss.startswith('m') else loss)
if os.path.exists(path):
true_prevs, estim_prevs, _, _, _, _ = pickle.load(open(path, 'rb'))
err_fn = getattr(qp.error, loss)
errors = err_fn(true_prevs, estim_prevs)
return errors
return None
def nicerm(key):
return '\mathrm{'+nice[key]+'}'
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Generate tables for Tweeter Sentiment Quantification')
parser.add_argument('results', metavar='RESULT_PATH', type=str,
help='path to the directory containing the results of the methods tested in Gao & Sebastiani')
parser.add_argument('newresults', metavar='RESULT_PATH', type=str,
help='path to the directory containing the results for the experimental methods')
args = parser.parse_args()
datasets = qp.datasets.TWITTER_SENTIMENT_DATASETS_TEST
evaluation_measures = [qp.error.ae, qp.error.rae]
gao_seb_methods = ['cc', 'acc', 'pcc', 'pacc', 'sld', 'svmq', 'svmkld', 'svmnkld']
new_methods = ['hdy'] # methods added to the Gao & Sebastiani methods
experimental_methods = ['hdysld'] # experimental
for i, eval_func in enumerate(evaluation_measures):
# Tables evaluation scores for AE and RAE (two tables)
# ----------------------------------------------------
eval_name = eval_func.__name__
added_methods = ['svmm' + eval_name] + new_methods
methods = gao_seb_methods + added_methods + experimental_methods
nold_methods = len(gao_seb_methods)
nnew_methods = len(added_methods)
nexp_methods = len(experimental_methods)
# fill data table
table = Table(benchmarks=datasets, methods=methods)
for dataset in datasets:
for method in methods:
if method in experimental_methods:
path = args.newresults
else:
path = args.results
table.add(dataset, method, experiment_errors(path, dataset, method, eval_name))
# write the latex table
tabular = """
\\begin{tabularx}{\\textwidth}{|c||""" + ('Y|'*nold_methods) + '|' + ('Y|'*nnew_methods) + '|' + ('Y|'*nexp_methods) + """} \hline
& \multicolumn{"""+str(nold_methods)+"""}{c||}{Methods tested in~\cite{Gao:2016uq}} &
\multicolumn{"""+str(nnew_methods)+"""}{c|}{} &
\multicolumn{"""+str(nexp_methods)+"""}{c|}{}\\\\ \hline
"""
rowreplace={dataset: nice.get(dataset, dataset.upper()) for dataset in datasets}
colreplace={method:'\side{' + nice.get(method, method.upper()) +'$^{' + nicerm(eval_name) + '}$} ' for method in methods}
tabular += table.latexTabular(benchmark_replace=rowreplace, method_replace=colreplace)
tabular += "\n\end{tabularx}"
save_table(f'./tables/tab_results_{eval_name}.new.tex', tabular)
# Tables ranks for AE and RAE (two tables)
# ----------------------------------------------------
# fill the data table
ranktable = Table(benchmarks=datasets, methods=methods, missing='--')
for dataset in datasets:
for method in methods:
ranktable.add(dataset, method, values=table.get(dataset, method, 'rank'))
# write the latex table
tabular = """
\\begin{tabularx}{\\textwidth}{|c||""" + ('Y|'*nold_methods) + '|' + ('Y|'*nnew_methods) + '|' + ('Y|'*nexp_methods) + """} \hline
& \multicolumn{"""+str(nold_methods)+"""}{c||}{Methods tested in~\cite{Gao:2016uq}} &
\multicolumn{"""+str(nnew_methods)+"""}{c|}{} &
\multicolumn{"""+str(nexp_methods)+"""}{c|}{}\\\\ \hline
"""
for method in methods:
tabular += ' & \side{' + nice.get(method, method.upper()) +'$^{' + nicerm(eval_name) + '}$} '
tabular += '\\\\\hline\n'
for dataset in datasets:
tabular += nice.get(dataset, dataset.upper()) + ' '
for method in methods:
newrank = ranktable.get(dataset, method)
if newrank != '--':
newrank = f'{int(newrank)}'
color = ranktable.get_color(dataset, method)
if color == '--':
color = ''
tabular += ' & ' + f'{newrank}' + color
tabular += '\\\\\hline\n'
tabular += '\hline\n'
tabular += 'Average '
for method in methods:
newrank = ranktable.get_average(method)
if newrank != '--':
newrank = f'{newrank:.1f}'
color = ranktable.get_average(method, 'color')
if color == '--':
color = ''
tabular += ' & ' + f'{newrank}' + color
tabular += '\\\\\hline\n'
tabular += "\end{tabularx}"
save_table(f'./tables/tab_rank_{eval_name}.new.tex', tabular)
print("[Done]")

View File

@ -1,5 +0,0 @@
import multiprocessing
N_JOBS = -2 #multiprocessing.cpu_count()
ENSEMBLE_N_JOBS=1
SAMPLE_SIZE = 100

View File

@ -1,26 +0,0 @@
from glob import glob
import pickle
import numpy as np
results = './results'
method_choices = {}
for file in glob(f'{results}/*'):
hyper = pickle.load(open(file, 'rb'))[-1]
if hyper:
dataset,method,optim = file.split('/')[-1].split('-')
key = str(hyper)
if method not in method_choices:
method_choices[method] = {}
if key not in method_choices[method]:
method_choices[method][key] = 0
method_choices[method][key] = method_choices[method][key]+1
for method, hyper_count_dict in method_choices.items():
hyper, counts = zip(*list(hyper_count_dict.items()))
order = np.argsort(counts)
counts = np.asarray(counts)[order][::-1]
hyper = np.asarray(hyper)[order][::-1]
print(method)
for hyper_i, count_i in zip(hyper, counts):
print('\t', hyper_i, count_i)

View File

@ -1,318 +0,0 @@
import numpy as np
import itertools
from scipy.stats import ttest_ind_from_stats, wilcoxon
class Table:
VALID_TESTS = [None, "wilcoxon", "ttest"]
def __init__(self, benchmarks, methods, lower_is_better=True, ttest='ttest', prec_mean=3,
clean_zero=False, show_std=False, prec_std=3, average=True, missing=None, missing_str='--', color=True):
assert ttest in self.VALID_TESTS, f'unknown test, valid are {self.VALID_TESTS}'
self.benchmarks = np.asarray(benchmarks)
self.benchmark_index = {row:i for i, row in enumerate(benchmarks)}
self.methods = np.asarray(methods)
self.method_index = {col:j for j, col in enumerate(methods)}
self.map = {}
# keyed (#rows,#cols)-ndarrays holding computations from self.map['values']
self._addmap('values', dtype=object)
self.lower_is_better = lower_is_better
self.ttest = ttest
self.prec_mean = prec_mean
self.clean_zero = clean_zero
self.show_std = show_std
self.prec_std = prec_std
self.add_average = average
self.missing = missing
self.missing_str = missing_str
self.color = color
self.touch()
@property
def nbenchmarks(self):
return len(self.benchmarks)
@property
def nmethods(self):
return len(self.methods)
def touch(self):
self._modif = True
def update(self):
if self._modif:
self.compute()
def _getfilled(self):
return np.argwhere(self.map['fill'])
@property
def values(self):
return self.map['values']
def _indexes(self):
return itertools.product(range(self.nbenchmarks), range(self.nmethods))
def _addmap(self, map, dtype, func=None):
self.map[map] = np.empty((self.nbenchmarks, self.nmethods), dtype=dtype)
if func is None:
return
m = self.map[map]
f = func
indexes = self._indexes() if map == 'fill' else self._getfilled()
for i, j in indexes:
m[i, j] = f(self.values[i, j])
def _addrank(self):
for i in range(self.nbenchmarks):
filled_cols_idx = np.argwhere(self.map['fill'][i]).flatten()
col_means = [self.map['mean'][i,j] for j in filled_cols_idx]
ranked_cols_idx = filled_cols_idx[np.argsort(col_means)]
if not self.lower_is_better:
ranked_cols_idx = ranked_cols_idx[::-1]
self.map['rank'][i, ranked_cols_idx] = np.arange(1, len(filled_cols_idx)+1)
def _addcolor(self):
for i in range(self.nbenchmarks):
filled_cols_idx = np.argwhere(self.map['fill'][i]).flatten()
if filled_cols_idx.size==0:
continue
col_means = [self.map['mean'][i,j] for j in filled_cols_idx]
minval = min(col_means)
maxval = max(col_means)
for col_idx in filled_cols_idx:
val = self.map['mean'][i,col_idx]
norm = (maxval - minval)
if norm > 0:
normval = (val - minval) / norm
else:
normval = 0.5
if self.lower_is_better:
normval = 1 - normval
self.map['color'][i, col_idx] = color_red2green_01(normval)
def _run_ttest(self, row, col1, col2):
mean1 = self.map['mean'][row, col1]
std1 = self.map['std'][row, col1]
nobs1 = self.map['nobs'][row, col1]
mean2 = self.map['mean'][row, col2]
std2 = self.map['std'][row, col2]
nobs2 = self.map['nobs'][row, col2]
_, p_val = ttest_ind_from_stats(mean1, std1, nobs1, mean2, std2, nobs2)
return p_val
def _run_wilcoxon(self, row, col1, col2):
values1 = self.map['values'][row, col1]
values2 = self.map['values'][row, col2]
_, p_val = wilcoxon(values1, values2)
return p_val
def _add_statistical_test(self):
if self.ttest is None:
return
self.some_similar = [False]*self.nmethods
for i in range(self.nbenchmarks):
filled_cols_idx = np.argwhere(self.map['fill'][i]).flatten()
if len(filled_cols_idx) <= 1:
continue
col_means = [self.map['mean'][i,j] for j in filled_cols_idx]
best_pos = filled_cols_idx[np.argmin(col_means)]
for j in filled_cols_idx:
if j==best_pos:
continue
if self.ttest == 'ttest':
p_val = self._run_ttest(i, best_pos, j)
else:
p_val = self._run_wilcoxon(i, best_pos, j)
pval_outcome = pval_interpretation(p_val)
self.map['ttest'][i, j] = pval_outcome
if pval_outcome != 'Diff':
self.some_similar[j] = True
def compute(self):
self._addmap('fill', dtype=bool, func=lambda x: x is not None)
self._addmap('mean', dtype=float, func=np.mean)
self._addmap('std', dtype=float, func=np.std)
self._addmap('nobs', dtype=float, func=len)
self._addmap('rank', dtype=int, func=None)
self._addmap('color', dtype=object, func=None)
self._addmap('ttest', dtype=object, func=None)
self._addmap('latex', dtype=object, func=None)
self._addrank()
self._addcolor()
self._add_statistical_test()
if self.add_average:
self._addave()
self._modif = False
def _is_column_full(self, col):
return all(self.map['fill'][:, self.method_index[col]])
def _addave(self):
ave = Table(['ave'], self.methods, lower_is_better=self.lower_is_better, ttest=self.ttest, average=False,
missing=self.missing, missing_str=self.missing_str)
for col in self.methods:
values = None
if self._is_column_full(col):
if self.ttest == 'ttest':
values = np.asarray(self.map['mean'][:, self.method_index[col]])
else: # wilcoxon
values = np.concatenate(self.values[:, self.method_index[col]])
ave.add('ave', col, values)
self.average = ave
def add(self, benchmark, method, values):
if values is not None:
values = np.asarray(values)
if values.ndim==0:
values = values.flatten()
rid, cid = self._coordinates(benchmark, method)
self.map['values'][rid, cid] = values
self.touch()
def get(self, benchmark, method, attr='mean'):
self.update()
assert attr in self.map, f'unknwon attribute {attr}'
rid, cid = self._coordinates(benchmark, method)
if self.map['fill'][rid, cid]:
v = self.map[attr][rid, cid]
if v is None or (isinstance(v,float) and np.isnan(v)):
return self.missing
return v
else:
return self.missing
def _coordinates(self, benchmark, method):
assert benchmark in self.benchmark_index, f'benchmark {benchmark} out of range'
assert method in self.method_index, f'method {method} out of range'
rid = self.benchmark_index[benchmark]
cid = self.method_index[method]
return rid, cid
def get_average(self, method, attr='mean'):
self.update()
if self.add_average:
return self.average.get('ave', method, attr=attr)
return None
def get_color(self, benchmark, method):
color = self.get(benchmark, method, attr='color')
if color is None:
return ''
return color
def latex(self, benchmark, method):
self.update()
i,j = self._coordinates(benchmark, method)
if self.map['fill'][i,j] == False:
return self.missing_str
mean = self.map['mean'][i,j]
l = f" {mean:.{self.prec_mean}f}"
if self.clean_zero:
l = l.replace(' 0.', '.')
isbest = self.map['rank'][i,j] == 1
if isbest:
l = "\\textbf{"+l.strip()+"}"
stat = ''
if self.ttest is not None and self.some_similar[j]:
test_label = self.map['ttest'][i,j]
if test_label == 'Sim':
stat = '^{\dag\phantom{\dag}}'
elif test_label == 'Same':
stat = '^{\ddag}'
elif isbest or test_label == 'Diff':
stat = '^{\phantom{\ddag}}'
std = ''
if self.show_std:
std = self.map['std'][i,j]
std = f" {std:.{self.prec_std}f}"
if self.clean_zero:
std = std.replace(' 0.', '.')
std = f" \pm {std:{self.prec_std}}"
if stat!='' or std!='':
l = f'{l}${stat}{std}$'
if self.color:
l += ' ' + self.map['color'][i,j]
return l
def latexTabular(self, benchmark_replace={}, method_replace={}, average=True):
tab = ' & '
tab += ' & '.join([method_replace.get(col, col) for col in self.methods])
tab += ' \\\\\hline\n'
for row in self.benchmarks:
rowname = benchmark_replace.get(row, row)
tab += rowname + ' & '
tab += self.latexRow(row)
if average:
tab += '\hline\n'
tab += 'Average & '
tab += self.latexAverage()
return tab
def latexRow(self, benchmark, endl='\\\\\hline\n'):
s = [self.latex(benchmark, col) for col in self.methods]
s = ' & '.join(s)
s += ' ' + endl
return s
def latexAverage(self, endl='\\\\\hline\n'):
if self.add_average:
return self.average.latexRow('ave', endl=endl)
def getRankTable(self):
t = Table(benchmarks=self.benchmarks, methods=self.methods, prec_mean=0, average=True)
for rid, cid in self._getfilled():
row = self.benchmarks[rid]
col = self.methods[cid]
t.add(row, col, self.get(row, col, 'rank'))
t.compute()
return t
def dropMethods(self, methods):
drop_index = [self.method_index[m] for m in methods]
new_methods = np.delete(self.methods, drop_index)
new_index = {col:j for j, col in enumerate(new_methods)}
self.map['values'] = self.values[:,np.asarray([self.method_index[m] for m in new_methods], dtype=int)]
self.methods = new_methods
self.method_index = new_index
self.touch()
def pval_interpretation(p_val):
if 0.005 >= p_val:
return 'Diff'
elif 0.05 >= p_val > 0.005:
return 'Sim'
elif p_val > 0.05:
return 'Same'
def color_red2green_01(val, maxtone=50):
if np.isnan(val): return None
assert 0 <= val <= 1, f'val {val} out of range [0,1]'
# rescale to [-1,1]
val = val * 2 - 1
if val < 0:
color = 'red'
tone = maxtone * (-val)
else:
color = 'green'
tone = maxtone * val
return '\cellcolor{' + color + f'!{int(tone)}' + '}'

View File

@ -1,89 +0,0 @@
import numpy as np
nice = {
'mae':'AE',
'mrae':'RAE',
'ae':'AE',
'rae':'RAE',
'svmkld': 'SVM(KLD)',
'svmnkld': 'SVM(NKLD)',
'svmq': 'SVM(Q)',
'svmae': 'SVM(AE)',
'svmnae': 'SVM(NAE)',
'svmmae': 'SVM(AE)',
'svmmrae': 'SVM(RAE)',
'quanet': 'QuaNet',
'hdy': 'HDy',
'dys': 'DyS',
'epaccmaeptr': 'E(PACC)$_\mathrm{Ptr}$',
'epaccmaemae': 'E(PACC)$_\mathrm{AE}$',
'epaccmraeptr': 'E(PACC)$_\mathrm{Ptr}$',
'epaccmraemrae': 'E(PACC)$_\mathrm{RAE}$',
'svmperf':'',
'sanders': 'Sanders',
'semeval13': 'SemEval13',
'semeval14': 'SemEval14',
'semeval15': 'SemEval15',
'semeval16': 'SemEval16',
'Average': 'Average'
}
def nicerm(key):
return '\mathrm{'+nice[key]+'}'
def nicename(method, eval_name=None, side=False):
m = nice.get(method, method.upper())
if eval_name is not None:
o = '$^{' + nicerm(eval_name) + '}$'
m = (m+o).replace('$$','')
if side:
m = '\side{'+m+'}'
return m
def load_Gao_Sebastiani_previous_results():
def rename(method):
old2new = {
'kld': 'svmkld',
'nkld': 'svmnkld',
'qbeta2': 'svmq',
'em': 'sld'
}
return old2new.get(method, method)
gao_seb_results = {}
with open('./Gao_Sebastiani_results.txt', 'rt') as fin:
lines = fin.readlines()
for line in lines[1:]:
line = line.strip()
parts = line.lower().split()
if len(parts) == 4:
dataset, method, ae, rae = parts
else:
method, ae, rae = parts
learner, method = method.split('-')
method = rename(method)
gao_seb_results[f'{dataset}-{method}-ae'] = float(ae)
gao_seb_results[f'{dataset}-{method}-rae'] = float(rae)
return gao_seb_results
def get_ranks_from_Gao_Sebastiani():
gao_seb_results = load_Gao_Sebastiani_previous_results()
datasets = set([key.split('-')[0] for key in gao_seb_results.keys()])
methods = np.sort(np.unique([key.split('-')[1] for key in gao_seb_results.keys()]))
ranks = {}
for metric in ['ae', 'rae']:
for dataset in datasets:
scores = [gao_seb_results[f'{dataset}-{method}-{metric}'] for method in methods]
order = np.argsort(scores)
sorted_methods = methods[order]
for i, method in enumerate(sorted_methods):
ranks[f'{dataset}-{method}-{metric}'] = i+1
for method in methods:
rankave = np.mean([ranks[f'{dataset}-{method}-{metric}'] for dataset in datasets])
ranks[f'Average-{method}-{metric}'] = rankave
return ranks, gao_seb_results