1
0
Fork 0

parallel functionality added to quapy in order to allow for multiprocess parallelization (and not threading) handling quapy's environment variables

This commit is contained in:
Alejandro Moreo Fernandez 2021-01-27 09:54:41 +01:00
parent 301e8b9088
commit e609c262b4
14 changed files with 420 additions and 231 deletions

View File

@ -17,6 +17,8 @@ import torch
import shutil
qp.environ['SAMPLE_SIZE'] = settings.SAMPLE_SIZE
def newLR():
return LogisticRegression(max_iter=1000, solver='lbfgs', n_jobs=-1)
@ -69,8 +71,8 @@ def quantification_ensembles():
hyper_none = None
yield 'epaccmaeptr', EPACC(newLR(), optim='mae', policy='ptr', **common), hyper_none
yield 'epaccmaemae', EPACC(newLR(), optim='mae', policy='mae', **common), hyper_none
#yield 'esldmaeptr', EEMQ(newLR(), optim='mae', policy='ptr', **common), hyper_none
#yield 'esldmaemae', EEMQ(newLR(), optim='mae', policy='mae', **common), hyper_none
# yield 'esldmaeptr', EEMQ(newLR(), optim='mae', policy='ptr', **common), hyper_none
# yield 'esldmaemae', EEMQ(newLR(), optim='mae', policy='mae', **common), hyper_none
yield 'epaccmraeptr', EPACC(newLR(), optim='mrae', policy='ptr', **common), hyper_none
yield 'epaccmraemrae', EPACC(newLR(), optim='mrae', policy='mrae', **common), hyper_none
@ -115,8 +117,6 @@ def save_results(dataset_name, model_name, optim_loss, *results):
def run(experiment):
qp.environ['SAMPLE_SIZE'] = settings.SAMPLE_SIZE
optim_loss, dataset_name, (model_name, model, hyperparams) = experiment
if is_already_computed(dataset_name, model_name, optim_loss=optim_loss):
@ -198,19 +198,16 @@ if __name__ == '__main__':
datasets = qp.datasets.TWITTER_SENTIMENT_DATASETS_TRAIN
models = quantification_models()
Parallel(n_jobs=settings.N_JOBS)(
delayed(run)(experiment) for experiment in itertools.product(optim_losses, datasets, models)
)
qp.util.parallel(run, itertools.product(optim_losses, datasets, models), n_jobs=settings.N_JOBS)
models = quantification_cuda_models()
Parallel(n_jobs=settings.CUDA_N_JOBS)(
delayed(run)(experiment) for experiment in itertools.product(optim_losses, datasets, models)
)
qp.util.parallel(run, itertools.product(optim_losses, datasets, models), n_jobs=settings.CUDA_N_JOBS)
models = quantification_ensembles()
Parallel(n_jobs=1)(
delayed(run)(experiment) for experiment in itertools.product(optim_losses, datasets, models)
)
qp.util.parallel(run, itertools.product(optim_losses, datasets, models), n_jobs=1)
# Parallel(n_jobs=1)(
# delayed(run)(experiment) for experiment in itertools.product(optim_losses, datasets, models)
# )
#shutil.rmtree(args.checkpointdir, ignore_errors=True)

View File

@ -0,0 +1,94 @@
import quapy as qp
import settings
import os
import pathlib
import pickle
from glob import glob
import sys
from TweetSentQuant.util import nicename
from os.path import join
qp.environ['SAMPLE_SIZE'] = settings.SAMPLE_SIZE
resultdir = './results'
plotdir = './plots'
os.makedirs(plotdir, exist_ok=True)
def gather_results(methods, error_name):
method_names, true_prevs, estim_prevs, tr_prevs = [], [], [], []
for method in methods:
for experiment in glob(f'{resultdir}/*-{method}-m{error_name}.pkl'):
true_prevalences, estim_prevalences, tr_prev, te_prev, te_prev_estim, best_params = pickle.load(open(experiment, 'rb'))
method_names.append(nicename(method))
true_prevs.append(true_prevalences)
estim_prevs.append(estim_prevalences)
tr_prevs.append(tr_prev)
return method_names, true_prevs, estim_prevs, tr_prevs
def plot_error_by_drift(methods, error_name, logscale=False, path=None):
print('plotting error by drift')
if path is not None:
path = join(path, f'error_by_drift_{error_name}.pdf')
method_names, true_prevs, estim_prevs, tr_prevs = gather_results(methods, error_name)
qp.plot.error_by_drift(
method_names,
true_prevs,
estim_prevs,
tr_prevs,
n_bins=20,
error_name=error_name,
show_std=False,
logscale=logscale,
title=f'Quantification error as a function of distribution shift',
savepath=path
)
def diagonal_plot(methods, error_name, path=None):
print('plotting diagonal plots')
if path is not None:
path = join(path, f'diag_{error_name}')
method_names, true_prevs, estim_prevs, tr_prevs = gather_results(methods, error_name)
qp.plot.binary_diagonal(method_names, true_prevs, estim_prevs, pos_class=0, title='Negative', legend=False, show_std=False, savepath=path+'_neg.pdf')
qp.plot.binary_diagonal(method_names, true_prevs, estim_prevs, pos_class=1, title='Neutral', legend=False, show_std=False, savepath=path+'_neu.pdf')
qp.plot.binary_diagonal(method_names, true_prevs, estim_prevs, pos_class=2, title='Positive', legend=True, show_std=False, savepath=path+'_pos.pdf')
def binary_bias_global(methods, error_name, path=None):
print('plotting bias global')
if path is not None:
path = join(path, f'globalbias_{error_name}')
method_names, true_prevs, estim_prevs, tr_prevs = gather_results(methods, error_name)
qp.plot.binary_bias_global(method_names, true_prevs, estim_prevs, pos_class=0, title='Negative', savepath=path+'_neg.pdf')
qp.plot.binary_bias_global(method_names, true_prevs, estim_prevs, pos_class=1, title='Neutral', savepath=path+'_neu.pdf')
qp.plot.binary_bias_global(method_names, true_prevs, estim_prevs, pos_class=2, title='Positive', savepath=path+'_pos.pdf')
def binary_bias_bins(methods, error_name, path=None):
print('plotting bias local')
if path is not None:
path = join(path, f'localbias_{error_name}')
method_names, true_prevs, estim_prevs, tr_prevs = gather_results(methods, error_name)
qp.plot.binary_bias_bins(method_names, true_prevs, estim_prevs, pos_class=0, title='Negative', legend=False, savepath=path+'_neg.pdf')
qp.plot.binary_bias_bins(method_names, true_prevs, estim_prevs, pos_class=1, title='Neutral', legend=False, savepath=path+'_neu.pdf')
qp.plot.binary_bias_bins(method_names, true_prevs, estim_prevs, pos_class=2, title='Positive', legend=True, savepath=path+'_pos.pdf')
gao_seb_methods = ['cc', 'acc', 'pcc', 'pacc', 'sld', 'svmq', 'svmkld', 'svmnkld']
new_methods_ae = ['svmmae' , 'epaccmaeptr', 'epaccmaemae', 'hdy', 'quanet']
new_methods_rae = ['svmmrae' , 'epaccmraeptr', 'epaccmraemrae', 'hdy', 'quanet']
# plot_error_by_drift(gao_seb_methods+new_methods_ae, error_name='ae', path=plotdir)
# plot_error_by_drift(gao_seb_methods+new_methods_rae, error_name='rae', logscale=True, path=plotdir)
# diagonal_plot(gao_seb_methods+new_methods_ae, error_name='ae', path=plotdir)
# diagonal_plot(gao_seb_methods+new_methods_rae, error_name='rae', path=plotdir)
binary_bias_global(gao_seb_methods+new_methods_ae, error_name='ae', path=plotdir)
binary_bias_global(gao_seb_methods+new_methods_rae, error_name='rae', path=plotdir)
# binary_bias_bins(gao_seb_methods+new_methods_ae, error_name='ae', path=plotdir)
# binary_bias_bins(gao_seb_methods+new_methods_rae, error_name='rae', path=plotdir)

