model selection with error handling

This commit is contained in:
Alejandro Moreo Fernandez 2023-11-16 19:56:30 +01:00
parent 513c78f1f3
commit f785a4eeef
2 changed files with 146 additions and 179 deletions

View File

@ -1,4 +1,5 @@
import quapy as qp
from quapy.method.non_aggregative import DMx
from quapy.protocol import APP
from quapy.method.aggregative import DMy
from sklearn.linear_model import LogisticRegression
@ -38,7 +39,7 @@ with qp.util.temp_seed(0):
param_grid = {
'classifier__C': np.logspace(-3,3,7),
'classifier__class_weight': ['balanced', None],
'nbins': [8, 16, 32, 64],
'nbins': [8, 16, 32, 64, 'poooo'],
}
tinit = time()

View File

@ -3,6 +3,7 @@ import signal
from copy import deepcopy
from enum import Enum
from typing import Union, Callable
from functools import wraps
import numpy as np
from sklearn import clone
@ -21,6 +22,56 @@ class Status(Enum):
INVALID = 3
ERROR = 4
def check_status(func):
@wraps(func)
def wrapper(*args, **kwargs):
obj = args[0]
tinit = time()
job_descriptor = dict(args[1])
params = {**job_descriptor.get('cls-params', {}), **job_descriptor.get('q-params', {})}
if obj.timeout > 0:
def handler(signum, frame):
raise TimeoutError()
signal.signal(signal.SIGALRM, handler)
signal.alarm(obj.timeout)
try:
job_descriptor = func(*args, **kwargs)
ttime = time() - tinit
score = job_descriptor.get('score', None)
if score is not None:
obj._sout(f'hyperparams=[{params}]\t got {obj.error.__name__} = {score:.5f} [took {ttime:.4f}s]')
if obj.timeout > 0:
signal.alarm(0)
exit_status = Status.SUCCESS
except TimeoutError:
obj._sout(f'timeout ({obj.timeout}s) reached for config {params}')
exit_status = Status.TIMEOUT
except ValueError as e:
obj._sout(f'the combination of hyperparameters {params} is invalid')
obj._sout(f'\tException: {e}')
exit_status = Status.INVALID
except Exception as e:
obj._sout(f'something went wrong for config {params}; skipping:')
obj._sout(f'\tException: {e}')
exit_status = Status.ERROR
job_descriptor['status'] = exit_status
job_descriptor['params'] = params
return job_descriptor
return wrapper
class GridSearchQ(BaseQuantifier):
"""Grid Search optimization targeting a quantification-oriented metric.
@ -76,184 +127,97 @@ class GridSearchQ(BaseQuantifier):
raise ValueError(f'unexpected error type; must either be a callable function or a str representing\n'
f'the name of an error function in {qp.error.QUANTIFICATION_ERROR_NAMES}')
def _fit_nonaggregative(self, training):
@check_status
def _prepare_classifier(self, args):
cls_params = args['cls-params']
training = args['training']
model = deepcopy(self.model)
model.set_params(**cls_params)
predictions = model.classifier_fit_predict(training)
return {'model': model, 'predictions': predictions, 'cls-params': cls_params}
@check_status
def _prepare_aggregation(self, args):
# (partial_setup, q_params), training = args
model = args['model']
predictions = args['predictions']
cls_params = args['cls-params']
q_params = args['q-params']
training = args['training']
params = {**cls_params, **q_params}
model = deepcopy(model)
# overrides default parameters with the parameters being explored at this iteration
model.set_params(**q_params)
model.aggregation_fit(predictions, training)
score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error)
return {'model': model, 'cls-params':cls_params, 'q-params': q_params, 'params': params, 'score': score}
@check_status
def _prepare_model(self, args):
params, training = args
model = deepcopy(self.model)
# overrides default parameters with the parameters being explored at this iteration
model.set_params(**params)
model.fit(training)
score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error)
return {'model': model, 'params': params, 'score': score}
def _compute_scores_aggregative(self, training):
# break down the set of hyperparameters into two: classifier-specific, quantifier-specific
cls_configs, q_configs = group_params(self.param_grid)
# train all classifiers and get the predictions
partial_setups = qp.util.parallel(
self._prepare_classifier,
({'cls-params':params, 'training':training} for params in cls_configs),
seed=qp.environ.get('_R_SEED', None),
n_jobs=self.n_jobs,
asarray=False,
)
# filter out classifier configurations that yield any error
for setup in partial_setups:
if setup['status'] != Status.SUCCESS:
self._sout(f'-> classifier hyperparemters {setup["params"]} caused '
f'error {setup["status"]} and will be ignored')
partial_setups = [setup for setup in partial_setups if setup['status']==Status.SUCCESS]
if len(partial_setups) == 0:
raise ValueError('No valid configuration found for the classifier.')
# explore the quantifier-specific hyperparameters for each training configuration
scores = qp.util.parallel(
self._prepare_aggregation,
({'q-params': setup[1], 'training': training, **setup[0]} for setup in itertools.product(partial_setups, q_configs)),
seed=qp.environ.get('_R_SEED', None),
n_jobs=self.n_jobs
)
return scores
def _compute_scores_nonaggregative(self, training):
configs = expand_grid(self.param_grid)
self._sout(f'starting model selection with {self.n_jobs =}')
#pass a seed to parallel so it is set in child processes
# pass a seed to parallel, so it is set in child processes
scores = qp.util.parallel(
self._delayed_eval,
self._prepare_model,
((params, training) for params in configs),
seed=qp.environ.get('_R_SEED', None),
n_jobs=self.n_jobs
)
return scores
def _delayed_fit_classifier(self, args):
cls_params, training = args
model = deepcopy(self.model)
model.set_params(**cls_params)
predictions = model.classifier_fit_predict(training)
return (model, predictions, cls_params)
def _eval_aggregative(self, args):
((model, predictions, cls_params), q_params), training = args
model = deepcopy(model)
# overrides default parameters with the parameters being explored at this iteration
model.set_params(**q_params)
model.aggregation_fit(predictions, training)
params = {**cls_params, **q_params}
return model, params
def _delayed_evaluation__(self, args):
exit_status = Status.SUCCESS
tinit = time()
if self.timeout > 0:
def handler(signum, frame):
raise TimeoutError()
signal.signal(signal.SIGALRM, handler)
signal.alarm(self.timeout)
try:
model, params = self._eval_aggregative(args)
score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error)
ttime = time() - tinit
self._sout(f'hyperparams=[{params}]\t got {self.error.__name__} score {score:.5f} [took {ttime:.4f}s]')
if self.timeout > 0:
signal.alarm(0)
except TimeoutError:
self._sout(f'timeout ({self.timeout}s) reached for config {params}')
score = None
exit_status = Status.TIMEOUT
except ValueError as e:
self._sout(f'the combination of hyperparameters {params} is invalid')
score = None
exit_status = Status.INVALID
except Exception as e:
self._sout(f'something went wrong for config {params}; skipping:')
self._sout(f'\tException: {e}')
score = None
exit_status = Status.ERROR
return params, score, model, exit_status
# def _delayed_fit_aggregation_and_eval(self, args):
#
# ((model, predictions, cls_params), q_params), training = args
# exit_status = Status.SUCCESS
#
# tinit = time()
# if self.timeout > 0:
# def handler(signum, frame):
# raise TimeoutError()
# signal.signal(signal.SIGALRM, handler)
# signal.alarm(self.timeout)
#
# try:
# model = deepcopy(model)
# # overrides default parameters with the parameters being explored at this iteration
# model.set_params(**q_params)
# model.aggregation_fit(predictions, training)
# score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error)
#
# ttime = time() - tinit
# self._sout(f'hyperparams=[cls:{cls_params}, q:{q_params}]\t got {self.error.__name__} score {score:.5f} [took {ttime:.4f}s]')
#
# if self.timeout > 0:
# signal.alarm(0)
# except TimeoutError:
# self._sout(f'timeout ({self.timeout}s) reached for config {q_params}')
# score = None
# exit_status = Status.TIMEOUT
# except ValueError as e:
# self._sout(f'the combination of hyperparameters {q_params} is invalid')
# score = None
# exit_status = Status.INVALID
# except Exception as e:
# self._sout(f'something went wrong for config {q_params}; skipping:')
# self._sout(f'\tException: {e}')
# score = None
# exit_status = Status.ERROR
#
# params = {**cls_params, **q_params}
# return params, score, model, exit_status
def _delayed_eval(self, args):
params, training = args
protocol = self.protocol
error = self.error
if self.timeout > 0:
def handler(signum, frame):
raise TimeoutError()
signal.signal(signal.SIGALRM, handler)
tinit = time()
if self.timeout > 0:
signal.alarm(self.timeout)
try:
model = deepcopy(self.model)
# overrides default parameters with the parameters being explored at this iteration
model.set_params(**params)
model.fit(training)
score = evaluation.evaluate(model, protocol=protocol, error_metric=error)
ttime = time()-tinit
self._sout(f'hyperparams={params}\t got {error.__name__} score {score:.5f} [took {ttime:.4f}s]')
if self.timeout > 0:
signal.alarm(0)
except TimeoutError:
self._sout(f'timeout ({self.timeout}s) reached for config {params}')
score = None
except ValueError as e:
self._sout(f'the combination of hyperparameters {params} is invalid')
raise e
except Exception as e:
self._sout(f'something went wrong for config {params}; skipping:')
self._sout(f'\tException: {e}')
score = None
return params, score, model, status
def _fit_aggregative(self, training):
# break down the set of hyperparameters into two: classifier-specific, quantifier-specific
cls_configs, q_configs = group_params(self.param_grid)
# train all classifiers and get the predictions
models_preds_clsconfigs = qp.util.parallel(
self._delayed_fit_classifier,
((params, training) for params in cls_configs),
seed=qp.environ.get('_R_SEED', None),
n_jobs=self.n_jobs,
asarray=False,
)
# explore the quantifier-specific hyperparameters for each training configuration
scores = qp.util.parallel(
self._delayed_fit_aggregation_and_eval,
((setup, training) for setup in itertools.product(models_preds_clsconfigs, q_configs)),
seed=qp.environ.get('_R_SEED', None),
n_jobs=self.n_jobs
)
return scores
def _compute_scores(self, training):
if isinstance(self.model, AggregativeQuantifier):
return self._compute_scores_aggregative(training)
else:
return self._compute_scores_nonaggregative(training)
def fit(self, training: LabelledCollection):
""" Learning routine. Fits methods with all combinations of hyperparameters and selects the one minimizing
@ -264,27 +228,29 @@ class GridSearchQ(BaseQuantifier):
"""
if self.refit and not isinstance(self.protocol, OnLabelledCollectionProtocol):
raise RuntimeWarning(f'"refit" was requested, but the protocol does not '
f'implement the {OnLabelledCollectionProtocol.__name__} interface')
raise RuntimeWarning(
f'"refit" was requested, but the protocol does not implement '
f'the {OnLabelledCollectionProtocol.__name__} interface'
)
tinit = time()
if isinstance(self.model, AggregativeQuantifier):
self.results = self._fit_aggregative(training)
else:
self.results = self._fit_nonaggregative(training)
self._sout(f'starting model selection with n_jobs={self.n_jobs}')
results = self._compute_scores(training)
self.param_scores_ = {}
self.best_score_ = None
for params, score, model in self.results:
for job_result in results:
score = job_result.get('score', None)
params = job_result['params']
if score is not None:
if self.best_score_ is None or score < self.best_score_:
self.best_score_ = score
self.best_params_ = params
self.best_model_ = model
self.best_model_ = job_result['model']
self.param_scores_[str(params)] = score
else:
self.param_scores_[str(params)] = 'timeout'
self.param_scores_[str(params)] = job_result['status']
tend = time()-tinit