1
0
Fork 0

added HDx and an example comparing HDy vs HDx

This commit is contained in:
Alejandro Moreo Fernandez 2023-11-08 15:34:17 +01:00
parent 8a6579428b
commit 76cf784844
5 changed files with 166 additions and 6 deletions

View File

@ -0,0 +1,74 @@
from sklearn.linear_model import LogisticRegression
from time import time
import pandas as pd
from tqdm import tqdm
import quapy as qp
from quapy.protocol import APP
from quapy.method.aggregative import HDy
from quapy.method.non_aggregative import HDx
"""
This example is meant to experimentally compare HDy and HDx.
The implementations of these methods adhere to the original design of the methods; in particular, this means that
the number of bins is not an hyperparameter, but is something that the method explores internally (returning the
median of the estimates as the final prevalence prediction), and the prevalence is not searched through any
numerical optimization procedure, but simply as a linear search between 0 and 1 steppy by 0.01.
See <https://www.sciencedirect.com/science/article/pii/S0020025512004069>`_ for further details
"""
qp.environ['SAMPLE_SIZE']=100
df = pd.DataFrame(columns=('method', 'dataset', 'MAE', 'MRAE', 'tr-time', 'te-time'))
for dataset_name in tqdm(qp.datasets.UCI_DATASETS, total=len(qp.datasets.UCI_DATASETS)):
if dataset_name in ['acute.a', 'acute.b', 'balance.2', 'iris.1']: continue
collection = qp.datasets.fetch_UCILabelledCollection(dataset_name, verbose=False)
train, test = collection.split_stratified()
# HDy............................................
tinit = time()
hdy = HDy(LogisticRegression()).fit(train)
t_hdy_train = time()-tinit
tinit = time()
hdy_report = qp.evaluation.evaluation_report(hdy, APP(test), error_metrics=['mae', 'mrae']).mean()
t_hdy_test = time() - tinit
df.loc[len(df)] = ['HDy', dataset_name, hdy_report['mae'], hdy_report['mrae'], t_hdy_train, t_hdy_test]
# HDx............................................
tinit = time()
hdx = HDx().fit(train)
t_hdx_train = time() - tinit
tinit = time()
hdx_report = qp.evaluation.evaluation_report(hdx, APP(test), error_metrics=['mae', 'mrae']).mean()
t_hdx_test = time() - tinit
df.loc[len(df)] = ['HDx', dataset_name, hdx_report['mae'], hdx_report['mrae'], t_hdx_train, t_hdx_test]
# evaluation reports
print('\n'*3)
print('='*80)
print('Comparison in terms of performance')
print('='*80)
pv = df.pivot_table(index='dataset', columns='method', values=['MAE', 'MRAE'])
print(pv)
print('\nAveraged values:')
print(pv.mean())
print('\n'*3)
print('='*80)
print('Comparison in terms of efficiency')
print('='*80)
pv = df.pivot_table(index='dataset', columns='method', values=['tr-time', 'te-time'])
print(pv)
print('\nAveraged values:')
print(pv.mean())

View File

@ -369,6 +369,7 @@ def fetch_UCILabelledCollection(dataset_name, data_home=None, verbose=False) ->
elif verbose:
print('no file description available')
if verbose:
print(f'Loading {dataset_name} ({fullname})')
if identifier == 'acute':
df = pd.read_csv(data_path, header=None, encoding='utf-16', sep='\t')
@ -550,6 +551,7 @@ def fetch_UCILabelledCollection(dataset_name, data_home=None, verbose=False) ->
y = binarize(y, pos_class='NUC')
data = LabelledCollection(X, y)
if verbose:
data.stats()
return data

View File

@ -64,7 +64,7 @@ def prevalence_from_probabilities(posteriors, binarize: bool = False):
return prevalences
def HellingerDistance(P, Q):
def HellingerDistance(P, Q) -> float:
"""
Computes the Hellingher Distance (HD) between (discretized) distributions `P` and `Q`.
The HD for two discrete distributions of `k` bins is defined as:

View File

@ -530,7 +530,7 @@ class HDy(AggregativeProbabilisticQuantifier, BinaryQuantifier):
"""
`Hellinger Distance y <https://www.sciencedirect.com/science/article/pii/S0020025512004069>`_ (HDy).
HDy is a probabilistic method for training binary quantifiers, that models quantification as the problem of
minimizing the divergence (in terms of the Hellinger Distance) between two cumulative distributions of posterior
minimizing the divergence (in terms of the Hellinger Distance) between two distributions of posterior
probabilities returned by the classifier. One of the distributions is generated from the unlabelled examples and
the other is generated from a validation set. This latter distribution is defined as a mixture of the
class-conditional distributions of the posterior probabilities returned for the positive and negative validation
@ -590,6 +590,9 @@ class HDy(AggregativeProbabilisticQuantifier, BinaryQuantifier):
Px_test, _ = np.histogram(Px, bins=bins, range=(0, 1), density=True)
# the authors proposed to search for the prevalence yielding the best matching as a linear search
# at small steps (modern implementations resort to an optimization procedure,
# see class DistributionMatching)
prev_selected, min_dist = None, None
for prev in F.prevalence_linspace(n_prevalences=100, repeats=1, smooth_limits_epsilon=0.0):
Px_train = prev * Pxy1_density + (1 - prev) * Pxy0_density

