1
0
Fork 0
QuaPy/quapy/method/meta.py

296 lines
13 KiB
Python

import numpy as np
from sklearn.linear_model import LogisticRegressionCV, LogisticRegression
import quapy as qp
from sklearn.model_selection import GridSearchCV, cross_val_predict
from model_selection import GridSearchQ
from .base import BaseQuantifier, BinaryQuantifier
from joblib import Parallel, delayed
from copy import deepcopy
from data import LabelledCollection
from quapy import functional as F
from . import neural
from evaluation import evaluate
QuaNet = neural.QuaNetTrainer
class Ensemble(BaseQuantifier):
VALID_POLICIES = {'ave', 'ptr', 'ds'} | qp.error.QUANTIFICATION_ERROR_NAMES
"""
Methods from the articles:
Pérez-Gállego, P., Quevedo, J. R., & del Coz, J. J. (2017).
Using ensembles for problems with characterizable changes in data distribution: A case study on quantification.
Information Fusion, 34, 87-100.
and
Pérez-Gállego, P., Castano, A., Quevedo, J. R., & del Coz, J. J. (2019).
Dynamic ensemble selection for quantification tasks.
Information Fusion, 45, 1-15.
"""
def __init__(self, quantifier: BaseQuantifier, size=50, min_pos=1, red_size=25, policy='ave', n_jobs=1):
assert policy in Ensemble.VALID_POLICIES, f'unknown policy={policy}; valid are {Ensemble.VALID_POLICIES}'
self.base_quantifier = quantifier
self.size = size
self.min_pos = min_pos
self.red_size = red_size
self.policy = policy
self.n_jobs = n_jobs
self.post_proba_fn = None
def fit(self, data: LabelledCollection):
if self.policy=='ds' and not data.binary:
raise ValueError(f'ds policy is only defined for binary quantification, but this dataset is not binary')
# randomly chooses the prevalences for each member of the ensemble (preventing classes with less than
# min_pos positive examples)
prevs = [_draw_simplex(ndim=data.n_classes, min_val=self.min_pos / len(data)) for _ in range(self.size)]
posteriors = None
if self.policy == 'ds':
# precompute the training posterior probabilities
posteriors, self.post_proba_fn = self.ds_policy_get_posteriors(data)
is_static_policy = (self.policy in qp.error.QUANTIFICATION_ERROR_NAMES)
self.ensemble = Parallel(n_jobs=self.n_jobs)(
delayed(_delayed_new_instance)(
self.base_quantifier, data, prev, posteriors, keep_samples=is_static_policy
) for prev in prevs
)
# static selection policy (the name of a quantification-oriented error function to minimize)
if self.policy in qp.error.QUANTIFICATION_ERROR_NAMES:
self.accuracy_policy(error_name=self.policy)
return self
def quantify(self, instances):
predictions = np.asarray(Parallel(n_jobs=self.n_jobs)(
delayed(_delayed_quantify)(Qi, instances) for Qi in self.ensemble
))
if self.policy == 'ptr':
predictions = self.ptr_policy(predictions)
elif self.policy == 'ds':
predictions = self.ds_policy(predictions, instances)
predictions = np.mean(predictions, axis=0)
return F.normalize_prevalence(predictions)
def set_params(self, **parameters):
raise NotImplementedError(f'{self.__class__.__name__} should not be used within GridSearchQ; '
f'instead, use GridSearchQ within Ensemble, or GridSearchCV whithin the '
f'base quantifier if it is an aggregative one.')
def get_params(self, deep=True):
raise NotImplementedError()
def accuracy_policy(self, error_name):
"""
Selects the red_size best performant quantifiers in a static way (i.e., dropping all non-selected instances).
For each model in the ensemble, the performance is measured in terms of _error_name_ on the quantification of
the samples used for training the rest of the models in the ensemble.
"""
error = getattr(qp.error, error_name)
tests = [m[3] for m in self.ensemble]
scores = []
for i, model in enumerate(self.ensemble):
scores.append(evaluate(model[0], tests[:i] + tests[i+1:], error, self.n_jobs))
order = np.argsort(scores)
self.ensemble = select_k(self.ensemble, order, k=self.red_size)
def ptr_policy(self, predictions):
"""
Selects the predictions made by models that have been trained on samples with a prevalence that is most similar
to a first approximation of the test prevalence as made by all models in the ensemble.
"""
test_prev_estim = predictions.mean(axis=0)
tr_prevs = [m[1] for m in self.ensemble]
ptr_differences = [qp.error.mse(ptr_i, test_prev_estim) for ptr_i in tr_prevs]
order = np.argsort(ptr_differences)
return select_k(predictions, order, k=self.red_size)
def ds_policy_get_posteriors(self, data: LabelledCollection):
"""
In the original article, this procedure is not described in a sufficient level of detail. The paper only says
that the distribution of posterior probabilities from training and test examples is compared by means of the
Hellinger Distance. However, how these posterior probabilities are generated is not specified. In the article,
a Logistic Regressor (LR) is used as the classifier device and that could be used for this purpose. However, in
general, a Quantifier is not necessarily an instance of Aggreggative Probabilistic Quantifiers, and so that the
quantifier builds on top of a probabilistic classifier cannot be given for granted. Additionally, it would not
be correct to generate the posterior probabilities for training documents that have concurred in training the
classifier that generates them.
This function thus generates the posterior probabilities for all training documents in a cross-validation way,
using a LR with hyperparameters that have previously been optimized via grid search in 5FCV.
:return P,f, where P is a ndarray containing the posterior probabilities of the training data, generated via
cross-validation and using an optimized LR, and the function to be used in order to generate posterior
probabilities for test instances.
"""
X, y = data.Xy
lr_base = LogisticRegression(class_weight='balanced', max_iter=1000)
optim = GridSearchCV(
lr_base, param_grid={'C': np.logspace(-4,4,9)}, cv=5, n_jobs=self.n_jobs, refit=True
).fit(X, y)
posteriors = cross_val_predict(
optim.best_estimator_, X, y, cv=5, n_jobs=self.n_jobs, method='predict_proba'
)
posteriors_generator = optim.best_estimator_.predict_proba
return posteriors, posteriors_generator
def ds_policy(self, predictions, test):
test_posteriors = self.post_proba_fn(test)
test_distribution = get_probability_distribution(test_posteriors)
tr_distributions = [m[2] for m in self.ensemble]
dist = [F.HellingerDistance(tr_dist_i, test_distribution) for tr_dist_i in tr_distributions]
order = np.argsort(dist)
return select_k(predictions, order, k=self.red_size)
def get_probability_distribution(posterior_probabilities, bins=8):
assert posterior_probabilities.shape[1]==2, 'the posterior probabilities do not seem to be for a binary problem'
posterior_probabilities = posterior_probabilities[:,1] # take the positive posteriors only
distribution, _ = np.histogram(posterior_probabilities, bins=bins, range=(0, 1), density=True)
return distribution
def select_k(elements, order, k):
return [elements[idx] for idx in order[:k]]
def _delayed_new_instance(base_quantifier, data:LabelledCollection, prev, posteriors, keep_samples):
model = deepcopy(base_quantifier)
sample_index = data.sampling_index(len(data), *prev)
sample = data.sampling_from_index(sample_index)
model.fit(sample)
tr_prevalence = sample.prevalence()
tr_distribution = get_probability_distribution(posteriors[sample_index]) if (posteriors is not None) else None
return (model, tr_prevalence, tr_distribution, sample if keep_samples else None)
def _delayed_fit(quantifier, data):
quantifier.fit(data)
def _delayed_quantify(quantifier, instances):
return quantifier[0].quantify(instances)
def _draw_simplex(ndim, min_val, max_trials=100):
"""
returns a uniform sampling from the ndim-dimensional simplex but guarantees that all dimensions
are >= min_class_prev (for min_val>0, this makes the sampling not truly uniform)
:param ndim: number of dimensions of the simplex
:param min_val: minimum class prevalence allowed. If less than 1/ndim a ValueError will be throw since
there is no possible solution.
:return: a sample from the ndim-dimensional simplex that is uniform in S(ndim)-R where S(ndim) is the simplex
and R is the simplex subset containing dimensions lower than min_val
"""
if min_val >= 1/ndim:
raise ValueError(f'no sample can be draw from the {ndim}-dimensional simplex so that '
f'all its values are >={min_val} (try with a larger value for min_pos)')
trials = 0
while True:
u = F.uniform_simplex_sampling(ndim)
if all(u >= min_val):
return u
trials += 1
if trials >= max_trials:
raise ValueError(f'it looks like finding a random simplex with all its dimensions being'
f'>= {min_val} is unlikely (it failed after {max_trials} trials)')
def _instantiate_ensemble(learner, base_quantifier_class, param_grid, optim, sample_size, eval_budget, **kwargs):
if optim is None:
base_quantifier = base_quantifier_class(learner)
elif optim in qp.error.CLASSIFICATION_ERROR:
learner = GridSearchCV(learner, param_grid)
base_quantifier = base_quantifier_class(learner)
elif optim in qp.error.QUANTIFICATION_ERROR:
base_quantifier = GridSearchQ(base_quantifier_class(learner),
param_grid=param_grid,
sample_size=sample_size,
eval_budget=eval_budget,
error=optim)
else:
raise ValueError(f'value optim={optim} not understood')
return Ensemble(base_quantifier, **kwargs)
class EnsembleFactory(BaseQuantifier):
def __init__(self, learner, base_quantifier_class, param_grid=None, optim=None, sample_size=None, eval_budget=None,
size=50, min_pos=1, red_size=25, policy='ave', n_jobs=1):
if param_grid is None and optim is not None:
raise ValueError(f'param_grid is None but optim was requested.')
error = self._check_error(optim)
self.model = _instantiate_ensemble(learner, base_quantifier_class, param_grid, error, sample_size,
eval_budget, size=size, min_pos=min_pos, red_size=red_size,
policy=policy, n_jobs=n_jobs)
def fit(self, data):
return self.model.fit(data)
def quantify(self, instances):
return self.model.quantify(instances)
def set_params(self, **parameters):
return self.model.set_params(**parameters)
def get_params(self, deep=True):
return self.model.get_params(deep)
def _check_error(self, error):
if error is None:
return None
if error in qp.error.QUANTIFICATION_ERROR or error in qp.error.CLASSIFICATION_ERROR:
return error
elif isinstance(error, str):
assert error in qp.error.ERROR_NAMES, \
f'unknown error name; valid ones are {qp.error.ERROR_NAMES}'
return getattr(qp.error, error)
else:
raise ValueError(f'unexpected error type; must either be a callable function or a str representing\n'
f'the name of an error function in {qp.error.ERROR_NAMES}')
class ECC(EnsembleFactory):
def __init__(self, learner, param_grid=None, optim=None, sample_size=None, eval_budget=None,
size=50, min_pos=1, red_size=25, policy='ave', n_jobs=1):
super().__init__(
learner, qp.method.aggregative.CC, param_grid, optim, sample_size, eval_budget, size, min_pos,
red_size, policy, n_jobs
)
class EACC(EnsembleFactory):
def __init__(self, learner, param_grid=None, optim=None, sample_size=None, eval_budget=None,
size=50, min_pos=1, red_size=25, policy='ave', n_jobs=1):
super().__init__(
learner, qp.method.aggregative.ACC, param_grid, optim, sample_size, eval_budget, size, min_pos,
red_size, policy, n_jobs
)
class EHDy(EnsembleFactory):
def __init__(self, learner, param_grid=None, optim=None, sample_size=None, eval_budget=None,
size=50, min_pos=1, red_size=25, policy='ave', n_jobs=1):
super().__init__(
learner, qp.method.aggregative.HDy, param_grid, optim, sample_size, eval_budget, size, min_pos,
red_size, policy, n_jobs
)
class EEMQ(EnsembleFactory):
def __init__(self, learner, param_grid=None, optim=None, sample_size=None, eval_budget=None,
size=50, min_pos=1, red_size=25, policy='ave', n_jobs=1):
super().__init__(
learner, qp.method.aggregative.EMQ, param_grid, optim, sample_size, eval_budget, size, min_pos,
red_size, policy, n_jobs
)