search algorithms added

This commit is contained in:
Lorenzo Volpi 2023-12-06 10:00:58 +01:00
parent 06761da870
commit d9ceffb2eb
2 changed files with 250 additions and 99 deletions

View File

@ -11,7 +11,12 @@ import quacc as qc
from quacc.environment import env
from quacc.evaluation.report import EvaluationReport
from quacc.method.base import BQAE, MCAE, BaseAccuracyEstimator
from quacc.method.model_selection import GridSearchAE
from quacc.method.model_selection import (
GridSearchAE,
HalvingSearchAE,
RandomizedSearchAE,
SpiderSearchAE,
)
from quacc.quantification import KDEy
_param_grid = {
@ -19,6 +24,7 @@ _param_grid = {
"q__classifier__C": np.logspace(-3, 3, 7),
"q__classifier__class_weight": [None, "balanced"],
"q__recalib": [None, "bcts"],
# "q__recalib": [None],
"confidence": [None, ["isoft"], ["max_conf", "entropy"]],
},
"pacc": {
@ -29,8 +35,10 @@ _param_grid = {
"kde": {
"q__classifier__C": np.logspace(-3, 3, 7),
"q__classifier__class_weight": [None, "balanced"],
"q__bandwidth": np.linspace(0.01, 0.2, 5),
# "q__classifier__class_weight": [None],
"q__bandwidth": np.linspace(0.01, 0.2, 20),
"confidence": [None, ["isoft"]],
# "confidence": [None],
},
}
@ -96,11 +104,22 @@ class EvaluationMethod:
@dataclass(frozen=True)
class EvaluationMethodGridSearch(EvaluationMethod):
pg: str = "sld"
search: str = "grid"
def get_search(self):
match self.search:
case "grid":
return GridSearchAE
case "spider":
return SpiderSearchAE
case _:
return GridSearchAE
def __call__(self, c_model, validation, protocol) -> EvaluationReport:
v_train, v_val = validation.split_stratified(0.6, random_state=env._R_SEED)
__grid = _param_grid.get(self.pg, {})
est = GridSearchAE(
_search_class = self.get_search()
est = _search_class(
model=self.get_est(c_model),
param_grid=__grid,
refit=False,
@ -182,9 +201,9 @@ __methods_set = [
M("mulis_kde", __kde_lr(), "mul", conf="isoft", ),
M("m3wis_kde", __kde_lr(), "mul", conf="isoft", cf=True),
# gs kde
G("bin_kde_gs", __kde_lr(), "bin", pg="kde", ),
G("mul_kde_gs", __kde_lr(), "mul", pg="kde", ),
G("m3w_kde_gs", __kde_lr(), "mul", pg="kde", cf=True),
G("bin_kde_gs", __kde_lr(), "bin", pg="kde", search="spider" ),
G("mul_kde_gs", __kde_lr(), "mul", pg="kde", search="spider" ),
G("m3w_kde_gs", __kde_lr(), "mul", pg="kde", search="spider", cf=True),
]
# fmt: on

View File

@ -1,25 +1,25 @@
import itertools
import math
import os
from copy import deepcopy
from time import time
from typing import Callable, Union
import numpy as np
import quapy as qp
from joblib import Parallel
from quapy.data import LabelledCollection
from quapy.model_selection import GridSearchQ
from quapy.protocol import UPP, AbstractProtocol, OnLabelledCollectionProtocol
from sklearn.base import BaseEstimator
from quapy.protocol import (
AbstractProtocol,
OnLabelledCollectionProtocol,
)
import quacc as qc
import quacc.error
from quacc.data import ExtendedCollection, ExtendedData
from quacc.environment import env
from quacc.data import ExtendedCollection
from quacc.evaluation import evaluate
from quacc.logger import Logger, SubLogger
from quacc.logger import logger
from quacc.method.base import (
BaseAccuracyEstimator,
BinaryQuantifierAccuracyEstimator,
MultiClassAccuracyEstimator,
)
@ -96,12 +96,7 @@ class GridSearchAE(BaseAccuracyEstimator):
# self._sout("starting model selection")
# scores = [self.__params_eval((params, training)) for params in hyper]
scores = qc.utils.parallel(
self._params_eval,
((params, training) for params in hyper),
seed=env._R_SEED,
n_jobs=self.n_jobs,
)
scores = self._select_scores(hyper, training)
for params, score, model in scores:
if score is not None:
@ -124,7 +119,8 @@ class GridSearchAE(BaseAccuracyEstimator):
level=1,
)
log = Logger.logger()
# log = Logger.logger()
log = logger()
log.debug(
f"[{self.model.__class__.__name__}] "
f"optimization finished: best params {self.best_params_} (score={self.best_score_:.5f}) "
@ -143,9 +139,16 @@ class GridSearchAE(BaseAccuracyEstimator):
return self
def _params_eval(self, args):
params, training = args
protocol = self.protocol
def _select_scores(self, hyper, training):
return qc.utils.parallel(
self._params_eval,
[(params, training) for params in hyper],
n_jobs=self.n_jobs,
verbose=1,
)
def _params_eval(self, params, training, protocol=None):
protocol = self.protocol if protocol is None else protocol
error = self.error
# if self.timeout > 0:
@ -191,6 +194,7 @@ class GridSearchAE(BaseAccuracyEstimator):
f"\tException: {e}",
level=1,
)
raise e
score = None
return params, score, model
@ -237,92 +241,220 @@ class GridSearchAE(BaseAccuracyEstimator):
raise ValueError("best_model called before fit")
class MCAEgsq(MultiClassAccuracyEstimator):
def __init__(
self,
classifier: BaseEstimator,
quantifier: BaseAccuracyEstimator,
param_grid: dict,
error: Union[Callable, str] = qp.error.mae,
refit=True,
timeout=-1,
n_jobs=None,
verbose=False,
):
self.param_grid = param_grid
self.refit = refit
self.timeout = timeout
self.n_jobs = n_jobs
self.verbose = verbose
self.error = error
super().__init__(classifier, quantifier)
class RandomizedSearchAE(GridSearchAE):
ERR_THRESHOLD = 1e-4
MAX_ITER_IMPROV = 3
def fit(self, train: LabelledCollection):
self.e_train = self.extend(train)
t_train, t_val = self.e_train.split_stratified(0.6, random_state=env._R_SEED)
self.quantifier = GridSearchQ(
deepcopy(self.quantifier),
param_grid=self.param_grid,
protocol=UPP(t_val, repeats=100),
error=self.error,
refit=self.refit,
timeout=self.timeout,
n_jobs=self.n_jobs,
verbose=self.verbose,
).fit(self.e_train)
return self
def estimate(self, instances) -> np.ndarray:
e_inst = instances
if not isinstance(e_inst, ExtendedData):
e_inst = self._extend_instances(instances)
estim_prev = self.quantifier.quantify(e_inst.X)
return self._check_prevalence_classes(
estim_prev, self.quantifier.best_model().classes_
def _select_scores(self, hyper, training: LabelledCollection):
log = logger()
hyper = np.array(hyper)
rand_index = np.random.choice(
np.arange(len(hyper)), size=len(hyper), replace=False
)
_n_jobs = os.cpu_count() + 1 + self.n_jobs if self.n_jobs < 0 else self.n_jobs
batch_size = _n_jobs
log.debug(f"{batch_size = }")
rand_index = list(
rand_index[: (len(hyper) // batch_size) * batch_size].reshape(
(len(hyper) // batch_size, batch_size)
)
) + [rand_index[(len(hyper) // batch_size) * batch_size :]]
scores = []
best_score, iter_from_improv = np.inf, 0
with Parallel(n_jobs=self.n_jobs) as parallel:
for i, ri in enumerate(rand_index):
tstart = time()
_iter_scores = qc.utils.parallel(
self._params_eval,
[(params, training) for params in hyper[ri]],
parallel=parallel,
)
_best_iter_score = np.min(
[s for _, s, _ in _iter_scores if s is not None]
)
log.debug(
f"[iter {i}] best score = {_best_iter_score:.8f} [took {time() - tstart:.3f}s]"
)
scores += _iter_scores
_check, best_score, iter_from_improv = self.__stop_condition(
_best_iter_score, best_score, iter_from_improv
)
if _check:
break
return scores
def __stop_condition(self, best_iter_score, best_score, iter_from_improv):
if best_iter_score < best_score:
_improv = best_score - best_iter_score
best_score = best_iter_score
else:
_improv = 0
if _improv > self.ERR_THRESHOLD:
iter_from_improv = 0
else:
iter_from_improv += 1
return iter_from_improv > self.MAX_ITER_IMPROV, best_score, iter_from_improv
class BQAEgsq(BinaryQuantifierAccuracyEstimator):
class HalvingSearchAE(GridSearchAE):
def _select_scores(self, hyper, training: LabelledCollection):
log = logger()
hyper = np.array(hyper)
threshold = 22
factor = 3
n_steps = math.ceil(math.log(len(hyper) / threshold, factor))
steps = np.logspace(n_steps, 0, base=1.0 / factor, num=n_steps + 1)
with Parallel(n_jobs=self.n_jobs, verbose=1) as parallel:
for _step in steps:
tstart = time()
_training, _ = (
training.split_stratified(train_prop=_step)
if _step < 1.0
else (training, None)
)
results = qc.utils.parallel(
self._params_eval,
[(params, _training) for params in hyper],
parallel=parallel,
)
scores = [(1.0 if s is None else s) for _, s, _ in results]
res_hyper = np.array([h for h, _, _ in results], dtype="object")
sorted_scores_idx = np.argsort(scores)
best_score = scores[sorted_scores_idx[0]]
hyper = res_hyper[
sorted_scores_idx[: round(len(res_hyper) * (1.0 / factor))]
]
log.debug(
f"[step {_step}] best score = {best_score:.8f} [took {time() - tstart:.3f}s]"
)
return results
class SpiderSearchAE(GridSearchAE):
def __init__(
self,
classifier: BaseEstimator,
quantifier: BaseAccuracyEstimator,
model: BaseAccuracyEstimator,
param_grid: dict,
error: Union[Callable, str] = qp.error.mae,
protocol: AbstractProtocol,
error: Union[Callable, str] = qc.error.maccd,
refit=True,
timeout=-1,
n_jobs=None,
verbose=False,
err_threshold=1e-4,
max_iter_improv=0,
pd_th_min=1,
best_width=2,
):
self.param_grid = param_grid
self.refit = refit
self.timeout = timeout
self.n_jobs = n_jobs
self.verbose = verbose
self.error = error
super().__init__(classifier=classifier, quantifier=quantifier)
super().__init__(
model=model,
param_grid=param_grid,
protocol=protocol,
error=error,
refit=refit,
n_jobs=n_jobs,
verbose=verbose,
)
self.err_threshold = err_threshold
self.max_iter_improv = max_iter_improv
self.pd_th_min = pd_th_min
self.best_width = best_width
def fit(self, train: LabelledCollection):
self.e_train = self.extend(train)
def _select_scores(self, hyper, training: LabelledCollection):
log = logger()
hyper = np.array(hyper)
_n_jobs = os.cpu_count() + 1 + self.n_jobs if self.n_jobs < 0 else self.n_jobs
batch_size = _n_jobs
self.n_classes = self.e_train.n_classes
self.e_trains = self.e_train.split_by_pred()
rand_index = np.arange(len(hyper))
np.random.shuffle(rand_index)
rand_index = rand_index[:batch_size]
remaining_index = np.setdiff1d(np.arange(len(hyper)), rand_index)
_hyper, _hyper_remaining = hyper[rand_index], hyper[remaining_index]
self.quantifiers = []
for e_train in self.e_trains:
t_train, t_val = e_train.split_stratified(0.6, random_state=env._R_SEED)
quantifier = GridSearchQ(
model=deepcopy(self.quantifier),
param_grid=self.param_grid,
protocol=UPP(t_val, repeats=100),
error=self.error,
refit=self.refit,
timeout=self.timeout,
n_jobs=self.n_jobs,
verbose=self.verbose,
).fit(t_train)
self.quantifiers.append(quantifier)
scores = []
best_score, last_best, iter_from_improv = np.inf, np.inf, 0
with Parallel(n_jobs=self.n_jobs, verbose=1) as parallel:
while len(_hyper) > 0:
# log.debug(f"{len(_hyper_remaining)=}")
tstart = time()
_iter_scores = qc.utils.parallel(
self._params_eval,
[(params, training) for params in _hyper],
parallel=parallel,
)
_sorted_idx = np.argsort(
[1.0 if s is None else s for _, s, _ in _iter_scores]
)
_sorted_scores = np.array(_iter_scores, dtype="object")[_sorted_idx]
_best_iter_params = np.array(
[p for p, _, _ in _sorted_scores], dtype="object"
)
_best_iter_scores = np.array(
[s for _, s, _ in _sorted_scores], dtype="object"
)
return self
for i, (_score, _param) in enumerate(
zip(
_best_iter_scores[: self.best_width],
_best_iter_params[: self.best_width],
)
):
log.debug(
f"[size={len(_hyper)},place={i+1}] best score = {_score:.8f}; "
f"best param = {_param} [took {time() - tstart:.3f}s]"
)
scores += _iter_scores
_improv = best_score - _best_iter_scores[0]
_improv_last = last_best - _best_iter_scores[0]
if _improv > self.err_threshold:
iter_from_improv = 0
best_score = _best_iter_scores[0]
elif _improv_last < 0:
iter_from_improv += 1
last_best = _best_iter_scores[0]
if iter_from_improv > self.max_iter_improv:
break
_new_hyper = np.array([], dtype="object")
for _base_param in _best_iter_params[: self.best_width]:
_rem_pds = np.array(
[
self.__param_distance(_base_param, h)
for h in _hyper_remaining
]
)
_rem_pd_sort_idx = np.argsort(_rem_pds)
# _min_pd = np.min(_rem_pds)
_min_pd_len = (_rem_pds <= self.pd_th_min).nonzero()[0].shape[0]
_new_hyper_idx = _rem_pd_sort_idx[:_min_pd_len]
_hyper_rem_idx = np.setdiff1d(
np.arange(len(_hyper_remaining)), _new_hyper_idx
)
_new_hyper = np.concatenate(
[_new_hyper, _hyper_remaining[_new_hyper_idx]]
)
_hyper_remaining = _hyper_remaining[_hyper_rem_idx]
_hyper = _new_hyper
return scores
def __param_distance(self, param1, param2):
score = 0
for k, v in param1.items():
if param2[k] != v:
score += 1
return score