Merge branch 'AICGijon-devel2' into devel

merged IFCB dataset
This commit is contained in:
Alejandro Moreo Fernandez 2024-02-07 18:46:12 +01:00
commit 4c77253f07
4 changed files with 383 additions and 19 deletions

View File

@ -6,6 +6,7 @@ from quapy.evaluation import evaluation_report
def newLR():
return LogisticRegression(n_jobs=-1)
<<<<<<< HEAD
quantifiers = [
('CC', qp.method.aggregative.CC(newLR())),
@ -18,6 +19,17 @@ quantifiers = [
for quant_name, quantifier in quantifiers:
=======
quantifiers = {'CC':qp.method.aggregative.CC(newLR()),
'ACC':qp.method.aggregative.ACC(newLR()),
'PCC':qp.method.aggregative.PCC(newLR()),
'PACC':qp.method.aggregative.PACC(newLR()),
'HDy':qp.method.aggregative.DistributionMatching(newLR()),
'EMQ':qp.method.aggregative.EMQ(newLR())
}
for quant_name, quantifier in quantifiers.items():
>>>>>>> 5566e0c97ae1b49b30874b6610d7f5b062009271
print("Experiment with "+quant_name)
train, test_gen = qp.datasets.fetch_IFCB()

View File

@ -1,5 +1,6 @@
import os
import pandas as pd
<<<<<<< HEAD
from quapy.protocol import AbstractProtocol
class IFCBTrainSamplesFromDir(AbstractProtocol):
@ -11,6 +12,55 @@ class IFCBTrainSamplesFromDir(AbstractProtocol):
for filename in os.listdir(path_dir):
if filename.endswith('.csv'):
self.samples.append(filename)
=======
import math
from quapy.protocol import AbstractProtocol
from pathlib import Path
def get_sample_list(path_dir):
"""Gets a sample list finding the csv files in a directory
Args:
path_dir (_type_): directory to look for samples
Returns:
_type_: list of samples
"""
samples = []
for filename in sorted(os.listdir(path_dir)):
if filename.endswith('.csv'):
samples.append(filename)
return samples
def generate_modelselection_split(samples, split=0.3):
"""This function generates a train/test split for model selection
without the use of random numbers so the split is always the same
Args:
samples (_type_): list of samples
split (float, optional): percentage saved for test. Defaults to 0.3.
Returns:
_type_: list of samples to use as train and list of samples to use as test
"""
num_items_to_pick = math.ceil(len(samples) * split)
step_size = math.floor(len(samples) / num_items_to_pick)
test_indices = [i * step_size for i in range(num_items_to_pick)]
test = [samples[i] for i in test_indices]
train = [item for i, item in enumerate(samples) if i not in test_indices]
return train, test
class IFCBTrainSamplesFromDir(AbstractProtocol):
def __init__(self, path_dir:str, classes: list, samples: list = None):
self.path_dir = path_dir
self.classes = classes
self.samples = []
if samples is not None:
self.samples = samples
else:
self.samples = get_sample_list(path_dir)
>>>>>>> 5566e0c97ae1b49b30874b6610d7f5b062009271
def __call__(self):
for sample in self.samples:
@ -28,6 +78,7 @@ class IFCBTrainSamplesFromDir(AbstractProtocol):
"""
return len(self.samples)
<<<<<<< HEAD
class IFCBTestSamples(AbstractProtocol):
@ -40,12 +91,43 @@ class IFCBTestSamples(AbstractProtocol):
#Load the sample from disk
X = pd.read_csv(os.path.join(self.path_dir,test_sample['sample']+'.csv')).to_numpy()
prevalences = test_sample.iloc[1:].to_numpy().astype(float)
=======
class IFCBTestSamples(AbstractProtocol):
def __init__(self, path_dir:str, test_prevalences: pd.DataFrame, samples: list = None, classes: list=None):
self.path_dir = path_dir
self.test_prevalences = test_prevalences
self.classes = classes
if samples is not None:
self.samples = samples
else:
self.samples = get_sample_list(path_dir)
def __call__(self):
for test_sample in self.samples:
s = pd.read_csv(os.path.join(self.path_dir,test_sample))
if self.test_prevalences is not None:
X = s
# If we are working with the test samples, we have a dataframe with the prevalences and no labels for the test
prevalences = self.test_prevalences.loc[self.test_prevalences['sample']==Path(test_sample).stem].to_numpy()[:,1:].flatten().astype(float)
else:
X = s.iloc[:, 1:].to_numpy()
y = s.iloc[:,0]
# In this case we compute the sample prevalences from the labels
prevalences = y[y.isin(self.classes)].value_counts().reindex(self.classes, fill_value=0).to_numpy()/len(s)
>>>>>>> 5566e0c97ae1b49b30874b6610d7f5b062009271
yield X, prevalences
def total(self):
"""
Returns the total number of samples that the protocol generates.
<<<<<<< HEAD
:return: The number of test samples to generate.
"""
return len(self.test_prevalences.index)
return len(self.test_prevalences.index)
=======
:return: The number of training samples to generate.
"""
return len(self.samples)
>>>>>>> 5566e0c97ae1b49b30874b6610d7f5b062009271

View File

@ -734,8 +734,7 @@ def fetch_lequa2022(task, data_home=None):
return train, val_gen, test_gen
def fetch_IFCB(single_sample_train=True, data_home=None):
def fetch_IFCB(single_sample_train=True, for_model_selection=False, data_home=None):
"""
Loads the IFCB dataset for quantification <https://zenodo.org/records/10036244>`. For more
information on this dataset check the zenodo site.
@ -746,21 +745,21 @@ def fetch_IFCB(single_sample_train=True, data_home=None):
The datasets are downloaded only once, and stored for fast reuse.
:param single_sample_train: boolean. If True (default), it returns the train dataset as an instance of
:param single_sample_train: a boolean. If true, it will return the train dataset as a
:class:`quapy.data.base.LabelledCollection` (all examples together).
If False, a generator of training samples will be returned.
Each example in the training set has an individual class label.
If false, a generator of training samples will be returned. Each example in the training set has an individual label.
:param for_model_selection: if True, then returns a split 30% of the training set (86 out of 286 samples) to be used for model selection;
if False, then returns the full training set as training set and the test set as the test set
:param data_home: specify the quapy home directory where collections will be dumped (leave empty to use the default
~/quay_data/ directory)
:return: a tuple `(train, test_gen)` where `train` is an instance of
:class:`quapy.data.base.LabelledCollection`, if `single_sample_train` is True or
:class:`quapy.data._ifcb.IFCBTrainSamplesFromDir` otherwise, i.e. a sampling protocol that
returns a series of samples labelled example by example.
test_gen is an instance of :class:`quapy.data._ifcb.IFCBTestSamples`,
:class:`quapy.data.base.LabelledCollection`, if `single_sample_train` is true or
:class:`quapy.data._ifcb.IFCBTrainSamplesFromDir`, i.e. a sampling protocol that returns a series of samples
labelled example by example. test_gen will be a :class:`quapy.data._ifcb.IFCBTestSamples`,
i.e., a sampling protocol that returns a series of samples labelled by prevalence.
"""
from quapy.data._ifcb import IFCBTrainSamplesFromDir, IFCBTestSamples
from quapy.data._ifcb import IFCBTrainSamplesFromDir, IFCBTestSamples, get_sample_list, generate_modelselection_split
if data_home is None:
data_home = get_quapy_home()
@ -791,25 +790,35 @@ def fetch_IFCB(single_sample_train=True, data_home=None):
test_true_prev = pd.read_csv(test_true_prev_path)
classes = test_true_prev.columns[1:]
#Load train samples
#Load train and test samples
train_samples_path = join(ifcb_dir,'train')
train_gen = IFCBTrainSamplesFromDir(path_dir=train_samples_path, classes=classes)
#Load test samples
test_samples_path = join(ifcb_dir,'test')
test_gen = IFCBTestSamples(path_dir=test_samples_path, test_prevalences_path=test_true_prev_path)
if for_model_selection:
# In this case, return 70% of training data as the training set and 30% as the test set
samples = get_sample_list(train_samples_path)
train, test = generate_modelselection_split(samples, split=0.3)
train_gen = IFCBTrainSamplesFromDir(path_dir=train_samples_path, classes=classes, samples=train)
# Test prevalence is computed from class labels
test_gen = IFCBTestSamples(path_dir=train_samples_path, test_prevalences=None, samples=test, classes=classes)
else:
# In this case, we use all training samples as the training set and the test samples as the test set
train_gen = IFCBTrainSamplesFromDir(path_dir=train_samples_path, classes=classes)
test_gen = IFCBTestSamples(path_dir=test_samples_path, test_prevalences=test_true_prev)
# In the case the user wants it, join all the train samples in one LabelledCollection
if single_sample_train:
X = []
y = []
X, y = [], []
for X_, y_ in train_gen():
X.append(X_)
y.append(y_)
X = np.vstack(X)
y = np.concatenate(y)
train = LabelledCollection(X,y, classes=classes)
train = LabelledCollection(X, y, classes = classes)
return train, test_gen
else:
return train_gen, test_gen

View File

@ -0,0 +1,261 @@
from abc import abstractmethod
import numpy as np
from sklearn.base import BaseEstimator
import quapy as qp
import quapy.functional as F
from quapy.data import LabelledCollection
from quapy.method.aggregative import BinaryAggregativeQuantifier
class ThresholdOptimization(BinaryAggregativeQuantifier):
"""
Abstract class of Threshold Optimization variants for :class:`ACC` as proposed by
`Forman 2006 <https://dl.acm.org/doi/abs/10.1145/1150402.1150423>`_ and
`Forman 2008 <https://link.springer.com/article/10.1007/s10618-008-0097-y>`_.
The goal is to bring improved stability to the denominator of the adjustment.
The different variants are based on different heuristics for choosing a decision threshold
that would allow for more true positives and many more false positives, on the grounds this
would deliver larger denominators.
:param classifier: a sklearn's Estimator that generates a classifier
:param val_split: indicates the proportion of data to be used as a stratified held-out validation set in which the
misclassification rates are to be estimated.
This parameter can be indicated as a real value (between 0 and 1), representing a proportion of
validation data, or as an integer, indicating that the misclassification rates should be estimated via
`k`-fold cross validation (this integer stands for the number of folds `k`, defaults 5), or as a
:class:`quapy.data.base.LabelledCollection` (the split itself).
"""
def __init__(self, classifier: BaseEstimator, val_split=5, n_jobs=None):
self.classifier = classifier
self.val_split = val_split
self.n_jobs = qp._get_njobs(n_jobs)
@abstractmethod
def condition(self, tpr, fpr) -> float:
"""
Implements the criterion according to which the threshold should be selected.
This function should return the (float) score to be minimized.
:param tpr: float, true positive rate
:param fpr: float, false positive rate
:return: float, a score for the given `tpr` and `fpr`
"""
...
def discard(self, tpr, fpr) -> bool:
"""
Indicates whether a combination of tpr and fpr should be discarded
:param tpr: float, true positive rate
:param fpr: float, false positive rate
:return: true if the combination is to be discarded, false otherwise
"""
return (tpr - fpr) == 0
def _eval_candidate_thresholds(self, decision_scores, y):
"""
Seeks for the best `tpr` and `fpr` according to the score obtained at different
decision thresholds. The scoring function is implemented in function `_condition`.
:param decision_scores: array-like with the classification scores
:param y: predicted labels for the validation set (or for the training set via `k`-fold cross validation)
:return: best `tpr` and `fpr` and `threshold` according to `_condition`
"""
candidate_thresholds = np.unique(decision_scores)
candidates = []
scores = []
for candidate_threshold in candidate_thresholds:
y_ = self.classes_[1 * (decision_scores >= candidate_threshold)]
TP, FP, FN, TN = self._compute_table(y, y_)
tpr = self._compute_tpr(TP, FN)
fpr = self._compute_fpr(FP, TN)
if not self.discard(tpr, fpr):
candidate_score = self.condition(tpr, fpr)
candidates.append([tpr, fpr, candidate_threshold])
scores.append(candidate_score)
if len(candidates) == 0:
# if no candidate gives rise to a valid combination of tpr and fpr, this method defaults to the standard
# classify & count; this is akin to assign tpr=1, fpr=0, threshold=0
tpr, fpr, threshold = 1, 0, 0
candidates.append([tpr, fpr, threshold])
scores.append(0)
candidates = np.asarray(candidates)
candidates = candidates[np.argsort(scores)] # sort candidates by candidate_score
return candidates
def aggregate_with_threshold(self, classif_predictions, tprs, fprs, thresholds):
# This function performs the adjusted count for given tpr, fpr, and threshold.
# Note that, due to broadcasting, tprs, fprs, and thresholds could be arrays of length > 1
prevs_estims = np.mean(classif_predictions[:, None] >= thresholds, axis=0)
prevs_estims = (prevs_estims - fprs) / (tprs - fprs)
prevs_estims = F.as_binary_prevalence(prevs_estims, clip_if_necessary=True)
return prevs_estims.squeeze()
def _compute_table(self, y, y_):
TP = np.logical_and(y == y_, y == self.pos_label).sum()
FP = np.logical_and(y != y_, y == self.neg_label).sum()
FN = np.logical_and(y != y_, y == self.pos_label).sum()
TN = np.logical_and(y == y_, y == self.neg_label).sum()
return TP, FP, FN, TN
def _compute_tpr(self, TP, FP):
if TP + FP == 0:
return 1
return TP / (TP + FP)
def _compute_fpr(self, FP, TN):
if FP + TN == 0:
return 0
return FP / (FP + TN)
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
decision_scores, y = classif_predictions.Xy
# the standard behavior is to keep the best threshold only
self.tpr, self.fpr, self.threshold = self._eval_candidate_thresholds(decision_scores, y)[0]
return self
def aggregate(self, classif_predictions: np.ndarray):
# the standard behavior is to compute the adjusted count using the best threshold found
return self.aggregate_with_threshold(classif_predictions, self.tpr, self.fpr, self.threshold)
class T50(ThresholdOptimization):
"""
Threshold Optimization variant for :class:`ACC` as proposed by
`Forman 2006 <https://dl.acm.org/doi/abs/10.1145/1150402.1150423>`_ and
`Forman 2008 <https://link.springer.com/article/10.1007/s10618-008-0097-y>`_ that looks
for the threshold that makes `tpr` closest to 0.5.
The goal is to bring improved stability to the denominator of the adjustment.
:param classifier: a sklearn's Estimator that generates a classifier
:param val_split: indicates the proportion of data to be used as a stratified held-out validation set in which the
misclassification rates are to be estimated.
This parameter can be indicated as a real value (between 0 and 1), representing a proportion of
validation data, or as an integer, indicating that the misclassification rates should be estimated via
`k`-fold cross validation (this integer stands for the number of folds `k`, defaults 5), or as a
:class:`quapy.data.base.LabelledCollection` (the split itself).
"""
def __init__(self, classifier: BaseEstimator, val_split=5):
super().__init__(classifier, val_split)
def condition(self, tpr, fpr) -> float:
return abs(tpr - 0.5)
class MAX(ThresholdOptimization):
"""
Threshold Optimization variant for :class:`ACC` as proposed by
`Forman 2006 <https://dl.acm.org/doi/abs/10.1145/1150402.1150423>`_ and
`Forman 2008 <https://link.springer.com/article/10.1007/s10618-008-0097-y>`_ that looks
for the threshold that maximizes `tpr-fpr`.
The goal is to bring improved stability to the denominator of the adjustment.
:param classifier: a sklearn's Estimator that generates a classifier
:param val_split: indicates the proportion of data to be used as a stratified held-out validation set in which the
misclassification rates are to be estimated.
This parameter can be indicated as a real value (between 0 and 1), representing a proportion of
validation data, or as an integer, indicating that the misclassification rates should be estimated via
`k`-fold cross validation (this integer stands for the number of folds `k`, defaults 5), or as a
:class:`quapy.data.base.LabelledCollection` (the split itself).
"""
def __init__(self, classifier: BaseEstimator, val_split=5):
super().__init__(classifier, val_split)
def condition(self, tpr, fpr) -> float:
# MAX strives to maximize (tpr - fpr), which is equivalent to minimize (fpr - tpr)
return (fpr - tpr)
class X(ThresholdOptimization):
"""
Threshold Optimization variant for :class:`ACC` as proposed by
`Forman 2006 <https://dl.acm.org/doi/abs/10.1145/1150402.1150423>`_ and
`Forman 2008 <https://link.springer.com/article/10.1007/s10618-008-0097-y>`_ that looks
for the threshold that yields `tpr=1-fpr`.
The goal is to bring improved stability to the denominator of the adjustment.
:param classifier: a sklearn's Estimator that generates a classifier
:param val_split: indicates the proportion of data to be used as a stratified held-out validation set in which the
misclassification rates are to be estimated.
This parameter can be indicated as a real value (between 0 and 1), representing a proportion of
validation data, or as an integer, indicating that the misclassification rates should be estimated via
`k`-fold cross validation (this integer stands for the number of folds `k`, defaults 5), or as a
:class:`quapy.data.base.LabelledCollection` (the split itself).
"""
def __init__(self, classifier: BaseEstimator, val_split=5):
super().__init__(classifier, val_split)
def condition(self, tpr, fpr) -> float:
return abs(1 - (tpr + fpr))
class MS(ThresholdOptimization):
"""
Median Sweep. Threshold Optimization variant for :class:`ACC` as proposed by
`Forman 2006 <https://dl.acm.org/doi/abs/10.1145/1150402.1150423>`_ and
`Forman 2008 <https://link.springer.com/article/10.1007/s10618-008-0097-y>`_ that generates
class prevalence estimates for all decision thresholds and returns the median of them all.
The goal is to bring improved stability to the denominator of the adjustment.
:param classifier: a sklearn's Estimator that generates a classifier
:param val_split: indicates the proportion of data to be used as a stratified held-out validation set in which the
misclassification rates are to be estimated.
This parameter can be indicated as a real value (between 0 and 1), representing a proportion of
validation data, or as an integer, indicating that the misclassification rates should be estimated via
`k`-fold cross validation (this integer stands for the number of folds `k`, defaults 5), or as a
:class:`quapy.data.base.LabelledCollection` (the split itself).
"""
def __init__(self, classifier: BaseEstimator, val_split=5):
super().__init__(classifier, val_split)
def condition(self, tpr, fpr) -> float:
return 1
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
decision_scores, y = classif_predictions.Xy
# keeps all candidates
tprs_fprs_thresholds = self._eval_candidate_thresholds(decision_scores, y)
self.tprs = tprs_fprs_thresholds[:, 0]
self.fprs = tprs_fprs_thresholds[:, 1]
self.thresholds = tprs_fprs_thresholds[:, 2]
return self
def aggregate(self, classif_predictions: np.ndarray):
prevalences = self.aggregate_with_threshold(classif_predictions, self.tprs, self.fprs, self.thresholds)
if prevalences.ndim==2:
prevalences = np.median(prevalences, axis=0)
return prevalences
class MS2(MS):
"""
Median Sweep 2. Threshold Optimization variant for :class:`ACC` as proposed by
`Forman 2006 <https://dl.acm.org/doi/abs/10.1145/1150402.1150423>`_ and
`Forman 2008 <https://link.springer.com/article/10.1007/s10618-008-0097-y>`_ that generates
class prevalence estimates for all decision thresholds and returns the median of for cases in
which `tpr-fpr>0.25`
The goal is to bring improved stability to the denominator of the adjustment.
:param classifier: a sklearn's Estimator that generates a classifier
:param val_split: indicates the proportion of data to be used as a stratified held-out validation set in which the
misclassification rates are to be estimated.
This parameter can be indicated as a real value (between 0 and 1), representing a proportion of
validation data, or as an integer, indicating that the misclassification rates should be estimated via
`k`-fold cross validation (this integer stands for the number of folds `k`, defaults 5), or as a
:class:`quapy.data.base.LabelledCollection` (the split itself).
"""
def __init__(self, classifier: BaseEstimator, val_split=5):
super().__init__(classifier, val_split)
def discard(self, tpr, fpr) -> bool:
return (tpr-fpr) <= 0.25