View File

@ -4,7 +4,7 @@ from os import makedirs
import sys, os
import pickle
import argparse
from TweetSentQuant.util import nicename, get_ranks_from_Gao_Sebastiani
import settings
from experiments import result_path
from tabular import Table
@ -17,84 +17,6 @@ makedirs(tables_path, exist_ok=True)
qp.environ['SAMPLE_SIZE'] = settings.SAMPLE_SIZE
nice = {
'mae':'AE',
'mrae':'RAE',
'ae':'AE',
'rae':'RAE',
'svmkld': 'SVM(KLD)',
'svmnkld': 'SVM(NKLD)',
'svmq': 'SVM(Q)',
'svmae': 'SVM(AE)',
'svmnae': 'SVM(NAE)',
'svmmae': 'SVM(AE)',
'svmmrae': 'SVM(RAE)',
'quanet': 'QuaNet',
'hdy': 'HDy',
'dys': 'DyS',
'epaccmaeptr': 'E(PACC)$_\mathrm{Ptr}$',
'epaccmaemae': 'E(PACC)$_\mathrm{AE}$',
'epaccmraeptr': 'E(PACC)$_\mathrm{Ptr}$',
'epaccmraemrae': 'E(PACC)$_\mathrm{RAE}$',
'svmperf':'',
'sanders': 'Sanders',
'semeval13': 'SemEval13',
'semeval14': 'SemEval14',
'semeval15': 'SemEval15',
'semeval16': 'SemEval16',
'Average': 'Average'
}
def nicerm(key):
return '\mathrm{'+nice[key]+'}'
def load_Gao_Sebastiani_previous_results():
def rename(method):
old2new = {
'kld': 'svmkld',
'nkld': 'svmnkld',
'qbeta2': 'svmq',
'em': 'sld'
}
return old2new.get(method, method)
gao_seb_results = {}
with open('./Gao_Sebastiani_results.txt', 'rt') as fin:
lines = fin.readlines()
for line in lines[1:]:
line = line.strip()
parts = line.lower().split()
if len(parts) == 4:
dataset, method, ae, rae = parts
else:
method, ae, rae = parts
learner, method = method.split('-')
method = rename(method)
gao_seb_results[f'{dataset}-{method}-ae'] = float(ae)
gao_seb_results[f'{dataset}-{method}-rae'] = float(rae)
return gao_seb_results
def get_ranks_from_Gao_Sebastiani():
gao_seb_results = load_Gao_Sebastiani_previous_results()
datasets = set([key.split('-')[0] for key in gao_seb_results.keys()])
methods = np.sort(np.unique([key.split('-')[1] for key in gao_seb_results.keys()]))
ranks = {}
for metric in ['ae', 'rae']:
for dataset in datasets:
scores = [gao_seb_results[f'{dataset}-{method}-{metric}'] for method in methods]
order = np.argsort(scores)
sorted_methods = methods[order]
for i, method in enumerate(sorted_methods):
ranks[f'{dataset}-{method}-{metric}'] = i+1
for method in methods:
rankave = np.mean([ranks[f'{dataset}-{method}-{metric}'] for dataset in datasets])
ranks[f'Average-{method}-{metric}'] = rankave
return ranks, gao_seb_results
def save_table(path, table):
print(f'saving results in {path}')
with open(path, 'wt') as foo:
@ -111,15 +33,6 @@ def experiment_errors(path, dataset, method, loss):
return None
def nicename(method, eval_name=None, side=False):
m = nice.get(method, method.upper())
if eval_name is not None:
o = '$^{' + nicerm(eval_name) + '}$'
m = (m+o).replace('$$','')
if side:
m = '\side{'+m+'}'
return m
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Generate tables for Tweeter Sentiment Quantification')
@ -146,7 +59,7 @@ if __name__ == '__main__':
nnew_methods = len(added_methods)
# fill data table
table = Table(rows=datasets, cols=methods)
table = Table(benchmarks=datasets, methods=methods)
for dataset in datasets:
for method in methods:
table.add(dataset, method, experiment_errors(args.results, dataset, method, eval_name))
@ -166,7 +79,7 @@ if __name__ == '__main__':
rowreplace={dataset: nicename(dataset) for dataset in datasets}
colreplace={method: nicename(method, eval_name, side=True) for method in methods}
tabular += table.latexTabular(rowreplace=rowreplace, colreplace=colreplace)
tabular += table.latexTabular(benchmark_replace=rowreplace, method_replace=colreplace)
tabular += """
\end{tabular}%
}
@ -178,8 +91,10 @@ if __name__ == '__main__':
# ----------------------------------------------------
methods = gao_seb_methods
table.dropMethods(added_methods)
# fill the data table
ranktable = Table(rows=datasets, cols=methods, missing='--')
ranktable = Table(benchmarks=datasets, methods=methods, missing='--')
for dataset in datasets:
for method in methods:
ranktable.add(dataset, method, values=table.get(dataset, method, 'rank'))

View File

@ -6,15 +6,15 @@ from scipy.stats import ttest_ind_from_stats, wilcoxon
class Table:
VALID_TESTS = [None, "wilcoxon", "ttest"]
def __init__(self, rows, cols, lower_is_better=True, ttest='ttest', prec_mean=3,
def __init__(self, benchmarks, methods, lower_is_better=True, ttest='ttest', prec_mean=3,
clean_zero=False, show_std=False, prec_std=3, average=True, missing=None, missing_str='--', color=True):
assert ttest in self.VALID_TESTS, f'unknown test, valid are {self.VALID_TESTS}'
self.rows = np.asarray(rows)
self.row_index = {row:i for i, row in enumerate(rows)}
self.benchmarks = np.asarray(benchmarks)
self.benchmark_index = {row:i for i, row in enumerate(benchmarks)}
self.cols = np.asarray(cols)
self.col_index = {col:j for j, col in enumerate(cols)}
self.methods = np.asarray(methods)
self.method_index = {col:j for j, col in enumerate(methods)}
self.map = {} # keyed (#rows,#cols)-ndarrays holding computations from self.map['values']
self._addmap('values', dtype=object)
@ -31,12 +31,12 @@ class Table:
self.touch()
@property
def nrows(self):
return len(self.rows)
def nbenchmarks(self):
return len(self.benchmarks)
@property
def ncols(self):
return len(self.cols)
def nmethods(self):
return len(self.methods)
def touch(self):
self.modif = True
@ -53,10 +53,10 @@ class Table:
return self.map['values']
def _indexes(self):
return itertools.product(range(self.nrows), range(self.ncols))
return itertools.product(range(self.nbenchmarks), range(self.nmethods))
def _addmap(self, map, dtype, func=None):
self.map[map] = np.empty((self.nrows, self.ncols), dtype=dtype)
self.map[map] = np.empty((self.nbenchmarks, self.nmethods), dtype=dtype)
if func is None:
return
m = self.map[map]
@ -68,7 +68,7 @@ class Table:
m[i, j] = f(self.values[i, j])
def _addrank(self):
for i in range(self.nrows):
for i in range(self.nbenchmarks):
filled_cols_idx = np.argwhere(self.map['fill'][i]).flatten()
col_means = [self.map['mean'][i,j] for j in filled_cols_idx]
ranked_cols_idx = filled_cols_idx[np.argsort(col_means)]
@ -77,7 +77,7 @@ class Table:
self.map['rank'][i, ranked_cols_idx] = np.arange(1, len(filled_cols_idx)+1)
def _addcolor(self):
for i in range(self.nrows):
for i in range(self.nbenchmarks):
filled_cols_idx = np.argwhere(self.map['fill'][i]).flatten()
if filled_cols_idx.size==0:
continue
@ -115,8 +115,8 @@ class Table:
def _addttest(self):
if self.ttest is None:
return
self.some_similar = [False]*self.ncols
for i in range(self.nrows):
self.some_similar = [False]*self.nmethods
for i in range(self.nbenchmarks):
filled_cols_idx = np.argwhere(self.map['fill'][i]).flatten()
if len(filled_cols_idx) <= 1:
continue
@ -153,34 +153,34 @@ class Table:
self.modif = False
def _is_column_full(self, col):
return all(self.map['fill'][:, self.col_index[col]])
return all(self.map['fill'][:, self.method_index[col]])
def _addave(self):
ave = Table(['ave'], self.cols, lower_is_better=self.lower_is_better, ttest=self.ttest, average=False,
ave = Table(['ave'], self.methods, lower_is_better=self.lower_is_better, ttest=self.ttest, average=False,
missing=self.missing, missing_str=self.missing_str)
for col in self.cols:
for col in self.methods:
values = None
if self._is_column_full(col):
if self.ttest == 'ttest':
values = np.asarray(self.map['mean'][:, self.col_index[col]])
values = np.asarray(self.map['mean'][:, self.method_index[col]])
else: # wilcoxon
values = np.concatenate(self.values[:, self.col_index[col]])
values = np.concatenate(self.values[:, self.method_index[col]])
ave.add('ave', col, values)
self.average = ave
def add(self, row, col, values):
def add(self, benchmark, method, values):
if values is not None:
values = np.asarray(values)
if values.ndim==0:
values = values.flatten()
rid, cid = self._coordinates(row, col)
rid, cid = self._coordinates(benchmark, method)
self.map['values'][rid, cid] = values
self.touch()
def get(self, row, col, attr='mean'):
def get(self, benchmark, method, attr='mean'):
self.update()
assert attr in self.map, f'unknwon attribute {attr}'
rid, cid = self._coordinates(row, col)
rid, cid = self._coordinates(benchmark, method)
if self.map['fill'][rid, cid]:
v = self.map[attr][rid, cid]
if v is None or (isinstance(v,float) and np.isnan(v)):
@ -190,27 +190,27 @@ class Table:
return self.missing
def _coordinates(self, row, col):
assert row in self.row_index, f'row {row} out of range'
assert col in self.col_index, f'col {col} out of range'
rid = self.row_index[row]
cid = self.col_index[col]
assert row in self.benchmark_index, f'row {row} out of range'
assert col in self.method_index, f'col {col} out of range'
rid = self.benchmark_index[row]
cid = self.method_index[col]
return rid, cid
def get_average(self, col, attr='mean'):
def get_average(self, method, attr='mean'):
self.update()
if self.add_average:
return self.average.get('ave', col, attr=attr)
return self.average.get('ave', method, attr=attr)
return None
def get_color(self, row, col):
color = self.get(row, col, attr='color')
def get_color(self, benchmark, method):
color = self.get(benchmark, method, attr='color')
if color is None:
return ''
return color
def latex(self, row, col):
def latex(self, benchmark, method):
self.update()
i,j = self._coordinates(row, col)
i,j = self._coordinates(benchmark, method)
if self.map['fill'][i,j] == False:
return self.missing_str
@ -249,12 +249,12 @@ class Table:
return l
def latexTabular(self, rowreplace={}, colreplace={}, average=True):
def latexTabular(self, benchmark_replace={}, method_replace={}, average=True):
tab = ' & '
tab += ' & '.join([colreplace.get(col, col) for col in self.cols])
tab += ' & '.join([method_replace.get(col, col) for col in self.methods])
tab += ' \\\\\hline\n'
for row in self.rows:
rowname = rowreplace.get(row, row)
for row in self.benchmarks:
rowname = benchmark_replace.get(row, row)
tab += rowname + ' & '
tab += self.latexRow(row)
@ -264,8 +264,8 @@ class Table:
tab += self.latexAverage()
return tab
def latexRow(self, row, endl='\\\\\hline\n'):
s = [self.latex(row, col) for col in self.cols]
def latexRow(self, benchmark, endl='\\\\\hline\n'):
s = [self.latex(benchmark, col) for col in self.methods]
s = ' & '.join(s)
s += ' ' + endl
return s
@ -275,14 +275,28 @@ class Table:
return self.average.latexRow('ave', endl=endl)
def getRankTable(self):
t = Table(rows=self.rows, cols=self.cols, prec_mean=0, average=True)
t = Table(benchmarks=self.benchmarks, methods=self.methods, prec_mean=0, average=True)
for rid, cid in self._getfilled():
row = self.rows[rid]
col = self.cols[cid]
row = self.benchmarks[rid]
col = self.methods[cid]
t.add(row, col, self.get(row, col, 'rank'))
t.compute()
return t
def dropMethods(self, methods):
drop_index = [self.method_index[m] for m in methods]
new_methods = np.delete(self.methods, drop_index)
new_index = {col:j for j, col in enumerate(new_methods)}
self.map['values'] = self.values[:,np.asarray([self.method_index[m] for m in new_methods], dtype=int)]
self.methods = new_methods
self.method_index = new_index
self.touch()
def pval_interpretation(p_val):
if 0.005 >= p_val:
return 'Diff'

87
TweetSentQuant/util.py Normal file
View File

@ -0,0 +1,87 @@
nice = {
'mae':'AE',
'mrae':'RAE',
'ae':'AE',
'rae':'RAE',
'svmkld': 'SVM(KLD)',
'svmnkld': 'SVM(NKLD)',
'svmq': 'SVM(Q)',
'svmae': 'SVM(AE)',
'svmnae': 'SVM(NAE)',
'svmmae': 'SVM(AE)',
'svmmrae': 'SVM(RAE)',
'quanet': 'QuaNet',
'hdy': 'HDy',
'dys': 'DyS',
'epaccmaeptr': 'E(PACC)$_\mathrm{Ptr}$',
'epaccmaemae': 'E(PACC)$_\mathrm{AE}$',
'epaccmraeptr': 'E(PACC)$_\mathrm{Ptr}$',
'epaccmraemrae': 'E(PACC)$_\mathrm{RAE}$',
'svmperf':'',
'sanders': 'Sanders',
'semeval13': 'SemEval13',
'semeval14': 'SemEval14',
'semeval15': 'SemEval15',
'semeval16': 'SemEval16',
'Average': 'Average'
}
def nicerm(key):
return '\mathrm{'+nice[key]+'}'
def nicename(method, eval_name=None, side=False):
m = nice.get(method, method.upper())
if eval_name is not None:
o = '$^{' + nicerm(eval_name) + '}$'
m = (m+o).replace('$$','')
if side:
m = '\side{'+m+'}'
return m
def load_Gao_Sebastiani_previous_results():
def rename(method):
old2new = {
'kld': 'svmkld',
'nkld': 'svmnkld',
'qbeta2': 'svmq',
'em': 'sld'
}
return old2new.get(method, method)
gao_seb_results = {}
with open('./Gao_Sebastiani_results.txt', 'rt') as fin:
lines = fin.readlines()
for line in lines[1:]:
line = line.strip()
parts = line.lower().split()
if len(parts) == 4:
dataset, method, ae, rae = parts
else:
method, ae, rae = parts
learner, method = method.split('-')
method = rename(method)
gao_seb_results[f'{dataset}-{method}-ae'] = float(ae)
gao_seb_results[f'{dataset}-{method}-rae'] = float(rae)
return gao_seb_results
def get_ranks_from_Gao_Sebastiani():
gao_seb_results = load_Gao_Sebastiani_previous_results()
datasets = set([key.split('-')[0] for key in gao_seb_results.keys()])
methods = np.sort(np.unique([key.split('-')[1] for key in gao_seb_results.keys()]))
ranks = {}
for metric in ['ae', 'rae']:
for dataset in datasets:
scores = [gao_seb_results[f'{dataset}-{method}-{metric}'] for method in methods]
order = np.argsort(scores)
sorted_methods = methods[order]
for i, method in enumerate(sorted_methods):
ranks[f'{dataset}-{method}-{metric}'] = i+1
for method in methods:
rankave = np.mean([ranks[f'{dataset}-{method}-{metric}'] for dataset in datasets])
ranks[f'Average-{method}-{metric}'] = rankave
return ranks, gao_seb_results

View File

@ -5,7 +5,7 @@ from tqdm import tqdm
import quapy as qp
from quapy.data.base import Dataset
from quapy.util import parallelize
from quapy.util import map_parallel
from .base import LabelledCollection
@ -131,9 +131,9 @@ class IndexTransformer:
return self
def transform(self, X, n_jobs=-1):
# given the number of tasks and the number of jobs, generates the slices for the parallel threads
# given the number of tasks and the number of jobs, generates the slices for the parallel processes
assert self.unk != -1, 'transform called before fit'
indexed = parallelize(func=self.index, args=X, n_jobs=n_jobs)
indexed = map_parallel(func=self.index, args=X, n_jobs=n_jobs)
return np.asarray(indexed)
def index(self, documents):

View File

@ -2,6 +2,17 @@ import numpy as np
from sklearn.metrics import f1_score
def from_name(err_name):
assert err_name in ERROR_NAMES, f'unknown error {err_name}'
callable_error = globals()[err_name]
if err_name in QUANTIFICATION_ERROR_SMOOTH_NAMES:
eps = __check_eps()
def bound_callable_error(y_true, y_pred):
return callable_error(y_true, y_pred, eps)
return bound_callable_error
return callable_error
def f1e(y_true, y_pred):
return 1. - f1_score(y_true, y_pred, average='macro')
@ -27,8 +38,8 @@ def se(p, p_hat):
return ((p_hat-p)**2).mean(axis=-1)
def mkld(prevs, prevs_hat):
return kld(prevs, prevs_hat).mean()
def mkld(prevs, prevs_hat, eps=None):
return kld(prevs, prevs_hat, eps).mean()
def kld(p, p_hat, eps=None):
@ -38,8 +49,8 @@ def kld(p, p_hat, eps=None):
return (sp*np.log(sp/sp_hat)).sum(axis=-1)
def mnkld(prevs, prevs_hat):
return nkld(prevs, prevs_hat).mean()
def mnkld(prevs, prevs_hat, eps=None):
return nkld(prevs, prevs_hat, eps).mean()
def nkld(p, p_hat, eps=None):
@ -63,10 +74,10 @@ def smooth(p, eps):
return (p+eps)/(eps*n_classes + 1)
def __check_eps(eps):
import quapy as qp
sample_size = qp.environ['SAMPLE_SIZE']
def __check_eps(eps=None):
if eps is None:
import quapy as qp
sample_size = qp.environ['SAMPLE_SIZE']
if sample_size is None:
raise ValueError('eps was not defined, and qp.environ["SAMPLE_SIZE"] was not set')
else:
@ -76,8 +87,10 @@ def __check_eps(eps):
CLASSIFICATION_ERROR = {f1e, acce}
QUANTIFICATION_ERROR = {mae, mrae, mse, mkld, mnkld}
QUANTIFICATION_ERROR_SMOOTH = {kld, nkld, rae, mkld, mnkld, mrae}
CLASSIFICATION_ERROR_NAMES = {func.__name__ for func in CLASSIFICATION_ERROR}
QUANTIFICATION_ERROR_NAMES = {func.__name__ for func in QUANTIFICATION_ERROR}
QUANTIFICATION_ERROR_SMOOTH_NAMES = {func.__name__ for func in QUANTIFICATION_ERROR_SMOOTH}
ERROR_NAMES = CLASSIFICATION_ERROR_NAMES | QUANTIFICATION_ERROR_NAMES
f1_error = f1e

View File

@ -61,9 +61,10 @@ def artificial_sampling_prediction(
return true_prevalence, estim_prevalence
pbar = tqdm(indexes, desc='[artificial sampling protocol] predicting') if verbose else indexes
results = Parallel(n_jobs=n_jobs)(
delayed(_predict_prevalences)(index) for index in pbar
)
results = qp.util.parallel(_predict_prevalences, pbar, n_jobs=n_jobs)
# results = Parallel(n_jobs=n_jobs)(
# delayed(_predict_prevalences)(index) for index in pbar
# )
true_prevalences, estim_prevalences = zip(*results)
true_prevalences = np.asarray(true_prevalences)
@ -74,16 +75,16 @@ def artificial_sampling_prediction(
def evaluate(model: BaseQuantifier, test_samples:Iterable[LabelledCollection], err:Union[str, Callable], n_jobs:int=-1):
if isinstance(err, str):
err = getattr(qp.error, err)
assert err.__name__ in qp.error.QUANTIFICATION_ERROR_NAMES, \
f'error={err} does not seem to be a quantification error'
scores = Parallel(n_jobs=n_jobs)(
delayed(_delayed_eval)(model, Ti, err) for Ti in test_samples
)
err = qp.error.from_name(err)
scores = qp.util.parallel(_delayed_eval, ((model, Ti, err) for Ti in test_samples), n_jobs=n_jobs)
# scores = Parallel(n_jobs=n_jobs)(
# delayed(_delayed_eval)(model, Ti, err) for Ti in test_samples
# )
return np.mean(scores)
def _delayed_eval(model:BaseQuantifier, test:LabelledCollection, error:Callable):
def _delayed_eval(args):
model, test, error = args
prev_estim = model.quantify(test.instances)
prev_true = test.prevalence()
return error(prev_true, prev_estim)

View File

@ -559,7 +559,7 @@ class OneVsAll(AggregativeQuantifier):
def __parallel(self, func, *args, **kwargs):
return np.asarray(
Parallel(n_jobs=self.n_jobs, backend='threading')(
Parallel(n_jobs=self.n_jobs)(
delayed(func)(c, *args, **kwargs) for c in self.classes
)
)

View File

@ -82,12 +82,21 @@ class Ensemble(BaseQuantifier):
is_static_policy = (self.policy in qp.error.QUANTIFICATION_ERROR_NAMES)
self.ensemble = Parallel(n_jobs=self.n_jobs, backend="threading")(
delayed(_delayed_new_instance)(
self.base_quantifier, data, val_split, prev, posteriors, keep_samples=is_static_policy,
verbose=self.verbose, sample_size=sample_size
) for prev in tqdm(prevs, desc='fitting ensamble')
args = (
(self.base_quantifier, data, val_split, prev, posteriors, is_static_policy, self.verbose, sample_size)
for prev in prevs
)
self.ensemble = qp.util.parallel(
_delayed_new_instance,
tqdm(args, desc='fitting ensamble', total=self.size),
n_jobs=self.n_jobs)
# self.ensemble = Parallel(n_jobs=self.n_jobs)(
# delayed(_delayed_new_instance)(
# self.base_quantifier, data, val_split, prev, posteriors, keep_samples=is_static_policy,
# verbose=self.verbose, sample_size=sample_size
# ) for prev in tqdm(prevs, desc='fitting ensamble')
# )
# static selection policy (the name of a quantification-oriented error function to minimize)
if self.policy in qp.error.QUANTIFICATION_ERROR_NAMES:
@ -97,9 +106,12 @@ class Ensemble(BaseQuantifier):
return self
def quantify(self, instances):
predictions = np.asarray(Parallel(n_jobs=self.n_jobs, backend="threading")(
delayed(_delayed_quantify)(Qi, instances) for Qi in self.ensemble
))
predictions = np.asarray(
qp.util.parallel(_delayed_quantify, ((Qi, instances) for Qi in self.ensemble), n_jobs=self.n_jobs)
)
# predictions = np.asarray(Parallel(n_jobs=self.n_jobs)(
# delayed(_delayed_quantify)(Qi, instances) for Qi in self.ensemble
# ))
if self.policy == 'ptr':
predictions = self.ptr_policy(predictions)
@ -124,7 +136,7 @@ class Ensemble(BaseQuantifier):
For each model in the ensemble, the performance is measured in terms of _error_name_ on the quantification of
the samples used for training the rest of the models in the ensemble.
"""
error = getattr(qp.error, error_name)
error = qp.error.from_name(error_name)
tests = [m[3] for m in self.ensemble]
scores = []
for i, model in enumerate(self.ensemble):
@ -209,14 +221,8 @@ def select_k(elements, order, k):
return [elements[idx] for idx in order[:k]]
def _delayed_new_instance(base_quantifier,
data: LabelledCollection,
val_split: Union[LabelledCollection, float],
prev,
posteriors,
keep_samples,
verbose,
sample_size):
def _delayed_new_instance(args):
base_quantifier, data, val_split, prev, posteriors, keep_samples, verbose, sample_size = args
if verbose:
print(f'\tfit-start for prev {F.strprev(prev)}, sample_size={sample_size}')
model = deepcopy(base_quantifier)
@ -241,7 +247,8 @@ def _delayed_new_instance(base_quantifier,
return (model, tr_prevalence, tr_distribution, sample if keep_samples else None)
def _delayed_quantify(quantifier, instances):
def _delayed_quantify(args):
quantifier, instances = args
return quantifier[0].quantify(instances)
@ -275,13 +282,11 @@ def _instantiate_ensemble(learner, base_quantifier_class, param_grid, optim, par
elif optim in qp.error.CLASSIFICATION_ERROR:
learner = GridSearchCV(learner, param_grid)
base_quantifier = base_quantifier_class(learner)
elif optim in qp.error.QUANTIFICATION_ERROR:
else:
base_quantifier = GridSearchQ(base_quantifier_class(learner),
param_grid=param_grid,
**param_model_sel,
error=optim)
else:
raise ValueError(f'value optim={optim} not understood')
return Ensemble(base_quantifier, **kwargs)
@ -292,9 +297,7 @@ def _check_error(error):
if error in qp.error.QUANTIFICATION_ERROR or error in qp.error.CLASSIFICATION_ERROR:
return error
elif isinstance(error, str):
assert error in qp.error.ERROR_NAMES, \
f'unknown error name; valid ones are {qp.error.ERROR_NAMES}'
return getattr(qp.error, error)
return qp.error.from_name(error)
else:
raise ValueError(f'unexpected error type; must either be a callable function or a str representing\n'
f'the name of an error function in {qp.error.ERROR_NAMES}')

View File

@ -113,9 +113,9 @@ class GridSearchQ(BaseQuantifier):
if error in qp.error.QUANTIFICATION_ERROR:
self.error = error
elif isinstance(error, str):
assert error in qp.error.QUANTIFICATION_ERROR_NAMES, \
f'unknown error name; valid ones are {qp.error.QUANTIFICATION_ERROR_NAMES}'
self.error = getattr(qp.error, error)
self.error = qp.error.from_name(error)
elif hasattr(error, '__call__'):
self.error = error
else:
raise ValueError(f'unexpected error type; must either be a callable function or a str representing\n'
f'the name of an error function in {qp.error.QUANTIFICATION_ERROR_NAMES}')

View File

@ -10,12 +10,14 @@ plt.rcParams['figure.figsize'] = [12, 8]
plt.rcParams['figure.dpi'] = 200
def binary_diagonal(method_names, true_prevs, estim_prevs, pos_class=1, title=None, savepath=None):
def binary_diagonal(method_names, true_prevs, estim_prevs, pos_class=1, title=None, show_std=True, legend=True, savepath=None):
fig, ax = plt.subplots()
ax.set_aspect('equal')
ax.grid()
ax.plot([0, 1], [0, 1], '--k', label='ideal', zorder=1)
method_names, true_prevs, estim_prevs = _merge(method_names, true_prevs, estim_prevs)
for method, true_prev, estim_prev in zip(method_names, true_prevs, estim_prevs):
true_prev = true_prev[:,pos_class]
estim_prev = estim_prev[:,pos_class]
@ -26,15 +28,17 @@ def binary_diagonal(method_names, true_prevs, estim_prevs, pos_class=1, title=No
y_std = np.asarray([estim_prev[true_prev == x].std() for x in x_ticks])
ax.errorbar(x_ticks, y_ave, fmt='-', marker='o', label=method, markersize=3, zorder=2)
ax.fill_between(x_ticks, y_ave - y_std, y_ave + y_std, alpha=0.25)
if show_std:
ax.fill_between(x_ticks, y_ave - y_std, y_ave + y_std, alpha=0.25)
ax.set(xlabel='true prevalence', ylabel='estimated prevalence', title=title)
ax.set_ylim(0, 1)
ax.set_xlim(0, 1)
box = ax.get_position()
ax.set_position([box.x0, box.y0, box.width * 0.8, box.height])
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))
if legend:
box = ax.get_position()
ax.set_position([box.x0, box.y0, box.width * 0.8, box.height])
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))
save_or_show(savepath)
@ -43,6 +47,8 @@ def binary_bias_global(method_names, true_prevs, estim_prevs, pos_class=1, title
fig, ax = plt.subplots()
ax.grid()
method_names, true_prevs, estim_prevs = _merge(method_names, true_prevs, estim_prevs)
data, labels = [], []
for method, true_prev, estim_prev in zip(method_names, true_prevs, estim_prevs):
true_prev = true_prev[:,pos_class]
@ -51,18 +57,21 @@ def binary_bias_global(method_names, true_prevs, estim_prevs, pos_class=1, title
labels.append(method)
ax.boxplot(data, labels=labels, patch_artist=False, showmeans=True)
plt.xticks(rotation=45)
ax.set(ylabel='error bias', title=title)
save_or_show(savepath)
def binary_bias_bins(method_names, true_prevs, estim_prevs, pos_class=1, title=None, nbins=5, colormap=cm.tab10,
vertical_xticks=False, savepath=None):
vertical_xticks=False, legend=True, savepath=None):
from pylab import boxplot, plot, setp
fig, ax = plt.subplots()
ax.grid()
method_names, true_prevs, estim_prevs = _merge(method_names, true_prevs, estim_prevs)
bins = np.linspace(0, 1, nbins+1)
binwidth = 1/nbins
data = {}
@ -77,13 +86,13 @@ def binary_bias_bins(method_names, true_prevs, estim_prevs, pos_class=1, title=N
data[method].append(estim_prev[selected] - true_prev[selected])
nmethods = len(method_names)
boxwidth = binwidth/(nmethods+1)
boxwidth = binwidth/(nmethods+4)
for i,bin in enumerate(bins[:-1]):
boxdata = [data[method][i] for method in method_names]
positions = [bin+(i*boxwidth)+boxwidth for i,_ in enumerate(method_names)]
positions = [bin+(i*boxwidth)+2*boxwidth for i,_ in enumerate(method_names)]
box = boxplot(boxdata, showmeans=False, positions=positions, widths = boxwidth, sym='+', patch_artist=True)
for boxid in range(len(method_names)):
c = colormap.colors[boxid]
c = colormap.colors[boxid%len(colormap.colors)]
setp(box['fliers'][boxid], color=c, marker='+', markersize=3., markeredgecolor=c)
setp(box['boxes'][boxid], color=c)
setp(box['medians'][boxid], color='k')
@ -106,18 +115,19 @@ def binary_bias_bins(method_names, true_prevs, estim_prevs, pos_class=1, title=N
# Tweak spacing to prevent clipping of tick-labels
plt.subplots_adjust(bottom=0.15)
# adds the legend to the list hs, initialized with the "ideal" quantifier (one that has 0 bias across all bins. i.e.
# a line from (0,0) to (1,0). The other elements are simply labelled dot-plots that are to be removed (setting
# set_visible to False for all but the first element) after the legend has been placed
hs=[ax.plot([0, 1], [0, 0], '-k', zorder=2)[0]]
for colorid in range(len(method_names)):
h, = plot([0, 0], '-s', markerfacecolor=colormap.colors[colorid], color='k',
mec=colormap.colors[colorid], linewidth=1.)
hs.append(h)
box = ax.get_position()
ax.set_position([box.x0, box.y0, box.width * 0.8, box.height])
ax.legend(hs, ['ideal']+method_names, loc='center left', bbox_to_anchor=(1, 0.5))
[h.set_visible(False) for h in hs[1:]]
if legend:
# adds the legend to the list hs, initialized with the "ideal" quantifier (one that has 0 bias across all bins. i.e.
# a line from (0,0) to (1,0). The other elements are simply labelled dot-plots that are to be removed (setting
# set_visible to False for all but the first element) after the legend has been placed
hs=[ax.plot([0, 1], [0, 0], '-k', zorder=2)[0]]
for colorid in range(len(method_names)):
color=colormap.colors[colorid % len(colormap.colors)]
h, = plot([0, 0], '-s', markerfacecolor=color, color='k',mec=color, linewidth=1.)
hs.append(h)
box = ax.get_position()
ax.set_position([box.x0, box.y0, box.width * 0.8, box.height])
ax.legend(hs, ['ideal']+method_names, loc='center left', bbox_to_anchor=(1, 0.5))
[h.set_visible(False) for h in hs[1:]]
# x-axis and y-axis labels and limits
ax.set(xlabel='prevalence', ylabel='error bias', title=title)
@ -127,9 +137,24 @@ def binary_bias_bins(method_names, true_prevs, estim_prevs, pos_class=1, title=N
save_or_show(savepath)
def _merge(method_names, true_prevs, estim_prevs):
ndims = true_prevs[0].shape[1]
data = defaultdict(lambda: {'true': np.empty(shape=(0, ndims)), 'estim': np.empty(shape=(0, ndims))})
method_order=[]
for method, true_prev, estim_prev in zip(method_names, true_prevs, estim_prevs):
data[method]['true'] = np.concatenate([data[method]['true'], true_prev])
data[method]['estim'] = np.concatenate([data[method]['estim'], estim_prev])
if method not in method_order:
method_order.append(method)
true_prevs_ = [data[m]['true'] for m in method_order]
estim_prevs_ = [data[m]['estim'] for m in method_order]
return method_order, true_prevs_, estim_prevs_
def error_by_drift(method_names, true_prevs, estim_prevs, tr_prevs, n_bins=20, error_name='ae', show_std=True,
title=f'Quantification error as a function of distribution shift',
savepath=None):
logscale=False,
title=f'Quantification error as a function of distribution shift',
savepath=None):
fig, ax = plt.subplots()
ax.grid()
@ -158,6 +183,8 @@ def error_by_drift(method_names, true_prevs, estim_prevs, tr_prevs, n_bins=20, e
for method in method_order:
tr_test_drifts = data[method]['x']
method_drifts = data[method]['y']
if logscale:
method_drifts=np.log(1+method_drifts)
inds = np.digitize(tr_test_drifts, bins, right=True)
xs, ys, ystds = [], [], []
@ -196,7 +223,7 @@ def save_or_show(savepath):
if savepath is not None:
qp.util.create_parent_dir(savepath)
# plt.tight_layout()
plt.savefig(savepath)
plt.savefig(savepath, bbox_inches='tight')
else:
plt.show()

View File

@ -5,6 +5,7 @@ import os
import pickle
import urllib
from pathlib import Path
import quapy as qp
import numpy as np
from joblib import Parallel, delayed
@ -19,7 +20,11 @@ def get_parallel_slices(n_tasks, n_jobs=-1):
range(n_jobs)]
def parallelize(func, args, n_jobs):
def map_parallel(func, args, n_jobs):
"""
Applies func to n_jobs slices of args. E.g., if args is an array of 99 items and n_jobs=2, then
func is applied in two parallel processes to args[0:50] and to args[50:99]
"""
args = np.asarray(args)
slices = get_parallel_slices(len(args), n_jobs)
results = Parallel(n_jobs=n_jobs)(
@ -28,6 +33,23 @@ def parallelize(func, args, n_jobs):
return list(itertools.chain.from_iterable(results))
def parallel(func, args, n_jobs):
"""
A wrapper of multiprocessing:
Parallel(n_jobs=n_jobs)(
delayed(func)(args_i) for args_i in args
)
that takes the quapy.environ variable as input silently
"""
def func_dec(environ, *args):
qp.environ = environ
return func(*args)
return Parallel(n_jobs=n_jobs)(
delayed(func_dec)(qp.environ, args_i) for args_i in args
)
@contextlib.contextmanager
def temp_seed(seed):
state = np.random.get_state()

38
test.py
View File

@ -9,10 +9,11 @@ import numpy as np
from NewMethods.methods import AveragePoolQuantification
from classification.methods import PCALR
from classification.neural import NeuralClassifierTrainer, CNNnet
from method.meta import EPACC
from quapy.model_selection import GridSearchQ
dataset = qp.datasets.fetch_UCIDataset('sonar', verbose=True)
sys.exit(0)
# dataset = qp.datasets.fetch_UCIDataset('sonar', verbose=True)
# sys.exit(0)
qp.environ['SAMPLE_SIZE'] = 500
@ -56,15 +57,28 @@ print(f'dataset loaded: #training={len(dataset.training)} #test={len(dataset.tes
#learner = LogisticRegression(max_iter=1000)
# model = qp.method.aggregative.ClassifyAndCount(learner)
learner = LogisticRegression(max_iter=1000)
#model = qp.method.aggregative.PACC(learner)
#model = qp.method.aggregative.ACC(learner)
model = qp.method.meta.EPACC(learner, size=10, red_size=5, max_sample_size=500, n_jobs=-1,
param_grid={'C':[1,10,100]},
optim='mae', param_mod_sel={'sample_size':100, 'n_prevpoints':21, 'n_repetitions':5, 'verbose':True},
policy='ptr',
val_split=0.4)
"""
param_mod_sel = {
'sample_size': 100,
'n_prevpoints': 21,
'n_repetitions': 5,
'verbose': False
}
common = {
'max_sample_size': 50,
'n_jobs': -1,
'param_grid': {'C': np.logspace(0,2,2), 'class_weight': ['balanced']},
'param_mod_sel': param_mod_sel,
'val_split': 0.4,
'min_pos': 10,
'size':6,
'red_size':3
}
# hyperparameters will be evaluated within each quantifier of the ensemble, and so the typical model selection
# will be skipped (by setting hyperparameters to None)
model = EPACC(LogisticRegression(max_iter=100), optim='mrae', policy='mrae', **common)
"""
Problemas:
- La interfaz es muy fea, hay que conocer practicamente todos los detalles así que no ahorra nada con respecto a crear
un objeto con otros anidados dentro
@ -108,6 +122,8 @@ if qp.isbinary(model) and not qp.isbinary(dataset):
print(f'fitting model {model.__class__.__name__}')
#train, val = dataset.training.split_stratified(0.6)
#model.fit(train, val_split=val)
qp.SAMPLE=1
qp.environ['SAMPLE_SIZE']=2
model.fit(dataset.training)