Compare commits

...

10 Commits

11 changed files with 964 additions and 382 deletions

View File

@ -2,7 +2,7 @@ import quapy as qp
from quapy.data import LabelledCollection
from quapy.method.base import BinaryQuantifier
from quapy.model_selection import GridSearchQ
from quapy.method.aggregative import AggregativeProbabilisticQuantifier
from quapy.method.aggregative import AggregativeSoftQuantifier
from quapy.protocol import APP
import numpy as np
from sklearn.linear_model import LogisticRegression
@ -15,7 +15,7 @@ from sklearn.linear_model import LogisticRegression
# internal hyperparameter (let say, alpha) which is the decision threshold. Let's also assume the quantifier
# is binary, for simplicity.
class MyQuantifier(AggregativeProbabilisticQuantifier, BinaryQuantifier):
class MyQuantifier(AggregativeSoftQuantifier, BinaryQuantifier):
def __init__(self, classifier, alpha=0.5):
self.alpha = alpha
# aggregative quantifiers have an internal self.classifier attribute

View File

@ -1,8 +1,11 @@
import quapy as qp
from quapy.method.non_aggregative import DMx
from quapy.protocol import APP
from quapy.method.aggregative import DMy
from sklearn.linear_model import LogisticRegression
from examples.comparing_gridsearch import OLD_GridSearchQ
import numpy as np
from time import time
"""
In this example, we show how to perform model selection on a DistributionMatching quantifier.
@ -15,6 +18,8 @@ qp.environ['N_JOBS'] = -1
training, test = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=5).train_test
with qp.util.temp_seed(0):
# The model will be returned by the fit method of GridSearchQ.
# Every combination of hyper-parameters will be evaluated by confronting the
# quantifier thus configured against a series of samples generated by means
@ -33,18 +38,25 @@ protocol = APP(validation)
# classifier.
param_grid = {
'classifier__C': np.logspace(-3,3,7),
'nbins': [8, 16, 32, 64],
'classifier__class_weight': ['balanced', None],
'nbins': [8, 16, 32, 64, 'poooo'],
}
tinit = time()
# model = OLD_GridSearchQ(
model = qp.model_selection.GridSearchQ(
model=model,
param_grid=param_grid,
protocol=protocol,
error='mae', # the error to optimize is the MAE (a quantification-oriented loss)
refit=True, # retrain on the whole labelled set once done
refit=False, # retrain on the whole labelled set once done
verbose=True # show information as the process goes on
).fit(training)
tend = time()
print(f'model selection ended: best hyper-parameters={model.best_params_}')
model = model.best_model_
@ -53,5 +65,5 @@ model = model.best_model_
mae_score = qp.evaluation.evaluate(model, protocol=APP(test), error_metric='mae')
print(f'MAE={mae_score:.5f}')
print(f'model selection took {tend-tinit}s')

View File

@ -24,7 +24,8 @@ class RecalibratedProbabilisticClassifier:
class RecalibratedProbabilisticClassifierBase(BaseEstimator, RecalibratedProbabilisticClassifier):
"""
Applies a (re)calibration method from `abstention.calibration`, as defined in
`Alexandari et al. paper <http://proceedings.mlr.press/v119/alexandari20a.html>`_:
`Alexandari et al. paper <http://proceedings.mlr.press/v119/alexandari20a.html>`_.
:param classifier: a scikit-learn probabilistic classifier
:param calibrator: the calibration object (an instance of abstention.calibration.CalibratorFactory)
@ -59,7 +60,7 @@ class RecalibratedProbabilisticClassifierBase(BaseEstimator, RecalibratedProbabi
elif isinstance(k, float):
if not (0 < k < 1):
raise ValueError('wrong value for val_split: the proportion of validation documents must be in (0,1)')
return self.fit_cv(X, y)
return self.fit_tr_val(X, y)
def fit_cv(self, X, y):
"""
@ -94,7 +95,7 @@ class RecalibratedProbabilisticClassifierBase(BaseEstimator, RecalibratedProbabi
self.classifier.fit(Xtr, ytr)
posteriors = self.classifier.predict_proba(Xva)
nclasses = len(np.unique(yva))
self.calibrator = self.calibrator(posteriors, np.eye(nclasses)[yva], posterior_supplied=True)
self.calibration_function = self.calibrator(posteriors, np.eye(nclasses)[yva], posterior_supplied=True)
return self
def predict(self, X):

File diff suppressed because it is too large Load Diff

View File

@ -12,7 +12,7 @@ from quapy import functional as F
from quapy.data import LabelledCollection
from quapy.model_selection import GridSearchQ
from quapy.method.base import BaseQuantifier, BinaryQuantifier
from quapy.method.aggregative import CC, ACC, PACC, HDy, EMQ
from quapy.method.aggregative import CC, ACC, PACC, HDy, EMQ, AggregativeQuantifier
try:
from . import neural
@ -26,6 +26,65 @@ else:
QuaNet = "QuaNet is not available due to missing torch package"
class MedianEstimator2(BinaryQuantifier):
"""
This method is a meta-quantifier that returns, as the estimated class prevalence values, the median of the
estimation returned by differently (hyper)parameterized base quantifiers.
The median of unit-vectors is only guaranteed to be a unit-vector for n=2 dimensions,
i.e., in cases of binary quantification.
:param base_quantifier: the base, binary quantifier
:param random_state: a seed to be set before fitting any base quantifier (default None)
:param param_grid: the grid or parameters towards which the median will be computed
:param n_jobs: number of parllel workes
"""
def __init__(self, base_quantifier: BinaryQuantifier, param_grid: dict, random_state=None, n_jobs=None):
self.base_quantifier = base_quantifier
self.param_grid = param_grid
self.random_state = random_state
self.n_jobs = qp._get_njobs(n_jobs)
def get_params(self, deep=True):
return self.base_quantifier.get_params(deep)
def set_params(self, **params):
self.base_quantifier.set_params(**params)
def _delayed_fit(self, args):
with qp.util.temp_seed(self.random_state):
params, training = args
model = deepcopy(self.base_quantifier)
model.set_params(**params)
model.fit(training)
return model
def fit(self, training: LabelledCollection):
self._check_binary(training, self.__class__.__name__)
configs = qp.model_selection.expand_grid(self.param_grid)
self.models = qp.util.parallel(
self._delayed_fit,
((params, training) for params in configs),
seed=qp.environ.get('_R_SEED', None),
n_jobs=self.n_jobs
)
return self
def _delayed_predict(self, args):
model, instances = args
return model.quantify(instances)
def quantify(self, instances):
prev_preds = qp.util.parallel(
self._delayed_predict,
((model, instances) for model in self.models),
seed=qp.environ.get('_R_SEED', None),
n_jobs=self.n_jobs
)
prev_preds = np.asarray(prev_preds)
return np.median(prev_preds, axis=0)
class MedianEstimator(BinaryQuantifier):
"""
This method is a meta-quantifier that returns, as the estimated class prevalence values, the median of the
@ -58,16 +117,63 @@ class MedianEstimator(BinaryQuantifier):
model.fit(training)
return model
def _delayed_fit_classifier(self, args):
with qp.util.temp_seed(self.random_state):
print('enter job')
cls_params, training = args
model = deepcopy(self.base_quantifier)
model.set_params(**cls_params)
predictions = model.classifier_fit_predict(training, predict_on=model.val_split)
print('exit job')
return (model, predictions)
def _delayed_fit_aggregation(self, args):
with qp.util.temp_seed(self.random_state):
print('\tenter job')
((model, predictions), q_params), training = args
model = deepcopy(model)
model.set_params(**q_params)
model.aggregation_fit(predictions, training)
print('\texit job')
return model
def fit(self, training: LabelledCollection):
self._check_binary(training, self.__class__.__name__)
params_keys = list(self.param_grid.keys())
params_values = list(self.param_grid.values())
hyper = [dict({k: val[i] for i, k in enumerate(params_keys)}) for val in itertools.product(*params_values)]
if isinstance(self.base_quantifier, AggregativeQuantifier):
cls_configs, q_configs = qp.model_selection.group_params(self.param_grid)
if len(cls_configs) > 1:
models_preds = qp.util.parallel(
self._delayed_fit_classifier,
((params, training) for params in cls_configs),
seed=qp.environ.get('_R_SEED', None),
n_jobs=self.n_jobs,
asarray=False
)
else:
print('only 1')
model = self.base_quantifier
model.set_params(**cls_configs[0])
predictions = model.classifier_fit_predict(training, predict_on=model.val_split)
models_preds = [(model, predictions)]
self.models = qp.util.parallel(
self._delayed_fit_aggregation,
((setup, training) for setup in itertools.product(models_preds, q_configs)),
seed=qp.environ.get('_R_SEED', None),
n_jobs=self.n_jobs,
asarray=False
)
else:
configs = qp.model_selection.expand_grid(self.param_grid)
self.models = qp.util.parallel(
self._delayed_fit,
((params, training) for params in hyper),
((params, training) for params in configs),
seed=qp.environ.get('_R_SEED', None),
n_jobs=self.n_jobs
n_jobs=self.n_jobs,
asarray=False
)
return self
@ -80,13 +186,13 @@ class MedianEstimator(BinaryQuantifier):
self._delayed_predict,
((model, instances) for model in self.models),
seed=qp.environ.get('_R_SEED', None),
n_jobs=self.n_jobs
n_jobs=self.n_jobs,
asarray=False
)
prev_preds = np.asarray(prev_preds)
return np.median(prev_preds, axis=0)
class Ensemble(BaseQuantifier):
VALID_POLICIES = {'ave', 'ptr', 'ds'} | qp.error.QUANTIFICATION_ERROR_NAMES

