diff --git a/examples/model_selection.py b/examples/model_selection.py index 50460fe..3145005 100644 --- a/examples/model_selection.py +++ b/examples/model_selection.py @@ -53,6 +53,7 @@ with qp.util.temp_seed(0): protocol=protocol, error='mae', # the error to optimize is the MAE (a quantification-oriented loss) refit=False, # retrain on the whole labelled set once done + raise_errors=False, verbose=True # show information as the process goes on ).fit(training) @@ -66,5 +67,5 @@ model = model.best_model_ mae_score = qp.evaluation.evaluate(model, protocol=APP(test), error_metric='mae') print(f'MAE={mae_score:.5f}') -print(f'model selection took {tend-tinit}s') +print(f'model selection took {tend-tinit:.1f}s') diff --git a/quapy/method/kdey.py b/quapy/method/kdey.py index c6f9794..b3cdf0e 100644 --- a/quapy/method/kdey.py +++ b/quapy/method/kdey.py @@ -5,7 +5,7 @@ from sklearn.neighbors import KernelDensity import quapy as qp from quapy.data import LabelledCollection -from quapy.method.aggregative import AggregativeProbabilisticQuantifier, cross_generate_predictions +from quapy.method.aggregative import AggregativeSoftQuantifier import quapy.functional as F from sklearn.metrics.pairwise import rbf_kernel @@ -33,7 +33,7 @@ class KDEBase: -class KDEyML(AggregativeProbabilisticQuantifier, KDEBase): +class KDEyML(AggregativeSoftQuantifier, KDEBase): def __init__(self, classifier: BaseEstimator, val_split=10, bandwidth=0.1, n_jobs=None, random_state=0): self._check_bandwidth(bandwidth) @@ -43,16 +43,8 @@ class KDEyML(AggregativeProbabilisticQuantifier, KDEBase): self.n_jobs = n_jobs self.random_state=random_state - def fit(self, data: LabelledCollection, fit_classifier=True, val_split: Union[float, LabelledCollection] = None): - if val_split is None: - val_split = self.val_split - - self.classifier, y, posteriors, _, _ = cross_generate_predictions( - data, self.classifier, val_split, probabilistic=True, fit_classifier=fit_classifier, n_jobs=self.n_jobs - ) - - self.mix_densities = self.get_mixture_components(posteriors, y, data.n_classes, self.bandwidth) - + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): + self.mix_densities = self.get_mixture_components(*classif_predictions.Xy, data.n_classes, self.bandwidth) return self def aggregate(self, posteriors: np.ndarray): @@ -76,7 +68,7 @@ class KDEyML(AggregativeProbabilisticQuantifier, KDEBase): return F.optim_minimize(neg_loglikelihood, n_classes) -class KDEyHD(AggregativeProbabilisticQuantifier, KDEBase): +class KDEyHD(AggregativeSoftQuantifier, KDEBase): def __init__(self, classifier: BaseEstimator, val_split=10, divergence: str='HD', bandwidth=0.1, n_jobs=None, random_state=0, montecarlo_trials=10000): @@ -90,15 +82,8 @@ class KDEyHD(AggregativeProbabilisticQuantifier, KDEBase): self.random_state=random_state self.montecarlo_trials = montecarlo_trials - def fit(self, data: LabelledCollection, fit_classifier=True, val_split: Union[float, LabelledCollection] = None): - if val_split is None: - val_split = self.val_split - - self.classifier, y, posteriors, _, _ = cross_generate_predictions( - data, self.classifier, val_split, probabilistic=True, fit_classifier=fit_classifier, n_jobs=self.n_jobs - ) - - self.mix_densities = self.get_mixture_components(posteriors, y, data.n_classes, self.bandwidth) + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): + self.mix_densities = self.get_mixture_components(*classif_predictions.Xy, data.n_classes, self.bandwidth) N = self.montecarlo_trials rs = self.random_state @@ -141,7 +126,7 @@ class KDEyHD(AggregativeProbabilisticQuantifier, KDEBase): return F.optim_minimize(divergence, n_classes) -class KDEyCS(AggregativeProbabilisticQuantifier): +class KDEyCS(AggregativeSoftQuantifier): def __init__(self, classifier: BaseEstimator, val_split=10, bandwidth=0.1, n_jobs=None, random_state=0): KDEBase._check_bandwidth(bandwidth) @@ -163,19 +148,14 @@ class KDEyCS(AggregativeProbabilisticQuantifier): gram = norm_factor * rbf_kernel(X, Y, gamma=gamma) return gram.sum() - def fit(self, data: LabelledCollection, fit_classifier=True, val_split: Union[float, LabelledCollection] = None): - if val_split is None: - val_split = self.val_split + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): - self.classifier, y, posteriors, _, _ = cross_generate_predictions( - data, self.classifier, val_split, probabilistic=True, fit_classifier=fit_classifier, n_jobs=self.n_jobs - ) + P, y = classif_predictions.Xy + n = data.n_classes - assert all(sorted(np.unique(y)) == np.arange(data.n_classes)), \ + assert all(sorted(np.unique(y)) == np.arange(n)), \ 'label name gaps not allowed in current implementation' - n = data.n_classes - P = posteriors # counts_inv keeps track of the relative weight of each datapoint within its class # (i.e., the weight in its KDE model) diff --git a/quapy/model_selection.py b/quapy/model_selection.py index 6637d62..9017b99 100644 --- a/quapy/model_selection.py +++ b/quapy/model_selection.py @@ -23,54 +23,24 @@ class Status(Enum): INVALID = 3 ERROR = 4 -def check_status(func): - @wraps(func) - def wrapper(*args, **kwargs): - obj = args[0] - tinit = time() - job_descriptor = dict(args[1]) - params = {**job_descriptor.get('cls-params', {}), **job_descriptor.get('q-params', {})} +class ConfigStatus: + def __init__(self, params, status, msg=''): + self.params = params + self.status = status + self.msg = msg - if obj.timeout > 0: - def handler(signum, frame): - raise TimeoutError() + def __str__(self): + return f':params:{self.params} :status:{self.status} ' + self.msg - signal.signal(signal.SIGALRM, handler) - signal.alarm(obj.timeout) + def __repr__(self): + return str(self) - try: - job_descriptor = func(*args, **kwargs) + def success(self): + return self.status == Status.SUCCESS - ttime = time() - tinit - - score = job_descriptor.get('score', None) - if score is not None: - obj._sout(f'hyperparams=[{params}]\t got {obj.error.__name__} = {score:.5f} [took {ttime:.4f}s]') - - if obj.timeout > 0: - signal.alarm(0) - - exit_status = Status.SUCCESS - - except TimeoutError: - obj._sout(f'timeout ({obj.timeout}s) reached for config {params}') - exit_status = Status.TIMEOUT - - except ValueError as e: - obj._sout(f'the combination of hyperparameters {params} is invalid') - obj._sout(f'\tException: {e}') - exit_status = Status.INVALID - - except Exception as e: - obj._sout(f'something went wrong for config {params}; skipping:') - obj._sout(f'\tException: {e}') - exit_status = Status.ERROR - - job_descriptor['status'] = exit_status - job_descriptor['params'] = params - return job_descriptor - return wrapper + def failed(self): + return self.status != Status.SUCCESS class GridSearchQ(BaseQuantifier): @@ -85,11 +55,14 @@ class GridSearchQ(BaseQuantifier): :param protocol: a sample generation protocol, an instance of :class:`quapy.protocol.AbstractProtocol` :param error: an error function (callable) or a string indicating the name of an error function (valid ones are those in :class:`quapy.error.QUANTIFICATION_ERROR` - :param refit: whether or not to refit the model on the whole labelled collection (training+validation) with + :param refit: whether to refit the model on the whole labelled collection (training+validation) with the best chosen hyperparameter combination. Ignored if protocol='gen' :param timeout: establishes a timer (in seconds) for each of the hyperparameters configurations being tested. Whenever a run takes longer than this timer, that configuration will be ignored. If all configurations end up being ignored, a TimeoutError exception is raised. If -1 (default) then no time bound is set. + :param raise_errors: boolean, if True then raises an exception when a param combination yields any error, if + otherwise is False (default), then the combination is marked with an error status, but the process goes on. + However, if no configuration yields a valid model, then a ValueError exception will be raised. :param verbose: set to True to get information through the stdout """ @@ -101,6 +74,7 @@ class GridSearchQ(BaseQuantifier): refit=True, timeout=-1, n_jobs=None, + raise_errors=False, verbose=False): self.model = model @@ -109,6 +83,7 @@ class GridSearchQ(BaseQuantifier): self.refit = refit self.timeout = timeout self.n_jobs = qp._get_njobs(n_jobs) + self.raise_errors = raise_errors self.verbose = verbose self.__check_error(error) assert isinstance(protocol, AbstractProtocol), 'unknown protocol' @@ -128,112 +103,97 @@ class GridSearchQ(BaseQuantifier): raise ValueError(f'unexpected error type; must either be a callable function or a str representing\n' f'the name of an error function in {qp.error.QUANTIFICATION_ERROR_NAMES}') - def _prepare_classifier(self, args): - cls_params = args['cls-params'] - training = args['training'] + def _prepare_classifier(self, cls_params): model = deepcopy(self.model) - model.set_params(**cls_params) - predictions = model.classifier_fit_predict(training) - return {'model': model, 'predictions': predictions, 'cls-params': cls_params} + + def job(cls_params): + model.set_params(**cls_params) + predictions = model.classifier_fit_predict(self._training) + return predictions + + predictions, status, took = self._error_handler(job, cls_params) + self._sout(f'[classifier fit] hyperparams={cls_params} status={status} [took {took:.3f}s]') + return model, predictions, status, took def _prepare_aggregation(self, args): - - model = args['model'] - predictions = args['predictions'] - cls_params = args['cls-params'] - q_params = args['q-params'] - training = args['training'] - + model, predictions, cls_took, cls_params, q_params = args + model = deepcopy(model) params = {**cls_params, **q_params} - def job(model): - tinit = time() - model = deepcopy(model) - # overrides default parameters with the parameters being explored at this iteration + def job(q_params): model.set_params(**q_params) - model.aggregation_fit(predictions, training) + model.aggregation_fit(predictions, self._training) score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error) - ttime = time()-tinit + return score - return { - 'model': model, - 'cls-params':cls_params, - 'q-params': q_params, - 'params': params, - 'score': score, - 'ttime':ttime - } + score, status, aggr_took = self._error_handler(job, q_params) + self._print_status(params, score, status, aggr_took) + return model, params, score, status, (cls_took+aggr_took) - out, status = self._error_handler(job, args) - if status == Status.SUCCESS: - self._sout(f'hyperparams=[{params}]\t got {self.error.__name__} = {out["score"]:.5f} [took {out["time"]:.4f}s]') - elif status == Status.INVALID: - self._sout(f'the combination of hyperparameters {params} is invalid') - elif status == Status. - - - def _prepare_model(self, args): - params, training = args + def _prepare_nonaggr_model(self, params): model = deepcopy(self.model) - # overrides default parameters with the parameters being explored at this iteration - model.set_params(**params) - model.fit(training) - score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error) - return {'model': model, 'params': params, 'score': score} + def job(params): + model.set_params(**params) + model.fit(self._training) + score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error) + return score + + score, status, took = self._error_handler(job, params) + self._print_status(params, score, status, took) + return model, params, score, status, took def _compute_scores_aggregative(self, training): - # break down the set of hyperparameters into two: classifier-specific, quantifier-specific cls_configs, q_configs = group_params(self.param_grid) # train all classifiers and get the predictions - partial_setups = qp.util.parallel( + self._training = training + cls_outs = qp.util.parallel( self._prepare_classifier, - ({'cls-params':params, 'training':training} for params in cls_configs), - seed=qp.environ.get('_R_SEED', None), - n_jobs=self.n_jobs, - asarray=False, - ) - - # filter out classifier configurations that yield any error - for setup in partial_setups: - if setup['status'] != Status.SUCCESS: - self._sout(f'-> classifier hyperparemters {setup["params"]} caused ' - f'error {setup["status"]} and will be ignored') - - partial_setups = [setup for setup in partial_setups if setup['status']==Status.SUCCESS] - - if len(partial_setups) == 0: - raise ValueError('No valid configuration found for the classifier.') - - # explore the quantifier-specific hyperparameters for each training configuration - scores = qp.util.parallel( - self._prepare_aggregation, - ({'q-params': setup[1], 'training': training, **setup[0]} for setup in itertools.product(partial_setups, q_configs)), + cls_configs, seed=qp.environ.get('_R_SEED', None), n_jobs=self.n_jobs ) - return scores + # filter out classifier configurations that yielded any error + success_outs = [] + for (model, predictions, status, took), cls_config in zip(cls_outs, cls_configs): + if status.success(): + success_outs.append((model, predictions, took, cls_config)) + else: + self.error_collector.append(status) + + if len(success_outs) == 0: + raise ValueError('No valid configuration found for the classifier!') + + # explore the quantifier-specific hyperparameters for each valid training configuration + aggr_configs = [(*out, q_config) for out, q_config in itertools.product(success_outs, q_configs)] + aggr_outs = qp.util.parallel( + self._prepare_aggregation, + aggr_configs, + seed=qp.environ.get('_R_SEED', None), + n_jobs=self.n_jobs + ) + + return aggr_outs def _compute_scores_nonaggregative(self, training): configs = expand_grid(self.param_grid) - - # pass a seed to parallel, so it is set in child processes + self._training = training scores = qp.util.parallel( - self._prepare_model, - ((params, training) for params in configs), + self._prepare_nonaggr_model, + configs, seed=qp.environ.get('_R_SEED', None), n_jobs=self.n_jobs ) return scores - def _compute_scores(self, training): - if isinstance(self.model, AggregativeQuantifier): - return self._compute_scores_aggregative(training) + def _print_status(self, params, score, status, took): + if status.success(): + self._sout(f'hyperparams=[{params}]\t got {self.error.__name__} = {score:.5f} [took {took:.3f}s]') else: - return self._compute_scores_nonaggregative(training) + self._sout(f'error={status}') def fit(self, training: LabelledCollection): """ Learning routine. Fits methods with all combinations of hyperparameters and selects the one minimizing @@ -251,31 +211,41 @@ class GridSearchQ(BaseQuantifier): tinit = time() + self.error_collector = [] + self._sout(f'starting model selection with n_jobs={self.n_jobs}') - results = self._compute_scores(training) + if isinstance(self.model, AggregativeQuantifier): + results = self._compute_scores_aggregative(training) + else: + results = self._compute_scores_nonaggregative(training) self.param_scores_ = {} self.best_score_ = None - for job_result in results: - score = job_result.get('score', None) - params = job_result['params'] - if score is not None: + for model, params, score, status, took in results: + if status.success(): if self.best_score_ is None or score < self.best_score_: self.best_score_ = score self.best_params_ = params - self.best_model_ = job_result['model'] + self.best_model_ = model self.param_scores_[str(params)] = score else: - self.param_scores_[str(params)] = job_result['status'] + self.param_scores_[str(params)] = status.status + self.error_collector.append(status) tend = time()-tinit if self.best_score_ is None: - raise TimeoutError('no combination of hyperparameters seem to work') + raise ValueError('no combination of hyperparameters seemed to work') self._sout(f'optimization finished: best params {self.best_params_} (score={self.best_score_:.5f}) ' f'[took {tend:.4f}s]') + no_errors = len(self.error_collector) + if no_errors>0: + self._sout(f'warning: {no_errors} errors found') + for err in self.error_collector: + self._sout(f'\t{str(err)}') + if self.refit: if isinstance(self.protocol, OnLabelledCollectionProtocol): tinit = time() @@ -284,6 +254,7 @@ class GridSearchQ(BaseQuantifier): tend = time() - tinit self.refit_time_ = tend else: + # already checked raise RuntimeWarning(f'the model cannot be refit on the whole dataset') return self @@ -324,23 +295,42 @@ class GridSearchQ(BaseQuantifier): return self.best_model_ raise ValueError('best_model called before fit') + def _error_handler(self, func, params): + """ + Endorses one job with two returned values: the status, and the time of execution - def _error_handler(self, func, *args, **kwargs): + :param func: the function to be called + :param params: parameters of the function + :return: `tuple(out, status, time)` where `out` is the function output, + `status` is an enum value from `Status`, and `time` is the time it + took to complete the call + """ + + output = None + + def _handle(status, exception): + if self.raise_errors: + raise exception + else: + return ConfigStatus(params, status, str(e)) try: with timeout(self.timeout): - output = func(*args, **kwargs) - return output, Status.SUCCESS + tinit = time() + output = func(params) + status = ConfigStatus(params, Status.SUCCESS) - except TimeoutError: - return None, Status.TIMEOUT + except TimeoutError as e: + status = _handle(Status.TIMEOUT, str(e)) - except ValueError: - return None, Status.INVALID + except ValueError as e: + status = _handle(Status.INVALID, str(e)) - except Exception: - return None, Status.ERROR + except Exception as e: + status = _handle(Status.ERROR, str(e)) + took = time() - tinit + return output, status, took def cross_val_predict(quantifier: BaseQuantifier, data: LabelledCollection, nfolds=3, random_state=0):