1
0
Fork 0
QuaPy/quapy/benchmarking/_base.py

359 lines
12 KiB
Python

import itertools
import os
from copy import deepcopy
from os.path import join
from dataclasses import dataclass
from typing import List, Union, Callable
from abc import ABC, abstractmethod
import numpy as np
import pandas as pd
import pickle
from sklearn.linear_model import LogisticRegression
import quapy as qp
from quapy.data import LabelledCollection
from quapy.method.aggregative import PACC
from quapy.protocol import APP, UPP, AbstractProtocol
from quapy.model_selection import GridSearchQ
from quapy.method.base import BaseQuantifier
from result_table.src.table import Table
def makedirs(dir):
print('creating ', dir)
os.makedirs(dir, exist_ok=True)
@dataclass
class MethodDescriptor:
id: str
name: str
instance: BaseQuantifier
hyperparams: dict
class Benchmark(ABC):
ID_SEPARATOR = '__' # used to separate components in a run-ID, cannot be used within the component IDs
def __init__(self, home_dir, n_jobs=3):
self.home_dir = home_dir
self.n_jobs = n_jobs
assert n_jobs!=-1, ('Setting n_jobs=-1 will probably blow your memory. '
'Specify a positive number.')
makedirs(home_dir)
makedirs(join(home_dir, 'results'))
makedirs(join(home_dir, 'params'))
makedirs(join(home_dir, 'tables'))
makedirs(join(home_dir, 'plots'))
self.train_prevalence = {}
def _run_id(self, method: MethodDescriptor, dataset: str):
sep = Benchmark.ID_SEPARATOR
assert sep not in method.id, \
(f'separator {sep} cannot be used in method ID ({method.id}), '
f'please change the method ID or redefine {Benchmark.ID_SEPARATOR=}')
assert sep not in dataset, \
(f'separator {sep} cannot be used in dataset name ({dataset}), '
f'please redefine {Benchmark.ID_SEPARATOR=}')
return sep.join([method.id, dataset])
def _result_path(self, method: MethodDescriptor, dataset: str):
id = self._run_id(method, dataset)
return join(self.home_dir, 'results', id + '.pkl')
def _params_path(self, method: MethodDescriptor, dataset: str):
id = self._run_id(method, dataset)
chosen = join(self.home_dir, 'params', id + 'chosen.pkl')
scores = join(self.home_dir, 'params', id + 'scores.pkl')
return chosen, scores
def _exist_run(self, method: MethodDescriptor, dataset: str):
return os.path.exists(self._result_path(method, dataset))
def _open_method_dataset_result(self, method: MethodDescriptor, dataset: str):
if not self._exist_run(method, dataset):
raise ValueError(f'cannot open result for method={method.id} and {dataset=}')
def check_dataset(self, dataset:str):
assert dataset in self.list_datasets(), f'unknown dataset {dataset}'
@abstractmethod
def list_datasets(self)-> List[str]:
...
@abstractmethod
def run_method_dataset(self, method: MethodDescriptor, dataset:str, random_state=0)-> pd.DataFrame:
...
def gen_tables(self, results, metrics=None):
if metrics is None:
metrics = ['mae', 'mrae', 'mkld', 'mnkld']
tables = {}
for (method, dataset, result) in results:
col_metrics = result.columns.values[2:]
for metric in metrics:
if metric not in col_metrics:
print(f'error; requested {metric=} not found among the columns in the dataframe')
continue
if metric not in tables:
tables[metric] = Table(name=metric)
table = tables[metric]
table.add(dataset, method.name, result[metric].values)
Table.LatexPDF(join(self.home_dir, 'tables', 'results.pdf'), list(tables.values()))
def gen_plots(self, results, metrics=None):
import matplotlib.pyplot as plt
plt.rcParams.update({'font.size': 11})
if metrics is None:
metrics = ['ae']
for metric in metrics:
method_names, true_prevs, estim_prevs, train_prevs = [], [], [], []
skip=False
for (method, dataset, result) in results:
method_names.append(method.name)
true_prevs.append(np.vstack(result['true-prev'].values))
estim_prevs.append(np.vstack(result['estim-prev'].values))
train_prevs.append(self.get_training_prevalence(dataset))
if not skip:
path = join(self.home_dir, 'plots', f'err_by_drift_{metric}.pdf')
qp.plot.error_by_drift(method_names, true_prevs, estim_prevs, train_prevs, error_name=metric, n_bins=20, savepath=path)
def _show_report(self, method, dataset, report: pd.DataFrame):
id = method.id
MAE = report['mae'].mean()
mae_std = report['mae'].std()
MRAE = report['mrae'].mean()
mrae_std = report['mrae'].std()
print(f'{id}\t{dataset}:\t{MAE=:.4f}+-{mae_std:.4f}\t{MRAE=:.4f}+-{mrae_std:.4f}')
def run(self,
methods: Union[List[MethodDescriptor], MethodDescriptor],
datasets:Union[List[str],str]=None,
force=False):
if not isinstance(methods, list):
methods = [methods]
if datasets is None:
datasets = self.list_datasets()
elif not isinstance(datasets, list):
datasets = [datasets]
results = []
pending_job_args = []
for method, dataset in itertools.product(methods, datasets):
self.check_dataset(dataset)
if not force and self._exist_run(method, dataset):
result = pd.read_pickle(self._result_path(method, dataset))
results.append((method, dataset, result))
else:
pending_job_args.append((method, dataset))
if len(pending_job_args)>0:
remaining_results = qp.util.parallel_unpack(
func=self.run_method_dataset,
args=pending_job_args,
n_jobs=self.n_jobs,
seed=0,
asarray=False
)
results += [
(method, dataset, result) for (method, dataset), result in zip(pending_job_args, remaining_results)
]
# print results
for method, dataset, result in results:
self._show_report(method, dataset, result)
self.gen_tables(results)
self.gen_plots(results)
@abstractmethod
def get_training_prevalence(self, dataset: str):
...
def __add__(self, other: 'Benchmark'):
return CombinedBenchmark(self, other, self.n_jobs)
class CombinedBenchmark(Benchmark):
def __init__(self, benchmark_a:Benchmark, benchmark_b:Benchmark, n_jobs=-1):
self.router = {
**{dataset: benchmark_a for dataset in benchmark_a.list_datasets()},
**{dataset: benchmark_b for dataset in benchmark_b.list_datasets()}
}
self.datasets = benchmark_a.list_datasets() + benchmark_b.list_datasets()
self.n_jobs = n_jobs
def list_datasets(self) -> List[str]:
return self.datasets
def run_method_dataset(self, method: MethodDescriptor, dataset:str, random_state=0) -> pd.DataFrame:
return self.router[dataset].run_method_dataset(method, dataset, random_state)
def _exist_run(self, method: MethodDescriptor, dataset: str):
return self.router[dataset]._exist_run(method, dataset)
class TypicalBenchmark(Benchmark):
# def __init__(self, home_dir, ):
@abstractmethod
def get_sample_size(self)-> int:
...
@abstractmethod
def get_training(self, dataset:str)-> LabelledCollection:
...
@abstractmethod
def get_trModsel_valprotModsel_trEval_teprotEval(self, dataset:str)->\
(LabelledCollection, AbstractProtocol, LabelledCollection, AbstractProtocol):
...
@abstractmethod
def get_target_error_for_modsel(self)-> Union[str, Callable]:
...
def run_method_dataset(self, method: MethodDescriptor, dataset: str, random_state=0) -> pd.DataFrame:
print(f'Running method={method.id} in {dataset=}')
sample_size = self.get_sample_size()
qp.environ['SAMPLE_SIZE'] = sample_size
q = deepcopy(method.instance)
optim_for = self.get_target_error_for_modsel()
with qp.util.temp_seed(random_state):
# data split
trModSel, valprotModSel, trEval, teprotEval = self.get_trModsel_valprotModsel_trEval_teprotEval(dataset)
self.train_prevalence[dataset] = trEval.prevalence()
# model selection
modsel = GridSearchQ(
model=q,
param_grid=method.hyperparams,
protocol=valprotModSel,
error=optim_for,
refit=False,
n_jobs=-1,
raise_errors=True,
verbose=True
).fit(trModSel)
# fit on the whole training data
optimized_model = modsel.best_model_
optimized_model.fit(trEval)
# evaluation
report = qp.evaluation.evaluation_report(
model=optimized_model,
protocol=teprotEval,
error_metrics=qp.error.QUANTIFICATION_ERROR_NAMES
)
# data persistence
chosen_path, scores_path = self._params_path(method, dataset)
pickle.dump(modsel.best_params_, open(chosen_path, 'wb'), pickle.HIGHEST_PROTOCOL)
pickle.dump(modsel.param_scores_, open(scores_path, 'wb'), pickle.HIGHEST_PROTOCOL)
result_path = self._result_path(method, dataset)
report.to_pickle(result_path)
return report
def get_training_prevalence(self, dataset: str):
if not dataset in self.train_prevalence:
training = self.get_training(dataset)
self.train_prevalence[dataset] = training.prevalence()
return self.train_prevalence[dataset]
class UCIBinaryBenchmark(TypicalBenchmark):
def get_trModsel_valprotModsel_trEval_teprotEval(self, dataset: str) -> \
(LabelledCollection, LabelledCollection, LabelledCollection, LabelledCollection):
data = qp.datasets.fetch_UCIBinaryDataset(dataset)
trEval, teEval = data.train_test
trModsel, vaModsel = trEval.split_stratified()
valprotModsel = APP(vaModsel, n_prevalences=21, repeats=25)
testprotModsel = APP(teEval, n_prevalences=21, repeats=100)
return trModsel, valprotModsel, trEval, testprotModsel
def get_training(self, dataset:str) -> LabelledCollection:
return qp.datasets.fetch_UCIBinaryDataset(dataset).training
def get_sample_size(self) -> int:
return 100
def get_target_error_for_modsel(self) -> Union[str, Callable]:
return 'mae'
def list_datasets(self)->List[str]:
ignore = ['acute.a', 'acute.b', 'balance.2']
return [d for d in qp.datasets.UCI_BINARY_DATASETS if d not in ignore]
class UCIMultiBenchmark(TypicalBenchmark):
def list_datasets(self) -> List[str]:
return qp.datasets.UCI_MULTICLASS_DATASETS
def get_trModsel_valprotModsel_trEval_teprotEval(self, dataset: str) -> \
(LabelledCollection, LabelledCollection, LabelledCollection, LabelledCollection):
data = qp.datasets.fetch_UCIMulticlassDataset(dataset)
trEval, teEval = data.train_test
trModsel, vaModsel = trEval.split_stratified()
valprotModsel = UPP(vaModsel, repeats=250)
testprotModsel = UPP(teEval, repeats=1000)
return trModsel, valprotModsel, trEval, testprotModsel
def get_training(self, dataset:str) -> LabelledCollection:
return qp.datasets.fetch_UCIMulticlassDataset(dataset).training
def get_sample_size(self) -> int:
return 500
def get_target_error_for_modsel(self) -> Union[str, Callable]:
return 'mae'
if __name__ == '__main__':
from quapy.benchmarking.typical import *
# from quapy.method.aggregative import BayesianCC
# bayes = MethodDescriptor(
# id='Bayesian',
# name='Bayesian(LR)',
# instance=BayesianCC(LogisticRegression()),
# hyperparams=wrap_cls_params(lr_hyper)
# )
# bench_bin = UCIBinaryBenchmark('../../Benchmarks/UCIbinary')
bench_multi = UCIMultiBenchmark('../../Benchmarks/UCIMulti')
# bench = bench_bin + bench_multi
bench = bench_multi
bench.run(methods=[cc, pcc, acc, pacc, sld, sld_bcts])