From d9ceffb2eb2a7cb95c597f3d9aa2115ba70efe1e Mon Sep 17 00:00:00 2001 From: Lorenzo Volpi Date: Wed, 6 Dec 2023 10:00:58 +0100 Subject: [PATCH] search algorithms added --- quacc/evaluation/method.py | 31 +++- quacc/method/model_selection.py | 318 ++++++++++++++++++++++---------- 2 files changed, 250 insertions(+), 99 deletions(-) diff --git a/quacc/evaluation/method.py b/quacc/evaluation/method.py index 892b7d4..58e3fe3 100644 --- a/quacc/evaluation/method.py +++ b/quacc/evaluation/method.py @@ -11,7 +11,12 @@ import quacc as qc from quacc.environment import env from quacc.evaluation.report import EvaluationReport from quacc.method.base import BQAE, MCAE, BaseAccuracyEstimator -from quacc.method.model_selection import GridSearchAE +from quacc.method.model_selection import ( + GridSearchAE, + HalvingSearchAE, + RandomizedSearchAE, + SpiderSearchAE, +) from quacc.quantification import KDEy _param_grid = { @@ -19,6 +24,7 @@ _param_grid = { "q__classifier__C": np.logspace(-3, 3, 7), "q__classifier__class_weight": [None, "balanced"], "q__recalib": [None, "bcts"], + # "q__recalib": [None], "confidence": [None, ["isoft"], ["max_conf", "entropy"]], }, "pacc": { @@ -29,8 +35,10 @@ _param_grid = { "kde": { "q__classifier__C": np.logspace(-3, 3, 7), "q__classifier__class_weight": [None, "balanced"], - "q__bandwidth": np.linspace(0.01, 0.2, 5), + # "q__classifier__class_weight": [None], + "q__bandwidth": np.linspace(0.01, 0.2, 20), "confidence": [None, ["isoft"]], + # "confidence": [None], }, } @@ -96,11 +104,22 @@ class EvaluationMethod: @dataclass(frozen=True) class EvaluationMethodGridSearch(EvaluationMethod): pg: str = "sld" + search: str = "grid" + + def get_search(self): + match self.search: + case "grid": + return GridSearchAE + case "spider": + return SpiderSearchAE + case _: + return GridSearchAE def __call__(self, c_model, validation, protocol) -> EvaluationReport: v_train, v_val = validation.split_stratified(0.6, random_state=env._R_SEED) __grid = _param_grid.get(self.pg, {}) - est = GridSearchAE( + _search_class = self.get_search() + est = _search_class( model=self.get_est(c_model), param_grid=__grid, refit=False, @@ -182,9 +201,9 @@ __methods_set = [ M("mulis_kde", __kde_lr(), "mul", conf="isoft", ), M("m3wis_kde", __kde_lr(), "mul", conf="isoft", cf=True), # gs kde - G("bin_kde_gs", __kde_lr(), "bin", pg="kde", ), - G("mul_kde_gs", __kde_lr(), "mul", pg="kde", ), - G("m3w_kde_gs", __kde_lr(), "mul", pg="kde", cf=True), + G("bin_kde_gs", __kde_lr(), "bin", pg="kde", search="spider" ), + G("mul_kde_gs", __kde_lr(), "mul", pg="kde", search="spider" ), + G("m3w_kde_gs", __kde_lr(), "mul", pg="kde", search="spider", cf=True), ] # fmt: on diff --git a/quacc/method/model_selection.py b/quacc/method/model_selection.py index cd8d35b..501510b 100644 --- a/quacc/method/model_selection.py +++ b/quacc/method/model_selection.py @@ -1,25 +1,25 @@ import itertools +import math +import os from copy import deepcopy from time import time from typing import Callable, Union import numpy as np -import quapy as qp +from joblib import Parallel from quapy.data import LabelledCollection -from quapy.model_selection import GridSearchQ -from quapy.protocol import UPP, AbstractProtocol, OnLabelledCollectionProtocol -from sklearn.base import BaseEstimator +from quapy.protocol import ( + AbstractProtocol, + OnLabelledCollectionProtocol, +) import quacc as qc import quacc.error -from quacc.data import ExtendedCollection, ExtendedData -from quacc.environment import env +from quacc.data import ExtendedCollection from quacc.evaluation import evaluate -from quacc.logger import Logger, SubLogger +from quacc.logger import logger from quacc.method.base import ( BaseAccuracyEstimator, - BinaryQuantifierAccuracyEstimator, - MultiClassAccuracyEstimator, ) @@ -96,12 +96,7 @@ class GridSearchAE(BaseAccuracyEstimator): # self._sout("starting model selection") # scores = [self.__params_eval((params, training)) for params in hyper] - scores = qc.utils.parallel( - self._params_eval, - ((params, training) for params in hyper), - seed=env._R_SEED, - n_jobs=self.n_jobs, - ) + scores = self._select_scores(hyper, training) for params, score, model in scores: if score is not None: @@ -124,7 +119,8 @@ class GridSearchAE(BaseAccuracyEstimator): level=1, ) - log = Logger.logger() + # log = Logger.logger() + log = logger() log.debug( f"[{self.model.__class__.__name__}] " f"optimization finished: best params {self.best_params_} (score={self.best_score_:.5f}) " @@ -143,9 +139,16 @@ class GridSearchAE(BaseAccuracyEstimator): return self - def _params_eval(self, args): - params, training = args - protocol = self.protocol + def _select_scores(self, hyper, training): + return qc.utils.parallel( + self._params_eval, + [(params, training) for params in hyper], + n_jobs=self.n_jobs, + verbose=1, + ) + + def _params_eval(self, params, training, protocol=None): + protocol = self.protocol if protocol is None else protocol error = self.error # if self.timeout > 0: @@ -191,6 +194,7 @@ class GridSearchAE(BaseAccuracyEstimator): f"\tException: {e}", level=1, ) + raise e score = None return params, score, model @@ -237,92 +241,220 @@ class GridSearchAE(BaseAccuracyEstimator): raise ValueError("best_model called before fit") -class MCAEgsq(MultiClassAccuracyEstimator): - def __init__( - self, - classifier: BaseEstimator, - quantifier: BaseAccuracyEstimator, - param_grid: dict, - error: Union[Callable, str] = qp.error.mae, - refit=True, - timeout=-1, - n_jobs=None, - verbose=False, - ): - self.param_grid = param_grid - self.refit = refit - self.timeout = timeout - self.n_jobs = n_jobs - self.verbose = verbose - self.error = error - super().__init__(classifier, quantifier) +class RandomizedSearchAE(GridSearchAE): + ERR_THRESHOLD = 1e-4 + MAX_ITER_IMPROV = 3 - def fit(self, train: LabelledCollection): - self.e_train = self.extend(train) - t_train, t_val = self.e_train.split_stratified(0.6, random_state=env._R_SEED) - self.quantifier = GridSearchQ( - deepcopy(self.quantifier), - param_grid=self.param_grid, - protocol=UPP(t_val, repeats=100), - error=self.error, - refit=self.refit, - timeout=self.timeout, - n_jobs=self.n_jobs, - verbose=self.verbose, - ).fit(self.e_train) - - return self - - def estimate(self, instances) -> np.ndarray: - e_inst = instances - if not isinstance(e_inst, ExtendedData): - e_inst = self._extend_instances(instances) - - estim_prev = self.quantifier.quantify(e_inst.X) - return self._check_prevalence_classes( - estim_prev, self.quantifier.best_model().classes_ + def _select_scores(self, hyper, training: LabelledCollection): + log = logger() + hyper = np.array(hyper) + rand_index = np.random.choice( + np.arange(len(hyper)), size=len(hyper), replace=False ) + _n_jobs = os.cpu_count() + 1 + self.n_jobs if self.n_jobs < 0 else self.n_jobs + batch_size = _n_jobs + + log.debug(f"{batch_size = }") + rand_index = list( + rand_index[: (len(hyper) // batch_size) * batch_size].reshape( + (len(hyper) // batch_size, batch_size) + ) + ) + [rand_index[(len(hyper) // batch_size) * batch_size :]] + scores = [] + best_score, iter_from_improv = np.inf, 0 + with Parallel(n_jobs=self.n_jobs) as parallel: + for i, ri in enumerate(rand_index): + tstart = time() + _iter_scores = qc.utils.parallel( + self._params_eval, + [(params, training) for params in hyper[ri]], + parallel=parallel, + ) + _best_iter_score = np.min( + [s for _, s, _ in _iter_scores if s is not None] + ) + + log.debug( + f"[iter {i}] best score = {_best_iter_score:.8f} [took {time() - tstart:.3f}s]" + ) + scores += _iter_scores + + _check, best_score, iter_from_improv = self.__stop_condition( + _best_iter_score, best_score, iter_from_improv + ) + if _check: + break + + return scores + + def __stop_condition(self, best_iter_score, best_score, iter_from_improv): + if best_iter_score < best_score: + _improv = best_score - best_iter_score + best_score = best_iter_score + else: + _improv = 0 + + if _improv > self.ERR_THRESHOLD: + iter_from_improv = 0 + else: + iter_from_improv += 1 + + return iter_from_improv > self.MAX_ITER_IMPROV, best_score, iter_from_improv -class BQAEgsq(BinaryQuantifierAccuracyEstimator): +class HalvingSearchAE(GridSearchAE): + def _select_scores(self, hyper, training: LabelledCollection): + log = logger() + hyper = np.array(hyper) + + threshold = 22 + factor = 3 + n_steps = math.ceil(math.log(len(hyper) / threshold, factor)) + steps = np.logspace(n_steps, 0, base=1.0 / factor, num=n_steps + 1) + with Parallel(n_jobs=self.n_jobs, verbose=1) as parallel: + for _step in steps: + tstart = time() + _training, _ = ( + training.split_stratified(train_prop=_step) + if _step < 1.0 + else (training, None) + ) + + results = qc.utils.parallel( + self._params_eval, + [(params, _training) for params in hyper], + parallel=parallel, + ) + scores = [(1.0 if s is None else s) for _, s, _ in results] + res_hyper = np.array([h for h, _, _ in results], dtype="object") + sorted_scores_idx = np.argsort(scores) + best_score = scores[sorted_scores_idx[0]] + hyper = res_hyper[ + sorted_scores_idx[: round(len(res_hyper) * (1.0 / factor))] + ] + + log.debug( + f"[step {_step}] best score = {best_score:.8f} [took {time() - tstart:.3f}s]" + ) + + return results + + +class SpiderSearchAE(GridSearchAE): def __init__( self, - classifier: BaseEstimator, - quantifier: BaseAccuracyEstimator, + model: BaseAccuracyEstimator, param_grid: dict, - error: Union[Callable, str] = qp.error.mae, + protocol: AbstractProtocol, + error: Union[Callable, str] = qc.error.maccd, refit=True, - timeout=-1, n_jobs=None, verbose=False, + err_threshold=1e-4, + max_iter_improv=0, + pd_th_min=1, + best_width=2, ): - self.param_grid = param_grid - self.refit = refit - self.timeout = timeout - self.n_jobs = n_jobs - self.verbose = verbose - self.error = error - super().__init__(classifier=classifier, quantifier=quantifier) + super().__init__( + model=model, + param_grid=param_grid, + protocol=protocol, + error=error, + refit=refit, + n_jobs=n_jobs, + verbose=verbose, + ) + self.err_threshold = err_threshold + self.max_iter_improv = max_iter_improv + self.pd_th_min = pd_th_min + self.best_width = best_width - def fit(self, train: LabelledCollection): - self.e_train = self.extend(train) + def _select_scores(self, hyper, training: LabelledCollection): + log = logger() + hyper = np.array(hyper) + _n_jobs = os.cpu_count() + 1 + self.n_jobs if self.n_jobs < 0 else self.n_jobs + batch_size = _n_jobs - self.n_classes = self.e_train.n_classes - self.e_trains = self.e_train.split_by_pred() + rand_index = np.arange(len(hyper)) + np.random.shuffle(rand_index) + rand_index = rand_index[:batch_size] + remaining_index = np.setdiff1d(np.arange(len(hyper)), rand_index) + _hyper, _hyper_remaining = hyper[rand_index], hyper[remaining_index] - self.quantifiers = [] - for e_train in self.e_trains: - t_train, t_val = e_train.split_stratified(0.6, random_state=env._R_SEED) - quantifier = GridSearchQ( - model=deepcopy(self.quantifier), - param_grid=self.param_grid, - protocol=UPP(t_val, repeats=100), - error=self.error, - refit=self.refit, - timeout=self.timeout, - n_jobs=self.n_jobs, - verbose=self.verbose, - ).fit(t_train) - self.quantifiers.append(quantifier) + scores = [] + best_score, last_best, iter_from_improv = np.inf, np.inf, 0 + with Parallel(n_jobs=self.n_jobs, verbose=1) as parallel: + while len(_hyper) > 0: + # log.debug(f"{len(_hyper_remaining)=}") + tstart = time() + _iter_scores = qc.utils.parallel( + self._params_eval, + [(params, training) for params in _hyper], + parallel=parallel, + ) + _sorted_idx = np.argsort( + [1.0 if s is None else s for _, s, _ in _iter_scores] + ) + _sorted_scores = np.array(_iter_scores, dtype="object")[_sorted_idx] + _best_iter_params = np.array( + [p for p, _, _ in _sorted_scores], dtype="object" + ) + _best_iter_scores = np.array( + [s for _, s, _ in _sorted_scores], dtype="object" + ) - return self + for i, (_score, _param) in enumerate( + zip( + _best_iter_scores[: self.best_width], + _best_iter_params[: self.best_width], + ) + ): + log.debug( + f"[size={len(_hyper)},place={i+1}] best score = {_score:.8f}; " + f"best param = {_param} [took {time() - tstart:.3f}s]" + ) + scores += _iter_scores + + _improv = best_score - _best_iter_scores[0] + _improv_last = last_best - _best_iter_scores[0] + if _improv > self.err_threshold: + iter_from_improv = 0 + best_score = _best_iter_scores[0] + elif _improv_last < 0: + iter_from_improv += 1 + + last_best = _best_iter_scores[0] + + if iter_from_improv > self.max_iter_improv: + break + + _new_hyper = np.array([], dtype="object") + for _base_param in _best_iter_params[: self.best_width]: + _rem_pds = np.array( + [ + self.__param_distance(_base_param, h) + for h in _hyper_remaining + ] + ) + _rem_pd_sort_idx = np.argsort(_rem_pds) + # _min_pd = np.min(_rem_pds) + _min_pd_len = (_rem_pds <= self.pd_th_min).nonzero()[0].shape[0] + _new_hyper_idx = _rem_pd_sort_idx[:_min_pd_len] + _hyper_rem_idx = np.setdiff1d( + np.arange(len(_hyper_remaining)), _new_hyper_idx + ) + _new_hyper = np.concatenate( + [_new_hyper, _hyper_remaining[_new_hyper_idx]] + ) + _hyper_remaining = _hyper_remaining[_hyper_rem_idx] + _hyper = _new_hyper + + return scores + + def __param_distance(self, param1, param2): + score = 0 + for k, v in param1.items(): + if param2[k] != v: + score += 1 + + return score