View File

@ -1,6 +1,7 @@
import numpy as np
from quapy.data import LabelledCollection
from .base import BaseQuantifier
from quapy.method.base import BaseQuantifier, BinaryQuantifier
import quapy.functional as F
class MaximumLikelihoodPrevalenceEstimation(BaseQuantifier):
"""
@ -33,3 +34,83 @@ class MaximumLikelihoodPrevalenceEstimation(BaseQuantifier):
"""
return self.estimated_prevalence
class HDx(BinaryQuantifier):
"""
`Hellinger Distance x <https://www.sciencedirect.com/science/article/pii/S0020025512004069>`_ (HDx).
HDx is a method for training binary quantifiers, that models quantification as the problem of
minimizing the average divergence (in terms of the Hellinger Distance) across the feature-specific normalized
histograms of two representations, one for the unlabelled examples, and another generated from the training
examples as a mixture model of the class-specific representations. The parameters of the mixture thus represent
the estimates of the class prevalence values. The method computes all matchings for nbins in [10, 20, ..., 110]
and reports the mean of the median. The best prevalence is searched via linear search, from 0 to 1 steppy by 0.01.
"""
def __init__(self):
self.feat_ranges = None
def get_features_range(self, X):
feat_ranges = []
ncols = X.shape[1]
for col_idx in range(ncols):
feature = X[:,col_idx]
feat_ranges.append((np.min(feature), np.max(feature)))
return feat_ranges
def covariate_histograms(self, X, nbins):
assert self.feat_ranges is not None, 'quantify called before fit'
histograms = []
for col_idx in range(self.ncols):
feature = X[:,col_idx]
feat_range = self.feat_ranges[col_idx]
histograms.append(np.histogram(feature, bins=nbins, range=feat_range, density=True)[0])
return np.vstack(histograms).T
def fit(self, data: LabelledCollection):
"""
Trains a HDx quantifier.
:param data: the training set
:return: self
"""
self._check_binary(data, self.__class__.__name__)
X, y = data.Xy
self.ncols = X.shape[1]
self.feat_ranges = self.get_features_range(X)
# pre-compute the representation for positive and negative examples
self.bins = np.linspace(10, 110, 11, dtype=int) # [10, 20, 30, ..., 100, 110]
self.H0 = {bins:self.covariate_histograms(X[y == 0], bins) for bins in self.bins}
self.H1 = {bins:self.covariate_histograms(X[y == 1], bins) for bins in self.bins}
return self
def quantify(self, X):
# "In this work, the number of bins b used in HDx and HDy was chosen from 10 to 110 in steps of 10,
# and the final estimated a priori probability was taken as the median of these 11 estimates."
# (González-Castro, et al., 2013).
assert X.shape[1] == self.ncols, f'wrong shape in quantify; expected {self.ncols}, found {X.shape[1]}'
prev_estimations = []
for nbins in self.bins:
Ht = self.covariate_histograms(X, nbins=nbins)
H0 = self.H0[nbins]
H1 = self.H1[nbins]
# the authors proposed to search for the prevalence yielding the best matching as a linear search
# at small steps (modern implementations resort to an optimization procedure)
prev_selected, min_dist = None, None
for prev in F.prevalence_linspace(n_prevalences=100, repeats=1, smooth_limits_epsilon=0.0):
Hx = prev * H1 + (1 - prev) * H0
hdx = np.mean([F.HellingerDistance(Hx[:,col], Ht[:,col]) for col in range(self.ncols)])
if prev_selected is None or hdx < min_dist:
prev_selected, min_dist = prev, hdx
prev_estimations.append(prev_selected)
class1_prev = np.median(prev_estimations)
return np.asarray([1 - class1_prev, class1_prev])