Compare commits
10 Commits
Author | SHA1 | Date |
Alejandro Moreo Fernandez | 6663b4c91d | |
Alejandro Moreo Fernandez | f785a4eeef | |
Alejandro Moreo Fernandez | 513c78f1f3 | |
Alejandro Moreo Fernandez | e870d798b7 | |
Alejandro Moreo Fernandez | 173db83c28 | |
Andrea Esuli | c2544b50ce | |
Alejandro Moreo Fernandez | c9c4511c0d | |
Alejandro Moreo Fernandez | 44bfc7921f | |
Alejandro Moreo Fernandez | 0a6185d908 | |
Alejandro Moreo Fernandez | 25f1cc29a3 |
@ -2,7 +2,7 @@ import quapy as qp
from import LabelledCollection
from quapy.method.base import BinaryQuantifier
from quapy.model_selection import GridSearchQ
from quapy.method.aggregative import AggregativeProbabilisticQuantifier
from quapy.method.aggregative import AggregativeSoftQuantifier
from quapy.protocol import APP
import numpy as np
from sklearn.linear_model import LogisticRegression
@ -15,7 +15,7 @@ from sklearn.linear_model import LogisticRegression
# internal hyperparameter (let say, alpha) which is the decision threshold. Let's also assume the quantifier
# is binary, for simplicity.
class MyQuantifier(AggregativeProbabilisticQuantifier, BinaryQuantifier):
class MyQuantifier(AggregativeSoftQuantifier, BinaryQuantifier):
def __init__(self, classifier, alpha=0.5):
self.alpha = alpha
# aggregative quantifiers have an internal self.classifier attribute
@ -1,8 +1,11 @@
import quapy as qp
from quapy.method.non_aggregative import DMx
from quapy.protocol import APP
from quapy.method.aggregative import DMy
from sklearn.linear_model import LogisticRegression
from examples.comparing_gridsearch import OLD_GridSearchQ
import numpy as np
from time import time
In this example, we show how to perform model selection on a DistributionMatching quantifier.
@ -15,35 +18,44 @@ qp.environ['N_JOBS'] = -1
training, test = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=5).train_test
# The model will be returned by the fit method of GridSearchQ.
# Every combination of hyper-parameters will be evaluated by confronting the
# quantifier thus configured against a series of samples generated by means
# of a sample generation protocol. For this example, we will use the
# artificial-prevalence protocol (APP), that generates samples with prevalence
# values in the entire range of values from a grid (e.g., [0, 0.1, 0.2, ..., 1]).
# We devote 30% of the dataset for this exploration.
training, validation = training.split_stratified(train_prop=0.7)
protocol = APP(validation)
with qp.util.temp_seed(0):
# We will explore a classification-dependent hyper-parameter (e.g., the 'C'
# hyper-parameter of LogisticRegression) and a quantification-dependent hyper-parameter
# (e.g., the number of bins in a DistributionMatching quantifier.
# Classifier-dependent hyper-parameters have to be marked with a prefix "classifier__"
# in order to let the quantifier know this hyper-parameter belongs to its underlying
# classifier.
param_grid = {
'classifier__C': np.logspace(-3,3,7),
'nbins': [8, 16, 32, 64],
# The model will be returned by the fit method of GridSearchQ.
# Every combination of hyper-parameters will be evaluated by confronting the
# quantifier thus configured against a series of samples generated by means
# of a sample generation protocol. For this example, we will use the
# artificial-prevalence protocol (APP), that generates samples with prevalence
# values in the entire range of values from a grid (e.g., [0, 0.1, 0.2, ..., 1]).
# We devote 30% of the dataset for this exploration.
training, validation = training.split_stratified(train_prop=0.7)
protocol = APP(validation)
model = qp.model_selection.GridSearchQ(
error='mae', # the error to optimize is the MAE (a quantification-oriented loss)
refit=True, # retrain on the whole labelled set once done
verbose=True # show information as the process goes on
# We will explore a classification-dependent hyper-parameter (e.g., the 'C'
# hyper-parameter of LogisticRegression) and a quantification-dependent hyper-parameter
# (e.g., the number of bins in a DistributionMatching quantifier.
# Classifier-dependent hyper-parameters have to be marked with a prefix "classifier__"
# in order to let the quantifier know this hyper-parameter belongs to its underlying
# classifier.
param_grid = {
'classifier__C': np.logspace(-3,3,7),
'classifier__class_weight': ['balanced', None],
'nbins': [8, 16, 32, 64, 'poooo'],
tinit = time()
# model = OLD_GridSearchQ(
model = qp.model_selection.GridSearchQ(
error='mae', # the error to optimize is the MAE (a quantification-oriented loss)
refit=False, # retrain on the whole labelled set once done
verbose=True # show information as the process goes on
tend = time()
print(f'model selection ended: best hyper-parameters={model.best_params_}')
model = model.best_model_
@ -53,5 +65,5 @@ model = model.best_model_
mae_score = qp.evaluation.evaluate(model, protocol=APP(test), error_metric='mae')
print(f'model selection took {tend-tinit}s')
@ -24,7 +24,8 @@ class RecalibratedProbabilisticClassifier:
class RecalibratedProbabilisticClassifierBase(BaseEstimator, RecalibratedProbabilisticClassifier):
Applies a (re)calibration method from `abstention.calibration`, as defined in
`Alexandari et al. paper <>`_:
`Alexandari et al. paper <>`_.
:param classifier: a scikit-learn probabilistic classifier
:param calibrator: the calibration object (an instance of abstention.calibration.CalibratorFactory)
@ -59,7 +60,7 @@ class RecalibratedProbabilisticClassifierBase(BaseEstimator, RecalibratedProbabi
elif isinstance(k, float):
if not (0 < k < 1):
raise ValueError('wrong value for val_split: the proportion of validation documents must be in (0,1)')
return self.fit_cv(X, y)
return self.fit_tr_val(X, y)
def fit_cv(self, X, y):
@ -94,7 +95,7 @@ class RecalibratedProbabilisticClassifierBase(BaseEstimator, RecalibratedProbabi
|||, ytr)
posteriors = self.classifier.predict_proba(Xva)
nclasses = len(np.unique(yva))
self.calibrator = self.calibrator(posteriors, np.eye(nclasses)[yva], posterior_supplied=True)
self.calibration_function = self.calibrator(posteriors, np.eye(nclasses)[yva], posterior_supplied=True)
return self
def predict(self, X):
File diff suppressed because it is too large
Load Diff
@ -63,7 +63,7 @@ def newOneVsAll(binary_quantifier, n_jobs=None):
return OneVsAllGeneric(binary_quantifier, n_jobs)
class OneVsAllGeneric(OneVsAll,BaseQuantifier):
class OneVsAllGeneric(OneVsAll, BaseQuantifier):
Allows any binary quantifier to perform quantification on single-label datasets. The method maintains one binary
quantifier for each class, and then l1-normalizes the outputs so that the class prevelence values sum up to 1.
@ -12,7 +12,7 @@ from quapy import functional as F
from import LabelledCollection
from quapy.model_selection import GridSearchQ
from quapy.method.base import BaseQuantifier, BinaryQuantifier
from quapy.method.aggregative import CC, ACC, PACC, HDy, EMQ
from quapy.method.aggregative import CC, ACC, PACC, HDy, EMQ, AggregativeQuantifier
from . import neural
@ -26,6 +26,65 @@ else:
QuaNet = "QuaNet is not available due to missing torch package"
class MedianEstimator2(BinaryQuantifier):
This method is a meta-quantifier that returns, as the estimated class prevalence values, the median of the
estimation returned by differently (hyper)parameterized base quantifiers.
The median of unit-vectors is only guaranteed to be a unit-vector for n=2 dimensions,
i.e., in cases of binary quantification.
:param base_quantifier: the base, binary quantifier
:param random_state: a seed to be set before fitting any base quantifier (default None)
:param param_grid: the grid or parameters towards which the median will be computed
:param n_jobs: number of parllel workes
def __init__(self, base_quantifier: BinaryQuantifier, param_grid: dict, random_state=None, n_jobs=None):
self.base_quantifier = base_quantifier
self.param_grid = param_grid
self.random_state = random_state
self.n_jobs = qp._get_njobs(n_jobs)
def get_params(self, deep=True):
return self.base_quantifier.get_params(deep)
def set_params(self, **params):
def _delayed_fit(self, args):
with qp.util.temp_seed(self.random_state):
params, training = args
model = deepcopy(self.base_quantifier)
return model
def fit(self, training: LabelledCollection):
self._check_binary(training, self.__class__.__name__)
configs = qp.model_selection.expand_grid(self.param_grid)
self.models = qp.util.parallel(
((params, training) for params in configs),
seed=qp.environ.get('_R_SEED', None),
return self
def _delayed_predict(self, args):
model, instances = args
return model.quantify(instances)
def quantify(self, instances):
prev_preds = qp.util.parallel(
((model, instances) for model in self.models),
seed=qp.environ.get('_R_SEED', None),
prev_preds = np.asarray(prev_preds)
return np.median(prev_preds, axis=0)
class MedianEstimator(BinaryQuantifier):
This method is a meta-quantifier that returns, as the estimated class prevalence values, the median of the
@ -58,17 +117,64 @@ class MedianEstimator(BinaryQuantifier):
return model
def _delayed_fit_classifier(self, args):
with qp.util.temp_seed(self.random_state):
print('enter job')
cls_params, training = args
model = deepcopy(self.base_quantifier)
predictions = model.classifier_fit_predict(training, predict_on=model.val_split)
print('exit job')
return (model, predictions)
def _delayed_fit_aggregation(self, args):
with qp.util.temp_seed(self.random_state):
print('\tenter job')
((model, predictions), q_params), training = args
model = deepcopy(model)
model.aggregation_fit(predictions, training)
print('\texit job')
return model
def fit(self, training: LabelledCollection):
self._check_binary(training, self.__class__.__name__)
params_keys = list(self.param_grid.keys())
params_values = list(self.param_grid.values())
hyper = [dict({k: val[i] for i, k in enumerate(params_keys)}) for val in itertools.product(*params_values)]
self.models = qp.util.parallel(
((params, training) for params in hyper),
seed=qp.environ.get('_R_SEED', None),
if isinstance(self.base_quantifier, AggregativeQuantifier):
cls_configs, q_configs = qp.model_selection.group_params(self.param_grid)
if len(cls_configs) > 1:
models_preds = qp.util.parallel(
((params, training) for params in cls_configs),
seed=qp.environ.get('_R_SEED', None),
print('only 1')
model = self.base_quantifier
predictions = model.classifier_fit_predict(training, predict_on=model.val_split)
models_preds = [(model, predictions)]
self.models = qp.util.parallel(
((setup, training) for setup in itertools.product(models_preds, q_configs)),
seed=qp.environ.get('_R_SEED', None),
configs = qp.model_selection.expand_grid(self.param_grid)
self.models = qp.util.parallel(
((params, training) for params in configs),
seed=qp.environ.get('_R_SEED', None),
return self
def _delayed_predict(self, args):
@ -80,13 +186,13 @@ class MedianEstimator(BinaryQuantifier):
((model, instances) for model in self.models),
seed=qp.environ.get('_R_SEED', None),
prev_preds = np.asarray(prev_preds)
return np.median(prev_preds, axis=0)
class Ensemble(BaseQuantifier):
VALID_POLICIES = {'ave', 'ptr', 'ds'} | qp.error.QUANTIFICATION_ERROR_NAMES
@ -194,7 +194,7 @@ class QuaNetTrainer(BaseQuantifier):
label_predictions = np.argmax(posteriors, axis=-1)
prevs_estim = []
for quantifier in self.quantifiers.values():
predictions = posteriors if isinstance(quantifier, AggregativeProbabilisticQuantifier) else label_predictions
predictions = posteriors if isinstance(quantifier, AggregativeSoftQuantifier) else label_predictions
# there is no real need for adding static estims like the TPR or FPR from training since those are constant
@ -1,7 +1,7 @@
from typing import Union, Callable
import numpy as np
from functional import get_divergence
from quapy.functional import get_divergence
from import LabelledCollection
from quapy.method.base import BaseQuantifier, BinaryQuantifier
import quapy.functional as F
@ -1,7 +1,9 @@
import itertools
import signal
from copy import deepcopy
from enum import Enum
from typing import Union, Callable
from functools import wraps
import numpy as np
from sklearn import clone
@ -10,10 +12,67 @@ import quapy as qp
from quapy import evaluation
from quapy.protocol import AbstractProtocol, OnLabelledCollectionProtocol
from import LabelledCollection
from quapy.method.aggregative import BaseQuantifier
from quapy.method.aggregative import BaseQuantifier, AggregativeQuantifier
from quapy.util import timeout
from time import time
class Status(Enum):
def check_status(func):
def wrapper(*args, **kwargs):
obj = args[0]
tinit = time()
job_descriptor = dict(args[1])
params = {**job_descriptor.get('cls-params', {}), **job_descriptor.get('q-params', {})}
if obj.timeout > 0:
def handler(signum, frame):
raise TimeoutError()
signal.signal(signal.SIGALRM, handler)
job_descriptor = func(*args, **kwargs)
ttime = time() - tinit
score = job_descriptor.get('score', None)
if score is not None:
obj._sout(f'hyperparams=[{params}]\t got {obj.error.__name__} = {score:.5f} [took {ttime:.4f}s]')
if obj.timeout > 0:
exit_status = Status.SUCCESS
except TimeoutError:
obj._sout(f'timeout ({obj.timeout}s) reached for config {params}')
exit_status = Status.TIMEOUT
except ValueError as e:
obj._sout(f'the combination of hyperparameters {params} is invalid')
obj._sout(f'\tException: {e}')
exit_status = Status.INVALID
except Exception as e:
obj._sout(f'something went wrong for config {params}; skipping:')
obj._sout(f'\tException: {e}')
exit_status = Status.ERROR
job_descriptor['status'] = exit_status
job_descriptor['params'] = params
return job_descriptor
return wrapper
class GridSearchQ(BaseQuantifier):
"""Grid Search optimization targeting a quantification-oriented metric.
@ -69,6 +128,113 @@ class GridSearchQ(BaseQuantifier):
raise ValueError(f'unexpected error type; must either be a callable function or a str representing\n'
f'the name of an error function in {qp.error.QUANTIFICATION_ERROR_NAMES}')
def _prepare_classifier(self, args):
cls_params = args['cls-params']
training = args['training']
model = deepcopy(self.model)
predictions = model.classifier_fit_predict(training)
return {'model': model, 'predictions': predictions, 'cls-params': cls_params}
def _prepare_aggregation(self, args):
model = args['model']
predictions = args['predictions']
cls_params = args['cls-params']
q_params = args['q-params']
training = args['training']
params = {**cls_params, **q_params}
def job(model):
tinit = time()
model = deepcopy(model)
# overrides default parameters with the parameters being explored at this iteration
model.aggregation_fit(predictions, training)
score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error)
ttime = time()-tinit
return {
'model': model,
'q-params': q_params,
'params': params,
'score': score,
out, status = self._error_handler(job, args)
if status == Status.SUCCESS:
self._sout(f'hyperparams=[{params}]\t got {self.error.__name__} = {out["score"]:.5f} [took {out["time"]:.4f}s]')
elif status == Status.INVALID:
self._sout(f'the combination of hyperparameters {params} is invalid')
elif status == Status.
def _prepare_model(self, args):
params, training = args
model = deepcopy(self.model)
# overrides default parameters with the parameters being explored at this iteration
score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error)
return {'model': model, 'params': params, 'score': score}
def _compute_scores_aggregative(self, training):
# break down the set of hyperparameters into two: classifier-specific, quantifier-specific
cls_configs, q_configs = group_params(self.param_grid)
# train all classifiers and get the predictions
partial_setups = qp.util.parallel(
({'cls-params':params, 'training':training} for params in cls_configs),
seed=qp.environ.get('_R_SEED', None),
# filter out classifier configurations that yield any error
for setup in partial_setups:
if setup['status'] != Status.SUCCESS:
self._sout(f'-> classifier hyperparemters {setup["params"]} caused '
f'error {setup["status"]} and will be ignored')
partial_setups = [setup for setup in partial_setups if setup['status']==Status.SUCCESS]
if len(partial_setups) == 0:
raise ValueError('No valid configuration found for the classifier.')
# explore the quantifier-specific hyperparameters for each training configuration
scores = qp.util.parallel(
({'q-params': setup[1], 'training': training, **setup[0]} for setup in itertools.product(partial_setups, q_configs)),
seed=qp.environ.get('_R_SEED', None),
return scores
def _compute_scores_nonaggregative(self, training):
configs = expand_grid(self.param_grid)
# pass a seed to parallel, so it is set in child processes
scores = qp.util.parallel(
((params, training) for params in configs),
seed=qp.environ.get('_R_SEED', None),
return scores
def _compute_scores(self, training):
if isinstance(self.model, AggregativeQuantifier):
return self._compute_scores_aggregative(training)
return self._compute_scores_nonaggregative(training)
def fit(self, training: LabelledCollection):
""" Learning routine. Fits methods with all combinations of hyperparameters and selects the one minimizing
the error metric.
@ -76,35 +242,31 @@ class GridSearchQ(BaseQuantifier):
:param training: the training set on which to optimize the hyperparameters
:return: self
params_keys = list(self.param_grid.keys())
params_values = list(self.param_grid.values())
protocol = self.protocol
self.param_scores_ = {}
self.best_score_ = None
if self.refit and not isinstance(self.protocol, OnLabelledCollectionProtocol):
raise RuntimeWarning(
f'"refit" was requested, but the protocol does not implement '
f'the {OnLabelledCollectionProtocol.__name__} interface'
tinit = time()
hyper = [dict({k: val[i] for i, k in enumerate(params_keys)}) for val in itertools.product(*params_values)]
self._sout(f'starting model selection with {self.n_jobs =}')
#pass a seed to parallel so it is set in clild processes
scores = qp.util.parallel(
((params, training) for params in hyper),
seed=qp.environ.get('_R_SEED', None),
self._sout(f'starting model selection with n_jobs={self.n_jobs}')
results = self._compute_scores(training)
for params, score, model in scores:
self.param_scores_ = {}
self.best_score_ = None
for job_result in results:
score = job_result.get('score', None)
params = job_result['params']
if score is not None:
if self.best_score_ is None or score < self.best_score_:
self.best_score_ = score
self.best_params_ = params
self.best_model_ = model
self.best_model_ = job_result['model']
self.param_scores_[str(params)] = score
self.param_scores_[str(params)] = 'timeout'
self.param_scores_[str(params)] = job_result['status']
tend = time()-tinit
@ -115,58 +277,17 @@ class GridSearchQ(BaseQuantifier):
f'[took {tend:.4f}s]')
if self.refit:
if isinstance(protocol, OnLabelledCollectionProtocol):
if isinstance(self.protocol, OnLabelledCollectionProtocol):
tinit = time()
self._sout(f'refitting on the whole development set')
|||| + protocol.get_labelled_collection())
|||| + self.protocol.get_labelled_collection())
tend = time() - tinit
self.refit_time_ = tend
raise RuntimeWarning(f'"refit" was requested, but the protocol does not '
f'implement the {OnLabelledCollectionProtocol.__name__} interface')
raise RuntimeWarning(f'the model cannot be refit on the whole dataset')
return self
def _delayed_eval(self, args):
params, training = args
protocol = self.protocol
error = self.error
if self.timeout > 0:
def handler(signum, frame):
raise TimeoutError()
signal.signal(signal.SIGALRM, handler)
tinit = time()
if self.timeout > 0:
model = deepcopy(self.model)
# overrides default parameters with the parameters being explored at this iteration
score = evaluation.evaluate(model, protocol=protocol, error_metric=error)
ttime = time()-tinit
self._sout(f'hyperparams={params}\t got {error.__name__} score {score:.5f} [took {ttime:.4f}s]')
if self.timeout > 0:
except TimeoutError:
self._sout(f'timeout ({self.timeout}s) reached for config {params}')
score = None
except ValueError as e:
self._sout(f'the combination of hyperparameters {params} is invalid')
raise e
except Exception as e:
self._sout(f'something went wrong for config {params}; skipping:')
self._sout(f'\tException: {e}')
score = None
return params, score, model
def quantify(self, instances):
"""Estimate class prevalence values using the best model found after calling the :meth:`fit` method.
@ -204,6 +325,22 @@ class GridSearchQ(BaseQuantifier):
raise ValueError('best_model called before fit')
def _error_handler(self, func, *args, **kwargs):
with timeout(self.timeout):
output = func(*args, **kwargs)
return output, Status.SUCCESS
except TimeoutError:
return None, Status.TIMEOUT
except ValueError:
return None, Status.INVALID
except Exception:
return None, Status.ERROR
def cross_val_predict(quantifier: BaseQuantifier, data: LabelledCollection, nfolds=3, random_state=0):
@ -229,3 +366,43 @@ def cross_val_predict(quantifier: BaseQuantifier, data: LabelledCollection, nfol
return total_prev
def expand_grid(param_grid: dict):
Expands a param_grid dictionary as a list of configurations.
>>> combinations = expand_grid({'A': [1, 10, 100], 'B': [True, False]})
>>> print(combinations)
>>> [{'A': 1, 'B': True}, {'A': 1, 'B': False}, {'A': 10, 'B': True}, {'A': 10, 'B': False}, {'A': 100, 'B': True}, {'A': 100, 'B': False}]
:param param_grid: dictionary with keys representing hyper-parameter names, and values representing the range
to explore for that hyper-parameter
:return: a list of configurations, i.e., combinations of hyper-parameter assignments in the grid.
params_keys = list(param_grid.keys())
params_values = list(param_grid.values())
configs = [{k: combs[i] for i, k in enumerate(params_keys)} for combs in itertools.product(*params_values)]
return configs
def group_params(param_grid: dict):
Partitions a param_grid dictionary as two lists of configurations, one for the classifier-specific
hyper-parameters, and another for que quantifier-specific hyper-parameters
:param param_grid: dictionary with keys representing hyper-parameter names, and values representing the range
to explore for that hyper-parameter
:return: two expanded grids of configurations, one for the classifier, another for the quantifier
classifier_params, quantifier_params = {}, {}
for key, values in param_grid.items():
if key.startswith('classifier__') or key == 'val_split':
classifier_params[key] = values
quantifier_params[key] = values
classifier_configs = expand_grid(classifier_params)
quantifier_configs = expand_grid(quantifier_params)
return classifier_configs, quantifier_configs
@ -22,9 +22,9 @@ class HierarchyTestCase(unittest.TestCase):
def test_probabilistic(self):
lr = LogisticRegression()
for m in [CC(lr), ACC(lr)]:
self.assertEqual(isinstance(m, AggregativeProbabilisticQuantifier), False)
self.assertEqual(isinstance(m, AggregativeSoftQuantifier), False)
for m in [PCC(lr), PACC(lr)]:
self.assertEqual(isinstance(m, AggregativeProbabilisticQuantifier), True)
self.assertEqual(isinstance(m, AggregativeSoftQuantifier), True)
if __name__ == '__main__':
@ -10,6 +10,8 @@ import quapy as qp
import numpy as np
from joblib import Parallel, delayed
from time import time
import signal
def _get_parallel_slices(n_tasks, n_jobs):
@ -38,7 +40,7 @@ def map_parallel(func, args, n_jobs):
return list(itertools.chain.from_iterable(results))
def parallel(func, args, n_jobs, seed=None):
def parallel(func, args, n_jobs, seed=None, asarray=True, backend='loky'):
A wrapper of multiprocessing:
@ -58,9 +60,12 @@ def parallel(func, args, n_jobs, seed=None):
return func(*args)
return Parallel(n_jobs=n_jobs)(
out = Parallel(n_jobs=n_jobs, backend=backend)(
delayed(func_dec)(qp.environ, None if seed is None else seed+i, args_i) for i, args_i in enumerate(args)
if asarray:
out = np.asarray(out)
return out
@ -254,3 +259,35 @@ class EarlyStop:
if self.patience <= 0:
self.STOP = True
def timeout(seconds):
Opens a context that will launch an exception if not closed after a given number of seconds
>>> def func(start_msg, end_msg):
>>> print(start_msg)
>>> sleep(2)
>>> print(end_msg)
>>> with timeout(1):
>>> func('begin function', 'end function')
>>> Out[]
>>> begin function
>>> TimeoutError
:param seconds: number of seconds, set to <=0 to ignore the timer
if seconds > 0:
def handler(signum, frame):
raise TimeoutError()
signal.signal(signal.SIGALRM, handler)
if seconds > 0:
Reference in New Issue