Compare commits

...

10 Commits

18 changed files with 365 additions and 350 deletions

33
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,33 @@
name: CI
on:
pull_request:
push:
branches:
- main
- devel
jobs:
# take out unit tests
test:
name: Unit tests (Python ${{ matrix.python-version }})
runs-on: ubuntu-latest
strategy:
matrix:
python-version:
- "3.11"
env:
QUAPY_TESTS_OMIT_LARGE_DATASETS: True
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip setuptools wheel
python -m pip install -e .[bayes,tests]
- name: Test with unittest
run: python -m unittest

View File

@ -1,10 +1,17 @@
Change Log 0.1.9
----------------
- Added Continuous Integration with GitHub Actions (thanks to Mirko Bunse!)
- Added Bayesian CC method (thanks to Pawel Czyz!). The method is described in detail in the paper
Ziegler, Albert, and Paweł Czyż. "Bayesian Quantification with Black-Box Estimators."
arXiv preprint arXiv:2302.09159 (2023).
- Removed binary UCI datasets {acute.a, acute.b, balance.2} from the list qp.data.datasets.UCI_BINARY_DATASETS
(the datasets are still loadable from the fetch_UCIBinaryLabelledCollection and fetch_UCIBinaryDataset
functions, though). The reason is that these datasets tend to yield results (for all methods) that are
one or two orders of magnitude greater than for other datasets, and this has a disproportionate impact in
methods average (I suspect there is something wrong in those datasets).
Change Log 0.1.8
----------------

View File

