From eb9a3dde2a77f280eb95a1c07c9503660971cb64 Mon Sep 17 00:00:00 2001 From: Alex Moreo Date: Tue, 21 Nov 2023 18:59:36 +0100 Subject: [PATCH] grid search almost complete --- examples/model_selection.py | 12 +- quapy/model_selection.py | 260 +++++++++++++++++------------------- 2 files changed, 131 insertions(+), 141 deletions(-) diff --git a/examples/model_selection.py b/examples/model_selection.py index 141cf91..559de27 100644 --- a/examples/model_selection.py +++ b/examples/model_selection.py @@ -3,7 +3,7 @@ from quapy.method.non_aggregative import DMx from quapy.protocol import APP from quapy.method.aggregative import DMy from sklearn.linear_model import LogisticRegression -from examples.comparing_gridsearch import OLD_GridSearchQ +#from examples.comparing_gridsearch import OLD_GridSearchQ import numpy as np from time import time @@ -37,14 +37,13 @@ with qp.util.temp_seed(0): # in order to let the quantifier know this hyper-parameter belongs to its underlying # classifier. param_grid = { - 'classifier__C': np.logspace(-3,3,7), - 'classifier__class_weight': ['balanced', None], - 'nbins': [8, 16, 32, 64, 'poooo'], + 'classifier__C': np.logspace(-2, 2, 5), + 'classifier__class_weight': ['balanced', None, 'ch'], + 'nbins': [8, 16, 32, 64, 'po'], } tinit = time() - # model = OLD_GridSearchQ( model = qp.model_selection.GridSearchQ( model=model, @@ -52,6 +51,7 @@ with qp.util.temp_seed(0): protocol=protocol, error='mae', # the error to optimize is the MAE (a quantification-oriented loss) refit=False, # retrain on the whole labelled set once done + raise_errors=False, verbose=True # show information as the process goes on ).fit(training) @@ -65,5 +65,5 @@ model = model.best_model_ mae_score = qp.evaluation.evaluate(model, protocol=APP(test), error_metric='mae') print(f'MAE={mae_score:.5f}') -print(f'model selection took {tend-tinit}s') +print(f'model selection took {tend-tinit:.1f}s') diff --git a/quapy/model_selection.py b/quapy/model_selection.py index 6637d62..9017b99 100644 --- a/quapy/model_selection.py +++ b/quapy/model_selection.py @@ -23,54 +23,24 @@ class Status(Enum): INVALID = 3 ERROR = 4 -def check_status(func): - @wraps(func) - def wrapper(*args, **kwargs): - obj = args[0] - tinit = time() - job_descriptor = dict(args[1]) - params = {**job_descriptor.get('cls-params', {}), **job_descriptor.get('q-params', {})} +class ConfigStatus: + def __init__(self, params, status, msg=''): + self.params = params + self.status = status + self.msg = msg - if obj.timeout > 0: - def handler(signum, frame): - raise TimeoutError() + def __str__(self): + return f':params:{self.params} :status:{self.status} ' + self.msg - signal.signal(signal.SIGALRM, handler) - signal.alarm(obj.timeout) + def __repr__(self): + return str(self) - try: - job_descriptor = func(*args, **kwargs) + def success(self): + return self.status == Status.SUCCESS - ttime = time() - tinit - - score = job_descriptor.get('score', None) - if score is not None: - obj._sout(f'hyperparams=[{params}]\t got {obj.error.__name__} = {score:.5f} [took {ttime:.4f}s]') - - if obj.timeout > 0: - signal.alarm(0) - - exit_status = Status.SUCCESS - - except TimeoutError: - obj._sout(f'timeout ({obj.timeout}s) reached for config {params}') - exit_status = Status.TIMEOUT - - except ValueError as e: - obj._sout(f'the combination of hyperparameters {params} is invalid') - obj._sout(f'\tException: {e}') - exit_status = Status.INVALID - - except Exception as e: - obj._sout(f'something went wrong for config {params}; skipping:') - obj._sout(f'\tException: {e}') - exit_status = Status.ERROR - - job_descriptor['status'] = exit_status - job_descriptor['params'] = params - return job_descriptor - return wrapper + def failed(self): + return self.status != Status.SUCCESS class GridSearchQ(BaseQuantifier): @@ -85,11 +55,14 @@ class GridSearchQ(BaseQuantifier): :param protocol: a sample generation protocol, an instance of :class:`quapy.protocol.AbstractProtocol` :param error: an error function (callable) or a string indicating the name of an error function (valid ones are those in :class:`quapy.error.QUANTIFICATION_ERROR` - :param refit: whether or not to refit the model on the whole labelled collection (training+validation) with + :param refit: whether to refit the model on the whole labelled collection (training+validation) with the best chosen hyperparameter combination. Ignored if protocol='gen' :param timeout: establishes a timer (in seconds) for each of the hyperparameters configurations being tested. Whenever a run takes longer than this timer, that configuration will be ignored. If all configurations end up being ignored, a TimeoutError exception is raised. If -1 (default) then no time bound is set. + :param raise_errors: boolean, if True then raises an exception when a param combination yields any error, if + otherwise is False (default), then the combination is marked with an error status, but the process goes on. + However, if no configuration yields a valid model, then a ValueError exception will be raised. :param verbose: set to True to get information through the stdout """ @@ -101,6 +74,7 @@ class GridSearchQ(BaseQuantifier): refit=True, timeout=-1, n_jobs=None, + raise_errors=False, verbose=False): self.model = model @@ -109,6 +83,7 @@ class GridSearchQ(BaseQuantifier): self.refit = refit self.timeout = timeout self.n_jobs = qp._get_njobs(n_jobs) + self.raise_errors = raise_errors self.verbose = verbose self.__check_error(error) assert isinstance(protocol, AbstractProtocol), 'unknown protocol' @@ -128,112 +103,97 @@ class GridSearchQ(BaseQuantifier): raise ValueError(f'unexpected error type; must either be a callable function or a str representing\n' f'the name of an error function in {qp.error.QUANTIFICATION_ERROR_NAMES}') - def _prepare_classifier(self, args): - cls_params = args['cls-params'] - training = args['training'] + def _prepare_classifier(self, cls_params): model = deepcopy(self.model) - model.set_params(**cls_params) - predictions = model.classifier_fit_predict(training) - return {'model': model, 'predictions': predictions, 'cls-params': cls_params} + + def job(cls_params): + model.set_params(**cls_params) + predictions = model.classifier_fit_predict(self._training) + return predictions + + predictions, status, took = self._error_handler(job, cls_params) + self._sout(f'[classifier fit] hyperparams={cls_params} status={status} [took {took:.3f}s]') + return model, predictions, status, took def _prepare_aggregation(self, args): - - model = args['model'] - predictions = args['predictions'] - cls_params = args['cls-params'] - q_params = args['q-params'] - training = args['training'] - + model, predictions, cls_took, cls_params, q_params = args + model = deepcopy(model) params = {**cls_params, **q_params} - def job(model): - tinit = time() - model = deepcopy(model) - # overrides default parameters with the parameters being explored at this iteration + def job(q_params): model.set_params(**q_params) - model.aggregation_fit(predictions, training) + model.aggregation_fit(predictions, self._training) score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error) - ttime = time()-tinit + return score - return { - 'model': model, - 'cls-params':cls_params, - 'q-params': q_params, - 'params': params, - 'score': score, - 'ttime':ttime - } + score, status, aggr_took = self._error_handler(job, q_params) + self._print_status(params, score, status, aggr_took) + return model, params, score, status, (cls_took+aggr_took) - out, status = self._error_handler(job, args) - if status == Status.SUCCESS: - self._sout(f'hyperparams=[{params}]\t got {self.error.__name__} = {out["score"]:.5f} [took {out["time"]:.4f}s]') - elif status == Status.INVALID: - self._sout(f'the combination of hyperparameters {params} is invalid') - elif status == Status. - - - def _prepare_model(self, args): - params, training = args + def _prepare_nonaggr_model(self, params): model = deepcopy(self.model) - # overrides default parameters with the parameters being explored at this iteration - model.set_params(**params) - model.fit(training) - score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error) - return {'model': model, 'params': params, 'score': score} + def job(params): + model.set_params(**params) + model.fit(self._training) + score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error) + return score + + score, status, took = self._error_handler(job, params) + self._print_status(params, score, status, took) + return model, params, score, status, took def _compute_scores_aggregative(self, training): - # break down the set of hyperparameters into two: classifier-specific, quantifier-specific cls_configs, q_configs = group_params(self.param_grid) # train all classifiers and get the predictions - partial_setups = qp.util.parallel( + self._training = training + cls_outs = qp.util.parallel( self._prepare_classifier, - ({'cls-params':params, 'training':training} for params in cls_configs), - seed=qp.environ.get('_R_SEED', None), - n_jobs=self.n_jobs, - asarray=False, - ) - - # filter out classifier configurations that yield any error - for setup in partial_setups: - if setup['status'] != Status.SUCCESS: - self._sout(f'-> classifier hyperparemters {setup["params"]} caused ' - f'error {setup["status"]} and will be ignored') - - partial_setups = [setup for setup in partial_setups if setup['status']==Status.SUCCESS] - - if len(partial_setups) == 0: - raise ValueError('No valid configuration found for the classifier.') - - # explore the quantifier-specific hyperparameters for each training configuration - scores = qp.util.parallel( - self._prepare_aggregation, - ({'q-params': setup[1], 'training': training, **setup[0]} for setup in itertools.product(partial_setups, q_configs)), + cls_configs, seed=qp.environ.get('_R_SEED', None), n_jobs=self.n_jobs ) - return scores + # filter out classifier configurations that yielded any error + success_outs = [] + for (model, predictions, status, took), cls_config in zip(cls_outs, cls_configs): + if status.success(): + success_outs.append((model, predictions, took, cls_config)) + else: + self.error_collector.append(status) + + if len(success_outs) == 0: + raise ValueError('No valid configuration found for the classifier!') + + # explore the quantifier-specific hyperparameters for each valid training configuration + aggr_configs = [(*out, q_config) for out, q_config in itertools.product(success_outs, q_configs)] + aggr_outs = qp.util.parallel( + self._prepare_aggregation, + aggr_configs, + seed=qp.environ.get('_R_SEED', None), + n_jobs=self.n_jobs + ) + + return aggr_outs def _compute_scores_nonaggregative(self, training): configs = expand_grid(self.param_grid) - - # pass a seed to parallel, so it is set in child processes + self._training = training scores = qp.util.parallel( - self._prepare_model, - ((params, training) for params in configs), + self._prepare_nonaggr_model, + configs, seed=qp.environ.get('_R_SEED', None), n_jobs=self.n_jobs ) return scores - def _compute_scores(self, training): - if isinstance(self.model, AggregativeQuantifier): - return self._compute_scores_aggregative(training) + def _print_status(self, params, score, status, took): + if status.success(): + self._sout(f'hyperparams=[{params}]\t got {self.error.__name__} = {score:.5f} [took {took:.3f}s]') else: - return self._compute_scores_nonaggregative(training) + self._sout(f'error={status}') def fit(self, training: LabelledCollection): """ Learning routine. Fits methods with all combinations of hyperparameters and selects the one minimizing @@ -251,31 +211,41 @@ class GridSearchQ(BaseQuantifier): tinit = time() + self.error_collector = [] + self._sout(f'starting model selection with n_jobs={self.n_jobs}') - results = self._compute_scores(training) + if isinstance(self.model, AggregativeQuantifier): + results = self._compute_scores_aggregative(training) + else: + results = self._compute_scores_nonaggregative(training) self.param_scores_ = {} self.best_score_ = None - for job_result in results: - score = job_result.get('score', None) - params = job_result['params'] - if score is not None: + for model, params, score, status, took in results: + if status.success(): if self.best_score_ is None or score < self.best_score_: self.best_score_ = score self.best_params_ = params - self.best_model_ = job_result['model'] + self.best_model_ = model self.param_scores_[str(params)] = score else: - self.param_scores_[str(params)] = job_result['status'] + self.param_scores_[str(params)] = status.status + self.error_collector.append(status) tend = time()-tinit if self.best_score_ is None: - raise TimeoutError('no combination of hyperparameters seem to work') + raise ValueError('no combination of hyperparameters seemed to work') self._sout(f'optimization finished: best params {self.best_params_} (score={self.best_score_:.5f}) ' f'[took {tend:.4f}s]') + no_errors = len(self.error_collector) + if no_errors>0: + self._sout(f'warning: {no_errors} errors found') + for err in self.error_collector: + self._sout(f'\t{str(err)}') + if self.refit: if isinstance(self.protocol, OnLabelledCollectionProtocol): tinit = time() @@ -284,6 +254,7 @@ class GridSearchQ(BaseQuantifier): tend = time() - tinit self.refit_time_ = tend else: + # already checked raise RuntimeWarning(f'the model cannot be refit on the whole dataset') return self @@ -324,23 +295,42 @@ class GridSearchQ(BaseQuantifier): return self.best_model_ raise ValueError('best_model called before fit') + def _error_handler(self, func, params): + """ + Endorses one job with two returned values: the status, and the time of execution - def _error_handler(self, func, *args, **kwargs): + :param func: the function to be called + :param params: parameters of the function + :return: `tuple(out, status, time)` where `out` is the function output, + `status` is an enum value from `Status`, and `time` is the time it + took to complete the call + """ + + output = None + + def _handle(status, exception): + if self.raise_errors: + raise exception + else: + return ConfigStatus(params, status, str(e)) try: with timeout(self.timeout): - output = func(*args, **kwargs) - return output, Status.SUCCESS + tinit = time() + output = func(params) + status = ConfigStatus(params, Status.SUCCESS) - except TimeoutError: - return None, Status.TIMEOUT + except TimeoutError as e: + status = _handle(Status.TIMEOUT, str(e)) - except ValueError: - return None, Status.INVALID + except ValueError as e: + status = _handle(Status.INVALID, str(e)) - except Exception: - return None, Status.ERROR + except Exception as e: + status = _handle(Status.ERROR, str(e)) + took = time() - tinit + return output, status, took def cross_val_predict(quantifier: BaseQuantifier, data: LabelledCollection, nfolds=3, random_state=0):