From ceb88792c5669dea84de244783570c1e37f627d6 Mon Sep 17 00:00:00 2001 From: Alejandro Moreo Date: Tue, 31 Jan 2023 15:08:58 +0100 Subject: [PATCH] added DistributionMatching method, a generic model for distribution matching for multiclass quantification problems that takes the divergence and number of bins as hyperparameters --- quapy/CHANGE_LOG.txt | 16 +++- quapy/method/aggregative.py | 158 ++++++++++++++++++++++++++++-------- 2 files changed, 136 insertions(+), 38 deletions(-) diff --git a/quapy/CHANGE_LOG.txt b/quapy/CHANGE_LOG.txt index c450b41..f2deea0 100644 --- a/quapy/CHANGE_LOG.txt +++ b/quapy/CHANGE_LOG.txt @@ -25,9 +25,9 @@ - cross_val_predict (for quantification) added to model_selection: would be nice to allow the user specifies a test protocol maybe, or None for bypassing it? -- I think Pablo added DyS, Topsoe distance and binary search. +- DyS, Topsoe distance and binary search (thanks to Pablo González) -- I think Pablo added multi-thread reproducibility. +- Multi-thread reproducibility via seeding (thanks to Pablo González) - Bugfix: adding two labelled collections (with +) now checks for consistency in the classes @@ -40,8 +40,16 @@ - the internal classifier of aggregative methods is now called "classifier" instead of "learner" - when optimizing the hyperparameters of an aggregative quantifier, the classifier's specific hyperparameters - should be marked with a "classifier__" prefix (just like in scikit-learn), while the quantifier's specific - hyperparameters are named directly. For example, PCC(LogisticRegression()) quantifier has + should be marked with a "classifier__" prefix (just like in scikit-learn with estimators), while the quantifier's + specific hyperparameters are named directly. For example, PCC(LogisticRegression()) quantifier has hyperparameters + "classifier__C", "classifier__class_weight", etc., instead of "C" and "class_weight" as in v0.1.6. + +- hyperparameters yielding to inconsistent runs raise a ValueError exception, while hyperparameter combinations + yielding to internal errors of surrogate functions are reported and skipped, without stopping the grid search. + +- DistributionMatching methods added. This is a general framework for distribution matching methods that catters for + multiclass quantification. That is to say, one could get a multiclass variant of the (originally binary) HDy + method aligned with the Firat's formulation. Things to fix: - calibration with recalibration methods has to be fixed for exact_train_prev in EMQ (conflicts with clone, deepcopy, etc.) diff --git a/quapy/method/aggregative.py b/quapy/method/aggregative.py index 3246b9f..87b682e 100644 --- a/quapy/method/aggregative.py +++ b/quapy/method/aggregative.py @@ -3,6 +3,7 @@ from copy import deepcopy from typing import Callable, Union import numpy as np from joblib import Parallel, delayed +from scipy import optimize from sklearn.base import BaseEstimator, clone from sklearn.calibration import CalibratedClassifierCV from sklearn.metrics import confusion_matrix @@ -10,8 +11,7 @@ from sklearn.model_selection import StratifiedKFold, cross_val_predict from tqdm import tqdm import quapy as qp import quapy.functional as F -from classification.calibration import RecalibratedProbabilisticClassifier, NBVSCalibration, BCTSCalibration, TSCalibration, \ - VSCalibration +from classification.calibration import NBVSCalibration, BCTSCalibration, TSCalibration, VSCalibration from quapy.classification.svmperf import SVMperf from quapy.data import LabelledCollection from quapy.method.base import BaseQuantifier, BinaryQuantifier @@ -91,25 +91,6 @@ class AggregativeQuantifier(BaseQuantifier): """ ... - # def get_params(self, deep=True): - # """ - # Return the current parameters of the quantifier. - # - # :param deep: for compatibility with sklearn - # :return: a dictionary of param-value pairs - # """ - # - # return self.learner.get_params() - - # def set_params(self, **parameters): - # """ - # Set the parameters of the quantifier. - # - # :param parameters: dictionary of param-value pairs - # """ - # - # self.learner.set_params(**parameters) - @property def classes_(self): """ @@ -690,16 +671,16 @@ class DyS(AggregativeProbabilisticQuantifier, BinaryQuantifier): :param val_split: a float in range (0,1) indicating the proportion of data to be used as a stratified held-out validation distribution, or a :class:`quapy.data.base.LabelledCollection` (the split itself). :param n_bins: an int with the number of bins to use to compute the histograms. - :param distance: an str with a distance already included in the librar (HD or topsoe), of a function - that computes the distance between two distributions. + :param divergence: a str indicating the name of divergence (currently supported ones are "HD" or "topsoe"), or a + callable function computes the divergence between two distributions (two equally sized arrays). :param tol: a float with the tolerance for the ternary search algorithm. """ - def __init__(self, classifier: BaseEstimator, val_split=0.4, n_bins=8, distance: Union[str, Callable]='HD', tol=1e-05): + def __init__(self, classifier: BaseEstimator, val_split=0.4, n_bins=8, divergence: Union[str, Callable]= 'HD', tol=1e-05): self.classifier = classifier self.val_split = val_split self.tol = tol - self.distance = distance + self.divergence = divergence self.n_bins = n_bins def _ternary_search(self, f, left, right, tol): @@ -718,14 +699,6 @@ class DyS(AggregativeProbabilisticQuantifier, BinaryQuantifier): # Left and right are the current bounds; the maximum is between them return (left + right) / 2 - def _compute_distance(self, Px_train, Px_test, distance: Union[str, Callable]='HD'): - if distance == 'HD': - return F.HellingerDistance(Px_train, Px_test) - elif distance == 'topsoe': - return F.TopsoeDistance(Px_train, Px_test) - else: - return distance(Px_train, Px_test) - def fit(self, data: LabelledCollection, fit_classifier=True, val_split: Union[float, LabelledCollection] = None): if val_split is None: val_split = self.val_split @@ -744,10 +717,11 @@ class DyS(AggregativeProbabilisticQuantifier, BinaryQuantifier): Px = classif_posteriors[:, 1] # takes only the P(y=+1|x) Px_test = np.histogram(Px, bins=self.n_bins, range=(0, 1), density=True)[0] + divergence = _get_divergence(self.divergence) def distribution_distance(prev): Px_train = prev * self.Pxy1_density + (1 - prev) * self.Pxy0_density - return self._compute_distance(Px_train,Px_test,self.distance) + return divergence(Px_train, Px_test) class1_prev = self._ternary_search(f=distribution_distance, left=0, right=1, tol=self.tol) return np.asarray([1 - class1_prev, class1_prev]) @@ -791,6 +765,122 @@ class SMM(AggregativeProbabilisticQuantifier, BinaryQuantifier): return np.asarray([1 - class1_prev, class1_prev]) +def _get_divergence(divergence: Union[str, Callable]): + if isinstance(divergence, str): + if divergence=='HD': + return F.HellingerDistance + elif divergence=='topsoe': + return F.TopsoeDistance + else: + raise ValueError(f'unknown divergence {divergence}') + elif callable(divergence): + return divergence + else: + raise ValueError(f'argument "divergence" not understood; use a str or a callable function') + +class DistributionMatching(AggregativeProbabilisticQuantifier): + """ + Generic Distribution Matching quantifier for binary or multiclass quantification. + This implementation takes the number of bins, the divergence, and the possibility to work on CDF as hyperparameters. + + :param classifier: a sklearn's Estimator that generates a probabilistic classifier + :param val_split: indicates the proportion of data to be used as a stratified held-out validation set to model the + validation distribution. + This parameter can be indicated as a real value (between 0 and 1, default 0.4), representing a proportion of + validation data, or as an integer, indicating that the validation distribution should be estimated via + `k`-fold cross validation (this integer stands for the number of folds `k`), or as a + :class:`quapy.data.base.LabelledCollection` (the split itself). + :param nbins: number of bins used to discretize the distributions (default 8) + :param divergence: a string representing a divergence measure (currently, "HD" and "topsoe" are implemented) + or a callable function taking two ndarrays of the same dimension as input (default "HD", meaning Hellinger + Distance) + :param cdf: whether or not to use CDF instead of PDF (default False) + :param n_jobs: number of parallel workers (default None) + """ + + def __init__(self, classifier, val_split=0.4, nbins=8, divergence: Union[str, Callable]='HD', cdf=False, n_jobs=None): + + self.classifier = classifier + self.val_split = val_split + self.nbins = nbins + self.divergence = divergence + self.cdf = cdf + self.n_jobs = n_jobs + + def __get_distributions(self, posteriors): + histograms = [] + post_dims = posteriors.shape[1] + if post_dims == 2: + # in binary quantification we can use only one class, since the other one is its complement + post_dims = 1 + for dim in range(post_dims): + hist = np.histogram(posteriors[:, dim], bins=self.nbins, range=(0, 1))[0] + histograms.append(hist) + + counts = np.vstack(histograms) + distributions = counts/counts.sum(axis=1)[:,np.newaxis] + if self.cdf: + distributions = np.cumsum(distributions, axis=1) + return distributions + + def fit(self, data: LabelledCollection, fit_classifier=True, val_split: Union[float, LabelledCollection] = None): + """ + Trains the classifier (if requested) and generates the validation distributions out of the training data. + The validation distributions have shape `(n, ch, nbins)`, with `n` the number of classes, `ch` the number of + channels, and `nbins` the number of bins. In particular, let `V` be the validation distributions; `di=V[i]` + are the distributions obtained from training data labelled with class `i`; `dij = di[j]` is the discrete + distribution of posterior probabilities `P(Y=j|X=x)` for training data labelled with class `i`, and `dij[k]` + is the fraction of instances with a value in the `k`-th bin. + + :param data: the training set + :param fit_classifier: set to False to bypass the training (the learner is assumed to be already fit) + :param val_split: either a float in (0,1) indicating the proportion of training instances to use for + validation (e.g., 0.3 for using 30% of the training set as validation data), or a LabelledCollection + indicating the validation set itself, or an int indicating the number k of folds to be used in kFCV + to estimate the parameters + """ + if val_split is None: + val_split = self.val_split + + self.classifier, y, posteriors, classes, class_count = cross_generate_predictions( + data, self.classifier, val_split, probabilistic=True, fit_classifier=fit_classifier, n_jobs=self.n_jobs + ) + + self.validation_distribution = np.asarray( + [self.__get_distributions(posteriors[y==cat]) for cat in range(data.n_classes)] + ) + + return self + + def aggregate(self, posteriors: np.ndarray): + """ + Searches for the mixture model parameter (the sought prevalence values) that yields a validation distribution + (the mixture) that best matches the test distribution, in terms of the divergence measure of choice. + In the multiclass case, with `n` the number of classes, the test and mixture distributions contain + `n` channels (proper distributions of binned posterior probabilities), on which the divergence is computed + independently. The matching is computed as an average of the divergence across all channels. + + :param instances: instances in the sample + :return: a vector of class prevalence estimates + """ + test_distribution = self.__get_distributions(posteriors) + divergence = _get_divergence(self.divergence) + n_classes, n_channels, nbins = self.validation_distribution.shape + def match(prev): + prev = np.expand_dims(prev, axis=0) + mixture_distribution = (prev @ self.validation_distribution.reshape(n_classes,-1)).reshape(n_channels, -1) + divs = [divergence(test_distribution[ch], mixture_distribution[ch]) for ch in range(n_channels)] + return np.mean(divs) + + # the initial point is set as the uniform distribution + uniform_distribution = np.full(fill_value=1 / n_classes, shape=(n_classes,)) + + # solutions are bounded to those contained in the unit-simplex + bounds = tuple((0, 1) for x in range(n_classes)) # values in [0,1] + constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x)}) # values summing up to 1 + r = optimize.minimize(match, x0=uniform_distribution, method='SLSQP', bounds=bounds, constraints=constraints) + return r.x + class ELM(AggregativeQuantifier, BinaryQuantifier): """