From c3cf0e2d492335b03dc770075ff026d5a5e5e886 Mon Sep 17 00:00:00 2001 From: Alejandro Moreo Date: Wed, 8 Nov 2023 16:13:48 +0100 Subject: [PATCH] adding DistributionMatchingX, the covariate-specific equivalent counterpart of DistributionMatching --- examples/comparing_HDy_HDx.py | 2 +- quapy/functional.py | 15 ++++ quapy/method/aggregative.py | 30 +++----- quapy/method/non_aggregative.py | 120 ++++++++++++++++++++++++++++---- quapy/protocol.py | 2 +- 5 files changed, 132 insertions(+), 37 deletions(-) diff --git a/examples/comparing_HDy_HDx.py b/examples/comparing_HDy_HDx.py index 18dbe1b..025f7cd 100644 --- a/examples/comparing_HDy_HDx.py +++ b/examples/comparing_HDy_HDx.py @@ -21,7 +21,7 @@ See `_ for qp.environ['SAMPLE_SIZE']=100 -df = pd.DataFrame(columns=('method', 'dataset', 'MAE', 'MRAE', 'tr-time', 'te-time')) +df = pd.DataFrame(columns=['method', 'dataset', 'MAE', 'MRAE', 'tr-time', 'te-time']) for dataset_name in tqdm(qp.datasets.UCI_DATASETS, total=len(qp.datasets.UCI_DATASETS)): diff --git a/quapy/functional.py b/quapy/functional.py index edb9b4a..2f64c2b 100644 --- a/quapy/functional.py +++ b/quapy/functional.py @@ -1,5 +1,7 @@ import itertools from collections import defaultdict +from typing import Union, Callable + import scipy import numpy as np @@ -276,3 +278,16 @@ def check_prevalence_vector(p, raise_exception=False, toleranze=1e-08): return False return True + +def get_divergence(divergence: Union[str, Callable]): + if isinstance(divergence, str): + if divergence=='HD': + return HellingerDistance + elif divergence=='topsoe': + return TopsoeDistance + else: + raise ValueError(f'unknown divergence {divergence}') + elif callable(divergence): + return divergence + else: + raise ValueError(f'argument "divergence" not understood; use a str or a callable function') diff --git a/quapy/method/aggregative.py b/quapy/method/aggregative.py index c041d70..526414c 100644 --- a/quapy/method/aggregative.py +++ b/quapy/method/aggregative.py @@ -9,6 +9,7 @@ from sklearn.metrics import confusion_matrix from sklearn.model_selection import cross_val_predict import quapy as qp import quapy.functional as F +from functional import get_divergence from quapy.classification.calibration import NBVSCalibration, BCTSCalibration, TSCalibration, VSCalibration from quapy.classification.svmperf import SVMperf from quapy.data import LabelledCollection @@ -605,20 +606,6 @@ class HDy(AggregativeProbabilisticQuantifier, BinaryQuantifier): return np.asarray([1 - class1_prev, class1_prev]) -def _get_divergence(divergence: Union[str, Callable]): - if isinstance(divergence, str): - if divergence=='HD': - return F.HellingerDistance - elif divergence=='topsoe': - return F.TopsoeDistance - else: - raise ValueError(f'unknown divergence {divergence}') - elif callable(divergence): - return divergence - else: - raise ValueError(f'argument "divergence" not understood; use a str or a callable function') - - class DyS(AggregativeProbabilisticQuantifier, BinaryQuantifier): """ `DyS framework `_ (DyS). @@ -676,7 +663,7 @@ class DyS(AggregativeProbabilisticQuantifier, BinaryQuantifier): Px = classif_posteriors[:, 1] # takes only the P(y=+1|x) Px_test = np.histogram(Px, bins=self.n_bins, range=(0, 1), density=True)[0] - divergence = _get_divergence(self.divergence) + divergence = get_divergence(self.divergence) def distribution_distance(prev): Px_train = prev * self.Pxy1_density + (1 - prev) * self.Pxy0_density @@ -727,8 +714,9 @@ class SMM(AggregativeProbabilisticQuantifier, BinaryQuantifier): class DistributionMatching(AggregativeProbabilisticQuantifier): """ - Generic Distribution Matching quantifier for binary or multiclass quantification. - This implementation takes the number of bins, the divergence, and the possibility to work on CDF as hyperparameters. + Generic Distribution Matching quantifier for binary or multiclass quantification based on the space of posterior + probabilities. This implementation takes the number of bins, the divergence, and the possibility to work on CDF + as hyperparameters. :param classifier: a `sklearn`'s Estimator that generates a probabilistic classifier :param val_split: indicates the proportion of data to be used as a stratified held-out validation set to model the @@ -741,7 +729,7 @@ class DistributionMatching(AggregativeProbabilisticQuantifier): :param divergence: a string representing a divergence measure (currently, "HD" and "topsoe" are implemented) or a callable function taking two ndarrays of the same dimension as input (default "HD", meaning Hellinger Distance) - :param cdf: whether or not to use CDF instead of PDF (default False) + :param cdf: whether to use CDF instead of PDF (default False) :param n_jobs: number of parallel workers (default None) """ @@ -773,8 +761,8 @@ class DistributionMatching(AggregativeProbabilisticQuantifier): """ Trains the classifier (if requested) and generates the validation distributions out of the training data. The validation distributions have shape `(n, ch, nbins)`, with `n` the number of classes, `ch` the number of - channels, and `nbins` the number of bins. In particular, let `V` be the validation distributions; `di=V[i]` - are the distributions obtained from training data labelled with class `i`; `dij = di[j]` is the discrete + channels, and `nbins` the number of bins. In particular, let `V` be the validation distributions; then `di=V[i]` + are the distributions obtained from training data labelled with class `i`; while `dij = di[j]` is the discrete distribution of posterior probabilities `P(Y=j|X=x)` for training data labelled with class `i`, and `dij[k]` is the fraction of instances with a value in the `k`-th bin. @@ -810,7 +798,7 @@ class DistributionMatching(AggregativeProbabilisticQuantifier): :return: a vector of class prevalence estimates """ test_distribution = self.__get_distributions(posteriors) - divergence = _get_divergence(self.divergence) + divergence = get_divergence(self.divergence) n_classes, n_channels, nbins = self.validation_distribution.shape def match(prev): prev = np.expand_dims(prev, axis=0) diff --git a/quapy/method/non_aggregative.py b/quapy/method/non_aggregative.py index b5b70cd..2f19b9e 100644 --- a/quapy/method/non_aggregative.py +++ b/quapy/method/non_aggregative.py @@ -1,8 +1,14 @@ +from typing import Union, Callable + import numpy as np +from scipy import optimize + +from functional import get_divergence from quapy.data import LabelledCollection from quapy.method.base import BaseQuantifier, BinaryQuantifier import quapy.functional as F + class MaximumLikelihoodPrevalenceEstimation(BaseQuantifier): """ The `Maximum Likelihood Prevalence Estimation` (MLPE) method is a lazy method that assumes there is no prior @@ -35,6 +41,7 @@ class MaximumLikelihoodPrevalenceEstimation(BaseQuantifier): return self.estimated_prevalence + class HDx(BinaryQuantifier): """ `Hellinger Distance x `_ (HDx). @@ -49,19 +56,11 @@ class HDx(BinaryQuantifier): def __init__(self): self.feat_ranges = None - def get_features_range(self, X): - feat_ranges = [] - ncols = X.shape[1] - for col_idx in range(ncols): - feature = X[:,col_idx] - feat_ranges.append((np.min(feature), np.max(feature))) - return feat_ranges - def covariate_histograms(self, X, nbins): assert self.feat_ranges is not None, 'quantify called before fit' histograms = [] - for col_idx in range(self.ncols): + for col_idx in range(self.nfeats): feature = X[:,col_idx] feat_range = self.feat_ranges[col_idx] histograms.append(np.histogram(feature, bins=nbins, range=feat_range, density=True)[0]) @@ -79,8 +78,8 @@ class HDx(BinaryQuantifier): self._check_binary(data, self.__class__.__name__) X, y = data.Xy - self.ncols = X.shape[1] - self.feat_ranges = self.get_features_range(X) + self.nfeats = X.shape[1] + self.feat_ranges = _get_features_range(X) # pre-compute the representation for positive and negative examples self.bins = np.linspace(10, 110, 11, dtype=int) # [10, 20, 30, ..., 100, 110] @@ -93,7 +92,7 @@ class HDx(BinaryQuantifier): # and the final estimated a priori probability was taken as the median of these 11 estimates." # (González-Castro, et al., 2013). - assert X.shape[1] == self.ncols, f'wrong shape in quantify; expected {self.ncols}, found {X.shape[1]}' + assert X.shape[1] == self.nfeats, f'wrong shape in quantify; expected {self.nfeats}, found {X.shape[1]}' prev_estimations = [] for nbins in self.bins: @@ -106,11 +105,104 @@ class HDx(BinaryQuantifier): prev_selected, min_dist = None, None for prev in F.prevalence_linspace(n_prevalences=100, repeats=1, smooth_limits_epsilon=0.0): Hx = prev * H1 + (1 - prev) * H0 - hdx = np.mean([F.HellingerDistance(Hx[:,col], Ht[:,col]) for col in range(self.ncols)]) + hdx = np.mean([F.HellingerDistance(Hx[:,col], Ht[:,col]) for col in range(self.nfeats)]) if prev_selected is None or hdx < min_dist: prev_selected, min_dist = prev, hdx prev_estimations.append(prev_selected) class1_prev = np.median(prev_estimations) - return np.asarray([1 - class1_prev, class1_prev]) \ No newline at end of file + return np.asarray([1 - class1_prev, class1_prev]) + + +class DistributionMatchingX(BaseQuantifier): + """ + Generic Distribution Matching quantifier for binary or multiclass quantification based on the space of covariates. + This implementation takes the number of bins, the divergence, and the possibility to work on CDF as hyperparameters. + + :param nbins: number of bins used to discretize the distributions (default 8) + :param divergence: a string representing a divergence measure (currently, "HD" and "topsoe" are implemented) + or a callable function taking two ndarrays of the same dimension as input (default "HD", meaning Hellinger + Distance) + :param cdf: whether to use CDF instead of PDF (default False) + :param n_jobs: number of parallel workers (default None) + """ + + def __init__(self, nbins=8, divergence: Union[str, Callable]='HD', cdf=False, n_jobs=None): + self.nbins = nbins + self.divergence = divergence + self.cdf = cdf + self.n_jobs = n_jobs + + def __get_distributions(self, X): + histograms = [] + for feat_idx in range(self.nfeats): + hist = np.histogram(X[:, feat_idx], bins=self.nbins, density=True, range=self.feat_ranges[feat_idx])[0] + histograms.append(hist) + + distributions = np.vstack(histograms) + if self.cdf: + distributions = np.cumsum(distributions, axis=1) + return distributions + + def fit(self, data: LabelledCollection): + """ + Generates the validation distributions out of the training data (covariates). + The validation distributions have shape `(n, nfeats, nbins)`, with `n` the number of classes, `nfeats` + the number of features, and `nbins` the number of bins. + In particular, let `V` be the validation distributions; then `di=V[i]` are the distributions obtained from + training data labelled with class `i`; while `dij = di[j]` is the discrete distribution for feature j in + training data labelled with class `i`, and `dij[k]` is the fraction of instances with a value in the `k`-th bin. + + :param data: the training set + """ + X, y = data.Xy + + self.nfeats = X.shape[1] + self.feat_ranges = _get_features_range(X) + + self.validation_distribution = np.asarray( + [self.__get_distributions(X[y==cat]) for cat in range(data.n_classes)] + ) + + return self + + def quantify(self, instances): + """ + Searches for the mixture model parameter (the sought prevalence values) that yields a validation distribution + (the mixture) that best matches the test distribution, in terms of the divergence measure of choice. + The matching is computed as the average dissimilarity (in terms of the dissimilarity measure of choice) + between all feature-specific discrete distributions. + + :param instances: instances in the sample + :return: a vector of class prevalence estimates + """ + + assert instances.shape[1] == self.nfeats, f'wrong shape; expected {self.nfeats}, found {instances.shape[1]}' + + test_distribution = self.__get_distributions(instances) + divergence = get_divergence(self.divergence) + n_classes, n_feats, nbins = self.validation_distribution.shape + def match(prev): + prev = np.expand_dims(prev, axis=0) + mixture_distribution = (prev @ self.validation_distribution.reshape(n_classes,-1)).reshape(n_feats, -1) + divs = [divergence(test_distribution[feat], mixture_distribution[feat]) for feat in range(n_feats)] + return np.mean(divs) + + # the initial point is set as the uniform distribution + uniform_distribution = np.full(fill_value=1 / n_classes, shape=(n_classes,)) + + # solutions are bounded to those contained in the unit-simplex + bounds = tuple((0, 1) for x in range(n_classes)) # values in [0,1] + constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x)}) # values summing up to 1 + r = optimize.minimize(match, x0=uniform_distribution, method='SLSQP', bounds=bounds, constraints=constraints) + return r.x + + +def _get_features_range(X): + feat_ranges = [] + ncols = X.shape[1] + for col_idx in range(ncols): + feature = X[:,col_idx] + feat_ranges.append((np.min(feature), np.max(feature))) + return feat_ranges \ No newline at end of file diff --git a/quapy/protocol.py b/quapy/protocol.py index 9bb716a..7d7d1df 100644 --- a/quapy/protocol.py +++ b/quapy/protocol.py @@ -236,7 +236,7 @@ class APP(AbstractStochasticSeededProtocol, OnLabelledCollectionProtocol): raise RuntimeError( f"Abort: the number of samples that will be generated by {self.__class__.__name__} ({n}) " f"exceeds the maximum number of allowed samples ({sanity_check = }). Set 'sanity_check' to " - f"None for bypassing this check, or to a higher number.") + f"None, or to a higher number, for bypassing this check.") self.collator = OnLabelledCollectionProtocol.get_collator(return_type)