adding DistributionMatchingX, the covariate-specific equivalent counterpart of DistributionMatching

This commit is contained in:
Alejandro Moreo Fernandez 2023-11-08 16:13:48 +01:00
parent 76cf784844
commit c3cf0e2d49
5 changed files with 132 additions and 37 deletions

View File

@ -21,7 +21,7 @@ See <https://www.sciencedirect.com/science/article/pii/S0020025512004069>`_ for
qp.environ['SAMPLE_SIZE']=100
df = pd.DataFrame(columns=('method', 'dataset', 'MAE', 'MRAE', 'tr-time', 'te-time'))
df = pd.DataFrame(columns=['method', 'dataset', 'MAE', 'MRAE', 'tr-time', 'te-time'])
for dataset_name in tqdm(qp.datasets.UCI_DATASETS, total=len(qp.datasets.UCI_DATASETS)):

View File

@ -1,5 +1,7 @@
import itertools
from collections import defaultdict
from typing import Union, Callable
import scipy
import numpy as np
@ -276,3 +278,16 @@ def check_prevalence_vector(p, raise_exception=False, toleranze=1e-08):
return False
return True
def get_divergence(divergence: Union[str, Callable]):
if isinstance(divergence, str):
if divergence=='HD':
return HellingerDistance
elif divergence=='topsoe':
return TopsoeDistance
else:
raise ValueError(f'unknown divergence {divergence}')
elif callable(divergence):
return divergence
else:
raise ValueError(f'argument "divergence" not understood; use a str or a callable function')

View File

