1
0
Fork 0

Merge branch 'protocols' of github.com:HLT-ISTI/QuaPy into protocols

This commit is contained in:
Alejandro Moreo Fernandez 2022-11-04 15:04:50 +01:00
commit d75b777a13
5 changed files with 138 additions and 7 deletions

View File

@ -78,6 +78,12 @@ def HellingerDistance(P, Q):
"""
return np.sqrt(np.sum((np.sqrt(P) - np.sqrt(Q))**2))
def TopsoeDistance(P, Q, epsilon=1e-20):
""" Topsoe
"""
return np.sum(P*np.log((2*P+epsilon)/(P+Q+epsilon)) +
Q*np.log((2*Q+epsilon)/(P+Q+epsilon)))
def uniform_prevalence_sampling(n_classes, size=1):
"""

View File

@ -19,6 +19,8 @@ AGGREGATIVE_METHODS = {
aggregative.PACC,
aggregative.EMQ,
aggregative.HDy,
aggregative.DyS,
aggregative.SMM,
aggregative.X,
aggregative.T50,
aggregative.MAX,

View File

@ -1,6 +1,6 @@
from abc import abstractmethod
from copy import deepcopy
from typing import Union
from typing import Callable, Union
import numpy as np
from joblib import Parallel, delayed
from sklearn.base import BaseEstimator
@ -638,6 +638,119 @@ class HDy(AggregativeProbabilisticQuantifier, BinaryQuantifier):
return np.asarray([1 - class1_prev, class1_prev])
class DyS(AggregativeProbabilisticQuantifier, BinaryQuantifier):
"""
`DyS framework <https://ojs.aaai.org/index.php/AAAI/article/view/4376>`_ (DyS).
DyS is a generalization of HDy method, using a Ternary Search in order to find the prevalence that
minimizes the distance between distributions.
Details for the ternary search have been got from <https://dl.acm.org/doi/pdf/10.1145/3219819.3220059>
:param learner: a sklearn's Estimator that generates a binary classifier
:param val_split: a float in range (0,1) indicating the proportion of data to be used as a stratified held-out
validation distribution, or a :class:`quapy.data.base.LabelledCollection` (the split itself).
:param n_bins: an int with the number of bins to use to compute the histograms.
:param distance: an str with a distance already included in the librar (HD or topsoe), of a function
that computes the distance between two distributions.
:param tol: a float with the tolerance for the ternary search algorithm.
"""
def __init__(self, learner: BaseEstimator, val_split=0.4, n_bins=8, distance: Union[str, Callable]='HD', tol=1e-05):
self.learner = learner
self.val_split = val_split
self.tol = tol
self.distance = distance
self.n_bins = n_bins
def _ternary_search(self, f, left, right, tol):
"""
Find maximum of unimodal function f() within [left, right]
"""
while abs(right - left) >= tol:
left_third = left + (right - left) / 3
right_third = right - (right - left) / 3
if f(left_third) > f(right_third):
left = left_third
else:
right = right_third
# Left and right are the current bounds; the maximum is between them
return (left + right) / 2
def _compute_distance(self, Px_train, Px_test, distance: Union[str, Callable]='HD'):
if distance=='HD':
return F.HellingerDistance(Px_train, Px_test)
elif distance=='topsoe':
return F.TopsoeDistance(Px_train, Px_test)
else:
return distance(Px_train, Px_test)
def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, LabelledCollection] = None):
if val_split is None:
val_split = self.val_split
self._check_binary(data, self.__class__.__name__)
self.learner, validation = _training_helper(
self.learner, data, fit_learner, ensure_probabilistic=True, val_split=val_split)
Px = self.classify(validation.instances)[:, 1] # takes only the P(y=+1|x)
self.Pxy1 = Px[validation.labels == self.learner.classes_[1]]
self.Pxy0 = Px[validation.labels == self.learner.classes_[0]]
self.Pxy1_density = np.histogram(self.Pxy1, bins=self.n_bins, range=(0, 1), density=True)[0]
self.Pxy0_density = np.histogram(self.Pxy0, bins=self.n_bins, range=(0, 1), density=True)[0]
return self
def aggregate(self, classif_posteriors):
Px = classif_posteriors[:, 1] # takes only the P(y=+1|x)
Px_test = np.histogram(Px, bins=self.n_bins, range=(0, 1), density=True)[0]
def distribution_distance(prev):
Px_train = prev * self.Pxy1_density + (1 - prev) * self.Pxy0_density
return self._compute_distance(Px_train,Px_test,self.distance)
class1_prev = self._ternary_search(f=distribution_distance, left=0, right=1, tol=self.tol)
return np.asarray([1 - class1_prev, class1_prev])
class SMM(AggregativeProbabilisticQuantifier, BinaryQuantifier):
"""
`SMM method <https://ieeexplore.ieee.org/document/9260028>`_ (SMM).
SMM is a simplification of matching distribution methods where the representation of the examples
is created using the mean instead of a histogram.
:param learner: a sklearn's Estimator that generates a binary classifier.
:param val_split: a float in range (0,1) indicating the proportion of data to be used as a stratified held-out
validation distribution, or a :class:`quapy.data.base.LabelledCollection` (the split itself).
"""
def __init__(self, learner: BaseEstimator, val_split=0.4):
self.learner = learner
self.val_split = val_split
def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, LabelledCollection] = None):
if val_split is None:
val_split = self.val_split
self._check_binary(data, self.__class__.__name__)
self.learner, validation = _training_helper(
self.learner, data, fit_learner, ensure_probabilistic=True, val_split=val_split)
Px = self.classify(validation.instances)[:, 1] # takes only the P(y=+1|x)
self.Pxy1 = Px[validation.labels == self.learner.classes_[1]]
self.Pxy0 = Px[validation.labels == self.learner.classes_[0]]
self.Pxy1_mean = np.mean(self.Pxy1)
self.Pxy0_mean = np.mean(self.Pxy0)
return self
def aggregate(self, classif_posteriors):
Px = classif_posteriors[:, 1] # takes only the P(y=+1|x)
Px_mean = np.mean(Px)
class1_prev = (Px_mean - self.Pxy0_mean)/(self.Pxy1_mean - self.Pxy0_mean)
class1_prev = np.clip(class1_prev, 0, 1)
return np.asarray([1 - class1_prev, class1_prev])
class ELM(AggregativeQuantifier, BinaryQuantifier):
"""
Class of Explicit Loss Minimization (ELM) quantifiers.

View File

@ -83,7 +83,8 @@ class GridSearchQ(BaseQuantifier):
tinit = time()
hyper = [dict({k: values[i] for i, k in enumerate(params_keys)}) for values in itertools.product(*params_values)]
scores = qp.util.parallel(self._delayed_eval, ((params, training) for params in hyper), n_jobs=self.n_jobs)
#pass a seed to parallel so it is set in clild processes
scores = qp.util.parallel(self._delayed_eval, ((params, training) for params in hyper), seed=qp.environ.get('_R_SEED', None), n_jobs=self.n_jobs)
for params, score, model in scores:
if score is not None:

View File

@ -5,6 +5,7 @@ import os
import pickle
import urllib
from pathlib import Path
from contextlib import ExitStack
import quapy as qp
import numpy as np
@ -36,7 +37,7 @@ def map_parallel(func, args, n_jobs):
return list(itertools.chain.from_iterable(results))
def parallel(func, args, n_jobs):
def parallel(func, args, n_jobs, seed = None):
"""
A wrapper of multiprocessing:
@ -44,14 +45,20 @@ def parallel(func, args, n_jobs):
>>> delayed(func)(args_i) for args_i in args
>>> )
that takes the `quapy.environ` variable as input silently
that takes the `quapy.environ` variable as input silently.
Seeds the child processes to ensure reproducibility when n_jobs>1
"""
def func_dec(environ, *args):
def func_dec(environ, seed, *args):
qp.environ = environ.copy()
qp.environ['N_JOBS'] = 1
return func(*args)
#set a context with a temporal seed to ensure results are reproducibles in parallel
with ExitStack() as stack:
if seed is not None:
stack.enter_context(qp.util.temp_seed(seed))
return func(*args)
return Parallel(n_jobs=n_jobs)(
delayed(func_dec)(qp.environ, args_i) for args_i in args
delayed(func_dec)(qp.environ, None if seed is None else seed+i, args_i) for i, args_i in enumerate(args)
)
@ -66,6 +73,8 @@ def temp_seed(random_state):
:param random_state: the seed to set within the "with" context
"""
state = np.random.get_state()
#save the seed just in case is needed (for instance for setting the seed to child processes)
qp.environ['_R_SEED'] = random_state
np.random.seed(random_state)
try:
yield