View File

@ -194,7 +194,7 @@ class QuaNetTrainer(BaseQuantifier):
label_predictions = np.argmax(posteriors, axis=-1)
prevs_estim = []
for quantifier in self.quantifiers.values():
predictions = posteriors if isinstance(quantifier, AggregativeProbabilisticQuantifier) else label_predictions
predictions = posteriors if isinstance(quantifier, AggregativeSoftQuantifier) else label_predictions
prevs_estim.extend(quantifier.aggregate(predictions))
# there is no real need for adding static estims like the TPR or FPR from training since those are constant

View File

@ -1,7 +1,7 @@
from typing import Union, Callable
import numpy as np
from functional import get_divergence
from quapy.functional import get_divergence
from quapy.data import LabelledCollection
from quapy.method.base import BaseQuantifier, BinaryQuantifier
import quapy.functional as F

View File

@ -1,7 +1,9 @@
import itertools
import signal
from copy import deepcopy
from enum import Enum
from typing import Union, Callable
from functools import wraps
import numpy as np
from sklearn import clone
@ -10,10 +12,67 @@ import quapy as qp
from quapy import evaluation
from quapy.protocol import AbstractProtocol, OnLabelledCollectionProtocol
from quapy.data.base import LabelledCollection
from quapy.method.aggregative import BaseQuantifier
from quapy.method.aggregative import BaseQuantifier, AggregativeQuantifier
from quapy.util import timeout
from time import time
class Status(Enum):
SUCCESS = 1
TIMEOUT = 2
INVALID = 3
ERROR = 4
def check_status(func):
@wraps(func)
def wrapper(*args, **kwargs):
obj = args[0]
tinit = time()
job_descriptor = dict(args[1])
params = {**job_descriptor.get('cls-params', {}), **job_descriptor.get('q-params', {})}
if obj.timeout > 0:
def handler(signum, frame):
raise TimeoutError()
signal.signal(signal.SIGALRM, handler)
signal.alarm(obj.timeout)
try:
job_descriptor = func(*args, **kwargs)
ttime = time() - tinit
score = job_descriptor.get('score', None)
if score is not None:
obj._sout(f'hyperparams=[{params}]\t got {obj.error.__name__} = {score:.5f} [took {ttime:.4f}s]')
if obj.timeout > 0:
signal.alarm(0)
exit_status = Status.SUCCESS
except TimeoutError:
obj._sout(f'timeout ({obj.timeout}s) reached for config {params}')
exit_status = Status.TIMEOUT
except ValueError as e:
obj._sout(f'the combination of hyperparameters {params} is invalid')
obj._sout(f'\tException: {e}')
exit_status = Status.INVALID
except Exception as e:
obj._sout(f'something went wrong for config {params}; skipping:')
obj._sout(f'\tException: {e}')
exit_status = Status.ERROR
job_descriptor['status'] = exit_status
job_descriptor['params'] = params
return job_descriptor
return wrapper
class GridSearchQ(BaseQuantifier):
"""Grid Search optimization targeting a quantification-oriented metric.
@ -69,6 +128,113 @@ class GridSearchQ(BaseQuantifier):
raise ValueError(f'unexpected error type; must either be a callable function or a str representing\n'
f'the name of an error function in {qp.error.QUANTIFICATION_ERROR_NAMES}')
def _prepare_classifier(self, args):
cls_params = args['cls-params']
training = args['training']
model = deepcopy(self.model)
model.set_params(**cls_params)
predictions = model.classifier_fit_predict(training)
return {'model': model, 'predictions': predictions, 'cls-params': cls_params}
def _prepare_aggregation(self, args):
model = args['model']
predictions = args['predictions']
cls_params = args['cls-params']
q_params = args['q-params']
training = args['training']
params = {**cls_params, **q_params}
def job(model):
tinit = time()
model = deepcopy(model)
# overrides default parameters with the parameters being explored at this iteration
model.set_params(**q_params)
model.aggregation_fit(predictions, training)
score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error)
ttime = time()-tinit
return {
'model': model,
'cls-params':cls_params,
'q-params': q_params,
'params': params,
'score': score,
'ttime':ttime
}
out, status = self._error_handler(job, args)
if status == Status.SUCCESS:
self._sout(f'hyperparams=[{params}]\t got {self.error.__name__} = {out["score"]:.5f} [took {out["time"]:.4f}s]')
elif status == Status.INVALID:
self._sout(f'the combination of hyperparameters {params} is invalid')
elif status == Status.
def _prepare_model(self, args):
params, training = args
model = deepcopy(self.model)
# overrides default parameters with the parameters being explored at this iteration
model.set_params(**params)
model.fit(training)
score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error)
return {'model': model, 'params': params, 'score': score}
def _compute_scores_aggregative(self, training):
# break down the set of hyperparameters into two: classifier-specific, quantifier-specific
cls_configs, q_configs = group_params(self.param_grid)
# train all classifiers and get the predictions
partial_setups = qp.util.parallel(
self._prepare_classifier,
({'cls-params':params, 'training':training} for params in cls_configs),
seed=qp.environ.get('_R_SEED', None),
n_jobs=self.n_jobs,
asarray=False,
)
# filter out classifier configurations that yield any error
for setup in partial_setups:
if setup['status'] != Status.SUCCESS:
self._sout(f'-> classifier hyperparemters {setup["params"]} caused '
f'error {setup["status"]} and will be ignored')
partial_setups = [setup for setup in partial_setups if setup['status']==Status.SUCCESS]
if len(partial_setups) == 0:
raise ValueError('No valid configuration found for the classifier.')
# explore the quantifier-specific hyperparameters for each training configuration
scores = qp.util.parallel(
self._prepare_aggregation,
({'q-params': setup[1], 'training': training, **setup[0]} for setup in itertools.product(partial_setups, q_configs)),
seed=qp.environ.get('_R_SEED', None),
n_jobs=self.n_jobs
)
return scores
def _compute_scores_nonaggregative(self, training):
configs = expand_grid(self.param_grid)
# pass a seed to parallel, so it is set in child processes
scores = qp.util.parallel(
self._prepare_model,
((params, training) for params in configs),
seed=qp.environ.get('_R_SEED', None),
n_jobs=self.n_jobs
)
return scores
def _compute_scores(self, training):
if isinstance(self.model, AggregativeQuantifier):
return self._compute_scores_aggregative(training)
else:
return self._compute_scores_nonaggregative(training)
def fit(self, training: LabelledCollection):
""" Learning routine. Fits methods with all combinations of hyperparameters and selects the one minimizing
the error metric.
@ -76,35 +242,31 @@ class GridSearchQ(BaseQuantifier):
:param training: the training set on which to optimize the hyperparameters
:return: self
"""
params_keys = list(self.param_grid.keys())
params_values = list(self.param_grid.values())
protocol = self.protocol
self.param_scores_ = {}
self.best_score_ = None
if self.refit and not isinstance(self.protocol, OnLabelledCollectionProtocol):
raise RuntimeWarning(
f'"refit" was requested, but the protocol does not implement '
f'the {OnLabelledCollectionProtocol.__name__} interface'
)
tinit = time()
hyper = [dict({k: val[i] for i, k in enumerate(params_keys)}) for val in itertools.product(*params_values)]
self._sout(f'starting model selection with {self.n_jobs =}')
#pass a seed to parallel so it is set in clild processes
scores = qp.util.parallel(
self._delayed_eval,
((params, training) for params in hyper),
seed=qp.environ.get('_R_SEED', None),
n_jobs=self.n_jobs
)
self._sout(f'starting model selection with n_jobs={self.n_jobs}')
results = self._compute_scores(training)
for params, score, model in scores:
self.param_scores_ = {}
self.best_score_ = None
for job_result in results:
score = job_result.get('score', None)
params = job_result['params']
if score is not None:
if self.best_score_ is None or score < self.best_score_:
self.best_score_ = score
self.best_params_ = params
self.best_model_ = model
self.best_model_ = job_result['model']
self.param_scores_[str(params)] = score
else:
self.param_scores_[str(params)] = 'timeout'
self.param_scores_[str(params)] = job_result['status']
tend = time()-tinit
@ -115,58 +277,17 @@ class GridSearchQ(BaseQuantifier):
f'[took {tend:.4f}s]')
if self.refit:
if isinstance(protocol, OnLabelledCollectionProtocol):
if isinstance(self.protocol, OnLabelledCollectionProtocol):
tinit = time()
self._sout(f'refitting on the whole development set')
self.best_model_.fit(training + protocol.get_labelled_collection())
self.best_model_.fit(training + self.protocol.get_labelled_collection())
tend = time() - tinit
self.refit_time_ = tend
else:
raise RuntimeWarning(f'"refit" was requested, but the protocol does not '
f'implement the {OnLabelledCollectionProtocol.__name__} interface')
raise RuntimeWarning(f'the model cannot be refit on the whole dataset')
return self
def _delayed_eval(self, args):
params, training = args
protocol = self.protocol
error = self.error
if self.timeout > 0:
def handler(signum, frame):
raise TimeoutError()
signal.signal(signal.SIGALRM, handler)
tinit = time()
if self.timeout > 0:
signal.alarm(self.timeout)
try:
model = deepcopy(self.model)
# overrides default parameters with the parameters being explored at this iteration
model.set_params(**params)
model.fit(training)
score = evaluation.evaluate(model, protocol=protocol, error_metric=error)
ttime = time()-tinit
self._sout(f'hyperparams={params}\t got {error.__name__} score {score:.5f} [took {ttime:.4f}s]')
if self.timeout > 0:
signal.alarm(0)
except TimeoutError:
self._sout(f'timeout ({self.timeout}s) reached for config {params}')
score = None
except ValueError as e:
self._sout(f'the combination of hyperparameters {params} is invalid')
raise e
except Exception as e:
self._sout(f'something went wrong for config {params}; skipping:')
self._sout(f'\tException: {e}')
score = None
return params, score, model
def quantify(self, instances):
"""Estimate class prevalence values using the best model found after calling the :meth:`fit` method.
@ -204,6 +325,22 @@ class GridSearchQ(BaseQuantifier):
raise ValueError('best_model called before fit')
def _error_handler(self, func, *args, **kwargs):
try:
with timeout(self.timeout):
output = func(*args, **kwargs)
return output, Status.SUCCESS
except TimeoutError:
return None, Status.TIMEOUT
except ValueError:
return None, Status.INVALID
except Exception:
return None, Status.ERROR
def cross_val_predict(quantifier: BaseQuantifier, data: LabelledCollection, nfolds=3, random_state=0):
@ -229,3 +366,43 @@ def cross_val_predict(quantifier: BaseQuantifier, data: LabelledCollection, nfol
return total_prev
def expand_grid(param_grid: dict):
"""
Expands a param_grid dictionary as a list of configurations.
Example:
>>> combinations = expand_grid({'A': [1, 10, 100], 'B': [True, False]})
>>> print(combinations)
>>> [{'A': 1, 'B': True}, {'A': 1, 'B': False}, {'A': 10, 'B': True}, {'A': 10, 'B': False}, {'A': 100, 'B': True}, {'A': 100, 'B': False}]
:param param_grid: dictionary with keys representing hyper-parameter names, and values representing the range
to explore for that hyper-parameter
:return: a list of configurations, i.e., combinations of hyper-parameter assignments in the grid.
"""
params_keys = list(param_grid.keys())
params_values = list(param_grid.values())
configs = [{k: combs[i] for i, k in enumerate(params_keys)} for combs in itertools.product(*params_values)]
return configs
def group_params(param_grid: dict):
"""
Partitions a param_grid dictionary as two lists of configurations, one for the classifier-specific
hyper-parameters, and another for que quantifier-specific hyper-parameters
:param param_grid: dictionary with keys representing hyper-parameter names, and values representing the range
to explore for that hyper-parameter
:return: two expanded grids of configurations, one for the classifier, another for the quantifier
"""
classifier_params, quantifier_params = {}, {}
for key, values in param_grid.items():
if key.startswith('classifier__') or key == 'val_split':
classifier_params[key] = values
else:
quantifier_params[key] = values
classifier_configs = expand_grid(classifier_params)
quantifier_configs = expand_grid(quantifier_params)
return classifier_configs, quantifier_configs

View File

@ -22,9 +22,9 @@ class HierarchyTestCase(unittest.TestCase):
def test_probabilistic(self):
lr = LogisticRegression()
for m in [CC(lr), ACC(lr)]:
self.assertEqual(isinstance(m, AggregativeProbabilisticQuantifier), False)
self.assertEqual(isinstance(m, AggregativeSoftQuantifier), False)
for m in [PCC(lr), PACC(lr)]:
self.assertEqual(isinstance(m, AggregativeProbabilisticQuantifier), True)
self.assertEqual(isinstance(m, AggregativeSoftQuantifier), True)
if __name__ == '__main__':

View File

@ -10,6 +10,8 @@ import quapy as qp
import numpy as np
from joblib import Parallel, delayed
from time import time
import signal
def _get_parallel_slices(n_tasks, n_jobs):
@ -38,7 +40,7 @@ def map_parallel(func, args, n_jobs):
return list(itertools.chain.from_iterable(results))
def parallel(func, args, n_jobs, seed=None):
def parallel(func, args, n_jobs, seed=None, asarray=True, backend='loky'):
"""
A wrapper of multiprocessing:
@ -58,9 +60,12 @@ def parallel(func, args, n_jobs, seed=None):
stack.enter_context(qp.util.temp_seed(seed))
return func(*args)
return Parallel(n_jobs=n_jobs)(
out = Parallel(n_jobs=n_jobs, backend=backend)(
delayed(func_dec)(qp.environ, None if seed is None else seed+i, args_i) for i, args_i in enumerate(args)
)
if asarray:
out = np.asarray(out)
return out
@contextlib.contextmanager
@ -254,3 +259,35 @@ class EarlyStop:
if self.patience <= 0:
self.STOP = True
@contextlib.contextmanager
def timeout(seconds):
"""
Opens a context that will launch an exception if not closed after a given number of seconds
>>> def func(start_msg, end_msg):
>>> print(start_msg)
>>> sleep(2)
>>> print(end_msg)
>>>
>>> with timeout(1):
>>> func('begin function', 'end function')
>>> Out[]
>>> begin function
>>> TimeoutError
:param seconds: number of seconds, set to <=0 to ignore the timer
"""
if seconds > 0:
def handler(signum, frame):
raise TimeoutError()
signal.signal(signal.SIGALRM, handler)
signal.alarm(seconds)
yield
if seconds > 0:
signal.alarm(0)