diff --git a/.gitignore b/.gitignore index b9703a3..8eaff3e 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,32 @@ dmypy.json .pyre/ *__pycache__* +*.pdf +*.zip +*.png +*.csv +*.pkl +*.dataframe + + +# other projects +LeQua2022 +MultiLabel +NewMethods +Ordinal +Retrieval +eDiscovery +poster-cikm +slides-cikm +slides-short-cikm +quick_experiment +svm_perf_quantification/svm_struct +svm_perf_quantification/svm_light +TweetSentQuant + + + + + + +*.png diff --git a/examples/custom_quantifier.py b/examples/custom_quantifier.py index 31a69cd..fa014de 100644 --- a/examples/custom_quantifier.py +++ b/examples/custom_quantifier.py @@ -2,7 +2,7 @@ import quapy as qp from quapy.data import LabelledCollection from quapy.method.base import BinaryQuantifier from quapy.model_selection import GridSearchQ -from quapy.method.aggregative import AggregativeProbabilisticQuantifier +from quapy.method.aggregative import AggregativeSoftQuantifier from quapy.protocol import APP import numpy as np from sklearn.linear_model import LogisticRegression @@ -15,7 +15,7 @@ from sklearn.linear_model import LogisticRegression # internal hyperparameter (let say, alpha) which is the decision threshold. Let's also assume the quantifier # is binary, for simplicity. -class MyQuantifier(AggregativeProbabilisticQuantifier, BinaryQuantifier): +class MyQuantifier(AggregativeSoftQuantifier, BinaryQuantifier): def __init__(self, classifier, alpha=0.5): self.alpha = alpha # aggregative quantifiers have an internal self.classifier attribute diff --git a/examples/model_selection.py b/examples/model_selection.py index ae7fb6a..4e52784 100644 --- a/examples/model_selection.py +++ b/examples/model_selection.py @@ -1,57 +1,71 @@ import quapy as qp -from quapy.protocol import APP +from method.kdey import KDEyML +from quapy.method.non_aggregative import DMx +from quapy.protocol import APP, UPP from quapy.method.aggregative import DMy from sklearn.linear_model import LogisticRegression +from examples.comparing_gridsearch import OLD_GridSearchQ import numpy as np +from time import time """ In this example, we show how to perform model selection on a DistributionMatching quantifier. """ -model = DMy(LogisticRegression()) +model = KDEyML(LogisticRegression()) qp.environ['SAMPLE_SIZE'] = 100 qp.environ['N_JOBS'] = -1 -training, test = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=5).train_test +# training, test = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=5).train_test +training, test = qp.datasets.fetch_UCIMulticlassDataset('letter').train_test -# The model will be returned by the fit method of GridSearchQ. -# Every combination of hyper-parameters will be evaluated by confronting the -# quantifier thus configured against a series of samples generated by means -# of a sample generation protocol. For this example, we will use the -# artificial-prevalence protocol (APP), that generates samples with prevalence -# values in the entire range of values from a grid (e.g., [0, 0.1, 0.2, ..., 1]). -# We devote 30% of the dataset for this exploration. -training, validation = training.split_stratified(train_prop=0.7) -protocol = APP(validation) +with qp.util.temp_seed(0): -# We will explore a classification-dependent hyper-parameter (e.g., the 'C' -# hyper-parameter of LogisticRegression) and a quantification-dependent hyper-parameter -# (e.g., the number of bins in a DistributionMatching quantifier. -# Classifier-dependent hyper-parameters have to be marked with a prefix "classifier__" -# in order to let the quantifier know this hyper-parameter belongs to its underlying -# classifier. -param_grid = { - 'classifier__C': np.logspace(-3,3,7), - 'nbins': [8, 16, 32, 64], -} + # The model will be returned by the fit method of GridSearchQ. + # Every combination of hyper-parameters will be evaluated by confronting the + # quantifier thus configured against a series of samples generated by means + # of a sample generation protocol. For this example, we will use the + # artificial-prevalence protocol (APP), that generates samples with prevalence + # values in the entire range of values from a grid (e.g., [0, 0.1, 0.2, ..., 1]). + # We devote 30% of the dataset for this exploration. + training, validation = training.split_stratified(train_prop=0.7) + protocol = UPP(validation) -model = qp.model_selection.GridSearchQ( - model=model, - param_grid=param_grid, - protocol=protocol, - error='mae', # the error to optimize is the MAE (a quantification-oriented loss) - refit=True, # retrain on the whole labelled set once done - verbose=True # show information as the process goes on -).fit(training) + # We will explore a classification-dependent hyper-parameter (e.g., the 'C' + # hyper-parameter of LogisticRegression) and a quantification-dependent hyper-parameter + # (e.g., the number of bins in a DistributionMatching quantifier. + # Classifier-dependent hyper-parameters have to be marked with a prefix "classifier__" + # in order to let the quantifier know this hyper-parameter belongs to its underlying + # classifier. + param_grid = { + 'classifier__C': np.logspace(-3,3,7), + 'classifier__class_weight': ['balanced', None], + 'bandwidth': np.linspace(0.01, 0.2, 20), + } + + tinit = time() + + # model = OLD_GridSearchQ( + model = qp.model_selection.GridSearchQ( + model=model, + param_grid=param_grid, + protocol=protocol, + error='mae', # the error to optimize is the MAE (a quantification-oriented loss) + refit=False, # retrain on the whole labelled set once done + # raise_errors=False, + verbose=True # show information as the process goes on + ).fit(training) + +tend = time() print(f'model selection ended: best hyper-parameters={model.best_params_}') model = model.best_model_ # evaluation in terms of MAE # we use the same evaluation protocol (APP) on the test set -mae_score = qp.evaluation.evaluate(model, protocol=APP(test), error_metric='mae') +mae_score = qp.evaluation.evaluate(model, protocol=UPP(test), error_metric='mae') print(f'MAE={mae_score:.5f}') - +print(f'model selection took {tend-tinit:.1f}s') diff --git a/examples/uci_experiments.py b/examples/uci_experiments.py index 2cf5bac..09efe5d 100644 --- a/examples/uci_experiments.py +++ b/examples/uci_experiments.py @@ -104,7 +104,7 @@ def run(experiment): timeout=60*60, verbose=True ) - model_selection.fit(data.training) + model_selection.fit(train) model = model_selection.best_model() best_params = model_selection.best_params_ else: diff --git a/quapy/CHANGE_LOG.txt b/quapy/CHANGE_LOG.txt index 1534038..43f4fdf 100644 --- a/quapy/CHANGE_LOG.txt +++ b/quapy/CHANGE_LOG.txt @@ -1,6 +1,9 @@ Change Log 0.1.8 ---------------- +- Fixed ThresholdOptimization methods (X, T50, MAX, MS and MS2). Thanks to Tobias Schumacher and colleagues for pointing + this out in Appendix A of "Schumacher, T., Strohmaier, M., & Lemmerich, F. (2021). A comparative evaluation of + quantification methods. arXiv:2103.03223v3 [cs.LG]" - Added HDx and DistributionMatchingX to non-aggregative quantifiers (see also the new example "comparing_HDy_HDx.py") - New UCI multiclass datasets added (thanks to Pablo González). The 5 UCI multiclass datasets are those corresponding to the following criteria: diff --git a/quapy/classification/calibration.py b/quapy/classification/calibration.py index a3f1543..0f5e9f7 100644 --- a/quapy/classification/calibration.py +++ b/quapy/classification/calibration.py @@ -24,7 +24,8 @@ class RecalibratedProbabilisticClassifier: class RecalibratedProbabilisticClassifierBase(BaseEstimator, RecalibratedProbabilisticClassifier): """ Applies a (re)calibration method from `abstention.calibration`, as defined in - `Alexandari et al. paper `_: + `Alexandari et al. paper `_. + :param classifier: a scikit-learn probabilistic classifier :param calibrator: the calibration object (an instance of abstention.calibration.CalibratorFactory) @@ -59,7 +60,7 @@ class RecalibratedProbabilisticClassifierBase(BaseEstimator, RecalibratedProbabi elif isinstance(k, float): if not (0 < k < 1): raise ValueError('wrong value for val_split: the proportion of validation documents must be in (0,1)') - return self.fit_cv(X, y) + return self.fit_tr_val(X, y) def fit_cv(self, X, y): """ @@ -94,7 +95,7 @@ class RecalibratedProbabilisticClassifierBase(BaseEstimator, RecalibratedProbabi self.classifier.fit(Xtr, ytr) posteriors = self.classifier.predict_proba(Xva) nclasses = len(np.unique(yva)) - self.calibrator = self.calibrator(posteriors, np.eye(nclasses)[yva], posterior_supplied=True) + self.calibration_function = self.calibrator(posteriors, np.eye(nclasses)[yva], posterior_supplied=True) return self def predict(self, X): diff --git a/quapy/functional.py b/quapy/functional.py index e29466f..c6dc351 100644 --- a/quapy/functional.py +++ b/quapy/functional.py @@ -66,6 +66,24 @@ def prevalence_from_probabilities(posteriors, binarize: bool = False): return prevalences +def as_binary_prevalence(positive_prevalence: Union[float, np.ndarray], clip_if_necessary=False): + """ + Helper that, given a float representing the prevalence for the positive class, returns a np.ndarray of two + values representing a binary distribution. + + :param positive_prevalence: prevalence for the positive class + :param clip_if_necessary: if True, clips the value in [0,1] in order to guarantee the resulting distribution + is valid. If False, it then checks that the value is in the valid range, and raises an error if not. + :return: np.ndarray of shape `(2,)` + """ + if clip_if_necessary: + positive_prevalence = np.clip(positive_prevalence, 0, 1) + else: + assert 0 <= positive_prevalence <= 1, 'the value provided is not a valid prevalence for the positive class' + return np.asarray([1-positive_prevalence, positive_prevalence]).T + + + def HellingerDistance(P, Q) -> float: """ Computes the Hellingher Distance (HD) between (discretized) distributions `P` and `Q`. diff --git a/quapy/method/aggregative.py b/quapy/method/aggregative.py index 232a92b..b7b7409 100644 --- a/quapy/method/aggregative.py +++ b/quapy/method/aggregative.py @@ -1,15 +1,17 @@ -from abc import abstractmethod +from abc import ABC, abstractmethod from copy import deepcopy from typing import Callable, Union import numpy as np +from abstention.calibration import NoBiasVectorScaling, TempScaling, VectorScaling from scipy import optimize from sklearn.base import BaseEstimator from sklearn.calibration import CalibratedClassifierCV from sklearn.metrics import confusion_matrix from sklearn.model_selection import cross_val_predict + import quapy as qp import quapy.functional as F -from functional import get_divergence +from quapy.functional import get_divergence from quapy.classification.calibration import NBVSCalibration, BCTSCalibration, TSCalibration, VSCalibration from quapy.classification.svmperf import SVMperf from quapy.data import LabelledCollection @@ -19,25 +21,124 @@ from quapy.method.base import BaseQuantifier, BinaryQuantifier, OneVsAllGeneric # Abstract classes # ------------------------------------ -class AggregativeQuantifier(BaseQuantifier): +class AggregativeQuantifier(BaseQuantifier, ABC): """ Abstract class for quantification methods that base their estimations on the aggregation of classification - results. Aggregative Quantifiers thus implement a :meth:`classify` method and maintain a :attr:`classifier` - attribute. Subclasses of this abstract class must implement the method :meth:`aggregate` which computes the - aggregation of label predictions. The method :meth:`quantify` comes with a default implementation based on - :meth:`classify` and :meth:`aggregate`. + results. Aggregative quantifiers implement a pipeline that consists of generating classification predictions + and aggregating them. For this reason, the training phase is implemented by :meth:`classification_fit` followed + by :meth:`aggregation_fit`, while the testing phase is implemented by :meth:`classify` followed by + :meth:`aggregate`. Subclasses of this abstract class must provide implementations for these methods. + Aggregative quantifiers also maintain a :attr:`classifier` attribute. + + The method :meth:`fit` comes with a default implementation based on :meth:`classification_fit` + and :meth:`aggregation_fit`. + + The method :meth:`quantify` comes with a default implementation based on :meth:`classify` + and :meth:`aggregate`. """ - @abstractmethod - def fit(self, data: LabelledCollection, fit_classifier=True): + val_split_ = None + + @property + def val_split(self): + return self.val_split_ + + @val_split.setter + def val_split(self, val_split): + if isinstance(val_split, LabelledCollection): + print('warning: setting val_split with a LabelledCollection will be inefficient in' + 'model selection. Rather pass the LabelledCollection at fit time') + self.val_split_ = val_split + + def fit(self, data: LabelledCollection, fit_classifier=True, val_split=None): """ - Trains the aggregative quantifier + Trains the aggregative quantifier. This comes down to training a classifier and an aggregation function. :param data: a :class:`quapy.data.base.LabelledCollection` consisting of the training data - :param fit_classifier: whether or not to train the learner (default is True). Set to False if the + :param fit_classifier: whether to train the learner (default is True). Set to False if the learner has been trained outside the quantifier. :return: self """ + classif_predictions = self.classifier_fit_predict(data, fit_classifier, predict_on=val_split) + self.aggregation_fit(classif_predictions, data) + return self + + def classifier_fit_predict(self, data: LabelledCollection, fit_classifier=True, predict_on=None): + """ + Trains the classifier if requested (`fit_classifier=True`) and generate the necessary predictions to + train the aggregation function. + + :param data: a :class:`quapy.data.base.LabelledCollection` consisting of the training data + :param fit_classifier: whether to train the learner (default is True). Set to False if the + learner has been trained outside the quantifier. + :param predict_on: specifies the set on which predictions need to be issued. This parameter can + be specified as None (default) to indicate no prediction is needed; a float in (0, 1) to + indicate the proportion of instances to be used for predictions (the remainder is used for + training); an integer >1 to indicate that the predictions must be generated via k-fold + cross-validation, using this integer as k; or the data sample itself on which to generate + the predictions. + """ + assert isinstance(fit_classifier, bool), 'unexpected type for "fit_classifier", must be boolean' + + self._check_classifier(adapt_if_necessary=(self._classifier_method() == 'predict_proba')) + + if predict_on is None: + predict_on = self.val_split + + if predict_on is None: + if fit_classifier: + self.classifier.fit(*data.Xy) + predictions = None + + elif isinstance(predict_on, float): + if fit_classifier: + if not (0. < predict_on < 1.): + raise ValueError(f'proportion {predict_on=} out of range, must be in (0,1)') + train, val = data.split_stratified(train_prop=(1 - predict_on)) + self.classifier.fit(*train.Xy) + predictions = LabelledCollection(self.classify(val.X), val.y, classes=data.classes_) + else: + raise ValueError(f'wrong type for predict_on: since fit_classifier=False, ' + f'the set on which predictions have to be issued must be ' + f'explicitly indicated') + + elif isinstance(predict_on, LabelledCollection): + if fit_classifier: + self.classifier.fit(*data.Xy) + predictions = LabelledCollection(self.classify(predict_on.X), predict_on.y, classes=predict_on.classes_) + + elif isinstance(predict_on, int): + if fit_classifier: + if predict_on <= 1: + raise ValueError(f'invalid value {predict_on} in fit. ' + f'Specify a integer >1 for kFCV estimation.') + else: + predictions = cross_val_predict( + self.classifier, *data.Xy, cv=predict_on, n_jobs=self.n_jobs, method=self._classifier_method()) + predictions = LabelledCollection(predictions, data.y, classes=data.classes_) + self.classifier.fit(*data.Xy) + else: + raise ValueError(f'wrong type for predict_on: since fit_classifier=False, ' + f'the set on which predictions have to be issued must be ' + f'explicitly indicated') + + else: + raise ValueError( + f'error: param "predict_on" ({type(predict_on)}) not understood; ' + f'use either a float indicating the split proportion, or a ' + f'tuple (X,y) indicating the validation partition') + + return predictions + + @abstractmethod + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): + """ + Trains the aggregation function. + + :param classif_predictions: a LabelledCollection containing the label predictions issued + by the classifier + :param data: a :class:`quapy.data.base.LabelledCollection` consisting of the training data + """ ... @property @@ -61,13 +162,31 @@ class AggregativeQuantifier(BaseQuantifier): def classify(self, instances): """ Provides the label predictions for the given instances. The predictions should respect the format expected by - :meth:`aggregate`, i.e., posterior probabilities for probabilistic quantifiers, or crisp predictions for - non-probabilistic quantifiers + :meth:`aggregate`, e.g., posterior probabilities for probabilistic quantifiers, or crisp predictions for + non-probabilistic quantifiers. The default one is "decision_function". - :param instances: array-like + :param instances: array-like of shape `(n_instances, n_features,)` :return: np.ndarray of shape `(n_instances,)` with label predictions """ - return self.classifier.predict(instances) + return getattr(self.classifier, self._classifier_method())(instances) + + def _classifier_method(self): + """ + Name of the method that must be used for issuing label predictions. The default one is "decision_function". + + :return: string + """ + return 'decision_function' + + def _check_classifier(self, adapt_if_necessary=False): + """ + Guarantees that the underlying classifier implements the method required for issuing predictions, i.e., + the method indicated by the :meth:`_classifier_method` + + :param adapt_if_necessary: if True, the method will try to comply with the required specifications + """ + assert hasattr(self.classifier, self._classifier_method()), \ + f"the method does not implement the required {self._classifier_method()} method" def quantify(self, instances): """ @@ -101,122 +220,82 @@ class AggregativeQuantifier(BaseQuantifier): return self.classifier.classes_ -class AggregativeProbabilisticQuantifier(AggregativeQuantifier): +class AggregativeCrispQuantifier(AggregativeQuantifier, ABC): """ - Abstract class for quantification methods that base their estimations on the aggregation of posterior probabilities - as returned by a probabilistic classifier. Aggregative Probabilistic Quantifiers thus extend Aggregative - Quantifiers by implementing a _posterior_probabilities_ method returning values in [0,1] -- the posterior - probabilities. + Abstract class for quantification methods that base their estimations on the aggregation of crips decisions + as returned by a hard classifier. Aggregative crisp quantifiers thus extend Aggregative + Quantifiers by implementing specifications about crisp predictions. """ - def classify(self, instances): - return self.classifier.predict_proba(instances) + def _classifier_method(self): + """ + Name of the method that must be used for issuing label predictions. For crisp quantifiers, the method + is 'predict', that returns an array of shape `(n_instances,)` of label predictions. + + :return: the string "predict", i.e., the standard method name for scikit-learn hard predictions + """ + return 'predict' -# Helper -# ------------------------------------ -def _ensure_probabilistic(classifier): - if not hasattr(classifier, 'predict_proba'): - print(f'The learner {classifier.__class__.__name__} does not seem to be probabilistic. ' - f'The learner will be calibrated.') - classifier = CalibratedClassifierCV(classifier, cv=5) - return classifier - - -def _training_helper(classifier, - data: LabelledCollection, - fit_classifier: bool = True, - ensure_probabilistic=False, - val_split: Union[LabelledCollection, float] = None): +class AggregativeSoftQuantifier(AggregativeQuantifier, ABC): """ - Training procedure common to all Aggregative Quantifiers. - - :param classifier: the learner to be fit - :param data: the data on which to fit the learner. If requested, the data will be split before fitting the learner. - :param fit_classifier: whether or not to fit the learner (if False, then bypasses any action) - :param ensure_probabilistic: if True, guarantees that the resulting classifier implements predict_proba (if the - learner is not probabilistic, then a CalibratedCV instance of it is trained) - :param val_split: if specified as a float, indicates the proportion of training instances that will define the - validation split (e.g., 0.3 for using 30% of the training set as validation data); if specified as a - LabelledCollection, represents the validation split itself - :return: the learner trained on the training set, and the unused data (a _LabelledCollection_ if train_val_split>0 - or None otherwise) to be used as a validation set for any subsequent parameter fitting + Abstract class for quantification methods that base their estimations on the aggregation of posterior + probabilities as returned by a probabilistic classifier. + Aggregative soft quantifiers thus extend Aggregative Quantifiers by implementing specifications + about soft predictions. """ - if fit_classifier: - if ensure_probabilistic: - classifier = _ensure_probabilistic(classifier) - if val_split is not None: - if isinstance(val_split, float): - if not (0 < val_split < 1): - raise ValueError(f'train/val split {val_split} out of range, must be in (0,1)') - train, unused = data.split_stratified(train_prop=1 - val_split) - elif isinstance(val_split, LabelledCollection): - train = data - unused = val_split + + def _classifier_method(self): + """ + Name of the method that must be used for issuing label predictions. For probabilistic quantifiers, the method + is 'predict_proba', that returns an array of shape `(n_instances, n_dimensions,)` with posterior + probabilities. + + :return: the string "predict_proba", i.e., the standard method name for scikit-learn soft predictions + """ + return 'predict_proba' + + def _check_classifier(self, adapt_if_necessary=False): + """ + Guarantees that the underlying classifier implements the method indicated by the :meth:`_classifier_method`. + In case it does not, the classifier is calibrated (by means of the Platt's calibration method implemented by + scikit-learn in CalibratedClassifierCV, with cv=5). This calibration is only allowed if `adapt_if_necessary` + is set to True. If otherwise (i.e., the classifier is not probabilistic, and `adapt_if_necessary` is set + to False), an exception will be raised. + + :param adapt_if_necessary: a hard classifier is turned into a soft classifier if `adapt_if_necessary==True` + """ + if not hasattr(self.classifier, self._classifier_method()): + if adapt_if_necessary: + print(f'warning: The learner {self.classifier.__class__.__name__} does not seem to be ' + f'probabilistic. The learner will be calibrated (using CalibratedClassifierCV).') + self.classifier = CalibratedClassifierCV(self.classifier, cv=5) else: - raise ValueError( - f'param "val_split" ({type(val_split)}) not understood; use either a float indicating the split ' - 'proportion, or a LabelledCollection indicating the validation split') - else: - train, unused = data, None - - if isinstance(classifier, BaseQuantifier): - classifier.fit(train) - else: - classifier.fit(*train.Xy) - else: - if ensure_probabilistic: - if not hasattr(classifier, 'predict_proba'): - raise AssertionError('error: the learner cannot be calibrated since fit_classifier is set to False') - unused = None - if isinstance(val_split, LabelledCollection): - unused = val_split - - return classifier, unused + raise AssertionError(f'error: The learner {self.classifier.__class__.__name__} does not ' + f'seem to be probabilistic. The learner cannot be calibrated since ' + f'fit_classifier is set to False') -def cross_generate_predictions( - data, - classifier, - val_split, - probabilistic, - fit_classifier, - n_jobs -): +class BinaryAggregativeQuantifier(AggregativeQuantifier, BinaryQuantifier): + + @property + def pos_label(self): + return self.classifier.classes_[1] - n_jobs = qp._get_njobs(n_jobs) + @property + def neg_label(self): + return self.classifier.classes_[0] - if isinstance(val_split, int): - assert fit_classifier == True, \ - 'the parameters for the adjustment cannot be estimated with kFCV with fit_classifier=False' + def fit(self, data: LabelledCollection, fit_classifier=True, val_split=None): + self._check_binary(data, self.__class__.__name__) + return super().fit(data, fit_classifier, val_split) + - if probabilistic: - classifier = _ensure_probabilistic(classifier) - predict = 'predict_proba' - else: - predict = 'predict' - y_pred = cross_val_predict(classifier, *data.Xy, cv=val_split, n_jobs=n_jobs, method=predict) - class_count = data.counts() - - # fit the learner on all data - classifier.fit(*data.Xy) - y = data.y - classes = data.classes_ - else: - classifier, val_data = _training_helper( - classifier, data, fit_classifier, ensure_probabilistic=probabilistic, val_split=val_split - ) - y_pred = classifier.predict_proba(val_data.instances) if probabilistic else classifier.predict(val_data.instances) - y = val_data.labels - classes = val_data.classes_ - class_count = val_data.counts() - - return classifier, y, y_pred, classes, class_count # Methods # ------------------------------------ -class CC(AggregativeQuantifier): +class CC(AggregativeCrispQuantifier): """ The most basic Quantification method. One that simply classifies all instances and counts how many have been attributed to each of the classes in order to compute class prevalence estimates. @@ -227,17 +306,13 @@ class CC(AggregativeQuantifier): def __init__(self, classifier: BaseEstimator): self.classifier = classifier - def fit(self, data: LabelledCollection, fit_classifier=True): + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): """ - Trains the Classify & Count method unless `fit_classifier` is False, in which case, the classifier is assumed to - be already fit and there is nothing else to do. + Nothing to do here! - :param data: a :class:`quapy.data.base.LabelledCollection` consisting of the training data - :param fit_classifier: if False, the classifier is assumed to be fit - :return: self + :param classif_predictions: this is actually None """ - self.classifier, _ = _training_helper(self.classifier, data, fit_classifier) - return self + pass def aggregate(self, classif_predictions: np.ndarray): """ @@ -249,50 +324,37 @@ class CC(AggregativeQuantifier): return F.prevalence_from_labels(classif_predictions, self.classes_) -class ACC(AggregativeQuantifier): +class ACC(AggregativeCrispQuantifier): """ `Adjusted Classify & Count `_, the "adjusted" variant of :class:`CC`, that corrects the predictions of CC according to the `misclassification rates`. :param classifier: a sklearn's Estimator that generates a classifier - :param val_split: indicates the proportion of data to be used as a stratified held-out validation set in which the - misclassification rates are to be estimated. - This parameter can be indicated as a real value (between 0 and 1, default 0.4), representing a proportion of - validation data, or as an integer, indicating that the misclassification rates should be estimated via - `k`-fold cross validation (this integer stands for the number of folds `k`), or as a - :class:`quapy.data.base.LabelledCollection` (the split itself). + :param val_split: specifies the data used for generating classifier predictions. This specification + can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to + be extracted from the training set (default 0.4); or as an integer, indicating that the predictions + are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value + for `k`); or as a collection defining the specific set of data to use for validation. + Alternatively, this set can be specified at fit time by indicating the exact set of data + on which the predictions are to be generated. + :param n_jobs: number of parallel workers """ - def __init__(self, classifier: BaseEstimator, val_split=0.4, n_jobs=None): + def __init__(self, classifier: BaseEstimator, val_split=5, n_jobs=None): self.classifier = classifier self.val_split = val_split self.n_jobs = qp._get_njobs(n_jobs) - def fit(self, data: LabelledCollection, fit_classifier=True, val_split: Union[float, int, LabelledCollection] = None): + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): """ - Trains a ACC quantifier. + Estimates the misclassification rates. - :param data: the training set - :param fit_classifier: set to False to bypass the training (the learner is assumed to be already fit) - :param val_split: either a float in (0,1) indicating the proportion of training instances to use for - validation (e.g., 0.3 for using 30% of the training set as validation data), or a LabelledCollection - indicating the validation set itself, or an int indicating the number `k` of folds to be used in `k`-fold - cross validation to estimate the parameters - :return: self + :param classif_predictions: classifier predictions with true labels """ - - if val_split is None: - val_split = self.val_split - - self.classifier, y, y_, classes, class_count = cross_generate_predictions( - data, self.classifier, val_split, probabilistic=False, fit_classifier=fit_classifier, n_jobs=self.n_jobs - ) - + pred_labels, true_labels = classif_predictions.Xy self.cc = CC(self.classifier) - self.Pte_cond_estim_ = self.getPteCondEstim(self.classifier.classes_, y, y_) - - return self + self.Pte_cond_estim_ = self.getPteCondEstim(self.classifier.classes_, true_labels, pred_labels) @classmethod def getPteCondEstim(cls, classes, y, y_): @@ -308,9 +370,6 @@ class ACC(AggregativeQuantifier): conf[:, i] /= class_counts[i] return conf - def classify(self, data): - return self.cc.classify(data) - def aggregate(self, classif_predictions): prevs_estim = self.cc.aggregate(classif_predictions) return ACC.solve_adjustment(self.Pte_cond_estim_, prevs_estim) @@ -337,7 +396,7 @@ class ACC(AggregativeQuantifier): return adjusted_prevs -class PCC(AggregativeProbabilisticQuantifier): +class PCC(AggregativeSoftQuantifier): """ `Probabilistic Classify & Count `_, the probabilistic variant of CC that relies on the posterior probabilities returned by a probabilistic classifier. @@ -348,58 +407,51 @@ class PCC(AggregativeProbabilisticQuantifier): def __init__(self, classifier: BaseEstimator): self.classifier = classifier - def fit(self, data: LabelledCollection, fit_classifier=True): - self.classifier, _ = _training_helper(self.classifier, data, fit_classifier, ensure_probabilistic=True) - return self + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): + """ + Nothing to do here! + + :param classif_predictions: this is actually None + """ + pass def aggregate(self, classif_posteriors): return F.prevalence_from_probabilities(classif_posteriors, binarize=False) -class PACC(AggregativeProbabilisticQuantifier): +class PACC(AggregativeSoftQuantifier): """ `Probabilistic Adjusted Classify & Count `_, the probabilistic variant of ACC that relies on the posterior probabilities returned by a probabilistic classifier. :param classifier: a sklearn's Estimator that generates a classifier - :param val_split: indicates the proportion of data to be used as a stratified held-out validation set in which the - misclassification rates are to be estimated. - This parameter can be indicated as a real value (between 0 and 1, default 0.4), representing a proportion of - validation data, or as an integer, indicating that the misclassification rates should be estimated via - `k`-fold cross validation (this integer stands for the number of folds `k`), or as a - :class:`quapy.data.base.LabelledCollection` (the split itself). + :param val_split: specifies the data used for generating classifier predictions. This specification + can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to + be extracted from the training set (default 0.4); or as an integer, indicating that the predictions + are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value + for `k`). Alternatively, this set can be specified at fit time by indicating the exact set of data + on which the predictions are to be generated. :param n_jobs: number of parallel workers """ - def __init__(self, classifier: BaseEstimator, val_split=0.4, n_jobs=None): + def __init__(self, classifier: BaseEstimator, val_split=5, n_jobs=None): self.classifier = classifier self.val_split = val_split self.n_jobs = qp._get_njobs(n_jobs) - def fit(self, data: LabelledCollection, fit_classifier=True, val_split: Union[float, int, LabelledCollection] = None): + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): """ - Trains a PACC quantifier. + Estimates the misclassification rates - :param data: the training set - :param fit_classifier: set to False to bypass the training (the learner is assumed to be already fit) - :param val_split: either a float in (0,1) indicating the proportion of training instances to use for - validation (e.g., 0.3 for using 30% of the training set as validation data), or a LabelledCollection - indicating the validation set itself, or an int indicating the number k of folds to be used in kFCV - to estimate the parameters - :return: self + :param classif_predictions: classifier soft predictions with true labels """ - - if val_split is None: - val_split = self.val_split - - self.classifier, y, y_, classes, class_count = cross_generate_predictions( - data, self.classifier, val_split, probabilistic=True, fit_classifier=fit_classifier, n_jobs=self.n_jobs - ) - + posteriors, true_labels = classif_predictions.Xy self.pcc = PCC(self.classifier) - self.Pte_cond_estim_ = self.getPteCondEstim(classes, y, y_) + self.Pte_cond_estim_ = self.getPteCondEstim(self.classifier.classes_, true_labels, posteriors) - return self + def aggregate(self, classif_posteriors): + prevs_estim = self.pcc.aggregate(classif_posteriors) + return ACC.solve_adjustment(self.Pte_cond_estim_, prevs_estim) @classmethod def getPteCondEstim(cls, classes, y, y_): @@ -414,15 +466,8 @@ class PACC(AggregativeProbabilisticQuantifier): return confusion.T - def aggregate(self, classif_posteriors): - prevs_estim = self.pcc.aggregate(classif_posteriors) - return ACC.solve_adjustment(self.Pte_cond_estim_, prevs_estim) - def classify(self, data): - return self.pcc.classify(data) - - -class EMQ(AggregativeProbabilisticQuantifier): +class EMQ(AggregativeSoftQuantifier): """ `Expectation Maximization for Quantification `_ (EMQ), aka `Saerens-Latinne-Decaestecker` (SLD) algorithm. @@ -431,60 +476,30 @@ class EMQ(AggregativeProbabilisticQuantifier): maximum-likelihood estimation, in a mutually recursive way, until convergence. :param classifier: a sklearn's Estimator that generates a classifier - :param exact_train_prev: set to True (default) for using, as the initial observation, the true training prevalence; - or set to False for computing the training prevalence as an estimate, akin to PCC, i.e., as the expected - value of the posterior probabilities of the training instances as suggested in - `Alexandari et al. paper `_: - :param recalib: a string indicating the method of recalibration. Available choices include "nbvs" (No-Bias Vector - Scaling), "bcts" (Bias-Corrected Temperature Scaling), "ts" (Temperature Scaling), and "vs" (Vector Scaling). - The default value is None, indicating no recalibration. """ MAX_ITER = 1000 EPSILON = 1e-4 - def __init__(self, classifier: BaseEstimator, exact_train_prev=True, recalib=None): + def __init__(self, classifier: BaseEstimator): self.classifier = classifier - self.non_calibrated = classifier - self.exact_train_prev = exact_train_prev - self.recalib = recalib - def fit(self, data: LabelledCollection, fit_classifier=True): - if self.recalib is not None: - if self.recalib == 'nbvs': - self.classifier = NBVSCalibration(self.non_calibrated) - elif self.recalib == 'bcts': - self.classifier = BCTSCalibration(self.non_calibrated) - elif self.recalib == 'ts': - self.classifier = TSCalibration(self.non_calibrated) - elif self.recalib == 'vs': - self.classifier = VSCalibration(self.non_calibrated) - elif self.recalib == 'platt': - self.classifier = CalibratedClassifierCV(self.classifier, ensemble=False) - else: - raise ValueError('invalid param argument for recalibration method; available ones are ' - '"nbvs", "bcts", "ts", and "vs".') - self.recalib = None - else: - self.classifier = self.non_calibrated - self.classifier, _ = _training_helper(self.classifier, data, fit_classifier, ensure_probabilistic=True) - if self.exact_train_prev: - self.train_prevalence = F.prevalence_from_labels(data.labels, self.classes_) - else: - self.train_prevalence = qp.model_selection.cross_val_predict( - quantifier=PCC(deepcopy(self.classifier)), - data=data, - nfolds=3, - random_state=0 - ) - return self + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): + self.train_prevalence = data.prevalence() def aggregate(self, classif_posteriors, epsilon=EPSILON): priors, posteriors = self.EM(self.train_prevalence, classif_posteriors, epsilon) return priors def predict_proba(self, instances, epsilon=EPSILON): - classif_posteriors = self.classifier.predict_proba(instances) + """ + Returns the posterior probabilities updated by the EM algorithm. + + :param instances: np.ndarray of shape `(n_instances, n_dimensions)` + :param epsilon: error tolerance + :return: np.ndarray of shape `(n_instances, n_classes)` + """ + classif_posteriors = self.classify(instances) priors, posteriors = self.EM(self.train_prevalence, classif_posteriors, epsilon) return posteriors @@ -527,7 +542,94 @@ class EMQ(AggregativeProbabilisticQuantifier): return qs, ps -class HDy(AggregativeProbabilisticQuantifier, BinaryQuantifier): +class EMQrecalib(AggregativeSoftQuantifier): + """ + `Expectation Maximization for Quantification `_ (EMQ), + aka `Saerens-Latinne-Decaestecker` (SLD) algorithm, with the heuristics proposed by + `Alexandari et al. paper `_. + + These heuristics consist of using, as the training prevalence, an estimate of it obtained via k-fold cross + validation (instead of the true training prevalence), and to recalibrate the posterior probabilities of + the classifier. + + :param classifier: a sklearn's Estimator that generates a classifier + :param val_split: specifies the data used for generating classifier predictions. This specification + can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to + be extracted from the training set (default 0.4); or as an integer, indicating that the predictions + are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value + for `k`, default 5); or as a collection defining the specific set of data to use for validation. + Alternatively, this set can be specified at fit time by indicating the exact set of data + on which the predictions are to be generated. + :param exact_train_prev: set to True (default) for using, as the initial observation, the true training prevalence; + or set to False for computing the training prevalence as an estimate of it, i.e., as the expected + value of the posterior probabilities of the training instances + :param recalib: a string indicating the method of recalibration. + Available choices include "nbvs" (No-Bias Vector Scaling), "bcts" (Bias-Corrected Temperature Scaling, + default), "ts" (Temperature Scaling), and "vs" (Vector Scaling). + :param n_jobs: number of parallel workers + """ + + MAX_ITER = 1000 + EPSILON = 1e-4 + + def __init__(self, classifier: BaseEstimator, val_split=5, exact_train_prev=False, recalib='bcts', n_jobs=None): + self.classifier = classifier + self.val_split = val_split + self.exact_train_prev = exact_train_prev + self.recalib = recalib + self.n_jobs = n_jobs + + def classify(self, instances): + """ + Provides the posterior probabilities for the given instances. If the classifier is + recalibrated, then these posteriors will be recalibrated accordingly. + + :param instances: array-like of shape `(n_instances, n_dimensions,)` + :return: np.ndarray of shape `(n_instances, n_classes,)` with posterior probabilities + """ + posteriors = self.classifier.predict_proba(instances) + if hasattr(self, 'calibration_function') and self.calibration_function is not None: + posteriors = self.calibration_function(posteriors) + return posteriors + + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): + if self.recalib is not None: + P, y = classif_predictions.Xy + if self.recalib == 'nbvs': + calibrator = NoBiasVectorScaling() + elif self.recalib == 'bcts': + calibrator = TempScaling(bias_positions='all') + elif self.recalib == 'ts': + calibrator = TempScaling() + elif self.recalib == 'vs': + calibrator = VectorScaling() + else: + raise ValueError('invalid param argument for recalibration method; available ones are ' + '"nbvs", "bcts", "ts", and "vs".') + + self.calibration_function = calibrator(P, np.eye(data.n_classes)[y], posterior_supplied=True) + + if self.exact_train_prev: + self.train_prevalence = F.prevalence_from_labels(data.labels, self.classes_) + else: + if self.recalib is not None: + train_posteriors = self.classify(data.X) + else: + train_posteriors = classif_predictions.X + + self.train_prevalence = np.mean(train_posteriors, axis=0) + + def aggregate(self, classif_posteriors, epsilon=EPSILON): + priors, posteriors = EMQ.EM(self.train_prevalence, classif_posteriors, epsilon) + return priors + + def predict_proba(self, instances, epsilon=EPSILON): + classif_posteriors = self.classify(instances) + priors, posteriors = EMQ.EM(self.train_prevalence, classif_posteriors, epsilon) + return posteriors + + +class HDy(AggregativeSoftQuantifier, BinaryAggregativeQuantifier): """ `Hellinger Distance y `_ (HDy). HDy is a probabilistic method for training binary quantifiers, that models quantification as the problem of @@ -539,14 +641,14 @@ class HDy(AggregativeProbabilisticQuantifier, BinaryQuantifier): :param classifier: a sklearn's Estimator that generates a binary classifier :param val_split: a float in range (0,1) indicating the proportion of data to be used as a stratified held-out - validation distribution, or a :class:`quapy.data.base.LabelledCollection` (the split itself). + validation distribution, or a :class:`quapy.data.base.LabelledCollection` (the split itself), or an integer indicating the number of folds (default 5).. """ - def __init__(self, classifier: BaseEstimator, val_split=0.4): + def __init__(self, classifier: BaseEstimator, val_split=5): self.classifier = classifier self.val_split = val_split - def fit(self, data: LabelledCollection, fit_classifier=True, val_split: Union[float, LabelledCollection] = None): + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): """ Trains a HDy quantifier. @@ -557,22 +659,21 @@ class HDy(AggregativeProbabilisticQuantifier, BinaryQuantifier): :class:`quapy.data.base.LabelledCollection` indicating the validation set itself :return: self """ - if val_split is None: - val_split = self.val_split + P, y = classif_predictions.Xy + Px = P[:, self.pos_label] # takes only the P(y=+1|x) + self.Pxy1 = Px[y == self.pos_label] + self.Pxy0 = Px[y == self.neg_label] - self._check_binary(data, self.__class__.__name__) - self.classifier, validation = _training_helper( - self.classifier, data, fit_classifier, ensure_probabilistic=True, val_split=val_split) - Px = self.classify(validation.instances)[:, 1] # takes only the P(y=+1|x) - self.Pxy1 = Px[validation.labels == self.classifier.classes_[1]] - self.Pxy0 = Px[validation.labels == self.classifier.classes_[0]] # pre-compute the histogram for positive and negative examples self.bins = np.linspace(10, 110, 11, dtype=int) # [10, 20, 30, ..., 100, 110] + def hist(P, bins): h = np.histogram(P, bins=bins, range=(0, 1), density=True)[0] return h / h.sum() + self.Pxy1_density = {bins: hist(self.Pxy1, bins) for bins in self.bins} self.Pxy0_density = {bins: hist(self.Pxy0, bins) for bins in self.bins} + return self def aggregate(self, classif_posteriors): @@ -580,7 +681,7 @@ class HDy(AggregativeProbabilisticQuantifier, BinaryQuantifier): # and the final estimated a priori probability was taken as the median of these 11 estimates." # (González-Castro, et al., 2013). - Px = classif_posteriors[:, 1] # takes only the P(y=+1|x) + Px = classif_posteriors[:, self.pos_label] # takes only the P(y=+1|x) prev_estimations = [] # for bins in np.linspace(10, 110, 11, dtype=int): #[10, 20, 30, ..., 100, 110] @@ -596,7 +697,7 @@ class HDy(AggregativeProbabilisticQuantifier, BinaryQuantifier): # at small steps (modern implementations resort to an optimization procedure, # see class DistributionMatching) prev_selected, min_dist = None, None - for prev in F.prevalence_linspace(n_prevalences=100, repeats=1, smooth_limits_epsilon=0.0): + for prev in F.prevalence_linspace(n_prevalences=101, repeats=1, smooth_limits_epsilon=0.0): Px_train = prev * Pxy1_density + (1 - prev) * Pxy0_density hdy = F.HellingerDistance(Px_train, Px_test) if prev_selected is None or hdy < min_dist: @@ -604,10 +705,10 @@ class HDy(AggregativeProbabilisticQuantifier, BinaryQuantifier): prev_estimations.append(prev_selected) class1_prev = np.median(prev_estimations) - return np.asarray([1 - class1_prev, class1_prev]) + return F.as_binary_prevalence(class1_prev) -class DyS(AggregativeProbabilisticQuantifier, BinaryQuantifier): +class DyS(AggregativeSoftQuantifier, BinaryAggregativeQuantifier): """ `DyS framework `_ (DyS). DyS is a generalization of HDy method, using a Ternary Search in order to find the prevalence that @@ -616,14 +717,14 @@ class DyS(AggregativeProbabilisticQuantifier, BinaryQuantifier): :param classifier: a sklearn's Estimator that generates a binary classifier :param val_split: a float in range (0,1) indicating the proportion of data to be used as a stratified held-out - validation distribution, or a :class:`quapy.data.base.LabelledCollection` (the split itself). + validation distribution, or a :class:`quapy.data.base.LabelledCollection` (the split itself), or an integer indicating the number of folds (default 5).. :param n_bins: an int with the number of bins to use to compute the histograms. :param divergence: a str indicating the name of divergence (currently supported ones are "HD" or "topsoe"), or a callable function computes the divergence between two distributions (two equally sized arrays). :param tol: a float with the tolerance for the ternary search algorithm. """ - def __init__(self, classifier: BaseEstimator, val_split=0.4, n_bins=8, divergence: Union[str, Callable]= 'HD', tol=1e-05): + def __init__(self, classifier: BaseEstimator, val_split=5, n_bins=8, divergence: Union[str, Callable]= 'HD', tol=1e-05): self.classifier = classifier self.val_split = val_split self.tol = tol @@ -646,22 +747,17 @@ class DyS(AggregativeProbabilisticQuantifier, BinaryQuantifier): # Left and right are the current bounds; the maximum is between them return (left + right) / 2 - def fit(self, data: LabelledCollection, fit_classifier=True, val_split: Union[float, LabelledCollection] = None): - if val_split is None: - val_split = self.val_split - - self._check_binary(data, self.__class__.__name__) - self.classifier, validation = _training_helper( - self.classifier, data, fit_classifier, ensure_probabilistic=True, val_split=val_split) - Px = self.classify(validation.instances)[:, 1] # takes only the P(y=+1|x) - self.Pxy1 = Px[validation.labels == self.classifier.classes_[1]] - self.Pxy0 = Px[validation.labels == self.classifier.classes_[0]] + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): + Px, y = classif_predictions.Xy + Px = Px[:, self.pos_label] # takes only the P(y=+1|x) + self.Pxy1 = Px[y == self.pos_label] + self.Pxy0 = Px[y == self.neg_label] self.Pxy1_density = np.histogram(self.Pxy1, bins=self.n_bins, range=(0, 1), density=True)[0] self.Pxy0_density = np.histogram(self.Pxy0, bins=self.n_bins, range=(0, 1), density=True)[0] return self def aggregate(self, classif_posteriors): - Px = classif_posteriors[:, 1] # takes only the P(y=+1|x) + Px = classif_posteriors[:, self.pos_label] # takes only the P(y=+1|x) Px_test = np.histogram(Px, bins=self.n_bins, range=(0, 1), density=True)[0] divergence = get_divergence(self.divergence) @@ -671,49 +767,42 @@ class DyS(AggregativeProbabilisticQuantifier, BinaryQuantifier): return divergence(Px_train, Px_test) class1_prev = self._ternary_search(f=distribution_distance, left=0, right=1, tol=self.tol) - return np.asarray([1 - class1_prev, class1_prev]) + return F.as_binary_prevalence(class1_prev) -class SMM(AggregativeProbabilisticQuantifier, BinaryQuantifier): +class SMM(AggregativeSoftQuantifier, BinaryAggregativeQuantifier): """ `SMM method `_ (SMM). SMM is a simplification of matching distribution methods where the representation of the examples - is created using the mean instead of a histogram. + is created using the mean instead of a histogram (conceptually equivalent to PACC). :param classifier: a sklearn's Estimator that generates a binary classifier. :param val_split: a float in range (0,1) indicating the proportion of data to be used as a stratified held-out - validation distribution, or a :class:`quapy.data.base.LabelledCollection` (the split itself). + validation distribution, or a :class:`quapy.data.base.LabelledCollection` (the split itself), or an integer indicating the number of folds (default 5).. """ - def __init__(self, classifier: BaseEstimator, val_split=0.4): + def __init__(self, classifier: BaseEstimator, val_split=5): self.classifier = classifier self.val_split = val_split - def fit(self, data: LabelledCollection, fit_classifier=True, val_split: Union[float, LabelledCollection] = None): - if val_split is None: - val_split = self.val_split - - self._check_binary(data, self.__class__.__name__) - self.classifier, validation = _training_helper( - self.classifier, data, fit_classifier, ensure_probabilistic=True, val_split=val_split) - Px = self.classify(validation.instances)[:, 1] # takes only the P(y=+1|x) - self.Pxy1 = Px[validation.labels == self.classifier.classes_[1]] - self.Pxy0 = Px[validation.labels == self.classifier.classes_[0]] - self.Pxy1_mean = np.mean(self.Pxy1) - self.Pxy0_mean = np.mean(self.Pxy0) + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): + Px, y = classif_predictions.Xy + Px = Px[:, self.pos_label] # takes only the P(y=+1|x) + self.Pxy1 = Px[y == self.pos_label] + self.Pxy0 = Px[y == self.neg_label] + self.Pxy1_mean = np.mean(self.Pxy1) # equiv. TPR + self.Pxy0_mean = np.mean(self.Pxy0) # equiv. FPR return self def aggregate(self, classif_posteriors): - Px = classif_posteriors[:, 1] # takes only the P(y=+1|x) + Px = classif_posteriors[:, self.pos_label] # takes only the P(y=+1|x) Px_mean = np.mean(Px) class1_prev = (Px_mean - self.Pxy0_mean)/(self.Pxy1_mean - self.Pxy0_mean) - class1_prev = np.clip(class1_prev, 0, 1) - - return np.asarray([1 - class1_prev, class1_prev]) + return F.as_binary_prevalence(class1_prev, clip_if_necessary=True) -class DMy(AggregativeProbabilisticQuantifier): +class DMy(AggregativeSoftQuantifier): """ Generic Distribution Matching quantifier for binary or multiclass quantification based on the space of posterior probabilities. This implementation takes the number of bins, the divergence, and the possibility to work on CDF @@ -722,9 +811,9 @@ class DMy(AggregativeProbabilisticQuantifier): :param classifier: a `sklearn`'s Estimator that generates a probabilistic classifier :param val_split: indicates the proportion of data to be used as a stratified held-out validation set to model the validation distribution. - This parameter can be indicated as a real value (between 0 and 1, default 0.4), representing a proportion of + This parameter can be indicated as a real value (between 0 and 1), representing a proportion of validation data, or as an integer, indicating that the validation distribution should be estimated via - `k`-fold cross validation (this integer stands for the number of folds `k`), or as a + `k`-fold cross validation (this integer stands for the number of folds `k`, defaults 5), or as a :class:`quapy.data.base.LabelledCollection` (the split itself). :param nbins: number of bins used to discretize the distributions (default 8) :param divergence: a string representing a divergence measure (currently, "HD" and "topsoe" are implemented) @@ -734,7 +823,7 @@ class DMy(AggregativeProbabilisticQuantifier): :param n_jobs: number of parallel workers (default None) """ - def __init__(self, classifier, val_split=0.4, nbins=8, divergence: Union[str, Callable]='HD', + def __init__(self, classifier, val_split=5, nbins=8, divergence: Union[str, Callable]='HD', cdf=False, search='optim_minimize', n_jobs=None): self.classifier = classifier self.val_split = val_split @@ -744,15 +833,15 @@ class DMy(AggregativeProbabilisticQuantifier): self.search = search self.n_jobs = n_jobs - @classmethod - def HDy(cls, classifier, val_split=0.4, n_jobs=None): - from quapy.method.meta import MedianEstimator + # @classmethod + # def HDy(cls, classifier, val_split=5, n_jobs=None): + # from quapy.method.meta import MedianEstimator + # + # hdy = DMy(classifier=classifier, val_split=val_split, search='linear_search', divergence='HD') + # hdy = AggregativeMedianEstimator(hdy, param_grid={'nbins': np.linspace(10, 110, 11).astype(int)}, n_jobs=n_jobs) + # return hdy - hdy = DMy(classifier=classifier, val_split=val_split, search='linear_search', divergence='HD') - hdy = MedianEstimator(hdy, param_grid={'nbins': np.linspace(10, 110, 11).astype(int)}, n_jobs=n_jobs) - return hdy - - def __get_distributions(self, posteriors): + def _get_distributions(self, posteriors): histograms = [] post_dims = posteriors.shape[1] if post_dims == 2: @@ -768,7 +857,7 @@ class DMy(AggregativeProbabilisticQuantifier): distributions = np.cumsum(distributions, axis=1) return distributions - def fit(self, data: LabelledCollection, fit_classifier=True, val_split: Union[float, LabelledCollection] = None): + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): """ Trains the classifier (if requested) and generates the validation distributions out of the training data. The validation distributions have shape `(n, ch, nbins)`, with `n` the number of classes, `ch` the number of @@ -784,19 +873,16 @@ class DMy(AggregativeProbabilisticQuantifier): indicating the validation set itself, or an int indicating the number k of folds to be used in kFCV to estimate the parameters """ - if val_split is None: - val_split = self.val_split + posteriors, true_labels = classif_predictions.Xy + n_classes = len(self.classifier.classes_) - self.classifier, y, posteriors, classes, class_count = cross_generate_predictions( - data, self.classifier, val_split, probabilistic=True, fit_classifier=fit_classifier, n_jobs=self.n_jobs + self.validation_distribution = qp.util.parallel( + func=self._get_distributions, + args=[posteriors[true_labels==cat] for cat in range(n_classes)], + n_jobs=self.n_jobs, + backend='threading' ) - self.validation_distribution = np.asarray( - [self.__get_distributions(posteriors[y==cat]) for cat in range(data.n_classes)] - ) - - return self - def aggregate(self, posteriors: np.ndarray): """ Searches for the mixture model parameter (the sought prevalence values) that yields a validation distribution @@ -808,7 +894,7 @@ class DMy(AggregativeProbabilisticQuantifier): :param posteriors: posterior probabilities of the instances in the sample :return: a vector of class prevalence estimates """ - test_distribution = self.__get_distributions(posteriors) + test_distribution = self._get_distributions(posteriors) divergence = get_divergence(self.divergence) n_classes, n_channels, nbins = self.validation_distribution.shape def loss(prev): @@ -972,7 +1058,7 @@ def newSVMRAE(svmperf_base=None, C=1): return newELM(svmperf_base, loss='mrae', C=C) -class ThresholdOptimization(AggregativeQuantifier, BinaryQuantifier): +class ThresholdOptimization(BinaryAggregativeQuantifier): """ Abstract class of Threshold Optimization variants for :class:`ACC` as proposed by `Forman 2006 `_ and @@ -985,35 +1071,19 @@ class ThresholdOptimization(AggregativeQuantifier, BinaryQuantifier): :param classifier: a sklearn's Estimator that generates a classifier :param val_split: indicates the proportion of data to be used as a stratified held-out validation set in which the misclassification rates are to be estimated. - This parameter can be indicated as a real value (between 0 and 1, default 0.4), representing a proportion of + This parameter can be indicated as a real value (between 0 and 1), representing a proportion of validation data, or as an integer, indicating that the misclassification rates should be estimated via - `k`-fold cross validation (this integer stands for the number of folds `k`), or as a + `k`-fold cross validation (this integer stands for the number of folds `k`, defaults 5), or as a :class:`quapy.data.base.LabelledCollection` (the split itself). """ - def __init__(self, classifier: BaseEstimator, val_split=0.4, n_jobs=None): + def __init__(self, classifier: BaseEstimator, val_split=5, n_jobs=None): self.classifier = classifier self.val_split = val_split self.n_jobs = qp._get_njobs(n_jobs) - def fit(self, data: LabelledCollection, fit_classifier=True, val_split: Union[float, int, LabelledCollection] = None): - self._check_binary(data, "Threshold Optimization") - - if val_split is None: - val_split = self.val_split - - self.classifier, y, y_, classes, class_count = cross_generate_predictions( - data, self.classifier, val_split, probabilistic=True, fit_classifier=fit_classifier, n_jobs=self.n_jobs - ) - - self.cc = CC(self.classifier) - - self.tpr, self.fpr = self._optimize_threshold(y, y_) - - return self - @abstractmethod - def _condition(self, tpr, fpr) -> float: + def condition(self, tpr, fpr) -> float: """ Implements the criterion according to which the threshold should be selected. This function should return the (float) score to be minimized. @@ -1024,45 +1094,65 @@ class ThresholdOptimization(AggregativeQuantifier, BinaryQuantifier): """ ... - def _optimize_threshold(self, y, probabilities): + def discard(self, tpr, fpr) -> bool: + """ + Indicates whether a combination of tpr and fpr should be discarded + + :param tpr: float, true positive rate + :param fpr: float, false positive rate + :return: true if the combination is to be discarded, false otherwise + """ + return (tpr - fpr) == 0 + + + def _eval_candidate_thresholds(self, decision_scores, y): """ Seeks for the best `tpr` and `fpr` according to the score obtained at different decision thresholds. The scoring function is implemented in function `_condition`. + :param decision_scores: array-like with the classification scores :param y: predicted labels for the validation set (or for the training set via `k`-fold cross validation) - :param probabilities: array-like with the posterior probabilities - :return: best `tpr` and `fpr` according to `_condition` + :return: best `tpr` and `fpr` and `threshold` according to `_condition` """ - best_candidate_threshold_score = None - best_tpr = 0 - best_fpr = 0 - candidate_thresholds = np.unique(probabilities[:, 1]) + candidate_thresholds = np.unique(decision_scores) + + candidates = [] + scores = [] for candidate_threshold in candidate_thresholds: - y_ = [self.classes_[1] if p > candidate_threshold else self.classes_[0] for p in probabilities[:, 1]] + y_ = self.classes_[1 * (decision_scores >= candidate_threshold)] TP, FP, FN, TN = self._compute_table(y, y_) - tpr = self._compute_tpr(TP, FP) + tpr = self._compute_tpr(TP, FN) fpr = self._compute_fpr(FP, TN) - condition_score = self._condition(tpr, fpr) - if best_candidate_threshold_score is None or condition_score < best_candidate_threshold_score: - best_candidate_threshold_score = condition_score - best_tpr = tpr - best_fpr = fpr + if not self.discard(tpr, fpr): + candidate_score = self.condition(tpr, fpr) + candidates.append([tpr, fpr, candidate_threshold]) + scores.append(candidate_score) - return best_tpr, best_fpr + if len(candidates) == 0: + # if no candidate gives rise to a valid combination of tpr and fpr, this method defaults to the standard + # classify & count; this is akin to assign tpr=1, fpr=0, threshold=0 + tpr, fpr, threshold = 1, 0, 0 + candidates.append([tpr, fpr, threshold]) + scores.append(0) - def aggregate(self, classif_predictions): - prevs_estim = self.cc.aggregate(classif_predictions) - if self.tpr - self.fpr == 0: - return prevs_estim - adjusted_prevs_estim = np.clip((prevs_estim[1] - self.fpr) / (self.tpr - self.fpr), 0, 1) - adjusted_prevs_estim = np.array((1 - adjusted_prevs_estim, adjusted_prevs_estim)) - return adjusted_prevs_estim + candidates = np.asarray(candidates) + candidates = candidates[np.argsort(scores)] # sort candidates by candidate_score + + return candidates + + def aggregate_with_threshold(self, classif_predictions, tprs, fprs, thresholds): + # This function performs the adjusted count for given tpr, fpr, and threshold. + # Note that, due to broadcasting, tprs, fprs, and thresholds could be arrays of length > 1 + prevs_estims = np.mean(classif_predictions[:, None] >= thresholds, axis=0) + prevs_estims = (prevs_estims - fprs) / (tprs - fprs) + prevs_estims = F.as_binary_prevalence(prevs_estims, clip_if_necessary=True) + return prevs_estims.squeeze() def _compute_table(self, y, y_): - TP = np.logical_and(y == y_, y == self.classes_[1]).sum() - FP = np.logical_and(y != y_, y == self.classes_[0]).sum() - FN = np.logical_and(y != y_, y == self.classes_[1]).sum() - TN = np.logical_and(y == y_, y == self.classes_[0]).sum() + TP = np.logical_and(y == y_, y == self.pos_label).sum() + FP = np.logical_and(y != y_, y == self.neg_label).sum() + FN = np.logical_and(y != y_, y == self.pos_label).sum() + TN = np.logical_and(y == y_, y == self.neg_label).sum() return TP, FP, FN, TN def _compute_tpr(self, TP, FP): @@ -1075,28 +1165,38 @@ class ThresholdOptimization(AggregativeQuantifier, BinaryQuantifier): return 0 return FP / (FP + TN) + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): + decision_scores, y = classif_predictions.Xy + # the standard behavior is to keep the best threshold only + self.tpr, self.fpr, self.threshold = self._eval_candidate_thresholds(decision_scores, y)[0] + return self + + def aggregate(self, classif_predictions: np.ndarray): + # the standard behavior is to compute the adjusted count using the best threshold found + return self.aggregate_with_threshold(classif_predictions, self.tpr, self.fpr, self.threshold) + class T50(ThresholdOptimization): """ Threshold Optimization variant for :class:`ACC` as proposed by `Forman 2006 `_ and `Forman 2008 `_ that looks - for the threshold that makes `tpr` cosest to 0.5. + for the threshold that makes `tpr` closest to 0.5. The goal is to bring improved stability to the denominator of the adjustment. :param classifier: a sklearn's Estimator that generates a classifier :param val_split: indicates the proportion of data to be used as a stratified held-out validation set in which the misclassification rates are to be estimated. - This parameter can be indicated as a real value (between 0 and 1, default 0.4), representing a proportion of + This parameter can be indicated as a real value (between 0 and 1), representing a proportion of validation data, or as an integer, indicating that the misclassification rates should be estimated via - `k`-fold cross validation (this integer stands for the number of folds `k`), or as a + `k`-fold cross validation (this integer stands for the number of folds `k`, defaults 5), or as a :class:`quapy.data.base.LabelledCollection` (the split itself). """ - def __init__(self, classifier: BaseEstimator, val_split=0.4): + def __init__(self, classifier: BaseEstimator, val_split=5): super().__init__(classifier, val_split) - def _condition(self, tpr, fpr) -> float: + def condition(self, tpr, fpr) -> float: return abs(tpr - 0.5) @@ -1111,16 +1211,16 @@ class MAX(ThresholdOptimization): :param classifier: a sklearn's Estimator that generates a classifier :param val_split: indicates the proportion of data to be used as a stratified held-out validation set in which the misclassification rates are to be estimated. - This parameter can be indicated as a real value (between 0 and 1, default 0.4), representing a proportion of + This parameter can be indicated as a real value (between 0 and 1), representing a proportion of validation data, or as an integer, indicating that the misclassification rates should be estimated via - `k`-fold cross validation (this integer stands for the number of folds `k`), or as a + `k`-fold cross validation (this integer stands for the number of folds `k`, defaults 5), or as a :class:`quapy.data.base.LabelledCollection` (the split itself). """ - def __init__(self, classifier: BaseEstimator, val_split=0.4): + def __init__(self, classifier: BaseEstimator, val_split=5): super().__init__(classifier, val_split) - def _condition(self, tpr, fpr) -> float: + def condition(self, tpr, fpr) -> float: # MAX strives to maximize (tpr - fpr), which is equivalent to minimize (fpr - tpr) return (fpr - tpr) @@ -1136,16 +1236,16 @@ class X(ThresholdOptimization): :param classifier: a sklearn's Estimator that generates a classifier :param val_split: indicates the proportion of data to be used as a stratified held-out validation set in which the misclassification rates are to be estimated. - This parameter can be indicated as a real value (between 0 and 1, default 0.4), representing a proportion of + This parameter can be indicated as a real value (between 0 and 1), representing a proportion of validation data, or as an integer, indicating that the misclassification rates should be estimated via - `k`-fold cross validation (this integer stands for the number of folds `k`), or as a + `k`-fold cross validation (this integer stands for the number of folds `k`, defaults 5), or as a :class:`quapy.data.base.LabelledCollection` (the split itself). """ - def __init__(self, classifier: BaseEstimator, val_split=0.4): + def __init__(self, classifier: BaseEstimator, val_split=5): super().__init__(classifier, val_split) - def _condition(self, tpr, fpr) -> float: + def condition(self, tpr, fpr) -> float: return abs(1 - (tpr + fpr)) @@ -1160,29 +1260,31 @@ class MS(ThresholdOptimization): :param classifier: a sklearn's Estimator that generates a classifier :param val_split: indicates the proportion of data to be used as a stratified held-out validation set in which the misclassification rates are to be estimated. - This parameter can be indicated as a real value (between 0 and 1, default 0.4), representing a proportion of + This parameter can be indicated as a real value (between 0 and 1), representing a proportion of validation data, or as an integer, indicating that the misclassification rates should be estimated via - `k`-fold cross validation (this integer stands for the number of folds `k`), or as a + `k`-fold cross validation (this integer stands for the number of folds `k`, defaults 5), or as a :class:`quapy.data.base.LabelledCollection` (the split itself). """ - def __init__(self, classifier: BaseEstimator, val_split=0.4): + def __init__(self, classifier: BaseEstimator, val_split=5): super().__init__(classifier, val_split) - def _condition(self, tpr, fpr) -> float: - pass + def condition(self, tpr, fpr) -> float: + return 1 - def _optimize_threshold(self, y, probabilities): - tprs = [] - fprs = [] - candidate_thresholds = np.unique(probabilities[:, 1]) - for candidate_threshold in candidate_thresholds: - y_ = [self.classes_[1] if p > candidate_threshold else self.classes_[0] for p in probabilities[:, 1]] - TP, FP, FN, TN = self._compute_table(y, y_) - tpr = self._compute_tpr(TP, FP) - fpr = self._compute_fpr(FP, TN) - tprs.append(tpr) - fprs.append(fpr) - return np.median(tprs), np.median(fprs) + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): + decision_scores, y = classif_predictions.Xy + # keeps all candidates + tprs_fprs_thresholds = self._eval_candidate_thresholds(decision_scores, y) + self.tprs = tprs_fprs_thresholds[:, 0] + self.fprs = tprs_fprs_thresholds[:, 1] + self.thresholds = tprs_fprs_thresholds[:, 2] + return self + + def aggregate(self, classif_predictions: np.ndarray): + prevalences = self.aggregate_with_threshold(classif_predictions, self.tprs, self.fprs, self.thresholds) + if prevalences.ndim==2: + prevalences = np.median(prevalences, axis=0) + return prevalences class MS2(MS): @@ -1197,27 +1299,16 @@ class MS2(MS): :param classifier: a sklearn's Estimator that generates a classifier :param val_split: indicates the proportion of data to be used as a stratified held-out validation set in which the misclassification rates are to be estimated. - This parameter can be indicated as a real value (between 0 and 1, default 0.4), representing a proportion of + This parameter can be indicated as a real value (between 0 and 1), representing a proportion of validation data, or as an integer, indicating that the misclassification rates should be estimated via - `k`-fold cross validation (this integer stands for the number of folds `k`), or as a + `k`-fold cross validation (this integer stands for the number of folds `k`, defaults 5), or as a :class:`quapy.data.base.LabelledCollection` (the split itself). """ - def __init__(self, classifier: BaseEstimator, val_split=0.4): + def __init__(self, classifier: BaseEstimator, val_split=5): super().__init__(classifier, val_split) - def _optimize_threshold(self, y, probabilities): - tprs = [0, 1] - fprs = [0, 1] - candidate_thresholds = np.unique(probabilities[:, 1]) - for candidate_threshold in candidate_thresholds: - y_ = [self.classes_[1] if p > candidate_threshold else self.classes_[0] for p in probabilities[:, 1]] - TP, FP, FN, TN = self._compute_table(y, y_) - tpr = self._compute_tpr(TP, FP) - fpr = self._compute_fpr(FP, TN) - if (tpr - fpr) > 0.25: - tprs.append(tpr) - fprs.append(fpr) - return np.median(tprs), np.median(fprs) + def discard(self, tpr, fpr) -> bool: + return (tpr-fpr) <= 0.25 class OneVsAllAggregative(OneVsAllGeneric, AggregativeQuantifier): @@ -1261,7 +1352,7 @@ class OneVsAllAggregative(OneVsAllGeneric, AggregativeQuantifier): """ classif_predictions = self._parallel(self._delayed_binary_classification, instances) - if isinstance(self.binary_quantifier, AggregativeProbabilisticQuantifier): + if isinstance(self.binary_quantifier, AggregativeSoftQuantifier): return np.swapaxes(classif_predictions, 0, 1) else: return classif_predictions.T @@ -1278,6 +1369,113 @@ class OneVsAllAggregative(OneVsAllGeneric, AggregativeQuantifier): return self.dict_binary_quantifiers[c].aggregate(classif_predictions[:, c])[1] +class AggregativeMedianEstimator(BinaryQuantifier): + """ + This method is a meta-quantifier that returns, as the estimated class prevalence values, the median of the + estimation returned by differently (hyper)parameterized base quantifiers. + The median of unit-vectors is only guaranteed to be a unit-vector for n=2 dimensions, + i.e., in cases of binary quantification. + + :param base_quantifier: the base, binary quantifier + :param random_state: a seed to be set before fitting any base quantifier (default None) + :param param_grid: the grid or parameters towards which the median will be computed + :param n_jobs: number of parllel workes + """ + def __init__(self, base_quantifier: AggregativeQuantifier, param_grid: dict, random_state=None, n_jobs=None): + self.base_quantifier = base_quantifier + self.param_grid = param_grid + self.random_state = random_state + self.n_jobs = qp._get_njobs(n_jobs) + + def get_params(self, deep=True): + return self.base_quantifier.get_params(deep) + + def set_params(self, **params): + self.base_quantifier.set_params(**params) + + def _delayed_fit(self, args): + with qp.util.temp_seed(self.random_state): + params, training = args + model = deepcopy(self.base_quantifier) + model.set_params(**params) + model.fit(training) + return model + + def _delayed_fit_classifier(self, args): + with qp.util.temp_seed(self.random_state): + print('enter job') + cls_params, training, kwargs = args + model = deepcopy(self.base_quantifier) + model.set_params(**cls_params) + predictions = model.classifier_fit_predict(training, **kwargs) + print('exit job') + return (model, predictions) + + def _delayed_fit_aggregation(self, args): + with qp.util.temp_seed(self.random_state): + ((model, predictions), q_params), training = args + model = deepcopy(model) + model.set_params(**q_params) + model.aggregation_fit(predictions, training) + return model + + + def fit(self, training: LabelledCollection, **kwargs): + import itertools + + self._check_binary(training, self.__class__.__name__) + + if isinstance(self.base_quantifier, AggregativeQuantifier): + cls_configs, q_configs = qp.model_selection.group_params(self.param_grid) + + if len(cls_configs) > 1: + models_preds = qp.util.parallel( + self._delayed_fit_classifier, + ((params, training, kwargs) for params in cls_configs), + seed=qp.environ.get('_R_SEED', None), + n_jobs=self.n_jobs, + asarray=False, + backend='threading' + ) + else: + print('only 1') + model = self.base_quantifier + model.set_params(**cls_configs[0]) + predictions = model.classifier_fit_predict(training, **kwargs) + models_preds = [(model, predictions)] + + self.models = qp.util.parallel( + self._delayed_fit_aggregation, + ((setup, training) for setup in itertools.product(models_preds, q_configs)), + seed=qp.environ.get('_R_SEED', None), + n_jobs=self.n_jobs, + backend='threading' + ) + else: + configs = qp.model_selection.expand_grid(self.param_grid) + self.models = qp.util.parallel( + self._delayed_fit, + ((params, training) for params in configs), + seed=qp.environ.get('_R_SEED', None), + n_jobs=self.n_jobs, + backend='threading' + ) + return self + + def _delayed_predict(self, args): + model, instances = args + return model.quantify(instances) + + def quantify(self, instances): + prev_preds = qp.util.parallel( + self._delayed_predict, + ((model, instances) for model in self.models), + seed=qp.environ.get('_R_SEED', None), + n_jobs=self.n_jobs, + backend='threading' + ) + return np.median(prev_preds, axis=0) + #--------------------------------------------------------------- # aliases #--------------------------------------------------------------- diff --git a/quapy/method/base.py b/quapy/method/base.py index e0363f1..f34acf6 100644 --- a/quapy/method/base.py +++ b/quapy/method/base.py @@ -63,7 +63,7 @@ def newOneVsAll(binary_quantifier, n_jobs=None): return OneVsAllGeneric(binary_quantifier, n_jobs) -class OneVsAllGeneric(OneVsAll,BaseQuantifier): +class OneVsAllGeneric(OneVsAll, BaseQuantifier): """ Allows any binary quantifier to perform quantification on single-label datasets. The method maintains one binary quantifier for each class, and then l1-normalizes the outputs so that the class prevelence values sum up to 1. diff --git a/quapy/method/kdey.py b/quapy/method/kdey.py new file mode 100644 index 0000000..b3cdf0e --- /dev/null +++ b/quapy/method/kdey.py @@ -0,0 +1,214 @@ +from typing import Union +import numpy as np +from sklearn.base import BaseEstimator +from sklearn.neighbors import KernelDensity + +import quapy as qp +from quapy.data import LabelledCollection +from quapy.method.aggregative import AggregativeSoftQuantifier +import quapy.functional as F + +from sklearn.metrics.pairwise import rbf_kernel + + +class KDEBase: + + BANDWIDTH_METHOD = ['scott', 'silverman'] + + @classmethod + def _check_bandwidth(cls, bandwidth): + assert bandwidth in KDEBase.BANDWIDTH_METHOD or isinstance(bandwidth, float), \ + f'invalid bandwidth, valid ones are {KDEBase.BANDWIDTH_METHOD} or float values' + if isinstance(bandwidth, float): + assert 0 < bandwidth < 1, "the bandwith for KDEy should be in (0,1), since this method models the unit simplex" + + def get_kde_function(self, X, bandwidth): + return KernelDensity(bandwidth=bandwidth).fit(X) + + def pdf(self, kde, X): + return np.exp(kde.score_samples(X)) + + def get_mixture_components(self, X, y, n_classes, bandwidth): + return [self.get_kde_function(X[y == cat], bandwidth) for cat in range(n_classes)] + + + +class KDEyML(AggregativeSoftQuantifier, KDEBase): + + def __init__(self, classifier: BaseEstimator, val_split=10, bandwidth=0.1, n_jobs=None, random_state=0): + self._check_bandwidth(bandwidth) + self.classifier = classifier + self.val_split = val_split + self.bandwidth = bandwidth + self.n_jobs = n_jobs + self.random_state=random_state + + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): + self.mix_densities = self.get_mixture_components(*classif_predictions.Xy, data.n_classes, self.bandwidth) + return self + + def aggregate(self, posteriors: np.ndarray): + """ + Searches for the mixture model parameter (the sought prevalence values) that maximizes the likelihood + of the data (i.e., that minimizes the negative log-likelihood) + + :param posteriors: instances in the sample converted into posterior probabilities + :return: a vector of class prevalence estimates + """ + np.random.RandomState(self.random_state) + epsilon = 1e-10 + n_classes = len(self.mix_densities) + test_densities = [self.pdf(kde_i, posteriors) for kde_i in self.mix_densities] + + def neg_loglikelihood(prev): + test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip (prev, test_densities)) + test_loglikelihood = np.log(test_mixture_likelihood + epsilon) + return -np.sum(test_loglikelihood) + + return F.optim_minimize(neg_loglikelihood, n_classes) + + +class KDEyHD(AggregativeSoftQuantifier, KDEBase): + + def __init__(self, classifier: BaseEstimator, val_split=10, divergence: str='HD', + bandwidth=0.1, n_jobs=None, random_state=0, montecarlo_trials=10000): + + self._check_bandwidth(bandwidth) + self.classifier = classifier + self.val_split = val_split + self.divergence = divergence + self.bandwidth = bandwidth + self.n_jobs = n_jobs + self.random_state=random_state + self.montecarlo_trials = montecarlo_trials + + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): + self.mix_densities = self.get_mixture_components(*classif_predictions.Xy, data.n_classes, self.bandwidth) + + N = self.montecarlo_trials + rs = self.random_state + n = data.n_classes + self.reference_samples = np.vstack([kde_i.sample(N//n, random_state=rs) for kde_i in self.mix_densities]) + self.reference_classwise_densities = np.asarray([self.pdf(kde_j, self.reference_samples) for kde_j in self.mix_densities]) + self.reference_density = np.mean(self.reference_classwise_densities, axis=0) # equiv. to (uniform @ self.reference_classwise_densities) + + return self + + def aggregate(self, posteriors: np.ndarray): + # we retain all n*N examples (sampled from a mixture with uniform parameter), and then + # apply importance sampling (IS). In this version we compute D(p_alpha||q) with IS + n_classes = len(self.mix_densities) + + test_kde = self.get_kde_function(posteriors, self.bandwidth) + test_densities = self.pdf(test_kde, self.reference_samples) + + def f_squared_hellinger(u): + return (np.sqrt(u)-1)**2 + + # todo: this will fail when self.divergence is a callable, and is not the right place to do it anyway + if self.divergence.lower() == 'hd': + f = f_squared_hellinger + else: + raise ValueError('only squared HD is currently implemented') + + epsilon = 1e-10 + qs = test_densities + epsilon + rs = self.reference_density + epsilon + iw = qs/rs #importance weights + p_class = self.reference_classwise_densities + epsilon + fracs = p_class/qs + + def divergence(prev): + # ps / qs = (prev @ p_class) / qs = prev @ (p_class / qs) = prev @ fracs + ps_div_qs = prev @ fracs + return np.mean( f(ps_div_qs) * iw ) + + return F.optim_minimize(divergence, n_classes) + + +class KDEyCS(AggregativeSoftQuantifier): + + def __init__(self, classifier: BaseEstimator, val_split=10, bandwidth=0.1, n_jobs=None, random_state=0): + KDEBase._check_bandwidth(bandwidth) + self.classifier = classifier + self.val_split = val_split + self.bandwidth = bandwidth + self.n_jobs = n_jobs + self.random_state=random_state + + def gram_matrix_mix_sum(self, X, Y=None): + # this adapts the output of the rbf_kernel function (pairwise evaluations of Gaussian kernels k(x,y)) + # to contain pairwise evaluations of N(x|mu,Sigma1+Sigma2) with mu=y and Sigma1 and Sigma2 are + # two "scalar matrices" (h^2)*I each, so Sigma1+Sigma2 has scalar 2(h^2) (h is the bandwidth) + h = self.bandwidth + variance = 2 * (h**2) + nD = X.shape[1] + gamma = 1/(2*variance) + norm_factor = 1/np.sqrt(((2*np.pi)**nD) * (variance**(nD))) + gram = norm_factor * rbf_kernel(X, Y, gamma=gamma) + return gram.sum() + + def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection): + + P, y = classif_predictions.Xy + n = data.n_classes + + assert all(sorted(np.unique(y)) == np.arange(n)), \ + 'label name gaps not allowed in current implementation' + + + # counts_inv keeps track of the relative weight of each datapoint within its class + # (i.e., the weight in its KDE model) + counts_inv = 1 / (data.counts()) + + # tr_tr_sums corresponds to symbol \overline{B} in the paper + tr_tr_sums = np.zeros(shape=(n,n), dtype=float) + for i in range(n): + for j in range(n): + if i > j: + tr_tr_sums[i,j] = tr_tr_sums[j,i] + else: + block = self.gram_matrix_mix_sum(P[y == i], P[y == j] if i!=j else None) + tr_tr_sums[i, j] = block + + # keep track of these data structures for the test phase + self.Ptr = P + self.ytr = y + self.tr_tr_sums = tr_tr_sums + self.counts_inv = counts_inv + + return self + + + def aggregate(self, posteriors: np.ndarray): + Ptr = self.Ptr + Pte = posteriors + y = self.ytr + tr_tr_sums = self.tr_tr_sums + + M, nD = Pte.shape + Minv = (1/M) # t in the paper + n = Ptr.shape[1] + + + # becomes a constant that does not affect the optimization, no need to compute it + # partC = 0.5*np.log(self.gram_matrix_mix_sum(Pte) * Kinv * Kinv) + + # tr_te_sums corresponds to \overline{a}*(1/Li)*(1/M) in the paper (note the constants + # are already aggregated to tr_te_sums, so these multiplications are not carried out + # at each iteration of the optimization phase) + tr_te_sums = np.zeros(shape=n, dtype=float) + for i in range(n): + tr_te_sums[i] = self.gram_matrix_mix_sum(Ptr[y==i], Pte) + + def divergence(alpha): + # called \overline{r} in the paper + alpha_ratio = alpha * self.counts_inv + + # recal that tr_te_sums already accounts for the constant terms (1/Li)*(1/M) + partA = -np.log((alpha_ratio @ tr_te_sums) * Minv) + partB = 0.5 * np.log(alpha_ratio @ tr_tr_sums @ alpha_ratio) + return partA + partB #+ partC + + return F.optim_minimize(divergence, n) + diff --git a/quapy/method/meta.py b/quapy/method/meta.py index 7f111c0..d29433e 100644 --- a/quapy/method/meta.py +++ b/quapy/method/meta.py @@ -12,7 +12,7 @@ from quapy import functional as F from quapy.data import LabelledCollection from quapy.model_selection import GridSearchQ from quapy.method.base import BaseQuantifier, BinaryQuantifier -from quapy.method.aggregative import CC, ACC, PACC, HDy, EMQ +from quapy.method.aggregative import CC, ACC, PACC, HDy, EMQ, AggregativeQuantifier try: from . import neural @@ -26,6 +26,65 @@ else: QuaNet = "QuaNet is not available due to missing torch package" +class MedianEstimator2(BinaryQuantifier): + """ + This method is a meta-quantifier that returns, as the estimated class prevalence values, the median of the + estimation returned by differently (hyper)parameterized base quantifiers. + The median of unit-vectors is only guaranteed to be a unit-vector for n=2 dimensions, + i.e., in cases of binary quantification. + + :param base_quantifier: the base, binary quantifier + :param random_state: a seed to be set before fitting any base quantifier (default None) + :param param_grid: the grid or parameters towards which the median will be computed + :param n_jobs: number of parllel workes + """ + def __init__(self, base_quantifier: BinaryQuantifier, param_grid: dict, random_state=None, n_jobs=None): + self.base_quantifier = base_quantifier + self.param_grid = param_grid + self.random_state = random_state + self.n_jobs = qp._get_njobs(n_jobs) + + def get_params(self, deep=True): + return self.base_quantifier.get_params(deep) + + def set_params(self, **params): + self.base_quantifier.set_params(**params) + + def _delayed_fit(self, args): + with qp.util.temp_seed(self.random_state): + params, training = args + model = deepcopy(self.base_quantifier) + model.set_params(**params) + model.fit(training) + return model + + def fit(self, training: LabelledCollection): + self._check_binary(training, self.__class__.__name__) + + configs = qp.model_selection.expand_grid(self.param_grid) + self.models = qp.util.parallel( + self._delayed_fit, + ((params, training) for params in configs), + seed=qp.environ.get('_R_SEED', None), + n_jobs=self.n_jobs + ) + return self + + def _delayed_predict(self, args): + model, instances = args + return model.quantify(instances) + + def quantify(self, instances): + prev_preds = qp.util.parallel( + self._delayed_predict, + ((model, instances) for model in self.models), + seed=qp.environ.get('_R_SEED', None), + n_jobs=self.n_jobs + ) + prev_preds = np.asarray(prev_preds) + return np.median(prev_preds, axis=0) + + class MedianEstimator(BinaryQuantifier): """ This method is a meta-quantifier that returns, as the estimated class prevalence values, the median of the @@ -58,17 +117,64 @@ class MedianEstimator(BinaryQuantifier): model.fit(training) return model + def _delayed_fit_classifier(self, args): + with qp.util.temp_seed(self.random_state): + print('enter job') + cls_params, training = args + model = deepcopy(self.base_quantifier) + model.set_params(**cls_params) + predictions = model.classifier_fit_predict(training, predict_on=model.val_split) + print('exit job') + return (model, predictions) + + def _delayed_fit_aggregation(self, args): + with qp.util.temp_seed(self.random_state): + print('\tenter job') + ((model, predictions), q_params), training = args + model = deepcopy(model) + model.set_params(**q_params) + model.aggregation_fit(predictions, training) + print('\texit job') + return model + + def fit(self, training: LabelledCollection): self._check_binary(training, self.__class__.__name__) - params_keys = list(self.param_grid.keys()) - params_values = list(self.param_grid.values()) - hyper = [dict({k: val[i] for i, k in enumerate(params_keys)}) for val in itertools.product(*params_values)] - self.models = qp.util.parallel( - self._delayed_fit, - ((params, training) for params in hyper), - seed=qp.environ.get('_R_SEED', None), - n_jobs=self.n_jobs - ) + + if isinstance(self.base_quantifier, AggregativeQuantifier): + cls_configs, q_configs = qp.model_selection.group_params(self.param_grid) + + if len(cls_configs) > 1: + models_preds = qp.util.parallel( + self._delayed_fit_classifier, + ((params, training) for params in cls_configs), + seed=qp.environ.get('_R_SEED', None), + n_jobs=self.n_jobs, + asarray=False + ) + else: + print('only 1') + model = self.base_quantifier + model.set_params(**cls_configs[0]) + predictions = model.classifier_fit_predict(training, predict_on=model.val_split) + models_preds = [(model, predictions)] + + self.models = qp.util.parallel( + self._delayed_fit_aggregation, + ((setup, training) for setup in itertools.product(models_preds, q_configs)), + seed=qp.environ.get('_R_SEED', None), + n_jobs=self.n_jobs, + asarray=False + ) + else: + configs = qp.model_selection.expand_grid(self.param_grid) + self.models = qp.util.parallel( + self._delayed_fit, + ((params, training) for params in configs), + seed=qp.environ.get('_R_SEED', None), + n_jobs=self.n_jobs, + asarray=False + ) return self def _delayed_predict(self, args): @@ -80,13 +186,13 @@ class MedianEstimator(BinaryQuantifier): self._delayed_predict, ((model, instances) for model in self.models), seed=qp.environ.get('_R_SEED', None), - n_jobs=self.n_jobs + n_jobs=self.n_jobs, + asarray=False ) prev_preds = np.asarray(prev_preds) return np.median(prev_preds, axis=0) - class Ensemble(BaseQuantifier): VALID_POLICIES = {'ave', 'ptr', 'ds'} | qp.error.QUANTIFICATION_ERROR_NAMES diff --git a/quapy/method/neural.py b/quapy/method/neural.py index 2478055..330ac60 100644 --- a/quapy/method/neural.py +++ b/quapy/method/neural.py @@ -194,7 +194,7 @@ class QuaNetTrainer(BaseQuantifier): label_predictions = np.argmax(posteriors, axis=-1) prevs_estim = [] for quantifier in self.quantifiers.values(): - predictions = posteriors if isinstance(quantifier, AggregativeProbabilisticQuantifier) else label_predictions + predictions = posteriors if isinstance(quantifier, AggregativeSoftQuantifier) else label_predictions prevs_estim.extend(quantifier.aggregate(predictions)) # there is no real need for adding static estims like the TPR or FPR from training since those are constant diff --git a/quapy/method/non_aggregative.py b/quapy/method/non_aggregative.py index 87e59fb..6048bf6 100644 --- a/quapy/method/non_aggregative.py +++ b/quapy/method/non_aggregative.py @@ -1,7 +1,7 @@ from typing import Union, Callable import numpy as np -from functional import get_divergence +from quapy.functional import get_divergence from quapy.data import LabelledCollection from quapy.method.base import BaseQuantifier, BinaryQuantifier import quapy.functional as F diff --git a/quapy/model_selection.py b/quapy/model_selection.py index f02d9dc..307e7d3 100644 --- a/quapy/model_selection.py +++ b/quapy/model_selection.py @@ -1,7 +1,9 @@ import itertools import signal from copy import deepcopy +from enum import Enum from typing import Union, Callable +from functools import wraps import numpy as np from sklearn import clone @@ -10,10 +12,37 @@ import quapy as qp from quapy import evaluation from quapy.protocol import AbstractProtocol, OnLabelledCollectionProtocol from quapy.data.base import LabelledCollection -from quapy.method.aggregative import BaseQuantifier +from quapy.method.aggregative import BaseQuantifier, AggregativeQuantifier +from quapy.util import timeout from time import time +class Status(Enum): + SUCCESS = 1 + TIMEOUT = 2 + INVALID = 3 + ERROR = 4 + + +class ConfigStatus: + def __init__(self, params, status, msg=''): + self.params = params + self.status = status + self.msg = msg + + def __str__(self): + return f':params:{self.params} :status:{self.status} ' + self.msg + + def __repr__(self): + return str(self) + + def success(self): + return self.status == Status.SUCCESS + + def failed(self): + return self.status != Status.SUCCESS + + class GridSearchQ(BaseQuantifier): """Grid Search optimization targeting a quantification-oriented metric. @@ -26,11 +55,14 @@ class GridSearchQ(BaseQuantifier): :param protocol: a sample generation protocol, an instance of :class:`quapy.protocol.AbstractProtocol` :param error: an error function (callable) or a string indicating the name of an error function (valid ones are those in :class:`quapy.error.QUANTIFICATION_ERROR` - :param refit: whether or not to refit the model on the whole labelled collection (training+validation) with + :param refit: whether to refit the model on the whole labelled collection (training+validation) with the best chosen hyperparameter combination. Ignored if protocol='gen' :param timeout: establishes a timer (in seconds) for each of the hyperparameters configurations being tested. Whenever a run takes longer than this timer, that configuration will be ignored. If all configurations end up being ignored, a TimeoutError exception is raised. If -1 (default) then no time bound is set. + :param raise_errors: boolean, if True then raises an exception when a param combination yields any error, if + otherwise is False (default), then the combination is marked with an error status, but the process goes on. + However, if no configuration yields a valid model, then a ValueError exception will be raised. :param verbose: set to True to get information through the stdout """ @@ -42,6 +74,7 @@ class GridSearchQ(BaseQuantifier): refit=True, timeout=-1, n_jobs=None, + raise_errors=False, verbose=False): self.model = model @@ -50,6 +83,7 @@ class GridSearchQ(BaseQuantifier): self.refit = refit self.timeout = timeout self.n_jobs = qp._get_njobs(n_jobs) + self.raise_errors = raise_errors self.verbose = verbose self.__check_error(error) assert isinstance(protocol, AbstractProtocol), 'unknown protocol' @@ -69,6 +103,98 @@ class GridSearchQ(BaseQuantifier): raise ValueError(f'unexpected error type; must either be a callable function or a str representing\n' f'the name of an error function in {qp.error.QUANTIFICATION_ERROR_NAMES}') + def _prepare_classifier(self, cls_params): + model = deepcopy(self.model) + + def job(cls_params): + model.set_params(**cls_params) + predictions = model.classifier_fit_predict(self._training) + return predictions + + predictions, status, took = self._error_handler(job, cls_params) + self._sout(f'[classifier fit] hyperparams={cls_params} [took {took:.3f}s]') + return model, predictions, status, took + + def _prepare_aggregation(self, args): + model, predictions, cls_took, cls_params, q_params = args + model = deepcopy(model) + params = {**cls_params, **q_params} + + def job(q_params): + model.set_params(**q_params) + model.aggregation_fit(predictions, self._training) + score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error) + return score + + score, status, aggr_took = self._error_handler(job, q_params) + self._print_status(params, score, status, aggr_took) + return model, params, score, status, (cls_took+aggr_took) + + def _prepare_nonaggr_model(self, params): + model = deepcopy(self.model) + + def job(params): + model.set_params(**params) + model.fit(self._training) + score = evaluation.evaluate(model, protocol=self.protocol, error_metric=self.error) + return score + + score, status, took = self._error_handler(job, params) + self._print_status(params, score, status, took) + return model, params, score, status, took + + def _compute_scores_aggregative(self, training): + # break down the set of hyperparameters into two: classifier-specific, quantifier-specific + cls_configs, q_configs = group_params(self.param_grid) + + # train all classifiers and get the predictions + self._training = training + cls_outs = qp.util.parallel( + self._prepare_classifier, + cls_configs, + seed=qp.environ.get('_R_SEED', None), + n_jobs=self.n_jobs + ) + + # filter out classifier configurations that yielded any error + success_outs = [] + for (model, predictions, status, took), cls_config in zip(cls_outs, cls_configs): + if status.success(): + success_outs.append((model, predictions, took, cls_config)) + else: + self.error_collector.append(status) + + if len(success_outs) == 0: + raise ValueError('No valid configuration found for the classifier!') + + # explore the quantifier-specific hyperparameters for each valid training configuration + aggr_configs = [(*out, q_config) for out, q_config in itertools.product(success_outs, q_configs)] + aggr_outs = qp.util.parallel( + self._prepare_aggregation, + aggr_configs, + seed=qp.environ.get('_R_SEED', None), + n_jobs=self.n_jobs + ) + + return aggr_outs + + def _compute_scores_nonaggregative(self, training): + configs = expand_grid(self.param_grid) + self._training = training + scores = qp.util.parallel( + self._prepare_nonaggr_model, + configs, + seed=qp.environ.get('_R_SEED', None), + n_jobs=self.n_jobs + ) + return scores + + def _print_status(self, params, score, status, took): + if status.success(): + self._sout(f'hyperparams=[{params}]\t got {self.error.__name__} = {score:.5f} [took {took:.3f}s]') + else: + self._sout(f'error={status}') + def fit(self, training: LabelledCollection): """ Learning routine. Fits methods with all combinations of hyperparameters and selects the one minimizing the error metric. @@ -76,97 +202,63 @@ class GridSearchQ(BaseQuantifier): :param training: the training set on which to optimize the hyperparameters :return: self """ - params_keys = list(self.param_grid.keys()) - params_values = list(self.param_grid.values()) - protocol = self.protocol - - self.param_scores_ = {} - self.best_score_ = None + if self.refit and not isinstance(self.protocol, OnLabelledCollectionProtocol): + raise RuntimeWarning( + f'"refit" was requested, but the protocol does not implement ' + f'the {OnLabelledCollectionProtocol.__name__} interface' + ) tinit = time() - hyper = [dict({k: val[i] for i, k in enumerate(params_keys)}) for val in itertools.product(*params_values)] - self._sout(f'starting model selection with {self.n_jobs =}') - #pass a seed to parallel so it is set in clild processes - scores = qp.util.parallel( - self._delayed_eval, - ((params, training) for params in hyper), - seed=qp.environ.get('_R_SEED', None), - n_jobs=self.n_jobs - ) + self.error_collector = [] - for params, score, model in scores: - if score is not None: + self._sout(f'starting model selection with n_jobs={self.n_jobs}') + if isinstance(self.model, AggregativeQuantifier): + results = self._compute_scores_aggregative(training) + else: + results = self._compute_scores_nonaggregative(training) + + self.param_scores_ = {} + self.best_score_ = None + for model, params, score, status, took in results: + if status.success(): if self.best_score_ is None or score < self.best_score_: self.best_score_ = score self.best_params_ = params self.best_model_ = model self.param_scores_[str(params)] = score else: - self.param_scores_[str(params)] = 'timeout' + self.param_scores_[str(params)] = status.status + self.error_collector.append(status) tend = time()-tinit if self.best_score_ is None: - raise TimeoutError('no combination of hyperparameters seem to work') + raise ValueError('no combination of hyperparameters seemed to work') self._sout(f'optimization finished: best params {self.best_params_} (score={self.best_score_:.5f}) ' f'[took {tend:.4f}s]') + no_errors = len(self.error_collector) + if no_errors>0: + self._sout(f'warning: {no_errors} errors found') + for err in self.error_collector: + self._sout(f'\t{str(err)}') + if self.refit: - if isinstance(protocol, OnLabelledCollectionProtocol): + if isinstance(self.protocol, OnLabelledCollectionProtocol): + tinit = time() self._sout(f'refitting on the whole development set') - self.best_model_.fit(training + protocol.get_labelled_collection()) + self.best_model_.fit(training + self.protocol.get_labelled_collection()) + tend = time() - tinit + self.refit_time_ = tend else: - raise RuntimeWarning(f'"refit" was requested, but the protocol does not ' - f'implement the {OnLabelledCollectionProtocol.__name__} interface') + # already checked + raise RuntimeWarning(f'the model cannot be refit on the whole dataset') return self - def _delayed_eval(self, args): - params, training = args - - protocol = self.protocol - error = self.error - - if self.timeout > 0: - def handler(signum, frame): - raise TimeoutError() - - signal.signal(signal.SIGALRM, handler) - - tinit = time() - - if self.timeout > 0: - signal.alarm(self.timeout) - - try: - model = deepcopy(self.model) - # overrides default parameters with the parameters being explored at this iteration - model.set_params(**params) - model.fit(training) - score = evaluation.evaluate(model, protocol=protocol, error_metric=error) - - ttime = time()-tinit - self._sout(f'hyperparams={params}\t got {error.__name__} score {score:.5f} [took {ttime:.4f}s]') - - if self.timeout > 0: - signal.alarm(0) - except TimeoutError: - self._sout(f'timeout ({self.timeout}s) reached for config {params}') - score = None - except ValueError as e: - self._sout(f'the combination of hyperparameters {params} is invalid') - raise e - except Exception as e: - self._sout(f'something went wrong for config {params}; skipping:') - self._sout(f'\tException: {e}') - score = None - - return params, score, model - - def quantify(self, instances): """Estimate class prevalence values using the best model found after calling the :meth:`fit` method. @@ -203,7 +295,42 @@ class GridSearchQ(BaseQuantifier): return self.best_model_ raise ValueError('best_model called before fit') + def _error_handler(self, func, params): + """ + Endorses one job with two returned values: the status, and the time of execution + :param func: the function to be called + :param params: parameters of the function + :return: `tuple(out, status, time)` where `out` is the function output, + `status` is an enum value from `Status`, and `time` is the time it + took to complete the call + """ + + output = None + + def _handle(status, exception): + if self.raise_errors: + raise exception + else: + return ConfigStatus(params, status, str(e)) + + try: + with timeout(self.timeout): + tinit = time() + output = func(params) + status = ConfigStatus(params, Status.SUCCESS) + + except TimeoutError as e: + status = _handle(Status.TIMEOUT, str(e)) + + except ValueError as e: + status = _handle(Status.INVALID, str(e)) + + except Exception as e: + status = _handle(Status.ERROR, str(e)) + + took = time() - tinit + return output, status, took def cross_val_predict(quantifier: BaseQuantifier, data: LabelledCollection, nfolds=3, random_state=0): @@ -229,3 +356,43 @@ def cross_val_predict(quantifier: BaseQuantifier, data: LabelledCollection, nfol return total_prev +def expand_grid(param_grid: dict): + """ + Expands a param_grid dictionary as a list of configurations. + Example: + + >>> combinations = expand_grid({'A': [1, 10, 100], 'B': [True, False]}) + >>> print(combinations) + >>> [{'A': 1, 'B': True}, {'A': 1, 'B': False}, {'A': 10, 'B': True}, {'A': 10, 'B': False}, {'A': 100, 'B': True}, {'A': 100, 'B': False}] + + :param param_grid: dictionary with keys representing hyper-parameter names, and values representing the range + to explore for that hyper-parameter + :return: a list of configurations, i.e., combinations of hyper-parameter assignments in the grid. + """ + params_keys = list(param_grid.keys()) + params_values = list(param_grid.values()) + configs = [{k: combs[i] for i, k in enumerate(params_keys)} for combs in itertools.product(*params_values)] + return configs + + +def group_params(param_grid: dict): + """ + Partitions a param_grid dictionary as two lists of configurations, one for the classifier-specific + hyper-parameters, and another for que quantifier-specific hyper-parameters + + :param param_grid: dictionary with keys representing hyper-parameter names, and values representing the range + to explore for that hyper-parameter + :return: two expanded grids of configurations, one for the classifier, another for the quantifier + """ + classifier_params, quantifier_params = {}, {} + for key, values in param_grid.items(): + if key.startswith('classifier__') or key == 'val_split': + classifier_params[key] = values + else: + quantifier_params[key] = values + + classifier_configs = expand_grid(classifier_params) + quantifier_configs = expand_grid(quantifier_params) + + return classifier_configs, quantifier_configs + diff --git a/quapy/tests/test_hierarchy.py b/quapy/tests/test_hierarchy.py index 2ea3af5..b0842e5 100644 --- a/quapy/tests/test_hierarchy.py +++ b/quapy/tests/test_hierarchy.py @@ -22,9 +22,9 @@ class HierarchyTestCase(unittest.TestCase): def test_probabilistic(self): lr = LogisticRegression() for m in [CC(lr), ACC(lr)]: - self.assertEqual(isinstance(m, AggregativeProbabilisticQuantifier), False) + self.assertEqual(isinstance(m, AggregativeSoftQuantifier), False) for m in [PCC(lr), PACC(lr)]: - self.assertEqual(isinstance(m, AggregativeProbabilisticQuantifier), True) + self.assertEqual(isinstance(m, AggregativeSoftQuantifier), True) if __name__ == '__main__': diff --git a/quapy/util.py b/quapy/util.py index 733fbb8..de5c131 100644 --- a/quapy/util.py +++ b/quapy/util.py @@ -10,6 +10,8 @@ import quapy as qp import numpy as np from joblib import Parallel, delayed +from time import time +import signal def _get_parallel_slices(n_tasks, n_jobs): @@ -38,7 +40,7 @@ def map_parallel(func, args, n_jobs): return list(itertools.chain.from_iterable(results)) -def parallel(func, args, n_jobs, seed=None): +def parallel(func, args, n_jobs, seed=None, asarray=True, backend='loky'): """ A wrapper of multiprocessing: @@ -58,9 +60,12 @@ def parallel(func, args, n_jobs, seed=None): stack.enter_context(qp.util.temp_seed(seed)) return func(*args) - return Parallel(n_jobs=n_jobs)( + out = Parallel(n_jobs=n_jobs, backend=backend)( delayed(func_dec)(qp.environ, None if seed is None else seed+i, args_i) for i, args_i in enumerate(args) ) + if asarray: + out = np.asarray(out) + return out @contextlib.contextmanager @@ -254,3 +259,35 @@ class EarlyStop: if self.patience <= 0: self.STOP = True + +@contextlib.contextmanager +def timeout(seconds): + """ + Opens a context that will launch an exception if not closed after a given number of seconds + + >>> def func(start_msg, end_msg): + >>> print(start_msg) + >>> sleep(2) + >>> print(end_msg) + >>> + >>> with timeout(1): + >>> func('begin function', 'end function') + >>> Out[] + >>> begin function + >>> TimeoutError + + + :param seconds: number of seconds, set to <=0 to ignore the timer + """ + if seconds > 0: + def handler(signum, frame): + raise TimeoutError() + + signal.signal(signal.SIGALRM, handler) + signal.alarm(seconds) + + yield + + if seconds > 0: + signal.alarm(0) +