starting refactor...

This commit is contained in:
Alejandro Moreo Fernandez 2024-04-24 18:01:05 +02:00
parent 9274ea21aa
commit 3051c08184
3 changed files with 110 additions and 103 deletions

View File

@ -75,11 +75,12 @@ class AggregativeQuantifier(BaseQuantifier, ABC):
empty_class_names = data.classes_[empty_classes]
raise ValueError(f'classes {empty_class_names} have no training examples')
def fit(self, data: LabelledCollection, fit_classifier=True, val_split=None):
def fit(self, X, y, fit_classifier=True, val_split=None):
"""
Trains the aggregative quantifier. This comes down to training a classifier and an aggregation function.
:param data: a :class:`quapy.data.base.LabelledCollection` consisting of the training data
:param X: `array-like` of shape `(n_samples, n_features)` consisting of the training covariates
:param y: `array-like` of shape `(n_samples,)` consisting of the instances labels
:param fit_classifier: whether to train the learner (default is True). Set to False if the
learner has been trained outside the quantifier.
:param val_split: specifies the data used for generating classifier predictions. This specification
@ -92,16 +93,17 @@ class AggregativeQuantifier(BaseQuantifier, ABC):
:return: self
"""
self._check_init_parameters()
classif_predictions = self.classifier_fit_predict(data, fit_classifier, predict_on=val_split)
self.aggregation_fit(classif_predictions, data)
P, y = self.classifier_fit_predict(X, y, fit_classifier, predict_on=val_split)
self.aggregation_fit(P, y)
return self
def classifier_fit_predict(self, data: LabelledCollection, fit_classifier=True, predict_on=None):
def classifier_fit_predict(self, X, y, fit_classifier=True, predict_on=None):
"""
Trains the classifier if requested (`fit_classifier=True`) and generate the necessary predictions to
train the aggregation function.
:param data: a :class:`quapy.data.base.LabelledCollection` consisting of the training data
:param X: `array-like` of shape `(n_samples, n_features)` consisting of the training covariates
:param y: `array-like` of shape `(n_samples,)` consisting of the instances labels
:param fit_classifier: whether to train the learner (default is True). Set to False if the
learner has been trained outside the quantifier.
:param predict_on: specifies the set on which predictions need to be issued. This parameter can
@ -113,10 +115,11 @@ class AggregativeQuantifier(BaseQuantifier, ABC):
"""
assert isinstance(fit_classifier, bool), 'unexpected type for "fit_classifier", must be boolean'
data = LabelledCollection(X, y)
self._check_classifier(adapt_if_necessary=(self._classifier_method() == 'predict_proba'))
if fit_classifier:
self._check_non_empty_classes(data)
self._check_non_empty_classes(y)
if predict_on is None:
if not fit_classifier:
@ -170,16 +173,16 @@ class AggregativeQuantifier(BaseQuantifier, ABC):
f'use either a float indicating the split proportion, or a '
f'tuple (X,y) indicating the validation partition')
return predictions
return predictions.Xy
@abstractmethod
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
def aggregation_fit(self, classif_predictions: np.ndarray, y: np.ndarray):
"""
Trains the aggregation function.
:param classif_predictions: a :class:`quapy.data.base.LabelledCollection` containing,
as instances, the predictions issued by the classifier and, as labels, the true labels
:param data: a :class:`quapy.data.base.LabelledCollection` consisting of the training data
:param classif_predictions: `array-like` of shape `(n_samples, n_classes)` consisting of the classifier
predictions for each class
:param y: `array-like` of shape `(n_samples,)` consisting of the instances labels
"""
...
@ -201,16 +204,16 @@ class AggregativeQuantifier(BaseQuantifier, ABC):
"""
self.classifier_ = classifier
def classify(self, instances):
def classify(self, X):
"""
Provides the label predictions for the given instances. The predictions should respect the format expected by
:meth:`aggregate`, e.g., posterior probabilities for probabilistic quantifiers, or crisp predictions for
non-probabilistic quantifiers. The default one is "decision_function".
:param instances: array-like of shape `(n_instances, n_features,)`
:param X: array-like of shape `(n_instances, n_features,)`
:return: np.ndarray of shape `(n_instances,)` with label predictions
"""
return getattr(self.classifier, self._classifier_method())(instances)
return getattr(self.classifier, self._classifier_method())(X)
def _classifier_method(self):
"""
@ -230,15 +233,15 @@ class AggregativeQuantifier(BaseQuantifier, ABC):
assert hasattr(self.classifier, self._classifier_method()), \
f"the method does not implement the required {self._classifier_method()} method"
def quantify(self, instances):
def quantify(self, X):
"""
Generate class prevalence estimates for the sample's instances by aggregating the label predictions generated
by the classifier.
:param instances: array-like
:param X: array-like
:return: `np.ndarray` of shape `(n_classes)` with class prevalence estimates.
"""
classif_predictions = self.classify(instances)
classif_predictions = self.classify(X)
return self.aggregate(classif_predictions)
@abstractmethod
@ -328,9 +331,9 @@ class BinaryAggregativeQuantifier(AggregativeQuantifier, BinaryQuantifier):
def neg_label(self):
return self.classifier.classes_[0]
def fit(self, data: LabelledCollection, fit_classifier=True, val_split=None):
self._check_binary(data, self.__class__.__name__)
return super().fit(data, fit_classifier, val_split)
def fit(self, X, y, fit_classifier=True, val_split=None):
self._check_binary(y, self.__class__.__name__)
return super().fit(X, y, fit_classifier, val_split)
# Methods
@ -346,12 +349,12 @@ class CC(AggregativeCrispQuantifier):
def __init__(self, classifier: BaseEstimator):
self.classifier = classifier
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
def aggregation_fit(self, classif_predictions: np.ndarray, y: np.ndarray):
"""
Nothing to do here!
:param classif_predictions: not used
:param data: not used
:param y: not used
"""
pass
@ -376,12 +379,12 @@ class PCC(AggregativeSoftQuantifier):
def __init__(self, classifier: BaseEstimator):
self.classifier = classifier
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
def aggregation_fit(self, classif_posteriors: np.ndarray, y: np.ndarray):
"""
Nothing to do here!
:param classif_predictions: not used
:param data: not used
:param classif_posteriors: not used
:param y: not used
"""
pass
@ -482,17 +485,16 @@ class ACC(AggregativeCrispQuantifier):
if self.norm not in ACC.NORMALIZATIONS:
raise ValueError(f"unknown normalization; valid ones are {ACC.NORMALIZATIONS}")
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
def aggregation_fit(self, classif_predictions: np.ndarray, y: np.ndarray):
"""
Estimates the misclassification rates.
:param classif_predictions: a :class:`quapy.data.base.LabelledCollection` containing,
as instances, the label predictions issued by the classifier and, as labels, the true labels
:param data: a :class:`quapy.data.base.LabelledCollection` consisting of the training data
:param classif_predictions: `array-like` of shape `(n_samples, n_classes)`
consisting of the posterior probabilities of the training examples
:param y: `array-like` of shape `(n_samples,)` consisting of the instances labels
"""
pred_labels, true_labels = classif_predictions.Xy
self.cc = CC(self.classifier)
self.Pte_cond_estim_ = ACC.getPteCondEstim(self.classifier.classes_, true_labels, pred_labels)
self.Pte_cond_estim_ = ACC.getPteCondEstim(self.classifier.classes_, y, classif_predictions)
@classmethod
def getPteCondEstim(cls, classes, y, y_):
@ -593,17 +595,15 @@ class PACC(AggregativeSoftQuantifier):
if self.norm not in ACC.NORMALIZATIONS:
raise ValueError(f"unknown normalization; valid ones are {ACC.NORMALIZATIONS}")
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
def aggregation_fit(self, classif_predictions: np.ndarray, y: np.ndarray):
"""
Estimates the misclassification rates
:param classif_predictions: a :class:`quapy.data.base.LabelledCollection` containing,
as instances, the posterior probabilities issued by the classifier and, as labels, the true labels
:param data: a :class:`quapy.data.base.LabelledCollection` consisting of the training data
"""
posteriors, true_labels = classif_predictions.Xy
:param classif_predictions: `array-like` of shape `(n_samples, n_classes)`
consisting of the posterior probabilities of the training examples
:param y: `array-like` of shape `(n_samples,)` consisting of the instances labels """
self.pcc = PCC(self.classifier)
self.Pte_cond_estim_ = PACC.getPteCondEstim(self.classifier.classes_, true_labels, posteriors)
self.Pte_cond_estim_ = PACC.getPteCondEstim(self.classifier.classes_, y, classif_predictions)
def aggregate(self, classif_posteriors):
prevs_estim = self.pcc.aggregate(classif_posteriors)

View File

@ -19,21 +19,22 @@ class BaseQuantifier(BaseEstimator):
"""
@abstractmethod
def fit(self, data: LabelledCollection):
def fit(self, X, y):
"""
Trains a quantifier.
:param data: a :class:`quapy.data.base.LabelledCollection` consisting of the training data
:param X: `array-like` of shape `(n_samples, n_features)` consisting of the training covariates
:param y: `array-like` of shape `(n_samples,)` consisting of the instances labels
:return: self
"""
...
@abstractmethod
def quantify(self, instances):
def quantify(self, X):
"""
Generate class prevalence estimates for the sample's instances
:param instances: array-like
:param X: `array-like` of shape `(n_samples, n_features)` consisting of the test covariates
:return: `np.ndarray` of shape `(n_classes,)` with class prevalence estimates.
"""
...
@ -45,8 +46,9 @@ class BinaryQuantifier(BaseQuantifier):
(typically, to be interpreted as one class and its complement).
"""
def _check_binary(self, data: LabelledCollection, quantifier_name):
assert data.binary, f'{quantifier_name} works only on problems of binary classification. ' \
def _check_binary(self, y, quantifier_name):
n_classes = len(np.unique(y))
assert n_classes==2, f'{quantifier_name} works only on problems of binary classification. ' \
f'Use the class OneVsAll to enable {quantifier_name} work on single-label data.'
@ -78,7 +80,8 @@ class OneVsAllGeneric(OneVsAll, BaseQuantifier):
self.binary_quantifier = binary_quantifier
self.n_jobs = qp._get_njobs(n_jobs)
def fit(self, data: LabelledCollection, fit_classifier=True):
def fit(self, X, y, fit_classifier=True):
data = LabelledCollection(X, y)
assert not data.binary, f'{self.__class__.__name__} expect non-binary data'
assert fit_classifier == True, 'fit_classifier must be True'
@ -93,8 +96,8 @@ class OneVsAllGeneric(OneVsAll, BaseQuantifier):
)
)
def quantify(self, instances):
prevalences = self._parallel(self._delayed_binary_predict, instances)
def quantify(self, X):
prevalences = self._parallel(self._delayed_binary_predict, X)
return qp.functional.normalize_prevalence(prevalences)
@property

View File

@ -8,7 +8,7 @@ from quapy.method.base import BaseQuantifier, BinaryQuantifier
import quapy.functional as F
class MaximumLikelihoodPrevalenceEstimation(BaseQuantifier):
class MLPE(BaseQuantifier):
"""
The `Maximum Likelihood Prevalence Estimation` (MLPE) method is a lazy method that assumes there is no prior
probability shift between training and test instances (put it other way, that the i.i.d. assumpion holds).
@ -20,13 +20,15 @@ class MaximumLikelihoodPrevalenceEstimation(BaseQuantifier):
def __init__(self):
self._classes_ = None
def fit(self, data: LabelledCollection):
def fit(self, X, y):
"""
Computes the training prevalence and stores it.
:param data: the training sample
:param X: `array-like` of shape `(n_samples, n_features)` consisting of the training covariates
:param y: `array-like` of shape `(n_samples,)` consisting of the instances labels
:return: self
"""
data = LabelledCollection(X, y)
self.estimated_prevalence = data.prevalence()
return self
@ -100,7 +102,7 @@ class DMx(BaseQuantifier):
return distributions
def fit(self, data: LabelledCollection):
def fit(self, X, y):
"""
Generates the validation distributions out of the training data (covariates).
The validation distributions have shape `(n, nfeats, nbins)`, with `n` the number of classes, `nfeats`
@ -109,15 +111,16 @@ class DMx(BaseQuantifier):
training data labelled with class `i`; while `dij = di[j]` is the discrete distribution for feature j in
training data labelled with class `i`, and `dij[k]` is the fraction of instances with a value in the `k`-th bin.
:param data: the training set
:param X: `array-like` of shape `(n_samples, n_features)` consisting of the training covariates
:param y: `array-like` of shape `(n_samples,)` consisting of the instances labels
:return: self
"""
X, y = data.Xy
data = LabelledCollection(X, y)
self.nfeats = X.shape[1]
self.feat_ranges = _get_features_range(X)
self.validation_distribution = np.asarray(
[self.__get_distributions(X[y==cat]) for cat in range(data.n_classes)]
[self.__get_distributions(X[y==cat]) for cat in range(data.classes_)]
)
return self
@ -147,53 +150,53 @@ class DMx(BaseQuantifier):
return F.argmin_prevalence(loss, n_classes, method=self.search)
class ReadMe(BaseQuantifier):
def __init__(self, bootstrap_trials=100, bootstrap_range=100, bagging_trials=100, bagging_range=25, **vectorizer_kwargs):
raise NotImplementedError('under development ...')
self.bootstrap_trials = bootstrap_trials
self.bootstrap_range = bootstrap_range
self.bagging_trials = bagging_trials
self.bagging_range = bagging_range
self.vectorizer_kwargs = vectorizer_kwargs
def fit(self, data: LabelledCollection):
X, y = data.Xy
self.vectorizer = CountVectorizer(binary=True, **self.vectorizer_kwargs)
X = self.vectorizer.fit_transform(X)
self.class_conditional_X = {i: X[y==i] for i in range(data.classes_)}
def quantify(self, instances):
X = self.vectorizer.transform(instances)
# number of features
num_docs, num_feats = X.shape
# bootstrap
p_boots = []
for _ in range(self.bootstrap_trials):
docs_idx = np.random.choice(num_docs, size=self.bootstra_range, replace=False)
class_conditional_X = {i: X[docs_idx] for i, X in self.class_conditional_X.items()}
Xboot = X[docs_idx]
# bagging
p_bags = []
for _ in range(self.bagging_trials):
feat_idx = np.random.choice(num_feats, size=self.bagging_range, replace=False)
class_conditional_Xbag = {i: X[:, feat_idx] for i, X in class_conditional_X.items()}
Xbag = Xboot[:,feat_idx]
p = self.std_constrained_linear_ls(Xbag, class_conditional_Xbag)
p_bags.append(p)
p_boots.append(np.mean(p_bags, axis=0))
p_mean = np.mean(p_boots, axis=0)
p_std = np.std(p_bags, axis=0)
return p_mean
def std_constrained_linear_ls(self, X, class_cond_X: dict):
pass
# class ReadMe(BaseQuantifier):
#
# def __init__(self, bootstrap_trials=100, bootstrap_range=100, bagging_trials=100, bagging_range=25, **vectorizer_kwargs):
# raise NotImplementedError('under development ...')
# self.bootstrap_trials = bootstrap_trials
# self.bootstrap_range = bootstrap_range
# self.bagging_trials = bagging_trials
# self.bagging_range = bagging_range
# self.vectorizer_kwargs = vectorizer_kwargs
#
# def fit(self, data: LabelledCollection):
# X, y = data.Xy
# self.vectorizer = CountVectorizer(binary=True, **self.vectorizer_kwargs)
# X = self.vectorizer.fit_transform(X)
# self.class_conditional_X = {i: X[y==i] for i in range(data.classes_)}
#
# def quantify(self, instances):
# X = self.vectorizer.transform(instances)
#
# # number of features
# num_docs, num_feats = X.shape
#
# # bootstrap
# p_boots = []
# for _ in range(self.bootstrap_trials):
# docs_idx = np.random.choice(num_docs, size=self.bootstra_range, replace=False)
# class_conditional_X = {i: X[docs_idx] for i, X in self.class_conditional_X.items()}
# Xboot = X[docs_idx]
#
# # bagging
# p_bags = []
# for _ in range(self.bagging_trials):
# feat_idx = np.random.choice(num_feats, size=self.bagging_range, replace=False)
# class_conditional_Xbag = {i: X[:, feat_idx] for i, X in class_conditional_X.items()}
# Xbag = Xboot[:,feat_idx]
# p = self.std_constrained_linear_ls(Xbag, class_conditional_Xbag)
# p_bags.append(p)
# p_boots.append(np.mean(p_bags, axis=0))
#
# p_mean = np.mean(p_boots, axis=0)
# p_std = np.std(p_bags, axis=0)
#
# return p_mean
#
#
# def std_constrained_linear_ls(self, X, class_cond_X: dict):
# pass
def _get_features_range(X):
@ -209,4 +212,5 @@ def _get_features_range(X):
# aliases
#---------------------------------------------------------------
MaximumLikelihoodPrevalenceEstimation = MLPE
DistributionMatchingX = DMx