@ -9,6 +9,7 @@ from sklearn.metrics import confusion_matrix
from sklearn.model_selection import cross_val_predict
import quapy as qp
import quapy.functional as F
from functional import get_divergence
from quapy.classification.calibration import NBVSCalibration, BCTSCalibration, TSCalibration, VSCalibration
from quapy.classification.svmperf import SVMperf
from quapy.data import LabelledCollection
@ -605,20 +606,6 @@ class HDy(AggregativeProbabilisticQuantifier, BinaryQuantifier):
return np.asarray([1 - class1_prev, class1_prev])
def _get_divergence(divergence: Union[str, Callable]):
if isinstance(divergence, str):
if divergence=='HD':
return F.HellingerDistance
elif divergence=='topsoe':
return F.TopsoeDistance
else:
raise ValueError(f'unknown divergence {divergence}')
elif callable(divergence):
return divergence
else:
raise ValueError(f'argument "divergence" not understood; use a str or a callable function')
class DyS(AggregativeProbabilisticQuantifier, BinaryQuantifier):
"""
`DyS framework <https://ojs.aaai.org/index.php/AAAI/article/view/4376>`_ (DyS).
@ -676,7 +663,7 @@ class DyS(AggregativeProbabilisticQuantifier, BinaryQuantifier):
Px = classif_posteriors[:, 1] # takes only the P(y=+1|x)
Px_test = np.histogram(Px, bins=self.n_bins, range=(0, 1), density=True)[0]
divergence = _get_divergence(self.divergence)
divergence = get_divergence(self.divergence)
def distribution_distance(prev):
Px_train = prev * self.Pxy1_density + (1 - prev) * self.Pxy0_density
@ -727,8 +714,9 @@ class SMM(AggregativeProbabilisticQuantifier, BinaryQuantifier):
class DistributionMatching(AggregativeProbabilisticQuantifier):
"""
Generic Distribution Matching quantifier for binary or multiclass quantification.
This implementation takes the number of bins, the divergence, and the possibility to work on CDF as hyperparameters.
Generic Distribution Matching quantifier for binary or multiclass quantification based on the space of posterior
probabilities. This implementation takes the number of bins, the divergence, and the possibility to work on CDF
as hyperparameters.
:param classifier: a `sklearn`'s Estimator that generates a probabilistic classifier
:param val_split: indicates the proportion of data to be used as a stratified held-out validation set to model the
@ -741,7 +729,7 @@ class DistributionMatching(AggregativeProbabilisticQuantifier):
:param divergence: a string representing a divergence measure (currently, "HD" and "topsoe" are implemented)
or a callable function taking two ndarrays of the same dimension as input (default "HD", meaning Hellinger
Distance)
:param cdf: whether or not to use CDF instead of PDF (default False)
:param cdf: whether to use CDF instead of PDF (default False)
:param n_jobs: number of parallel workers (default None)
"""
@ -773,8 +761,8 @@ class DistributionMatching(AggregativeProbabilisticQuantifier):
"""
Trains the classifier (if requested) and generates the validation distributions out of the training data.
The validation distributions have shape `(n, ch, nbins)`, with `n` the number of classes, `ch` the number of
channels, and `nbins` the number of bins. In particular, let `V` be the validation distributions; `di=V[i]`
are the distributions obtained from training data labelled with class `i`; `dij = di[j]` is the discrete
channels, and `nbins` the number of bins. In particular, let `V` be the validation distributions; then `di=V[i]`
are the distributions obtained from training data labelled with class `i`; while `dij = di[j]` is the discrete
distribution of posterior probabilities `P(Y=j|X=x)` for training data labelled with class `i`, and `dij[k]`
is the fraction of instances with a value in the `k`-th bin.
@ -810,7 +798,7 @@ class DistributionMatching(AggregativeProbabilisticQuantifier):
:return: a vector of class prevalence estimates
"""
test_distribution = self.__get_distributions(posteriors)
divergence = _get_divergence(self.divergence)
divergence = get_divergence(self.divergence)
n_classes, n_channels, nbins = self.validation_distribution.shape
def match(prev):
prev = np.expand_dims(prev, axis=0)

View File

@ -1,8 +1,14 @@
from typing import Union, Callable
import numpy as np
from scipy import optimize
from functional import get_divergence
from quapy.data import LabelledCollection
from quapy.method.base import BaseQuantifier, BinaryQuantifier
import quapy.functional as F
class MaximumLikelihoodPrevalenceEstimation(BaseQuantifier):
"""
The `Maximum Likelihood Prevalence Estimation` (MLPE) method is a lazy method that assumes there is no prior
@ -35,6 +41,7 @@ class MaximumLikelihoodPrevalenceEstimation(BaseQuantifier):
return self.estimated_prevalence
class HDx(BinaryQuantifier):
"""
`Hellinger Distance x <https://www.sciencedirect.com/science/article/pii/S0020025512004069>`_ (HDx).
@ -49,19 +56,11 @@ class HDx(BinaryQuantifier):
def __init__(self):
self.feat_ranges = None
def get_features_range(self, X):
feat_ranges = []
ncols = X.shape[1]
for col_idx in range(ncols):
feature = X[:,col_idx]
feat_ranges.append((np.min(feature), np.max(feature)))
return feat_ranges
def covariate_histograms(self, X, nbins):
assert self.feat_ranges is not None, 'quantify called before fit'
histograms = []
for col_idx in range(self.ncols):
for col_idx in range(self.nfeats):
feature = X[:,col_idx]
feat_range = self.feat_ranges[col_idx]
histograms.append(np.histogram(feature, bins=nbins, range=feat_range, density=True)[0])
@ -79,8 +78,8 @@ class HDx(BinaryQuantifier):
self._check_binary(data, self.__class__.__name__)
X, y = data.Xy
self.ncols = X.shape[1]
self.feat_ranges = self.get_features_range(X)
self.nfeats = X.shape[1]
self.feat_ranges = _get_features_range(X)
# pre-compute the representation for positive and negative examples
self.bins = np.linspace(10, 110, 11, dtype=int) # [10, 20, 30, ..., 100, 110]
@ -93,7 +92,7 @@ class HDx(BinaryQuantifier):
# and the final estimated a priori probability was taken as the median of these 11 estimates."
# (González-Castro, et al., 2013).
assert X.shape[1] == self.ncols, f'wrong shape in quantify; expected {self.ncols}, found {X.shape[1]}'
assert X.shape[1] == self.nfeats, f'wrong shape in quantify; expected {self.nfeats}, found {X.shape[1]}'
prev_estimations = []
for nbins in self.bins:
@ -106,11 +105,104 @@ class HDx(BinaryQuantifier):
prev_selected, min_dist = None, None
for prev in F.prevalence_linspace(n_prevalences=100, repeats=1, smooth_limits_epsilon=0.0):
Hx = prev * H1 + (1 - prev) * H0
hdx = np.mean([F.HellingerDistance(Hx[:,col], Ht[:,col]) for col in range(self.ncols)])
hdx = np.mean([F.HellingerDistance(Hx[:,col], Ht[:,col]) for col in range(self.nfeats)])
if prev_selected is None or hdx < min_dist:
prev_selected, min_dist = prev, hdx
prev_estimations.append(prev_selected)
class1_prev = np.median(prev_estimations)
return np.asarray([1 - class1_prev, class1_prev])
return np.asarray([1 - class1_prev, class1_prev])
class DistributionMatchingX(BaseQuantifier):
"""
Generic Distribution Matching quantifier for binary or multiclass quantification based on the space of covariates.
This implementation takes the number of bins, the divergence, and the possibility to work on CDF as hyperparameters.
:param nbins: number of bins used to discretize the distributions (default 8)
:param divergence: a string representing a divergence measure (currently, "HD" and "topsoe" are implemented)
or a callable function taking two ndarrays of the same dimension as input (default "HD", meaning Hellinger
Distance)
:param cdf: whether to use CDF instead of PDF (default False)
:param n_jobs: number of parallel workers (default None)
"""
def __init__(self, nbins=8, divergence: Union[str, Callable]='HD', cdf=False, n_jobs=None):
self.nbins = nbins
self.divergence = divergence
self.cdf = cdf
self.n_jobs = n_jobs
def __get_distributions(self, X):
histograms = []
for feat_idx in range(self.nfeats):
hist = np.histogram(X[:, feat_idx], bins=self.nbins, density=True, range=self.feat_ranges[feat_idx])[0]
histograms.append(hist)
distributions = np.vstack(histograms)
if self.cdf:
distributions = np.cumsum(distributions, axis=1)
return distributions
def fit(self, data: LabelledCollection):
"""
Generates the validation distributions out of the training data (covariates).
The validation distributions have shape `(n, nfeats, nbins)`, with `n` the number of classes, `nfeats`
the number of features, and `nbins` the number of bins.
In particular, let `V` be the validation distributions; then `di=V[i]` are the distributions obtained from
training data labelled with class `i`; while `dij = di[j]` is the discrete distribution for feature j in
training data labelled with class `i`, and `dij[k]` is the fraction of instances with a value in the `k`-th bin.
:param data: the training set
"""
X, y = data.Xy
self.nfeats = X.shape[1]
self.feat_ranges = _get_features_range(X)
self.validation_distribution = np.asarray(
[self.__get_distributions(X[y==cat]) for cat in range(data.n_classes)]
)
return self
def quantify(self, instances):
"""
Searches for the mixture model parameter (the sought prevalence values) that yields a validation distribution
(the mixture) that best matches the test distribution, in terms of the divergence measure of choice.
The matching is computed as the average dissimilarity (in terms of the dissimilarity measure of choice)
between all feature-specific discrete distributions.
:param instances: instances in the sample
:return: a vector of class prevalence estimates
"""
assert instances.shape[1] == self.nfeats, f'wrong shape; expected {self.nfeats}, found {instances.shape[1]}'
test_distribution = self.__get_distributions(instances)
divergence = get_divergence(self.divergence)
n_classes, n_feats, nbins = self.validation_distribution.shape
def match(prev):
prev = np.expand_dims(prev, axis=0)
mixture_distribution = (prev @ self.validation_distribution.reshape(n_classes,-1)).reshape(n_feats, -1)
divs = [divergence(test_distribution[feat], mixture_distribution[feat]) for feat in range(n_feats)]
return np.mean(divs)
# the initial point is set as the uniform distribution
uniform_distribution = np.full(fill_value=1 / n_classes, shape=(n_classes,))
# solutions are bounded to those contained in the unit-simplex
bounds = tuple((0, 1) for x in range(n_classes)) # values in [0,1]
constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x)}) # values summing up to 1
r = optimize.minimize(match, x0=uniform_distribution, method='SLSQP', bounds=bounds, constraints=constraints)
return r.x
def _get_features_range(X):
feat_ranges = []
ncols = X.shape[1]
for col_idx in range(ncols):
feature = X[:,col_idx]
feat_ranges.append((np.min(feature), np.max(feature)))
return feat_ranges

View File

@ -236,7 +236,7 @@ class APP(AbstractStochasticSeededProtocol, OnLabelledCollectionProtocol):
raise RuntimeError(
f"Abort: the number of samples that will be generated by {self.__class__.__name__} ({n}) "
f"exceeds the maximum number of allowed samples ({sanity_check = }). Set 'sanity_check' to "
f"None for bypassing this check, or to a higher number.")
f"None, or to a higher number, for bypassing this check.")
self.collator = OnLabelledCollectionProtocol.get_collator(return_type)