Compare commits
10 Commits
29db15ae25
...
6663b4c91d
Author | SHA1 | Date |
---|---|---|
Alejandro Moreo Fernandez | 6663b4c91d | |
Alejandro Moreo Fernandez | f785a4eeef | |
Alejandro Moreo Fernandez | 513c78f1f3 | |
Alejandro Moreo Fernandez | e870d798b7 | |
Alejandro Moreo Fernandez | 173db83c28 | |
Andrea Esuli | c2544b50ce | |
Alejandro Moreo Fernandez | c9c4511c0d | |
Alejandro Moreo Fernandez | 44bfc7921f | |
Alejandro Moreo Fernandez | 0a6185d908 | |
Alejandro Moreo Fernandez | 25f1cc29a3 |
|
@ -2,7 +2,7 @@ import quapy as qp
|
|||
from quapy.data import LabelledCollection
|
||||
from quapy.method.base import BinaryQuantifier
|
||||
from quapy.model_selection import GridSearchQ
|
||||
from quapy.method.aggregative import AggregativeProbabilisticQuantifier
|
||||
from quapy.method.aggregative import AggregativeSoftQuantifier
|
||||
from quapy.protocol import APP
|
||||
import numpy as np
|
||||
from sklearn.linear_model import LogisticRegression
|
||||
|
@ -15,7 +15,7 @@ from sklearn.linear_model import LogisticRegression
|
|||
# internal hyperparameter (let say, alpha) which is the decision threshold. Let's also assume the quantifier
|
||||
# is binary, for simplicity.
|
||||
|
||||
class MyQuantifier(AggregativeProbabilisticQuantifier, BinaryQuantifier):
|
||||
class MyQuantifier(AggregativeSoftQuantifier, BinaryQuantifier):
|
||||
def __init__(self, classifier, alpha=0.5):
|
||||
self.alpha = alpha
|
||||
# aggregative quantifiers have an internal self.classifier attribute
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
import quapy as qp
|
||||
from quapy.method.non_aggregative import DMx
|
||||
from quapy.protocol import APP
|
||||
from quapy.method.aggregative import DMy
|
||||
from sklearn.linear_model import LogisticRegression
|
||||
from examples.comparing_gridsearch import OLD_GridSearchQ
|
||||
import numpy as np
|
||||
from time import time
|
||||
|
||||
"""
|
||||
In this example, we show how to perform model selection on a DistributionMatching quantifier.
|
||||
|
@ -15,35 +18,44 @@ qp.environ['N_JOBS'] = -1
|
|||
|
||||
training, test = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=5).train_test
|
||||
|
||||
# The model will be returned by the fit method of GridSearchQ.
|
||||
# Every combination of hyper-parameters will be evaluated by confronting the
|
||||
# quantifier thus configured against a series of samples generated by means
|
||||
# of a sample generation protocol. For this example, we will use the
|
||||
# artificial-prevalence protocol (APP), that generates samples with prevalence
|
||||
# values in the entire range of values from a grid (e.g., [0, 0.1, 0.2, ..., 1]).
|
||||
# We devote 30% of the dataset for this exploration.
|
||||
training, validation = training.split_stratified(train_prop=0.7)
|
||||
protocol = APP(validation)
|
||||
with qp.util.temp_seed(0):
|
||||
|
||||
# We will explore a classification-dependent hyper-parameter (e.g., the 'C'
|
||||
# hyper-parameter of LogisticRegression) and a quantification-dependent hyper-parameter
|
||||
# (e.g., the number of bins in a DistributionMatching quantifier.
|
||||
# Classifier-dependent hyper-parameters have to be marked with a prefix "classifier__"
|
||||
# in order to let the quantifier know this hyper-parameter belongs to its underlying
|
||||
# classifier.
|
||||
param_grid = {
|
||||
'classifier__C': np.logspace(-3,3,7),
|
||||
'nbins': [8, 16, 32, 64],
|
||||
}
|
||||
# The model will be returned by the fit method of GridSearchQ.
|
||||
# Every combination of hyper-parameters will be evaluated by confronting the
|
||||
# quantifier thus configured against a series of samples generated by means
|
||||
# of a sample generation protocol. For this example, we will use the
|
||||
# artificial-prevalence protocol (APP), that generates samples with prevalence
|
||||
# values in the entire range of values from a grid (e.g., [0, 0.1, 0.2, ..., 1]).
|
||||
# We devote 30% of the dataset for this exploration.
|
||||
training, validation = training.split_stratified(train_prop=0.7)
|
||||
protocol = APP(validation)
|
||||
|
||||
model = qp.model_selection.GridSearchQ(
|
||||
model=model,
|
||||
param_grid=param_grid,
|
||||
protocol=protocol,
|
||||
error='mae', # the error to optimize is the MAE (a quantification-oriented loss)
|
||||
refit=True, # retrain on the whole labelled set once done
|
||||
verbose=True # show information as the process goes on
|
||||
).fit(training)
|
||||
# We will explore a classification-dependent hyper-parameter (e.g., the 'C'
|
||||
# hyper-parameter of LogisticRegression) and a quantification-dependent hyper-parameter
|
||||
# (e.g., the number of bins in a DistributionMatching quantifier.
|
||||
# Classifier-dependent hyper-parameters have to be marked with a prefix "classifier__"
|
||||
# in order to let the quantifier know this hyper-parameter belongs to its underlying
|
||||
# classifier.
|
||||
param_grid = {
|
||||
'classifier__C': np.logspace(-3,3,7),
|
||||
'classifier__class_weight': ['balanced', None],
|
||||
'nbins': [8, 16, 32, 64, 'poooo'],
|
||||
}
|
||||
|
||||
tinit = time()
|
||||
|
||||
|
||||
# model = OLD_GridSearchQ(
|
||||
model = qp.model_selection.GridSearchQ(
|
||||
model=model,
|
||||
param_grid=param_grid,
|
||||
protocol=protocol,
|
||||
error='mae', # the error to optimize is the MAE (a quantification-oriented loss)
|
||||
refit=False, # retrain on the whole labelled set once done
|
||||
verbose=True # show information as the process goes on
|
||||
).fit(training)
|
||||
|
||||
tend = time()
|
||||
|
||||
print(f'model selection ended: best hyper-parameters={model.best_params_}')
|
||||
model = model.best_model_
|
||||
|
@ -53,5 +65,5 @@ model = model.best_model_
|
|||
mae_score = qp.evaluation.evaluate(model, protocol=APP(test), error_metric='mae')
|
||||
|
||||
print(f'MAE={mae_score:.5f}')
|
||||
|
||||
print(f'model selection took {tend-tinit}s')
|
||||
|
||||
|
|
|
@ -24,7 +24,8 @@ class RecalibratedProbabilisticClassifier:
|
|||
class RecalibratedProbabilisticClassifierBase(BaseEstimator, RecalibratedProbabilisticClassifier):
|
||||
"""
|
||||
Applies a (re)calibration method from `abstention.calibration`, as defined in
|
||||
`Alexandari et al. paper <http://proceedings.mlr.press/v119/alexandari20a.html>`_:
|
||||
`Alexandari et al. paper <http://proceedings.mlr.press/v119/alexandari20a.html>`_.
|
||||
|
||||
|
||||
:param classifier: a scikit-learn probabilistic classifier
|
||||
:param calibrator: the calibration object (an instance of abstention.calibration.CalibratorFactory)
|
||||
|
@ -59,7 +60,7 @@ class RecalibratedProbabilisticClassifierBase(BaseEstimator, RecalibratedProbabi
|
|||
elif isinstance(k, float):
|
||||
if not (0 < k < 1):
|
||||
raise ValueError('wrong value for val_split: the proportion of validation documents must be in (0,1)')
|
||||
return self.fit_cv(X, y)
|
||||
return self.fit_tr_val(X, y)
|
||||
|
||||
def fit_cv(self, X, y):
|
||||
"""
|
||||
|
@ -94,7 +95,7 @@ class RecalibratedProbabilisticClassifierBase(BaseEstimator, RecalibratedProbabi
|
|||
self.classifier.fit(Xtr, ytr)
|
||||
posteriors = self.classifier.predict_proba(Xva)
|
||||
nclasses = len(np.unique(yva))
|
||||
self.calibrator = self.calibrator(posteriors, np.eye(nclasses)[yva], posterior_supplied=True)
|
||||
self.calibration_function = self.calibrator(posteriors, np.eye(nclasses)[yva], posterior_supplied=True)
|
||||
return self
|
||||
|
||||
def predict(self, X):
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -63,7 +63,7 @@ def newOneVsAll(binary_quantifier, n_jobs=None):
|
|||
return OneVsAllGeneric(binary_quantifier, n_jobs)
|
||||
|
||||
|
||||
class OneVsAllGeneric(OneVsAll,BaseQuantifier):
|
||||
class OneVsAllGeneric(OneVsAll, BaseQuantifier):
|
||||
"""
|
||||
Allows any binary quantifier to perform quantification on single-label datasets. The method maintains one binary
|
||||
quantifier for each class, and then l1-normalizes the outputs so that the class prevelence values sum up to 1.
|
||||
|
|
|
@ -12,7 +12,7 @@ from quapy import functional as F
|
|||
from quapy.data import LabelledCollection
|
||||
from quapy.model_selection import GridSearchQ
|
||||
from quapy.method.base import BaseQuantifier, BinaryQuantifier
|
||||
from quapy.method.aggregative import CC, ACC, PACC, HDy, EMQ
|
||||
from quapy.method.aggregative import CC, ACC, PACC, HDy, EMQ, AggregativeQuantifier
|
||||
|
||||
try:
|
||||
from . import neural
|
||||
|
@ -26,6 +26,65 @@ else:
|
|||
QuaNet = "QuaNet is not available due to missing torch package"
|
||||
|
||||
|
||||
class MedianEstimator2(BinaryQuantifier):
|
||||
"""
|
||||
This method is a meta-quantifier that returns, as the estimated class prevalence values, the median of the
|
||||
estimation returned by differently (hyper)parameterized base quantifiers.
|
||||
The median of unit-vectors is only guaranteed to be a unit-vector for n=2 dimensions,
|
||||
i.e., in cases of binary quantification.
|
||||
|
||||
:param base_quantifier: the base, binary quantifier
|
||||
:param random_state: a seed to be set before fitting any base quantifier (default None)
|
||||
:param param_grid: the grid or parameters towards which the median will be computed
|
||||
:param n_jobs: number of parllel workes
|
||||
"""
|
||||
def __init__(self, base_quantifier: BinaryQuantifier, param_grid: dict, random_state=None, n_jobs=None):
|
||||
self.base_quantifier = base_quantifier
|
||||
self.param_grid = param_grid
|
||||
self.random_state = random_state
|
||||
self.n_jobs = qp._get_njobs(n_jobs)
|
||||
|
||||
def get_params(self, deep=True):
|
||||
return self.base_quantifier.get_params(deep)
|
||||
|
||||
def set_params(self, **params):
|
||||
self.base_quantifier.set_params(**params)
|
||||
|
||||
def _delayed_fit(self, args):
|
||||
with qp.util.temp_seed(self.random_state):
|
||||
params, training = args
|
||||
model = deepcopy(self.base_quantifier)
|
||||
model.set_params(**params)
|
||||
model.fit(training)
|
||||
return model
|
||||
|
||||
def fit(self, training: LabelledCollection):
|
||||
self._check_binary(training, self.__class__.__name__)
|
||||
|
||||
configs = qp.model_selection.expand_grid(self.param_grid)
|
||||
self.models = qp.util.parallel(
|
||||
self._delayed_fit,
|
||||
((params, training) for params in configs),
|
||||
seed=qp.environ.get('_R_SEED', None),
|
||||
n_jobs=self.n_jobs
|
||||
)
|
||||
return self
|
||||
|
||||
def _delayed_predict(self, args):
|
||||
model, instances = args
|
||||
return model.quantify(instances)
|
||||
|
||||
def quantify(self, instances):
|
||||
prev_preds = qp.util.parallel(
|
||||
self._delayed_predict,
|
||||
((model, instances) for model in self.models),
|
||||
seed=qp.environ.get('_R_SEED', None),
|
||||
n_jobs=self.n_jobs
|
||||
)
|
||||
prev_preds = np.asarray(prev_preds)
|
||||
return np.median(prev_preds, axis=0)
|
||||
|
||||
|
||||
class MedianEstimator(BinaryQuantifier):
|
||||
"""
|
||||
This method is a meta-quantifier that returns, as the estimated class prevalence values, the median of the
|
||||
|
@ -58,17 +117,64 @@ class MedianEstimator(BinaryQuantifier):
|
|||
model.fit(training)
|
||||
return model
|
||||
|
||||
def _delayed_fit_classifier(self, args):
|
||||
with qp.util.temp_seed(self.random_state):
|
||||
print('enter job')
|
||||
cls_params, training = args
|
||||
model = deepcopy(self.base_quantifier)
|
||||
model.set_params(**cls_params)
|
||||
predictions = model.classifier_fit_predict(training, predict_on=model.val_split)
|
||||
print('exit job')
|
||||
return (model, predictions)
|
||||
|
||||
def _delayed_fit_aggregation(self, args):
|
||||
with qp.util.temp_seed(self.random_state):
|
||||
print('\tenter job')
|
||||
((model, predictions), q_params), training = args
|
||||
model = deepcopy(model)
|
||||
model.set_params(**q_params)
|
||||
model.aggregation_fit(predictions, training)
|
||||
print('\texit job')
|
||||
return model
|
||||
|
||||
|
||||
def fit(self, training: LabelledCollection):
|
||||
self._check_binary(training, self.__class__.__name__)
|
||||
params_keys = list(self.param_grid.keys())
|
||||
params_values = list(self.param_grid.values())
|
||||
hyper = [dict({k: val[i] for i, k in enumerate(params_keys)}) for val in itertools.product(*params_values)]
|
||||
self.models = qp.util.parallel(
|
||||
self._delayed_fit,
|
||||
((params, training) for params in hyper),
|
||||
seed=qp.environ.get('_R_SEED', None),
|
||||
n_jobs=self.n_jobs
|
||||
)
|
||||
|
||||
if isinstance(self.base_quantifier, AggregativeQuantifier):
|
||||
cls_configs, q_configs = qp.model_selection.group_params(self.param_grid)
|
||||
|
||||
if len(cls_configs) > 1:
|
||||
models_preds = qp.util.parallel(
|
||||
self._delayed_fit_classifier,
|
||||
((params, training) for params in cls_configs),
|
||||
seed=qp.environ.get('_R_SEED', None),
|
||||
n_jobs=self.n_jobs,
|
||||
asarray=False
|
||||
)
|
||||
else:
|
||||
print('only 1')
|
||||
model = self.base_quantifier
|
||||
model.set_params(**cls_configs[0])
|
||||
predictions = model.classifier_fit_predict(training, predict_on=model.val_split)
|
||||
models_preds = [(model, predictions)]
|
||||
|
||||
self.models = qp.util.parallel(
|
||||
self._delayed_fit_aggregation,
|
||||
((setup, training) for setup in itertools.product(models_preds, q_configs)),
|
||||
seed=qp.environ.get('_R_SEED', None),
|
||||
n_jobs=self.n_jobs,
|
||||
asarray=False
|
||||
)
|
||||
else:
|
||||
configs = qp.model_selection.expand_grid(self.param_grid)
|
||||
self.models = qp.util.parallel(
|
||||
self._delayed_fit,
|
||||
((params, training) for params in configs),
|
||||
seed=qp.environ.get('_R_SEED', None),
|
||||
n_jobs=self.n_jobs,
|
||||
asarray=False
|
||||
)
|
||||
return self
|
||||
|
||||
def _delayed_predict(self, args):
|
||||
|
@ -80,13 +186,13 @@ class MedianEstimator(BinaryQuantifier):
|
|||
self._delayed_predict,
|
||||
((model, instances) for model in self.models),
|
||||
seed=qp.environ.get('_R_SEED', None),
|
||||
n_jobs=self.n_jobs
|
||||
n_jobs=self.n_jobs,
|
||||
asarray=False
|
||||
)
|
||||
prev_preds = np.asarray(prev_preds)
|
||||
return np.median(prev_preds, axis=0)
|
||||
|
||||
|
||||
|
||||
class Ensemble(BaseQuantifier):
|
||||
VALID_POLICIES = {'ave', 'ptr', 'ds'} | qp.error.QUANTIFICATION_ERROR_NAMES
|
||||
|
||||
|
|
|
@ -194,7 +194,7 @@ class QuaNetTrainer(BaseQuantifier):
|
|||
label_predictions = np.argmax(posteriors, axis=-1)
|
||||
prevs_estim = []
|
||||
for quantifier in self.quantifiers.values():
|
||||
predictions = posteriors if isinstance(quantifier, AggregativeProbabilisticQuantifier) else label_predictions
|
||||
predictions = posteriors if isinstance(quantifier, AggregativeSoftQuantifier) else label_predictions
|
||||
prevs_estim.extend(quantifier.aggregate(predictions))
|
||||
|
||||
# there is no real need for adding static estims like the TPR or FPR from training since those are constant
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
from typing import Union, Callable
|
||||
import numpy as np
|
||||
|
||||
from functional import get_divergence
|
||||
from quapy.functional import get_divergence
|
||||
from quapy.data import LabelledCollection
|
||||
from quapy.method.base import BaseQuantifier, BinaryQuantifier
|
||||
import quapy.functional as F
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
import itertools
|
||||
import signal
|
||||
from copy import deepcopy
|
||||
from enum import Enum
|
||||
from typing import Union, Callable
|
||||
from functools import wraps
|
||||
|
||||
import numpy as np
|
||||
from sklearn import clone
|
||||
|
@ -10,10 +12,67 @@ import quapy as qp
|
|||
from quapy import evaluation
|
||||
from quapy.protocol import AbstractProtocol, OnLabelledCollectionProtocol
|
||||
from quapy.data.base import LabelledCollection
|
||||
from quapy.method.aggregative import BaseQuantifier
|
||||
from quapy.method.aggregative import BaseQuantifier, AggregativeQuantifier
|
||||
from quapy.util import timeout
|
||||
from time import time
|
||||
|
||||
|
||||
class Status(Enum):
|
||||
SUCCESS = 1
|
||||
TIMEOUT = 2
|
||||
INVALID = 3
|
||||
ERROR = 4
|
||||
|
||||
def check_status(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
obj = args[0]
|
||||
tinit = time()
|
||||
|
||||
job_descriptor = dict(args[1])
|
||||
params = {**job_descriptor.get('cls-params', {}), **job_descriptor.get('q-params', {})}
|
||||
|
||||
if obj.timeout > 0:
|
||||
def handler(signum, frame):
|
||||
raise TimeoutError()
|
||||
|
||||
signal.signal(signal.SIGALRM, handler)
|
||||
signal.alarm(obj.timeout)
|
||||
|
||||
try:
|
||||
job_descriptor = func(*args, **kwargs)
|
||||
|
||||
ttime = time() - tinit
|
||||
|
||||
score = job_descriptor.get('score', None)
|
||||
if score is not None:
|
||||
obj._sout(f'hyperparams=[{params}]\t got {obj.error.__name__} = {score:.5f} [took {ttime:.4f}s]')
|
||||
|
||||
if obj.timeout > 0:
|
||||
signal.alarm(0)
|
||||
|
||||
exit_status = Status.SUCCESS
|
||||
|
||||
except TimeoutError:
|
||||
obj._sout(f'timeout ({obj.timeout}s) reached for config {params}')
|
||||
exit_status = Status.TIMEOUT
|
||||
|
||||
except ValueError as e:
|
||||
obj._sout(f'the combination of hyperparameters {params} is invalid')
|
||||
obj._sout(f'\tException: {e}')
|
||||
exit_status = Status.INVALID
|
||||
|
||||
except Exception as e:
|
||||
obj._sout(f'something went wrong for config {params}; skipping:')
|
||||
obj._sout(f'\tException: {e}')
|
||||
exit_status = Status.ERROR
|
||||
|
||||
job_descriptor['status'] = exit_status
|
||||
job_descriptor['params'] = params
|
||||
return job_descriptor
|
||||
return wrapper
|
||||
|
||||
|
||||
class GridSearchQ(BaseQuantifier):
|
||||
"""Grid Search optimization targeting a quantification-oriented metric.
|
||||
|
||||
|
@ -69,6 +128,113 @@ class GridSearchQ(BaseQuantifier):
|
|||
raise ValueError(f'unexpected error type; must either be a callable function or a str representing\n'
|
||||
f'the name of an error function in {qp.error.QUANTIFICATION_ERROR_NAMES}')
|
||||
|
||||
def _prepare_classifier(self, args):
|
||||
cls_params = args['cls-params']
|
||||
training = args['training']
|
||||
model = deepcopy(self.model)
|
||||
model.set_params(**cls_params)
|
||||
predictions = model.classifier_fit_predict(training)
|
||||
return {'model': model, 'predictions': predictions, 'cls-params': cls_params}
|
||||
|
||||
def _prepare_aggregation(self, args):
|
||||
|
||||
model = args['model']
|
||||
predictions = args['predictions']
|
||||
cls_params = args['cls-params']
|
||||
q_params = args['q-params']
|
||||
training = args['training']
|
||||
|
||||
params = {**cls_params, **q_params}
|
||||
|
||||
def job(model):
|
||||
tinit = time()
|
||||
model = deepcopy(model)
|
||||
# overrides default parameters with the parameters being explored at this iteration
|
||||
model.set_params(**q_params)
|
||||
model.aggregation_fit(predictions, training)
|
||||
score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error)
|
||||
ttime = time()-tinit
|
||||
|
||||
return {
|
||||
'model': model,
|
||||
'cls-params':cls_params,
|
||||
'q-params': q_params,
|
||||
'params': params,
|
||||
'score': score,
|
||||
'ttime':ttime
|
||||
}
|
||||
|
||||
out, status = self._error_handler(job, args)
|
||||
if status == Status.SUCCESS:
|
||||
self._sout(f'hyperparams=[{params}]\t got {self.error.__name__} = {out["score"]:.5f} [took {out["time"]:.4f}s]')
|
||||
elif status == Status.INVALID:
|
||||
self._sout(f'the combination of hyperparameters {params} is invalid')
|
||||
elif status == Status.
|
||||
|
||||
|
||||
def _prepare_model(self, args):
|
||||
params, training = args
|
||||
model = deepcopy(self.model)
|
||||
# overrides default parameters with the parameters being explored at this iteration
|
||||
model.set_params(**params)
|
||||
model.fit(training)
|
||||
score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error)
|
||||
return {'model': model, 'params': params, 'score': score}
|
||||
|
||||
|
||||
def _compute_scores_aggregative(self, training):
|
||||
|
||||
# break down the set of hyperparameters into two: classifier-specific, quantifier-specific
|
||||
cls_configs, q_configs = group_params(self.param_grid)
|
||||
|
||||
# train all classifiers and get the predictions
|
||||
partial_setups = qp.util.parallel(
|
||||
self._prepare_classifier,
|
||||
({'cls-params':params, 'training':training} for params in cls_configs),
|
||||
seed=qp.environ.get('_R_SEED', None),
|
||||
n_jobs=self.n_jobs,
|
||||
asarray=False,
|
||||
)
|
||||
|
||||
# filter out classifier configurations that yield any error
|
||||
for setup in partial_setups:
|
||||
if setup['status'] != Status.SUCCESS:
|
||||
self._sout(f'-> classifier hyperparemters {setup["params"]} caused '
|
||||
f'error {setup["status"]} and will be ignored')
|
||||
|
||||
partial_setups = [setup for setup in partial_setups if setup['status']==Status.SUCCESS]
|
||||
|
||||
if len(partial_setups) == 0:
|
||||
raise ValueError('No valid configuration found for the classifier.')
|
||||
|
||||
# explore the quantifier-specific hyperparameters for each training configuration
|
||||
scores = qp.util.parallel(
|
||||
self._prepare_aggregation,
|
||||
({'q-params': setup[1], 'training': training, **setup[0]} for setup in itertools.product(partial_setups, q_configs)),
|
||||
seed=qp.environ.get('_R_SEED', None),
|
||||
n_jobs=self.n_jobs
|
||||
)
|
||||
|
||||
return scores
|
||||
|
||||
def _compute_scores_nonaggregative(self, training):
|
||||
configs = expand_grid(self.param_grid)
|
||||
|
||||
# pass a seed to parallel, so it is set in child processes
|
||||
scores = qp.util.parallel(
|
||||
self._prepare_model,
|
||||
((params, training) for params in configs),
|
||||
seed=qp.environ.get('_R_SEED', None),
|
||||
n_jobs=self.n_jobs
|
||||
)
|
||||
return scores
|
||||
|
||||
def _compute_scores(self, training):
|
||||
if isinstance(self.model, AggregativeQuantifier):
|
||||
return self._compute_scores_aggregative(training)
|
||||
else:
|
||||
return self._compute_scores_nonaggregative(training)
|
||||
|
||||
def fit(self, training: LabelledCollection):
|
||||
""" Learning routine. Fits methods with all combinations of hyperparameters and selects the one minimizing
|
||||
the error metric.
|
||||
|
@ -76,35 +242,31 @@ class GridSearchQ(BaseQuantifier):
|
|||
:param training: the training set on which to optimize the hyperparameters
|
||||
:return: self
|
||||
"""
|
||||
params_keys = list(self.param_grid.keys())
|
||||
params_values = list(self.param_grid.values())
|
||||
|
||||
protocol = self.protocol
|
||||
|
||||
self.param_scores_ = {}
|
||||
self.best_score_ = None
|
||||
if self.refit and not isinstance(self.protocol, OnLabelledCollectionProtocol):
|
||||
raise RuntimeWarning(
|
||||
f'"refit" was requested, but the protocol does not implement '
|
||||
f'the {OnLabelledCollectionProtocol.__name__} interface'
|
||||
)
|
||||
|
||||
tinit = time()
|
||||
|
||||
hyper = [dict({k: val[i] for i, k in enumerate(params_keys)}) for val in itertools.product(*params_values)]
|
||||
self._sout(f'starting model selection with {self.n_jobs =}')
|
||||
#pass a seed to parallel so it is set in clild processes
|
||||
scores = qp.util.parallel(
|
||||
self._delayed_eval,
|
||||
((params, training) for params in hyper),
|
||||
seed=qp.environ.get('_R_SEED', None),
|
||||
n_jobs=self.n_jobs
|
||||
)
|
||||
self._sout(f'starting model selection with n_jobs={self.n_jobs}')
|
||||
results = self._compute_scores(training)
|
||||
|
||||
for params, score, model in scores:
|
||||
self.param_scores_ = {}
|
||||
self.best_score_ = None
|
||||
for job_result in results:
|
||||
score = job_result.get('score', None)
|
||||
params = job_result['params']
|
||||
if score is not None:
|
||||
if self.best_score_ is None or score < self.best_score_:
|
||||
self.best_score_ = score
|
||||
self.best_params_ = params
|
||||
self.best_model_ = model
|
||||
self.best_model_ = job_result['model']
|
||||
self.param_scores_[str(params)] = score
|
||||
else:
|
||||
self.param_scores_[str(params)] = 'timeout'
|
||||
self.param_scores_[str(params)] = job_result['status']
|
||||
|
||||
tend = time()-tinit
|
||||
|
||||
|
@ -115,58 +277,17 @@ class GridSearchQ(BaseQuantifier):
|
|||
f'[took {tend:.4f}s]')
|
||||
|
||||
if self.refit:
|
||||
if isinstance(protocol, OnLabelledCollectionProtocol):
|
||||
if isinstance(self.protocol, OnLabelledCollectionProtocol):
|
||||
tinit = time()
|
||||
self._sout(f'refitting on the whole development set')
|
||||
self.best_model_.fit(training + protocol.get_labelled_collection())
|
||||
self.best_model_.fit(training + self.protocol.get_labelled_collection())
|
||||
tend = time() - tinit
|
||||
self.refit_time_ = tend
|
||||
else:
|
||||
raise RuntimeWarning(f'"refit" was requested, but the protocol does not '
|
||||
f'implement the {OnLabelledCollectionProtocol.__name__} interface')
|
||||
raise RuntimeWarning(f'the model cannot be refit on the whole dataset')
|
||||
|
||||
return self
|
||||
|
||||
def _delayed_eval(self, args):
|
||||
params, training = args
|
||||
|
||||
protocol = self.protocol
|
||||
error = self.error
|
||||
|
||||
if self.timeout > 0:
|
||||
def handler(signum, frame):
|
||||
raise TimeoutError()
|
||||
|
||||
signal.signal(signal.SIGALRM, handler)
|
||||
|
||||
tinit = time()
|
||||
|
||||
if self.timeout > 0:
|
||||
signal.alarm(self.timeout)
|
||||
|
||||
try:
|
||||
model = deepcopy(self.model)
|
||||
# overrides default parameters with the parameters being explored at this iteration
|
||||
model.set_params(**params)
|
||||
model.fit(training)
|
||||
score = evaluation.evaluate(model, protocol=protocol, error_metric=error)
|
||||
|
||||
ttime = time()-tinit
|
||||
self._sout(f'hyperparams={params}\t got {error.__name__} score {score:.5f} [took {ttime:.4f}s]')
|
||||
|
||||
if self.timeout > 0:
|
||||
signal.alarm(0)
|
||||
except TimeoutError:
|
||||
self._sout(f'timeout ({self.timeout}s) reached for config {params}')
|
||||
score = None
|
||||
except ValueError as e:
|
||||
self._sout(f'the combination of hyperparameters {params} is invalid')
|
||||
raise e
|
||||
except Exception as e:
|
||||
self._sout(f'something went wrong for config {params}; skipping:')
|
||||
self._sout(f'\tException: {e}')
|
||||
score = None
|
||||
|
||||
return params, score, model
|
||||
|
||||
|
||||
def quantify(self, instances):
|
||||
"""Estimate class prevalence values using the best model found after calling the :meth:`fit` method.
|
||||
|
||||
|
@ -204,6 +325,22 @@ class GridSearchQ(BaseQuantifier):
|
|||
raise ValueError('best_model called before fit')
|
||||
|
||||
|
||||
def _error_handler(self, func, *args, **kwargs):
|
||||
|
||||
try:
|
||||
with timeout(self.timeout):
|
||||
output = func(*args, **kwargs)
|
||||
return output, Status.SUCCESS
|
||||
|
||||
except TimeoutError:
|
||||
return None, Status.TIMEOUT
|
||||
|
||||
except ValueError:
|
||||
return None, Status.INVALID
|
||||
|
||||
except Exception:
|
||||
return None, Status.ERROR
|
||||
|
||||
|
||||
|
||||
def cross_val_predict(quantifier: BaseQuantifier, data: LabelledCollection, nfolds=3, random_state=0):
|
||||
|
@ -229,3 +366,43 @@ def cross_val_predict(quantifier: BaseQuantifier, data: LabelledCollection, nfol
|
|||
return total_prev
|
||||
|
||||
|
||||
def expand_grid(param_grid: dict):
|
||||
"""
|
||||
Expands a param_grid dictionary as a list of configurations.
|
||||
Example:
|
||||
|
||||
>>> combinations = expand_grid({'A': [1, 10, 100], 'B': [True, False]})
|
||||
>>> print(combinations)
|
||||
>>> [{'A': 1, 'B': True}, {'A': 1, 'B': False}, {'A': 10, 'B': True}, {'A': 10, 'B': False}, {'A': 100, 'B': True}, {'A': 100, 'B': False}]
|
||||
|
||||
:param param_grid: dictionary with keys representing hyper-parameter names, and values representing the range
|
||||
to explore for that hyper-parameter
|
||||
:return: a list of configurations, i.e., combinations of hyper-parameter assignments in the grid.
|
||||
"""
|
||||
params_keys = list(param_grid.keys())
|
||||
params_values = list(param_grid.values())
|
||||
configs = [{k: combs[i] for i, k in enumerate(params_keys)} for combs in itertools.product(*params_values)]
|
||||
return configs
|
||||
|
||||
|
||||
def group_params(param_grid: dict):
|
||||
"""
|
||||
Partitions a param_grid dictionary as two lists of configurations, one for the classifier-specific
|
||||
hyper-parameters, and another for que quantifier-specific hyper-parameters
|
||||
|
||||
:param param_grid: dictionary with keys representing hyper-parameter names, and values representing the range
|
||||
to explore for that hyper-parameter
|
||||
:return: two expanded grids of configurations, one for the classifier, another for the quantifier
|
||||
"""
|
||||
classifier_params, quantifier_params = {}, {}
|
||||
for key, values in param_grid.items():
|
||||
if key.startswith('classifier__') or key == 'val_split':
|
||||
classifier_params[key] = values
|
||||
else:
|
||||
quantifier_params[key] = values
|
||||
|
||||
classifier_configs = expand_grid(classifier_params)
|
||||
quantifier_configs = expand_grid(quantifier_params)
|
||||
|
||||
return classifier_configs, quantifier_configs
|
||||
|
||||
|
|
|
@ -22,9 +22,9 @@ class HierarchyTestCase(unittest.TestCase):
|
|||
def test_probabilistic(self):
|
||||
lr = LogisticRegression()
|
||||
for m in [CC(lr), ACC(lr)]:
|
||||
self.assertEqual(isinstance(m, AggregativeProbabilisticQuantifier), False)
|
||||
self.assertEqual(isinstance(m, AggregativeSoftQuantifier), False)
|
||||
for m in [PCC(lr), PACC(lr)]:
|
||||
self.assertEqual(isinstance(m, AggregativeProbabilisticQuantifier), True)
|
||||
self.assertEqual(isinstance(m, AggregativeSoftQuantifier), True)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -10,6 +10,8 @@ import quapy as qp
|
|||
|
||||
import numpy as np
|
||||
from joblib import Parallel, delayed
|
||||
from time import time
|
||||
import signal
|
||||
|
||||
|
||||
def _get_parallel_slices(n_tasks, n_jobs):
|
||||
|
@ -38,7 +40,7 @@ def map_parallel(func, args, n_jobs):
|
|||
return list(itertools.chain.from_iterable(results))
|
||||
|
||||
|
||||
def parallel(func, args, n_jobs, seed=None):
|
||||
def parallel(func, args, n_jobs, seed=None, asarray=True, backend='loky'):
|
||||
"""
|
||||
A wrapper of multiprocessing:
|
||||
|
||||
|
@ -58,9 +60,12 @@ def parallel(func, args, n_jobs, seed=None):
|
|||
stack.enter_context(qp.util.temp_seed(seed))
|
||||
return func(*args)
|
||||
|
||||
return Parallel(n_jobs=n_jobs)(
|
||||
out = Parallel(n_jobs=n_jobs, backend=backend)(
|
||||
delayed(func_dec)(qp.environ, None if seed is None else seed+i, args_i) for i, args_i in enumerate(args)
|
||||
)
|
||||
if asarray:
|
||||
out = np.asarray(out)
|
||||
return out
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
|
@ -254,3 +259,35 @@ class EarlyStop:
|
|||
if self.patience <= 0:
|
||||
self.STOP = True
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def timeout(seconds):
|
||||
"""
|
||||
Opens a context that will launch an exception if not closed after a given number of seconds
|
||||
|
||||
>>> def func(start_msg, end_msg):
|
||||
>>> print(start_msg)
|
||||
>>> sleep(2)
|
||||
>>> print(end_msg)
|
||||
>>>
|
||||
>>> with timeout(1):
|
||||
>>> func('begin function', 'end function')
|
||||
>>> Out[]
|
||||
>>> begin function
|
||||
>>> TimeoutError
|
||||
|
||||
|
||||
:param seconds: number of seconds, set to <=0 to ignore the timer
|
||||
"""
|
||||
if seconds > 0:
|
||||
def handler(signum, frame):
|
||||
raise TimeoutError()
|
||||
|
||||
signal.signal(signal.SIGALRM, handler)
|
||||
signal.alarm(seconds)
|
||||
|
||||
yield
|
||||
|
||||
if seconds > 0:
|
||||
signal.alarm(0)
|
||||
|
||||
|
|
Loading…
Reference in New Issue