@ -12,12 +12,11 @@ from time import time
In this example, we show how to perform model selection on a DistributionMatching quantifier.
"""
model = KDEyML(LogisticRegression())
model = DMy(LogisticRegression())
qp.environ['SAMPLE_SIZE'] = 100
qp.environ['N_JOBS'] = -1
# training, test = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=5).train_test
training, test = qp.datasets.fetch_UCIMulticlassDataset('letter').train_test
with qp.util.temp_seed(0):
@ -34,19 +33,21 @@ with qp.util.temp_seed(0):
# We will explore a classification-dependent hyper-parameter (e.g., the 'C'
# hyper-parameter of LogisticRegression) and a quantification-dependent hyper-parameter
# (e.g., the number of bins in a DistributionMatching quantifier.
# (e.g., the number of bins in a DistributionMatching quantifier).
# Classifier-dependent hyper-parameters have to be marked with a prefix "classifier__"
# in order to let the quantifier know this hyper-parameter belongs to its underlying
# classifier.
# We consider 7 values for the classifier and 7 values for the quantifier.
# QuaPy is optimized so that only 7 classifiers are trained, and then reused to test the
# different configurations of the quantifier. In other words, quapy avoids to train
# the classifier 7x7 times.
param_grid = {
'classifier__C': np.logspace(-3,3,7),
'classifier__class_weight': ['balanced', None],
'bandwidth': np.linspace(0.01, 0.2, 20),
'nbins': [2, 3, 4, 5, 10, 15, 20]
}
tinit = time()
# model = OLD_GridSearchQ(
model = qp.model_selection.GridSearchQ(
model=model,
param_grid=param_grid,

View File

@ -123,7 +123,7 @@ class LabelledCollection:
if len(prevs) == self.n_classes - 1:
prevs = prevs + (1 - sum(prevs),)
assert len(prevs) == self.n_classes, 'unexpected number of prevalences'
assert sum(prevs) == 1, f'prevalences ({prevs}) wrong range (sum={sum(prevs)})'
assert np.isclose(sum(prevs), 1), f'prevalences ({prevs}) wrong range (sum={sum(prevs)})'
# Decide how many instances should be taken for each class in order to satisfy the requested prevalence
# accurately, and the number of instances in the sample (exactly). If int(size * prevs[i]) (which is
@ -549,7 +549,7 @@ class Dataset:
yield Dataset(train, test, name=f'fold {(i % nfolds) + 1}/{nfolds} (round={(i // nfolds) + 1})')
def reduce(self, n_train=100, n_test=100):
def reduce(self, n_train=100, n_test=100, random_state=None):
"""
Reduce the number of instances in place for quick experiments. Preserves the prevalence of each set.
@ -557,6 +557,14 @@ class Dataset:
:param n_test: number of test documents to keep (default 100)
:return: self
"""
self.training = self.training.sampling(n_train, *self.training.prevalence())
self.test = self.test.sampling(n_test, *self.test.prevalence())
self.training = self.training.sampling(
n_train,
*self.training.prevalence(),
random_state = random_state
)
self.test = self.test.sampling(
n_test,
*self.test.prevalence(),
random_state = random_state
)
return self

View File

@ -20,8 +20,11 @@ TWITTER_SENTIMENT_DATASETS_TEST = ['gasp', 'hcr', 'omd', 'sanders',
TWITTER_SENTIMENT_DATASETS_TRAIN = ['gasp', 'hcr', 'omd', 'sanders',
'semeval', 'semeval16',
'sst', 'wa', 'wb']
UCI_BINARY_DATASETS = ['acute.a', 'acute.b',
'balance.1', 'balance.2', 'balance.3',
UCI_BINARY_DATASETS = [
#'acute.a', 'acute.b',
'balance.1',
#'balance.2',
'balance.3',
'breast-cancer',
'cmc.1', 'cmc.2', 'cmc.3',
'ctg.1', 'ctg.2', 'ctg.3',
@ -50,7 +53,9 @@ UCI_MULTICLASS_DATASETS = ['dry-bean',
'digits',
'letter']
LEQUA2022_TASKS = ['T1A', 'T1B', 'T2A', 'T2B']
LEQUA2022_VECTOR_TASKS = ['T1A', 'T1B']
LEQUA2022_TEXT_TASKS = ['T2A', 'T2B']
LEQUA2022_TASKS = LEQUA2022_VECTOR_TASKS + LEQUA2022_TEXT_TASKS
_TXA_SAMPLE_SIZE = 250
_TXB_SAMPLE_SIZE = 1000
@ -209,7 +214,7 @@ def fetch_UCIBinaryDataset(dataset_name, data_home=None, test_split=0.3, verbose
:return: a :class:`quapy.data.base.Dataset` instance
"""
data = fetch_UCIBinaryLabelledCollection(dataset_name, data_home, verbose)
return Dataset(*data.split_stratified(1 - test_split, random_state=0))
return Dataset(*data.split_stratified(1 - test_split, random_state=0), name=dataset_name)
def fetch_UCIBinaryLabelledCollection(dataset_name, data_home=None, verbose=False) -> LabelledCollection:
@ -583,7 +588,7 @@ def fetch_UCIMulticlassDataset(dataset_name, data_home=None, test_split=0.3, ver
:return: a :class:`quapy.data.base.Dataset` instance
"""
data = fetch_UCIMulticlassLabelledCollection(dataset_name, data_home, verbose)
return Dataset(*data.split_stratified(1 - test_split, random_state=0))
return Dataset(*data.split_stratified(1 - test_split, random_state=0), name=dataset_name)
def fetch_UCIMulticlassLabelledCollection(dataset_name, data_home=None, verbose=False) -> LabelledCollection:

View File

@ -189,6 +189,19 @@ def check_prevalence_vector(prevalences: ArrayLike, raise_exception: bool=False,
return valid
def uniform_prevalence(n_classes):
"""
Returns a vector representing the uniform distribution for `n_classes`
:param n_classes: number of classes
:return: np.ndarray with all values 1/n_classes
"""
assert isinstance(n_classes, int) and n_classes>0, \
(f'param {n_classes} not understood; must be a positive integer representing the '
f'number of classes ')
return np.full(shape=n_classes, fill_value=1./n_classes)
def normalize_prevalence(prevalences: ArrayLike, method='l1'):
"""
Normalizes a vector or matrix of prevalence values. The normalization consists of applying a L1 normalization in
@ -606,3 +619,5 @@ def solve_adjustment(
raise ValueError(f"Solver {solver} not known.")
else:
raise ValueError(f'unknown {solver=}')

View File

@ -3,6 +3,7 @@ from . import aggregative
from . import non_aggregative
from . import meta
AGGREGATIVE_METHODS = {
aggregative.CC,
aggregative.ACC,

View File

@ -27,7 +27,7 @@ class ThresholdOptimization(BinaryAggregativeQuantifier):
:class:`quapy.data.base.LabelledCollection` (the split itself).
"""
def __init__(self, classifier: BaseEstimator, val_split=5, n_jobs=None):
def __init__(self, classifier: BaseEstimator, val_split=None, n_jobs=None):
self.classifier = classifier
self.val_split = val_split
self.n_jobs = qp._get_njobs(n_jobs)

View File

@ -82,6 +82,13 @@ class AggregativeQuantifier(BaseQuantifier, ABC):
:param data: a :class:`quapy.data.base.LabelledCollection` consisting of the training data
:param fit_classifier: whether to train the learner (default is True). Set to False if the
learner has been trained outside the quantifier.
:param val_split: specifies the data used for generating classifier predictions. This specification
can be made as float in (0, 1) indicating the proportion of stratified held-out validation set to
be extracted from the training set; or as an integer (default 5), indicating that the predictions
are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value
for `k`); or as a collection defining the specific set of data to use for validation.
Alternatively, this set can be specified at fit time by indicating the exact set of data
on which the predictions are to be generated.
:return: self
"""
self._check_init_parameters()
@ -111,6 +118,12 @@ class AggregativeQuantifier(BaseQuantifier, ABC):
if fit_classifier:
self._check_non_empty_classes(data)
if predict_on is None:
if not fit_classifier:
predict_on = data
if isinstance(self.val_split, LabelledCollection) and self.val_split!=predict_on:
raise ValueError(f'{fit_classifier=} but a LabelledCollection was provided as val_split '
f'in __init__ that is not the same as the LabelledCollection provided in fit.')
if predict_on is None:
predict_on = self.val_split
@ -467,7 +480,7 @@ class ACC(AggregativeCrispQuantifier):
if self.method not in ACC.METHODS:
raise ValueError(f"unknown method; valid ones are {ACC.METHODS}")
if self.norm not in ACC.NORMALIZATIONS:
raise ValueError(f"unknown clipping; valid ones are {ACC.NORMALIZATIONS}")
raise ValueError(f"unknown normalization; valid ones are {ACC.NORMALIZATIONS}")
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
"""
@ -577,8 +590,8 @@ class PACC(AggregativeSoftQuantifier):
raise ValueError(f"unknown solver; valid ones are {ACC.SOLVERS}")
if self.method not in ACC.METHODS:
raise ValueError(f"unknown method; valid ones are {ACC.METHODS}")
if self.clipping not in ACC.NORMALIZATIONS:
raise ValueError(f"unknown clipping; valid ones are {ACC.NORMALIZATIONS}")
if self.norm not in ACC.NORMALIZATIONS:
raise ValueError(f"unknown normalization; valid ones are {ACC.NORMALIZATIONS}")
def aggregation_fit(self, classif_predictions: LabelledCollection, data: LabelledCollection):
"""

View File

@ -54,7 +54,7 @@ class OneVsAll:
pass
def newOneVsAll(binary_quantifier, n_jobs=None):
def newOneVsAll(binary_quantifier: BaseQuantifier, n_jobs=None):
assert isinstance(binary_quantifier, BaseQuantifier), \
f'{binary_quantifier} does not seem to be a Quantifier'
if isinstance(binary_quantifier, qp.method.aggregative.AggregativeQuantifier):
@ -69,7 +69,7 @@ class OneVsAllGeneric(OneVsAll, BaseQuantifier):
quantifier for each class, and then l1-normalizes the outputs so that the class prevelence values sum up to 1.
"""
def __init__(self, binary_quantifier, n_jobs=None):
def __init__(self, binary_quantifier: BaseQuantifier, n_jobs=None):
assert isinstance(binary_quantifier, BaseQuantifier), \
f'{binary_quantifier} does not seem to be a Quantifier'
if isinstance(binary_quantifier, qp.method.aggregative.AggregativeQuantifier):

View File

@ -1,5 +1,11 @@
import pytest
import unittest
def test_import():
import quapy as qp
assert qp.__version__ is not None
class ImportTest(unittest.TestCase):
def test_import(self):
import quapy as qp
self.assertIsNotNone(qp.__version__)
if __name__ == '__main__':
unittest.main()

View File

@ -1,61 +1,127 @@
import pytest
import os
import unittest
from quapy.data.datasets import REVIEWS_SENTIMENT_DATASETS, TWITTER_SENTIMENT_DATASETS_TEST, \
TWITTER_SENTIMENT_DATASETS_TRAIN, UCI_BINARY_DATASETS, LEQUA2022_TASKS, UCI_MULTICLASS_DATASETS,\
fetch_reviews, fetch_twitter, fetch_UCIBinaryDataset, fetch_lequa2022, fetch_UCIMulticlassLabelledCollection
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
import quapy.functional as F
from quapy.method.aggregative import PCC
from quapy.data.datasets import *
@pytest.mark.parametrize('dataset_name', REVIEWS_SENTIMENT_DATASETS)
def test_fetch_reviews(dataset_name):
dataset = fetch_reviews(dataset_name)
print(f'Dataset {dataset_name}')
print('Training set stats')
dataset.training.stats()
print('Test set stats')
dataset.test.stats()
class TestDatasets(unittest.TestCase):
def new_quantifier(self):
return PCC(LogisticRegression(C=0.001, max_iter=100))
@pytest.mark.parametrize('dataset_name', TWITTER_SENTIMENT_DATASETS_TEST + TWITTER_SENTIMENT_DATASETS_TRAIN)
def test_fetch_twitter(dataset_name):
try:
dataset = fetch_twitter(dataset_name)
except ValueError as ve:
if dataset_name == 'semeval' and ve.args[0].startswith(
'dataset "semeval" can only be used for model selection.'):
dataset = fetch_twitter(dataset_name, for_model_selection=True)
print(f'Dataset {dataset_name}')
print('Training set stats')
dataset.training.stats()
print('Test set stats')
def _check_dataset(self, dataset):
q = self.new_quantifier()
print(f'testing method {q} in {dataset.name}...', end='')
q.fit(dataset.training)
estim_prevalences = q.quantify(dataset.test.instances)
self.assertTrue(F.check_prevalence_vector(estim_prevalences))
print(f'[done]')
def _check_samples(self, gen, q, max_samples_test=5, vectorizer=None):
for X, p in gen():
if vectorizer is not None:
X = vectorizer.transform(X)
estim_prevalences = q.quantify(X)
self.assertTrue(F.check_prevalence_vector(estim_prevalences))
max_samples_test -= 1
if max_samples_test == 0:
break
@pytest.mark.parametrize('dataset_name', UCI_BINARY_DATASETS)
def test_fetch_UCIDataset(dataset_name):
try:
dataset = fetch_UCIBinaryDataset(dataset_name)
except FileNotFoundError as fnfe:
if dataset_name == 'pageblocks.5' and fnfe.args[0].find(
'If this is the first time you attempt to load this dataset') > 0:
print('The pageblocks.5 dataset requires some hand processing to be usable, skipping this test.')
def test_reviews(self):
for dataset_name in REVIEWS_SENTIMENT_DATASETS:
print(f'loading dataset {dataset_name}...', end='')
dataset = fetch_reviews(dataset_name, tfidf=True, min_df=10)
dataset.stats()
dataset.reduce()
print(f'[done]')
self._check_dataset(dataset)
def test_twitter(self):
for dataset_name in TWITTER_SENTIMENT_DATASETS_TEST:
print(f'loading dataset {dataset_name}...', end='')
dataset = fetch_twitter(dataset_name, min_df=10)
dataset.stats()
dataset.reduce()
print(f'[done]')
self._check_dataset(dataset)
def test_UCIBinaryDataset(self):
for dataset_name in UCI_BINARY_DATASETS:
try:
print(f'loading dataset {dataset_name}...', end='')
dataset = fetch_UCIBinaryDataset(dataset_name)
dataset.stats()
dataset.reduce()
print(f'[done]')
self._check_dataset(dataset)
except FileNotFoundError as fnfe:
if dataset_name == 'pageblocks.5' and fnfe.args[0].find(
'If this is the first time you attempt to load this dataset') > 0:
print('The pageblocks.5 dataset requires some hand processing to be usable; skipping this test.')
continue
def test_UCIMultiDataset(self):
for dataset_name in UCI_MULTICLASS_DATASETS:
print(f'loading dataset {dataset_name}...', end='')
dataset = fetch_UCIMulticlassDataset(dataset_name)
dataset.stats()
n_classes = dataset.n_classes
uniform_prev = F.uniform_prevalence(n_classes)
dataset.training = dataset.training.sampling(100, *uniform_prev)
dataset.test = dataset.test.sampling(100, *uniform_prev)
print(f'[done]')
self._check_dataset(dataset)
def test_lequa2022(self):
if os.environ.get('QUAPY_TESTS_OMIT_LARGE_DATASETS'):
print("omitting test_lequa2022 because QUAPY_TESTS_OMIT_LARGE_DATASETS is set")
return
print(f'Dataset {dataset_name}')
print('Training set stats')
dataset.training.stats()
print('Test set stats')
for dataset_name in LEQUA2022_VECTOR_TASKS:
print(f'loading dataset {dataset_name}...', end='')
train, gen_val, gen_test = fetch_lequa2022(dataset_name)
train.stats()
n_classes = train.n_classes
train = train.sampling(100, *F.uniform_prevalence(n_classes))
q = self.new_quantifier()
q.fit(train)
self._check_samples(gen_val, q, max_samples_test=5)
self._check_samples(gen_test, q, max_samples_test=5)
for dataset_name in LEQUA2022_TEXT_TASKS:
print(f'loading dataset {dataset_name}...', end='')
train, gen_val, gen_test = fetch_lequa2022(dataset_name)
train.stats()
n_classes = train.n_classes
train = train.sampling(100, *F.uniform_prevalence(n_classes))
tfidf = TfidfVectorizer()
train.instances = tfidf.fit_transform(train.instances)
q = self.new_quantifier()
q.fit(train)
self._check_samples(gen_val, q, max_samples_test=5, vectorizer=tfidf)
self._check_samples(gen_test, q, max_samples_test=5, vectorizer=tfidf)
@pytest.mark.parametrize('dataset_name', UCI_MULTICLASS_DATASETS)
def test_fetch_UCIMultiDataset(dataset_name):
dataset = fetch_UCIMulticlassLabelledCollection(dataset_name)
print(f'Dataset {dataset_name}')
print('Training set stats')
dataset.stats()
print('Test set stats')
def test_IFCB(self):
if os.environ.get('QUAPY_TESTS_OMIT_LARGE_DATASETS'):
print("omitting test_IFCB because QUAPY_TESTS_OMIT_LARGE_DATASETS is set")
return
print(f'loading dataset IFCB.')
for mod_sel in [False, True]:
train, gen = fetch_IFCB(single_sample_train=True, for_model_selection=mod_sel)
train.stats()
n_classes = train.n_classes
train = train.sampling(100, *F.uniform_prevalence(n_classes))
q = self.new_quantifier()
q.fit(train)
self._check_samples(gen, q, max_samples_test=5)
@pytest.mark.parametrize('dataset_name', LEQUA2022_TASKS)
def test_fetch_lequa2022(dataset_name):
train, gen_val, gen_test = fetch_lequa2022(dataset_name)
print(train.stats())
print('Val:', gen_val.total())
print('Test:', gen_test.total())
if __name__ == '__main__':
unittest.main()

View File

@ -15,9 +15,9 @@ class HierarchyTestCase(unittest.TestCase):
def test_inspect_aggregative(self):
import quapy.method.aggregative as aggregative
import quapy.method.aggregative as methods
members = inspect.getmembers(aggregative)
members = inspect.getmembers(methods)
classes = set([cls for name, cls in members if inspect.isclass(cls)])
quantifiers = [cls for cls in classes if issubclass(cls, BaseQuantifier)]
quantifiers = [cls for cls in quantifiers if issubclass(cls, AggregativeQuantifier)]
@ -31,25 +31,6 @@ class HierarchyTestCase(unittest.TestCase):
for m in BINARY_METHODS:
self.assertEqual(isinstance(m(lr), BinaryQuantifier), True)
def test_inspect_binary(self):
import quapy.method.base as base
import quapy.method.aggregative as aggregative
import quapy.method.non_aggregative as non_aggregative
import quapy.method.meta as meta
members = inspect.getmembers(base)
members+= inspect.getmembers(aggregative)
members += inspect.getmembers(non_aggregative)
members += inspect.getmembers(meta)
classes = set([cls for name, cls in members if inspect.isclass(cls)])
quantifiers = [cls for cls in classes if issubclass(cls, BaseQuantifier)]
quantifiers = [cls for cls in quantifiers if issubclass(cls, BinaryQuantifier)]
quantifiers = [cls for cls in quantifiers if not inspect.isabstract(cls) ]
for cls in quantifiers:
self.assertIn(cls, BINARY_METHODS)
def test_probabilistic(self):
lr = LogisticRegression()
for m in [CC(lr), ACC(lr)]:

View File

@ -1,234 +1,92 @@
import numpy as np
import pytest
import itertools
import unittest
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
import method.aggregative
import quapy as qp
from quapy.model_selection import GridSearchQ
from quapy.method.base import BinaryQuantifier
from quapy.data import Dataset, LabelledCollection
from quapy.method import AGGREGATIVE_METHODS, NON_AGGREGATIVE_METHODS
from quapy.method.aggregative import ACC
from quapy.method.meta import Ensemble
from quapy.protocol import APP
from quapy.method.aggregative import DMy
from quapy.method.meta import MedianEstimator
from quapy.method import AGGREGATIVE_METHODS, BINARY_METHODS, NON_AGGREGATIVE_METHODS
from quapy.functional import check_prevalence_vector
# datasets = [pytest.param(qp.datasets.fetch_twitter('hcr', pickle=True), id='hcr'),
# pytest.param(qp.datasets.fetch_UCIDataset('ionosphere'), id='ionosphere')]
class TestMethods(unittest.TestCase):
tinydatasets = [pytest.param(qp.datasets.fetch_twitter('hcr', pickle=True).reduce(), id='tiny_hcr'),
pytest.param(qp.datasets.fetch_UCIBinaryDataset('ionosphere').reduce(), id='tiny_ionosphere')]
tiny_dataset_multiclass = qp.datasets.fetch_UCIMulticlassDataset('academic-success').reduce(n_test=10)
tiny_dataset_binary = qp.datasets.fetch_UCIBinaryDataset('ionosphere').reduce(n_test=10)
datasets = [tiny_dataset_binary, tiny_dataset_multiclass]
learners = [LogisticRegression, LinearSVC]
def test_aggregative(self):
for dataset in TestMethods.datasets:
learner = LogisticRegression()
learner.fit(*dataset.training.Xy)
for model in AGGREGATIVE_METHODS:
if not dataset.binary and model in BINARY_METHODS:
print(f'skipping the test of binary model {model.__name__} on multiclass dataset {dataset.name}')
continue
@pytest.mark.parametrize('dataset', tinydatasets)
@pytest.mark.parametrize('aggregative_method', AGGREGATIVE_METHODS)
@pytest.mark.parametrize('learner', learners)
def test_aggregative_methods(dataset: Dataset, aggregative_method, learner):
model = aggregative_method(learner())
q = model(learner)
print('testing', q)
q.fit(dataset.training, fit_classifier=False)
estim_prevalences = q.quantify(dataset.test.X)
self.assertTrue(check_prevalence_vector(estim_prevalences))
if isinstance(model, BinaryQuantifier) and not dataset.binary:
print(f'skipping the test of binary model {type(model)} on non-binary dataset {dataset}')
return
def test_non_aggregative(self):
for dataset in TestMethods.datasets:
model.fit(dataset.training)
for model in NON_AGGREGATIVE_METHODS:
if not dataset.binary and model in BINARY_METHODS:
print(f'skipping the test of binary model {model.__name__} on multiclass dataset {dataset.name}')
continue
estim_prevalences = model.quantify(dataset.test.instances)
q = model()
print(f'testing {q} on dataset {dataset.name}')
q.fit(dataset.training)
estim_prevalences = q.quantify(dataset.test.X)
self.assertTrue(check_prevalence_vector(estim_prevalences))
true_prevalences = dataset.test.prevalence()
error = qp.error.mae(true_prevalences, estim_prevalences)
def test_ensembles(self):
assert type(error) == np.float64
qp.environ['SAMPLE_SIZE'] = 10
base_quantifier = ACC(LogisticRegression())
for dataset, policy in itertools.product(TestMethods.datasets, Ensemble.VALID_POLICIES):
if not dataset.binary and policy == 'ds':
print(f'skipping the test of binary policy ds on non-binary dataset {dataset}')
continue
@pytest.mark.parametrize('dataset', tinydatasets)
@pytest.mark.parametrize('non_aggregative_method', NON_AGGREGATIVE_METHODS)
def test_non_aggregative_methods(dataset: Dataset, non_aggregative_method):
model = non_aggregative_method()
print(f'testing {base_quantifier} on dataset {dataset.name} with {policy=}')
ensemble = Ensemble(quantifier=base_quantifier, size=3, policy=policy, n_jobs=-1)
ensemble.fit(dataset.training)
estim_prevalences = ensemble.quantify(dataset.test.instances)
self.assertTrue(check_prevalence_vector(estim_prevalences))
if isinstance(model, BinaryQuantifier) and not dataset.binary:
print(f'skipping the test of binary model {model} on non-binary dataset {dataset}')
return
def test_quanet(self):
try:
import quapy.classification.neural
except ModuleNotFoundError:
print('the torch package is not installed; skipping unit test for QuaNet')
return
model.fit(dataset.training)
qp.environ['SAMPLE_SIZE'] = 10
estim_prevalences = model.quantify(dataset.test.instances)
# load the kindle dataset as text, and convert words to numerical indexes
dataset = qp.datasets.fetch_reviews('kindle', pickle=True).reduce()
qp.data.preprocessing.index(dataset, min_df=5, inplace=True)
true_prevalences = dataset.test.prevalence()
error = qp.error.mae(true_prevalences, estim_prevalences)
from quapy.classification.neural import CNNnet
cnn = CNNnet(dataset.vocabulary_size, dataset.n_classes)
assert type(error) == np.float64
from quapy.classification.neural import NeuralClassifierTrainer
learner = NeuralClassifierTrainer(cnn, device='cpu')
from quapy.method.meta import QuaNet
model = QuaNet(learner, device='cpu', n_epochs=2, tr_iter_per_poch=10, va_iter_per_poch=10, patience=2)
@pytest.mark.parametrize('base_method', [method.aggregative.ACC, method.aggregative.PACC])
@pytest.mark.parametrize('learner', [LogisticRegression])
@pytest.mark.parametrize('dataset', tinydatasets)
@pytest.mark.parametrize('policy', Ensemble.VALID_POLICIES)
def test_ensemble_method(base_method, learner, dataset: Dataset, policy):
model.fit(dataset.training)
estim_prevalences = model.quantify(dataset.test.instances)
self.assertTrue(check_prevalence_vector(estim_prevalences))
qp.environ['SAMPLE_SIZE'] = 20
base_quantifier=base_method(learner())
if not dataset.binary and policy=='ds':
print(f'skipping the test of binary policy ds on non-binary dataset {dataset}')
return
model = Ensemble(quantifier=base_quantifier, size=3, policy=policy, n_jobs=-1)
model.fit(dataset.training)
estim_prevalences = model.quantify(dataset.test.instances)
true_prevalences = dataset.test.prevalence()
error = qp.error.mae(true_prevalences, estim_prevalences)
assert type(error) == np.float64
def test_quanet_method():
try:
import quapy.classification.neural
except ModuleNotFoundError:
print('skipping QuaNet test due to missing torch package')
return
qp.environ['SAMPLE_SIZE'] = 100
# load the kindle dataset as text, and convert words to numerical indexes
dataset = qp.datasets.fetch_reviews('kindle', pickle=True).reduce(200, 200)
qp.data.preprocessing.index(dataset, min_df=5, inplace=True)
from quapy.classification.neural import CNNnet
cnn = CNNnet(dataset.vocabulary_size, dataset.n_classes)
from quapy.classification.neural import NeuralClassifierTrainer
learner = NeuralClassifierTrainer(cnn, device='cuda')
from quapy.method.meta import QuaNet
model = QuaNet(learner, device='cuda')
if isinstance(model, BinaryQuantifier) and not dataset.binary:
print(f'skipping the test of binary model {model} on non-binary dataset {dataset}')
return
model.fit(dataset.training)
estim_prevalences = model.quantify(dataset.test.instances)
true_prevalences = dataset.test.prevalence()
error = qp.error.mae(true_prevalences, estim_prevalences)
assert type(error) == np.float64
def test_str_label_names():
model = qp.method.aggregative.CC(LogisticRegression())
dataset = qp.datasets.fetch_reviews('imdb', pickle=True)
dataset = Dataset(dataset.training.sampling(1000, *dataset.training.prevalence()),
dataset.test.sampling(1000, 0.25, 0.75))
qp.data.preprocessing.text2tfidf(dataset, min_df=5, inplace=True)
np.random.seed(0)
model.fit(dataset.training)
int_estim_prevalences = model.quantify(dataset.test.instances)
true_prevalences = dataset.test.prevalence()
error = qp.error.mae(true_prevalences, int_estim_prevalences)
assert type(error) == np.float64
dataset_str = Dataset(LabelledCollection(dataset.training.instances,
['one' if label == 1 else 'zero' for label in dataset.training.labels]),
LabelledCollection(dataset.test.instances,
['one' if label == 1 else 'zero' for label in dataset.test.labels]))
assert all(dataset_str.training.classes_ == dataset_str.test.classes_), 'wrong indexation'
np.random.seed(0)
model.fit(dataset_str.training)
str_estim_prevalences = model.quantify(dataset_str.test.instances)
true_prevalences = dataset_str.test.prevalence()
error = qp.error.mae(true_prevalences, str_estim_prevalences)
assert type(error) == np.float64
print(true_prevalences)
print(int_estim_prevalences)
print(str_estim_prevalences)
np.testing.assert_almost_equal(int_estim_prevalences[1],
str_estim_prevalences[list(model.classes_).index('one')])
# helper
def __fit_test(quantifier, train, test):
quantifier.fit(train)
test_samples = APP(test)
true_prevs, estim_prevs = qp.evaluation.prediction(quantifier, test_samples)
return qp.error.mae(true_prevs, estim_prevs), estim_prevs
def test_median_meta():
"""
This test compares the performance of the MedianQuantifier with respect to computing the median of the predictions
of a differently parameterized quantifier. We use the DistributionMatching base quantifier and the median is
computed across different values of nbins
"""
qp.environ['SAMPLE_SIZE'] = 100
# grid of values
nbins_grid = list(range(2, 11))
dataset = 'kindle'
train, test = qp.datasets.fetch_reviews(dataset, tfidf=True, min_df=10).train_test
prevs = []
errors = []
for nbins in nbins_grid:
with qp.util.temp_seed(0):
q = DMy(LogisticRegression(), nbins=nbins)
mae, estim_prevs = __fit_test(q, train, test)
prevs.append(estim_prevs)
errors.append(mae)
print(f'{dataset} DistributionMatching(nbins={nbins}) got MAE {mae:.4f}')
prevs = np.asarray(prevs)
mae = np.mean(errors)
print(f'\tMAE={mae:.4f}')
q = DMy(LogisticRegression())
q = MedianEstimator(q, param_grid={'nbins': nbins_grid}, random_state=0, n_jobs=-1)
median_mae, prev = __fit_test(q, train, test)
print(f'\tMAE={median_mae:.4f}')
np.testing.assert_almost_equal(np.median(prevs, axis=0), prev)
assert median_mae < mae, 'the median-based quantifier provided a higher error...'
def test_median_meta_modsel():
"""
This test checks the median-meta quantifier with model selection
"""
qp.environ['SAMPLE_SIZE'] = 100
dataset = 'kindle'
train, test = qp.datasets.fetch_reviews(dataset, tfidf=True, min_df=10).train_test
train, val = train.split_stratified(random_state=0)
nbins_grid = [2, 4, 5, 10, 15]
q = DMy(LogisticRegression())
q = MedianEstimator(q, param_grid={'nbins': nbins_grid}, random_state=0, n_jobs=-1)
median_mae, _ = __fit_test(q, train, test)
print(f'\tMAE={median_mae:.4f}')
q = DMy(LogisticRegression())
lr_params = {'classifier__C': np.logspace(-1, 1, 3)}
q = MedianEstimator(q, param_grid={'nbins': nbins_grid}, random_state=0, n_jobs=-1)
q = GridSearchQ(q, param_grid=lr_params, protocol=APP(val), n_jobs=-1)
optimized_median_ave, _ = __fit_test(q, train, test)
print(f'\tMAE={optimized_median_ave:.4f}')
assert optimized_median_ave < median_mae, "the optimized method yielded worse performance..."
if __name__ == '__main__':
unittest.main()

View File

@ -2,7 +2,6 @@ import unittest
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
import quapy as qp
from quapy.method.aggregative import PACC
@ -14,13 +13,16 @@ import time
class ModselTestCase(unittest.TestCase):
def test_modsel(self):
"""
Checks whether a model selection exploration takes a good hyperparameter
"""
q = PACC(LogisticRegression(random_state=1, max_iter=5000))
data = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=10)
data = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=10).reduce(random_state=1)
training, validation = data.training.split_stratified(0.7, random_state=1)
param_grid = {'classifier__C': np.logspace(-3,3,7)}
param_grid = {'classifier__C': [0.000001, 10.]}
app = APP(validation, sample_size=100, random_state=1)
q = GridSearchQ(
q, param_grid, protocol=app, error='mae', refit=True, timeout=-1, verbose=True
@ -32,54 +34,40 @@ class ModselTestCase(unittest.TestCase):
self.assertEqual(q.best_model().get_params()['classifier__C'], 10.0)
def test_modsel_parallel(self):
"""
Checks whether a parallelized model selection actually is faster than a sequential exploration but
obtains the same optimal parameters
"""
q = PACC(LogisticRegression(random_state=1, max_iter=5000))
data = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=10)
data = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=10).reduce(n_train=500, random_state=1)
training, validation = data.training.split_stratified(0.7, random_state=1)
# test = data.test
param_grid = {'classifier__C': np.logspace(-3,3,7)}
app = APP(validation, sample_size=100, random_state=1)
q = GridSearchQ(
print('starting model selection in sequential exploration')
tinit = time.time()
modsel = GridSearchQ(
q, param_grid, protocol=app, error='mae', refit=True, timeout=-1, n_jobs=1, verbose=True
).fit(training)
tend_seq = time.time()-tinit
best_c_seq = modsel.best_params_['classifier__C']
print(f'[done] took {tend_seq:.2f}s best C = {best_c_seq}')
print('starting model selection in parallel exploration')
tinit = time.time()
modsel = GridSearchQ(
q, param_grid, protocol=app, error='mae', refit=True, timeout=-1, n_jobs=-1, verbose=True
).fit(training)
print('best params', q.best_params_)
print('best score', q.best_score_)
tend_par = time.time() - tinit
best_c_par = modsel.best_params_['classifier__C']
print(f'[done] took {tend_par:.2f}s best C = {best_c_par}')
self.assertEqual(q.best_params_['classifier__C'], 10.0)
self.assertEqual(q.best_model().get_params()['classifier__C'], 10.0)
self.assertEqual(best_c_seq, best_c_par)
self.assertLess(tend_par, tend_seq)
def test_modsel_parallel_speedup(self):
class SlowLR(LogisticRegression):
def fit(self, X, y, sample_weight=None):
time.sleep(1)
return super(SlowLR, self).fit(X, y, sample_weight)
q = PACC(SlowLR(random_state=1, max_iter=5000))
data = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=10)
training, validation = data.training.split_stratified(0.7, random_state=1)
param_grid = {'classifier__C': np.logspace(-3, 3, 7)}
app = APP(validation, sample_size=100, random_state=1)
tinit = time.time()
GridSearchQ(
q, param_grid, protocol=app, error='mae', refit=False, timeout=-1, n_jobs=1, verbose=True
).fit(training)
tend_nooptim = time.time()-tinit
tinit = time.time()
GridSearchQ(
q, param_grid, protocol=app, error='mae', refit=False, timeout=-1, n_jobs=-1, verbose=True
).fit(training)
tend_optim = time.time() - tinit
print(f'parallel training took {tend_optim:.4f}s')
print(f'sequential training took {tend_nooptim:.4f}s')
self.assertEqual(tend_optim < (0.5*tend_nooptim), True)
def test_modsel_timeout(self):
@ -91,11 +79,10 @@ class ModselTestCase(unittest.TestCase):
q = PACC(SlowLR())
data = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=10)
data = qp.datasets.fetch_reviews('imdb', tfidf=True, min_df=10).reduce(random_state=1)
training, validation = data.training.split_stratified(0.7, random_state=1)
# test = data.test
param_grid = {'classifier__C': np.logspace(-3,3,7)}
param_grid = {'classifier__C': np.logspace(-1,1,3)}
app = APP(validation, sample_size=100, random_state=1)
print('Expecting TimeoutError to be raised')

View File

@ -8,7 +8,7 @@ from quapy.method.aggregative import PACC
import quapy.functional as F
class MyTestCase(unittest.TestCase):
class TestReplicability(unittest.TestCase):
def test_prediction_replicability(self):
@ -26,7 +26,7 @@ class MyTestCase(unittest.TestCase):
prev2 = pacc.fit(dataset.training).quantify(dataset.test.X)
str_prev2 = strprev(prev2, prec=5)
self.assertEqual(str_prev1, str_prev2) # add assertion here
self.assertEqual(str_prev1, str_prev2)
def test_samping_replicability(self):
@ -78,7 +78,7 @@ class MyTestCase(unittest.TestCase):
def test_parallel_replicability(self):
train, test = qp.datasets.fetch_UCIMulticlassDataset('dry-bean').train_test
train, test = qp.datasets.fetch_UCIMulticlassDataset('dry-bean').reduce().train_test
test = test.sampling(500, *[0.1, 0.0, 0.1, 0.1, 0.2, 0.5, 0.0])

View File

@ -125,6 +125,7 @@ setup(
# projects.
extras_require={ # Optional
'bayes': ['jax', 'jaxlib', 'numpyro'],
'tests': ['certifi'],
},
# If there are data files included in your packages that need to be

33
todo_refactor.txt Normal file
View File

@ -0,0 +1,33 @@
Add unit test for all options and bifurcations
Revisit the classifier_helper
Opciones
typeexampleinit
kFCVint10y
heldoutfloat0.6y
sampletupleX,yn
init:
kFCV:Q(val_split=10, random_seed=None)
held:Q(val_split=0.7, random_seed=None)
sample:--
the classifier is not fit:
fit end-to-end (classification->predictions->aggregation):
kFCVfit(X, y, val_split=10)
heldfit(X, y, val_split=0.7)
samplefit(X, y, val_split=(X,y))
the classifier is fit:
fit only aggregation (predictions->aggregation):
samplefit_aggregation(X, y, transform_X=True)
fit only aggregation (aggregation):
samplefit_aggregation(P, y, transform_X=False)