Compare commits

..

22 Commits

Author SHA1 Message Date
Alejandro Moreo Fernandez ccb634fae5 step rate adaptation 2025-11-14 16:09:34 +01:00
Alejandro Moreo Fernandez 3dba708fe4 Merge branch 'devel' into kdeplus 2025-11-14 12:09:23 +01:00
Alejandro Moreo Fernandez 7ad5311fac merging from devel 2025-11-14 12:07:32 +01:00
Alejandro Moreo Fernandez 400edfdb63 bayesian plot 2025-11-13 19:43:07 +01:00
Alejandro Moreo Fernandez 3c09b1c98a adding prev@densities in KDEyML, huge effiency improvement... 2025-11-13 18:45:07 +01:00
Alejandro Moreo Fernandez 3e7a431d26 bayeisan kdey 2025-11-13 18:43:03 +01:00
Alejandro Moreo Fernandez f227ed2f60 adding kdex 2025-10-23 14:12:39 +02:00
Alejandro Moreo Fernandez 41baeb78ca lazy index construction in labelled collection 2025-10-23 12:35:01 +02:00
Alejandro Moreo Fernandez 088ebcdd31 adding non aggregative methods experimental 2025-10-23 12:10:21 +02:00
Alejandro Moreo Fernandez c11b99e08a improved ReadMe method 2025-10-22 18:51:35 +02:00
Alejandro Moreo Fernandez 854b3ba3f9 documented ReadMe 2025-10-20 18:33:45 +02:00
Alejandro Moreo Fernandez eafe486893 adding readme to non-aggregative 2025-10-20 18:13:34 +02:00
Alejandro Moreo Fernandez 1fb8500e87 improved doc 2025-10-09 12:49:08 +02:00
Alejandro Moreo Fernandez d597820a59 missing doc of confidence methods 2025-10-09 12:10:26 +02:00
Alejandro Moreo Fernandez 010676df12 starting devel 0.2.1 2025-10-07 10:27:59 +02:00
Alejandro Moreo Fernandez bd9f8e2cb4 show results fix 2025-09-30 12:02:13 +02:00
Alejandro Moreo Fernandez 79f3709e6f adding mrae 2025-09-27 18:12:56 +02:00
Alejandro Moreo Fernandez e4cb7868c7 adding mrae 2025-09-27 18:03:24 +02:00
Alejandro Moreo Fernandez 606ec2b89c with tables in pdf 2025-09-27 18:01:39 +02:00
Alejandro Moreo Fernandez 27afea8bf1 with tables in pdf 2025-09-27 18:01:21 +02:00
Alejandro Moreo Fernandez 29bb261f62 testing kde normal 2025-09-27 17:47:52 +02:00
Alejandro Moreo Fernandez c3fd92efde experiment 2025-09-27 17:41:12 +02:00
19 changed files with 1304 additions and 84 deletions

View File

@ -0,0 +1,127 @@
from sklearn.linear_model import LogisticRegression
import quapy as qp
from BayesianKDEy.plot_simplex import plot_prev_points, plot_prev_points_matplot
from method.confidence import ConfidenceIntervals
from quapy.functional import strprev
from quapy.method.aggregative import KDEyML
from quapy.protocol import UPP
import quapy.functional as F
import numpy as np
from tqdm import tqdm
from scipy.stats import dirichlet
def bayesian(kdey, data, probabilistic_classifier, init=None, MAX_ITER=100_000, warmup=3_000):
"""
Bayes:
P(prev|data) = P(data|prev) P(prev) / P(data)
i.e.,
posterior = likelihood * prior / evidence
we assume the likelihood be:
prev @ [kde_i_likelihood(data) 1..i..n]
prior be uniform in simplex
"""
def pdf(kde, X):
# todo: remove exp, since we are then doing the log every time? (not sure)
return np.exp(kde.score_samples(X))
X = probabilistic_classifier.predict_proba(data)
kdes = kdey.mix_densities
test_densities = np.asarray([pdf(kde_i, X) for kde_i in kdes])
def log_likelihood(prev, epsilon=1e-10):
test_likelihoods = prev @ test_densities
test_loglikelihood = np.log(test_likelihoods + epsilon)
return np.sum(test_loglikelihood)
# def log_prior(prev):
# todo: adapt to arbitrary prior knowledge (e.g., something around training prevalence)
# return 1/np.sum((prev-init)**2) # it is not 1 but we assume uniform, son anyway is an useless constant
# def log_prior(prev, alpha_scale=1000):
# alpha = np.array(init) * alpha_scale
# return dirichlet.logpdf(prev, alpha)
def log_prior(prev):
return 0
def sample_neighbour(prev, step_size=0.05):
# random-walk Metropolis-Hastings
dir_noise = np.random.normal(scale=step_size, size=len(prev))
neighbour = F.normalize_prevalence(prev + dir_noise, method='mapsimplex')
return neighbour
n_classes = len(probabilistic_classifier.classes_)
current_prev = F.uniform_prevalence(n_classes) if init is None else init
current_likelihood = log_likelihood(current_prev) + log_prior(current_prev)
# Metropolis-Hastings with adaptive rate
step_size = 0.05
target_acceptance = 0.3
adapt_rate = 0.01
acceptance_history = []
samples = []
for i in tqdm(range(MAX_ITER), total=MAX_ITER):
proposed_prev = sample_neighbour(current_prev, step_size)
# probability of acceptance
proposed_likelihood = log_likelihood(proposed_prev) + log_prior(proposed_prev)
acceptance = proposed_likelihood - current_likelihood
# decide acceptance
accepted = np.log(np.random.rand()) < acceptance
if accepted:
current_prev = proposed_prev
current_likelihood = proposed_likelihood
samples.append(current_prev)
acceptance_history.append(1. if accepted else 0.)
if i < warmup and i%10==0 and len(acceptance_history)>=100:
recent_accept_rate = np.mean(acceptance_history[-100:])
# print(f'{i=} recent_accept_rate={recent_accept_rate:.4f} (current step_size={step_size:.4f})')
step_size *= np.exp(adapt_rate * (recent_accept_rate - target_acceptance))
# remove "warmup" initial iterations
samples = np.asarray(samples[warmup:])
return samples
if __name__ == '__main__':
qp.environ["SAMPLE_SIZE"] = 500
cls = LogisticRegression()
kdey = KDEyML(cls)
train, test = qp.datasets.fetch_UCIMulticlassDataset('academic-success', standardize=True).train_test
with qp.util.temp_seed(2):
print('fitting KDEy')
kdey.fit(*train.Xy)
# shifted = test.sampling(500, *[0.7, 0.1, 0.2])
# shifted = test.sampling(500, *test.prevalence()[::-1])
shifted = test.sampling(500, *F.uniform_prevalence_sampling(train.n_classes))
prev_hat = kdey.predict(shifted.X)
mae = qp.error.mae(shifted.prevalence(), prev_hat)
print(f'true_prev={strprev(shifted.prevalence())}')
print(f'prev_hat={strprev(prev_hat)}, {mae=:.4f}')
h = kdey.classifier
samples = bayesian(kdey, shifted.X, h, init=None, MAX_ITER=5_000, warmup=3_000)
conf_interval = ConfidenceIntervals(samples, confidence_level=0.95)
mae = qp.error.mae(shifted.prevalence(), conf_interval.point_estimate())
print(f'mean posterior {strprev(samples.mean(axis=0))}, {mae=:.4f}')
print(f'CI={conf_interval}')
print(f'\tcontains true={conf_interval.coverage(true_value=shifted.prevalence())==1}')
print(f'\tamplitude={conf_interval.montecarlo_proportion(50_000)*100.:.20f}%')
if train.n_classes == 3:
plot_prev_points(samples, true_prev=shifted.prevalence(), point_estim=prev_hat, train_prev=train.prevalence())
# plot_prev_points_matplot(samples)
# report = qp.evaluation.evaluation_report(kdey, protocol=UPP(test), verbose=True)
# print(report.mean(numeric_only=True))

View File

@ -0,0 +1,112 @@
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import gaussian_kde
def plot_prev_points(prevs, true_prev, point_estim, train_prev):
plt.rcParams.update({
'font.size': 10, # tamaño base de todo el texto
'axes.titlesize': 12, # título del eje
'axes.labelsize': 10, # etiquetas de ejes
'xtick.labelsize': 8, # etiquetas de ticks
'ytick.labelsize': 8,
'legend.fontsize': 9, # leyenda
})
def cartesian(p):
dim = p.shape[-1]
p = p.reshape(-1,dim)
x = p[:, 1] + p[:, 2] * 0.5
y = p[:, 2] * np.sqrt(3) / 2
return x, y
# simplex coordinates
v1 = np.array([0, 0])
v2 = np.array([1, 0])
v3 = np.array([0.5, np.sqrt(3)/2])
# Plot
fig, ax = plt.subplots(figsize=(6, 6))
ax.scatter(*cartesian(prevs), s=10, alpha=0.5, edgecolors='none', label='samples')
ax.scatter(*cartesian(prevs.mean(axis=0)), s=10, alpha=1, label='sample-mean', edgecolors='black')
ax.scatter(*cartesian(true_prev), s=10, alpha=1, label='true-prev', edgecolors='black')
ax.scatter(*cartesian(point_estim), s=10, alpha=1, label='KDEy-estim', edgecolors='black')
ax.scatter(*cartesian(train_prev), s=10, alpha=1, label='train-prev', edgecolors='black')
# edges
triangle = np.array([v1, v2, v3, v1])
ax.plot(triangle[:, 0], triangle[:, 1], color='black')
# vertex labels
ax.text(-0.05, -0.05, "y=0", ha='right', va='top')
ax.text(1.05, -0.05, "y=1", ha='left', va='top')
ax.text(0.5, np.sqrt(3)/2 + 0.05, "y=2", ha='center', va='bottom')
ax.set_aspect('equal')
ax.axis('off')
plt.legend(
loc='center left',
bbox_to_anchor=(1.05, 0.5),
# ncol=3,
# frameon=False
)
plt.tight_layout()
plt.show()
def plot_prev_points_matplot(points):
# project 2D
v1 = np.array([0, 0])
v2 = np.array([1, 0])
v3 = np.array([0.5, np.sqrt(3) / 2])
x = points[:, 1] + points[:, 2] * 0.5
y = points[:, 2] * np.sqrt(3) / 2
# kde
xy = np.vstack([x, y])
kde = gaussian_kde(xy, bw_method=0.25)
xmin, xmax = 0, 1
ymin, ymax = 0, np.sqrt(3) / 2
# grid
xx, yy = np.mgrid[xmin:xmax:200j, ymin:ymax:200j]
positions = np.vstack([xx.ravel(), yy.ravel()])
zz = np.reshape(kde(positions).T, xx.shape)
# mask points in simplex
def in_triangle(x, y):
return (y >= 0) & (y <= np.sqrt(3) * np.minimum(x, 1 - x))
mask = in_triangle(xx, yy)
zz_masked = np.ma.array(zz, mask=~mask)
# plot
fig, ax = plt.subplots(figsize=(6, 6))
ax.imshow(
np.rot90(zz_masked),
cmap=plt.cm.viridis,
extent=[xmin, xmax, ymin, ymax],
alpha=0.8,
)
# Bordes del triángulo
triangle = np.array([v1, v2, v3, v1])
ax.plot(triangle[:, 0], triangle[:, 1], color='black', lw=2)
# Puntos (opcional)
ax.scatter(x, y, s=5, c='white', alpha=0.3)
# Etiquetas
ax.text(-0.05, -0.05, "A (1,0,0)", ha='right', va='top')
ax.text(1.05, -0.05, "B (0,1,0)", ha='left', va='top')
ax.text(0.5, np.sqrt(3) / 2 + 0.05, "C (0,0,1)", ha='center', va='bottom')
ax.set_aspect('equal')
ax.axis('off')
plt.show()
if __name__ == '__main__':
n = 1000
points = np.random.dirichlet([2, 3, 4], size=n)
plot_prev_points_matplot(points)

View File

@ -1,3 +1,10 @@
Change Log 0.2.1
-----------------
- Improved documentation of confidence regions.
- Added ReadMe method by Daniel Hopkins and Gary King
- Internal index in LabelledCollection is now "lazy", and is only constructed if required.
Change Log 0.2.0
-----------------

56
KDEyAitchison/commons.py Normal file
View File

@ -0,0 +1,56 @@
import numpy as np
import pandas as pd
from quapy.method.aggregative import EMQ, KDEyML, PACC
from sklearn.linear_model import LogisticRegression
METHODS = ['PACC',
'EMQ',
'KDEy-ML',
'KDEy-MLA'
]
# common hyperparameterss
hyper_LR = {
'classifier__C': np.logspace(-3, 3, 7),
'classifier__class_weight': ['balanced', None]
}
hyper_kde = {
'bandwidth': np.linspace(0.001, 0.5, 100)
}
hyper_kde_aitchison = {
'bandwidth': np.linspace(0.01, 2, 100)
}
# instances a new quantifier based on a string name
def new_method(method, **lr_kwargs):
lr = LogisticRegression(**lr_kwargs)
if method == 'KDEy-ML':
param_grid = {**hyper_kde, **hyper_LR}
quantifier = KDEyML(lr, kernel='gaussian')
elif method == 'KDEy-MLA':
param_grid = {**hyper_kde_aitchison, **hyper_LR}
quantifier = KDEyML(lr, kernel='aitchison')
elif method == 'EMQ':
param_grid = hyper_LR
quantifier = EMQ(lr)
elif method == 'PACC':
param_grid = hyper_LR
quantifier = PACC(lr)
else:
raise NotImplementedError('unknown method', method)
return param_grid, quantifier
def show_results(result_path):
df = pd.read_csv(result_path+'.csv', sep='\t')
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pv = df.pivot_table(index='Dataset', columns="Method", values=["MAE", "MRAE"])
print(pv)

View File

@ -0,0 +1,38 @@
import pickle
import os
import sys
import pandas as pd
import quapy as qp
from quapy.model_selection import GridSearchQ
from quapy.protocol import UPP
from commons import METHODS, new_method, show_results
from new_table import LatexTable
SEED = 1
if __name__ == '__main__':
print(qp.datasets.UCI_MULTICLASS_DATASETS)
for optim in ['mae', 'mrae']:
table = LatexTable()
result_dir = f'results/ucimulti/{optim}'
for method in METHODS:
print()
global_result_path = f'{result_dir}/{method}'
print(f'Method\tDataset\tMAE\tMRAE\tKLD')
for dataset in qp.datasets.UCI_MULTICLASS_DATASETS:
# print(dataset)
local_result_path = global_result_path + '_' + dataset
if os.path.exists(local_result_path + '.dataframe'):
report = pd.read_csv(local_result_path+'.dataframe')
print(f'{method}\t{dataset}\t{report[optim].mean():.5f}')
table.add(benchmark=dataset, method=method, v=report[optim].values)
else:
print(dataset, 'not found for method', method)
table.latexPDF(f'./tables/{optim}.pdf', landscape=False)

View File

@ -0,0 +1,94 @@
import pickle
import os
import sys
import pandas as pd
import quapy as qp
from quapy.model_selection import GridSearchQ
from quapy.protocol import UPP
from commons import METHODS, new_method, show_results
SEED = 1
if __name__ == '__main__':
qp.environ['SAMPLE_SIZE'] = 500
qp.environ['N_JOBS'] = -1
n_bags_val = 250
n_bags_test = 1000
for optim in ['mae', 'mrae']:
result_dir = f'results/ucimulti/{optim}'
os.makedirs(result_dir, exist_ok=True)
for method in METHODS:
print('Init method', method)
global_result_path = f'{result_dir}/{method}'
# show_results(global_result_path)
# sys.exit(0)
if not os.path.exists(global_result_path + '.csv'):
with open(global_result_path + '.csv', 'wt') as csv:
csv.write(f'Method\tDataset\tMAE\tMRAE\tKLD\n')
with open(global_result_path + '.csv', 'at') as csv:
for dataset in qp.datasets.UCI_MULTICLASS_DATASETS:
print('init', dataset)
local_result_path = global_result_path + '_' + dataset
if os.path.exists(local_result_path + '.dataframe'):
print(f'result file {local_result_path}.dataframe already exist; skipping')
report = pd.read_csv(local_result_path+'.dataframe')
print(report["mae"].mean())
# data = qp.datasets.fetch_UCIMulticlassDataset(dataset)
# csv.write(f'{method}\t{data.name}\t{report["mae"].mean():.5f}\t{report["mrae"].mean():.5f}\t{report["kld"].mean():.5f}\n')
continue
with qp.util.temp_seed(SEED):
param_grid, quantifier = new_method(method, max_iter=3000)
data = qp.datasets.fetch_UCIMulticlassDataset(dataset)
# model selection
train, test = data.train_test
train, val = train.split_stratified(random_state=SEED)
protocol = UPP(val, repeats=n_bags_val)
modsel = GridSearchQ(
quantifier, param_grid, protocol, refit=True, n_jobs=-1, verbose=True, error=optim
)
try:
modsel.fit(*train.Xy)
print(f'best params {modsel.best_params_}')
print(f'best score {modsel.best_score_}')
pickle.dump(
(modsel.best_params_, modsel.best_score_,),
open(f'{local_result_path}.hyper.pkl', 'wb'), pickle.HIGHEST_PROTOCOL)
quantifier = modsel.best_model()
except:
print('something went wrong... trying to fit the default model')
quantifier.fit(*train.Xy)
protocol = UPP(test, repeats=n_bags_test)
report = qp.evaluation.evaluation_report(
quantifier, protocol, error_metrics=['mae', 'mrae', 'kld'], verbose=True
)
report.to_csv(f'{local_result_path}.dataframe')
print(f'{method}\t{data.name}\t{report["mae"].mean():.5f}\t{report["mrae"].mean():.5f}\t{report["kld"].mean():.5f}\n')
csv.write(f'{method}\t{data.name}\t{report["mae"].mean():.5f}\t{report["mrae"].mean():.5f}\t{report["kld"].mean():.5f}\n')
csv.flush()
show_results(global_result_path)

View File

@ -1,19 +1,53 @@
Adapt examples; remaining: example 4-onwards
not working: 15 (qunfold)
Solve the warnings issue; right now there is a warning ignore in method/__init__.py:
Add 'platt' to calib options in EMQ?
Allow n_prevpoints in APP to be specified by a user-defined grid?
Add the fix suggested by Alexander?
"For a more general application, I would maybe first establish a per-class threshold value of plausible prevalence
Update READMEs, wiki, & examples for new fit-predict interface
Add the fix suggested by Alexander:
For a more general application, I would maybe first establish a per-class threshold value of plausible prevalence
based on the number of actual positives and the required sample size; e.g., for sample_size=100 and actual
positives [10, 100, 500] -> [0.1, 1.0, 1.0], meaning that class 0 can be sampled at most at 0.1 prevalence, while
the others can be sampled up to 1. prevalence. Then, when a prevalence value is requested, e.g., [0.33, 0.33, 0.33],
we may either clip each value and normalize (as you suggest for the extreme case, e.g., [0.1, 0.33, 0.33]/sum) or
scale each value by per-class thresholds, i.e., [0.33*0.1, 0.33*1, 0.33*1]/sum."
scale each value by per-class thresholds, i.e., [0.33*0.1, 0.33*1, 0.33*1]/sum.
- This affects LabelledCollection
- This functionality should be accessible via sampling protocols and evaluation functions
Solve the pre-trained classifier issues. An example is the coptic-codes script I did, which needed a mock_lr to
work for having access to classes_; think also the case in which the precomputed outputs are already generated
as in the unifying problems code.
Para quitar el labelledcollection de los métodos:
- El follón viene por la semántica confusa de fit en agregativos, que recibe 3 parámetros:
- data: LabelledCollection, que puede ser:
- el training set si hay que entrenar el clasificador
- None si no hay que entregar el clasificador
- el validation, que entra en conflicto con val_split, si no hay que entrenar clasificador
- fit_classifier: dice si hay que entrenar el clasificador o no, y estos cambia la semántica de los otros
- val_split: que puede ser:
- un número: el número de kfcv, lo cual implica fit_classifier=True y data=todo el training set
- una fración en [0,1]: que indica la parte que usamos para validation; implica fit_classifier=True y data=train+val
- un labelled collection: el conjunto de validación específico; no implica fit_classifier=True ni False
- La forma de quitar la dependencia de los métodos con LabelledCollection debería ser así:
- En el constructor se dice si el clasificador que se recibe por parámetro hay que entrenarlo o ya está entrenado;
es decir, hay un fit_classifier=True o False.
- fit_classifier=True:
- data en fit es todo el training incluyendo el validation y todo
- val_split:
- int: número de folds en kfcv
- proporción en [0,1]
- fit_classifier=False:
- [TODO] document confidence in manuals
- [TODO] Test the return_type="index" in protocols and finish the "distributing_samples.py" example
- [TODO] Add EDy (an implementation is available at quantificationlib)

View File

@ -604,7 +604,10 @@ estim_prevalence = model.predict(dataset.test.X)
_(New in v0.2.0!)_ Some quantification methods go beyond providing a single point estimate of class prevalence values and also produce confidence regions, which characterize the uncertainty around the point estimate. In QuaPy, two such methods are currently implemented:
* Aggregative Bootstrap: The Aggregative Bootstrap method extends any aggregative quantifier by generating confidence regions for class prevalence estimates through bootstrapping. Key features of this method include:
* Aggregative Bootstrap: The Aggregative Bootstrap method extends any aggregative quantifier by generating confidence regions for class prevalence estimates through bootstrapping. The method is described in the paper [Moreo, A., Salvati, N.
An Efficient Method for Deriving Confidence Intervals in Aggregative Quantification.
Learning to Quantify: Methods and Applications (LQ 2025), co-located at ECML-PKDD 2025.
pp 12-33, Porto (Portugal)](https://lq-2025.github.io/proceedings/CompleteVolume.pdf). Key features of this method include:
* Optimized Computation: The bootstrap is applied to pre-classified instances, significantly speeding up training and inference.
During training, bootstrap repetitions are performed only after training the classifier once. These repetitions are used to train multiple aggregation functions.

View File

@ -60,6 +60,14 @@ quapy.method.composable module
:undoc-members:
:show-inheritance:
quapy.method.confidence module
------------------------------
.. automodule:: quapy.method.confidence
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------

View File

@ -36,7 +36,7 @@ with qp.util.temp_seed(0):
true_prev = shifted_test.prevalence()
# by calling "quantify_conf", we obtain the point estimate and the confidence intervals around it
pred_prev, conf_intervals = pacc.quantify_conf(shifted_test.X)
pred_prev, conf_intervals = pacc.predict_conf(shifted_test.X)
# conf_intervals is an instance of ConfidenceRegionABC, which provides some useful utilities like:
# - coverage: a function which computes the fraction of true values that belong to the confidence region

View File

@ -0,0 +1,60 @@
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_selection import SelectKBest, chi2
import quapy as qp
from quapy.method.non_aggregative import ReadMe
import quapy.functional as F
from sklearn.pipeline import Pipeline
"""
This example showcases how to use the non-aggregative method ReadMe proposed by Hopkins and King.
This method is for text analysis, so let us first instantiate a dataset for sentiment quantification (we
use IMDb for this example). The method is quite computationally expensive, so we will restrict the training
set to 1000 documents only.
"""
reviews = qp.datasets.fetch_reviews('imdb').reduce(n_train=1000, random_state=0)
"""
We need to convert text to bag-of-words representations. Actually, ReadMe requires the representations to be
binary (i.e., storing a 1 whenever a document contains certain word, or 0 otherwise), so we will not use
TFIDF weighting. We will also retain the top 1000 most important features according to chi2.
"""
encode_0_1 = Pipeline([
('0_1_terms', CountVectorizer(min_df=5, binary=True)),
('feat_sel', SelectKBest(chi2, k=1000))
])
train, test = qp.data.preprocessing.instance_transformation(reviews, encode_0_1, inplace=True).train_test
"""
We now instantiate ReadMe, with the prob_model='full' (default behaviour, implementing the Hopkins and King original
idea). This method consists of estimating Q(Y) by solving:
Q(X) = \sum_i Q(X|Y=i) Q(Y=i)
without resorting to estimating the posteriors Q(Y=i|X), by solving a linear least-squares problem.
However, since Q(X) and Q(X|Y=i) are matrices of shape (2^K, 1) and (2^K, n), with K the number of features
and n the number of classes, their calculation becomes intractable. ReadMe instead performs bagging (i.e., it
samples small sets of features and averages the results) thus reducing K to a few terms. In our example we
set K (bagging_range) to 20, and the number of bagging_trials to 100.
ReadMe also computes confidence intervals via bootstrap. We set the number of bootstrap trials to 100.
"""
readme = ReadMe(prob_model='full', bootstrap_trials=100, bagging_trials=100, bagging_range=20, random_state=0, verbose=True)
readme.fit(*train.Xy) # <- there is actually nothing happening here (only bootstrap resampling); the method is "lazy"
# and postpones most of the calculations to the test phase.
# since the method is slow, we will only test 3 cases with different imbalances
few_negatives = [0.25, 0.75]
balanced = [0.5, 0.5]
few_positives = [0.75, 0.25]
for test_prev in [few_negatives, balanced, few_positives]:
sample = reviews.test.sampling(500, *test_prev, random_state=0) # draw sets of 500 documents with desired prevs
prev_estim, conf = readme.predict_conf(sample.X)
err = qp.error.mae(sample.prevalence(), prev_estim)
print(f'true-prevalence={F.strprev(sample.prevalence())},\n'
f'predicted-prevalence={F.strprev(prev_estim)}, with confidence intervals {conf},\n'
f'MAE={err:.4f}')

View File

@ -0,0 +1,254 @@
from scipy.sparse import csc_matrix, csr_matrix
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.feature_extraction.text import TfidfTransformer, TfidfVectorizer, CountVectorizer
import numpy as np
from joblib import Parallel, delayed
import sklearn
import math
from scipy.stats import t
class ContTable:
def __init__(self, tp=0, tn=0, fp=0, fn=0):
self.tp=tp
self.tn=tn
self.fp=fp
self.fn=fn
def get_d(self): return self.tp + self.tn + self.fp + self.fn
def get_c(self): return self.tp + self.fn
def get_not_c(self): return self.tn + self.fp
def get_f(self): return self.tp + self.fp
def get_not_f(self): return self.tn + self.fn
def p_c(self): return (1.0*self.get_c())/self.get_d()
def p_not_c(self): return 1.0-self.p_c()
def p_f(self): return (1.0*self.get_f())/self.get_d()
def p_not_f(self): return 1.0-self.p_f()
def p_tp(self): return (1.0*self.tp) / self.get_d()
def p_tn(self): return (1.0*self.tn) / self.get_d()
def p_fp(self): return (1.0*self.fp) / self.get_d()
def p_fn(self): return (1.0*self.fn) / self.get_d()
def tpr(self):
c = 1.0*self.get_c()
return self.tp / c if c > 0.0 else 0.0
def fpr(self):
_c = 1.0*self.get_not_c()
return self.fp / _c if _c > 0.0 else 0.0
def __ig_factor(p_tc, p_t, p_c):
den = p_t * p_c
if den != 0.0 and p_tc != 0:
return p_tc * math.log(p_tc / den, 2)
else:
return 0.0
def information_gain(cell):
return __ig_factor(cell.p_tp(), cell.p_f(), cell.p_c()) + \
__ig_factor(cell.p_fp(), cell.p_f(), cell.p_not_c()) +\
__ig_factor(cell.p_fn(), cell.p_not_f(), cell.p_c()) + \
__ig_factor(cell.p_tn(), cell.p_not_f(), cell.p_not_c())
def squared_information_gain(cell):
return information_gain(cell)**2
def posneg_information_gain(cell):
ig = information_gain(cell)
if cell.tpr() < cell.fpr():
return -ig
else:
return ig
def pos_information_gain(cell):
if cell.tpr() < cell.fpr():
return 0
else:
return information_gain(cell)
def pointwise_mutual_information(cell):
return __ig_factor(cell.p_tp(), cell.p_f(), cell.p_c())
def gss(cell):
return cell.p_tp()*cell.p_tn() - cell.p_fp()*cell.p_fn()
def chi_square(cell):
den = cell.p_f() * cell.p_not_f() * cell.p_c() * cell.p_not_c()
if den==0.0: return 0.0
num = gss(cell)**2
return num / den
def conf_interval(xt, n):
if n>30:
z2 = 3.84145882069 # norm.ppf(0.5+0.95/2.0)**2
else:
z2 = t.ppf(0.5 + 0.95 / 2.0, df=max(n-1,1)) ** 2
p = (xt + 0.5 * z2) / (n + z2)
amplitude = 0.5 * z2 * math.sqrt((p * (1.0 - p)) / (n + z2))
return p, amplitude
def strength(minPosRelFreq, minPos, maxNeg):
if minPos > maxNeg:
return math.log(2.0 * minPosRelFreq, 2.0)
else:
return 0.0
#set cancel_features=True to allow some features to be weighted as 0 (as in the original article)
#however, for some extremely imbalanced dataset caused all documents to be 0
def conf_weight(cell, cancel_features=False):
c = cell.get_c()
not_c = cell.get_not_c()
tp = cell.tp
fp = cell.fp
pos_p, pos_amp = conf_interval(tp, c)
neg_p, neg_amp = conf_interval(fp, not_c)
min_pos = pos_p-pos_amp
max_neg = neg_p+neg_amp
den = (min_pos + max_neg)
minpos_relfreq = min_pos / (den if den != 0 else 1)
str_tplus = strength(minpos_relfreq, min_pos, max_neg);
if str_tplus == 0 and not cancel_features:
return 1e-20
return str_tplus
def get_tsr_matrix(cell_matrix, tsr_score_funtion):
nC = len(cell_matrix)
nF = len(cell_matrix[0])
tsr_matrix = [[tsr_score_funtion(cell_matrix[c,f]) for f in range(nF)] for c in range(nC)]
return np.array(tsr_matrix)
def feature_label_contingency_table(positive_document_indexes, feature_document_indexes, nD):
tp_ = len(positive_document_indexes & feature_document_indexes)
fp_ = len(feature_document_indexes - positive_document_indexes)
fn_ = len(positive_document_indexes - feature_document_indexes)
tn_ = nD - (tp_ + fp_ + fn_)
return ContTable(tp=tp_, tn=tn_, fp=fp_, fn=fn_)
def category_tables(feature_sets, category_sets, c, nD, nF):
return [feature_label_contingency_table(category_sets[c], feature_sets[f], nD) for f in range(nF)]
def get_supervised_matrix(coocurrence_matrix, label_matrix, n_jobs=-1):
"""
Computes the nC x nF supervised matrix M where Mcf is the 4-cell contingency table for feature f and class c.
Efficiency O(nF x nC x log(S)) where S is the sparse factor
"""
nD, nF = coocurrence_matrix.shape
nD2, nC = label_matrix.shape
if nD != nD2:
raise ValueError('Number of rows in coocurrence matrix shape %s and label matrix shape %s is not consistent' %
(coocurrence_matrix.shape,label_matrix.shape))
def nonzero_set(matrix, col):
return set(matrix[:, col].nonzero()[0])
if isinstance(coocurrence_matrix, csr_matrix):
coocurrence_matrix = csc_matrix(coocurrence_matrix)
feature_sets = [nonzero_set(coocurrence_matrix, f) for f in range(nF)]
category_sets = [nonzero_set(label_matrix, c) for c in range(nC)]
cell_matrix = Parallel(n_jobs=n_jobs, backend="threading")(
delayed(category_tables)(feature_sets, category_sets, c, nD, nF) for c in range(nC)
)
return np.array(cell_matrix)
class TSRweighting(BaseEstimator,TransformerMixin):
"""
Supervised Term Weighting function based on any Term Selection Reduction (TSR) function (e.g., information gain,
chi-square, etc.) or, more generally, on any function that could be computed on the 4-cell contingency table for
each category-feature pair.
The supervised_4cell_matrix is a `(n_classes, n_words)` matrix containing the 4-cell contingency tables
for each class-word pair, and can be pre-computed (e.g., during the feature selection phase) and passed as an
argument.
When `n_classes>1`, i.e., in multiclass scenarios, a global_policy is used in order to determine a
single feature-score which informs about its relevance. Accepted policies include "max" (takes the max score
across categories), "ave" and "wave" (take the average, or weighted average, across all categories -- weights
correspond to the class prevalence), and "sum" (which sums all category scores).
"""
def __init__(self, tsr_function, global_policy='max', supervised_4cell_matrix=None, sublinear_tf=True, norm='l2', min_df=3, n_jobs=-1):
if global_policy not in ['max', 'ave', 'wave', 'sum']: raise ValueError('Global policy should be in {"max", "ave", "wave", "sum"}')
self.tsr_function = tsr_function
self.global_policy = global_policy
self.supervised_4cell_matrix = supervised_4cell_matrix
self.sublinear_tf = sublinear_tf
self.norm = norm
self.min_df = min_df
self.n_jobs = n_jobs
def fit(self, X, y):
self.count_vectorizer = CountVectorizer(min_df=self.min_df)
X = self.count_vectorizer.fit_transform(X)
self.tf_vectorizer = TfidfTransformer(
norm=None, use_idf=False, smooth_idf=False, sublinear_tf=self.sublinear_tf
).fit(X)
if len(y.shape) == 1:
y = np.expand_dims(y, axis=1)
nD, nC = y.shape
nF = len(self.tf_vectorizer.get_feature_names_out())
if self.supervised_4cell_matrix is None:
self.supervised_4cell_matrix = get_supervised_matrix(X, y, n_jobs=self.n_jobs)
else:
if self.supervised_4cell_matrix.shape != (nC, nF):
raise ValueError("Shape of supervised information matrix is inconsistent with X and y")
tsr_matrix = get_tsr_matrix(self.supervised_4cell_matrix, self.tsr_function)
if self.global_policy == 'ave':
self.global_tsr_vector = np.average(tsr_matrix, axis=0)
elif self.global_policy == 'wave':
category_prevalences = [sum(y[:,c])*1.0/nD for c in range(nC)]
self.global_tsr_vector = np.average(tsr_matrix, axis=0, weights=category_prevalences)
elif self.global_policy == 'sum':
self.global_tsr_vector = np.sum(tsr_matrix, axis=0)
elif self.global_policy == 'max':
self.global_tsr_vector = np.amax(tsr_matrix, axis=0)
return self
def fit_transform(self, X, y):
return self.fit(X,y).transform(X)
def transform(self, X):
if not hasattr(self, 'global_tsr_vector'): raise NameError('TSRweighting: transform method called before fit.')
X = self.count_vectorizer.transform(X)
tf_X = self.tf_vectorizer.transform(X).toarray()
weighted_X = np.multiply(tf_X, self.global_tsr_vector)
if self.norm is not None and self.norm!='none':
weighted_X = sklearn.preprocessing.normalize(weighted_X, norm=self.norm, axis=1, copy=False)
return csr_matrix(weighted_X)

View File

@ -0,0 +1,208 @@
from scipy.sparse import issparse
from sklearn.decomposition import TruncatedSVD
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
import quapy as qp
from data import LabelledCollection
import numpy as np
from experimental_non_aggregative.custom_vectorizers import *
from method._kdey import KDEBase
from protocol import APP
from quapy.method.aggregative import HDy, DistributionMatchingY
from quapy.method.base import BaseQuantifier
from scipy import optimize
import pandas as pd
import quapy.functional as F
# TODO: explore the bernoulli (term presence/absence) variant
# TODO: explore the multinomial (term frequency) variant
# TODO: explore the multinomial + length normalization variant
# TODO: consolidate the TSR-variant (e.g., using information gain) variant;
# - works better with the idf?
# - works better with length normalization?
# - etc
class DxS(BaseQuantifier):
def __init__(self, vectorizer=None, divergence='topsoe'):
self.vectorizer = vectorizer
self.divergence = divergence
# def __as_distribution(self, instances):
# return np.asarray(instances.sum(axis=0) / instances.sum()).flatten()
def __as_distribution(self, instances):
dist = instances.mean(axis=0)
return np.asarray(dist).flatten()
def fit(self, text_instances, labels):
classes = np.unique(labels)
if self.vectorizer is not None:
text_instances = self.vectorizer.fit_transform(text_instances, y=labels)
distributions = []
for class_i in classes:
distributions.append(self.__as_distribution(text_instances[labels == class_i]))
self.validation_distribution = np.asarray(distributions)
return self
def predict(self, text_instances):
if self.vectorizer is not None:
text_instances = self.vectorizer.transform(text_instances)
test_distribution = self.__as_distribution(text_instances)
divergence = qp.functional.get_divergence(self.divergence)
n_classes, n_feats = self.validation_distribution.shape
def match(prev):
prev = np.expand_dims(prev, axis=0)
mixture_distribution = (prev @ self.validation_distribution).flatten()
return divergence(test_distribution, mixture_distribution)
# the initial point is set as the uniform distribution
uniform_distribution = np.full(fill_value=1 / n_classes, shape=(n_classes,))
# solutions are bounded to those contained in the unit-simplex
bounds = tuple((0, 1) for x in range(n_classes)) # values in [0,1]
constraints = ({'type': 'eq', 'fun': lambda x: 1 - sum(x)}) # values summing up to 1
r = optimize.minimize(match, x0=uniform_distribution, method='SLSQP', bounds=bounds, constraints=constraints)
return r.x
class KDExML(BaseQuantifier, KDEBase):
def __init__(self, bandwidth=0.1, standardize=False):
self._check_bandwidth(bandwidth)
self.bandwidth = bandwidth
self.standardize = standardize
def fit(self, X, y):
classes = sorted(np.unique(y))
if self.standardize:
self.scaler = StandardScaler()
X = self.scaler.fit_transform(X)
if issparse(X):
X = X.toarray()
self.mix_densities = self.get_mixture_components(X, y, classes, self.bandwidth)
return self
def predict(self, X):
"""
Searches for the mixture model parameter (the sought prevalence values) that maximizes the likelihood
of the data (i.e., that minimizes the negative log-likelihood)
:param X: instances in the sample
:return: a vector of class prevalence estimates
"""
epsilon = 1e-10
if issparse(X):
X = X.toarray()
n_classes = len(self.mix_densities)
if self.standardize:
X = self.scaler.transform(X)
test_densities = [self.pdf(kde_i, X) for kde_i in self.mix_densities]
def neg_loglikelihood(prev):
test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip (prev, test_densities))
test_loglikelihood = np.log(test_mixture_likelihood + epsilon)
return -np.sum(test_loglikelihood)
return F.optim_minimize(neg_loglikelihood, n_classes)
if __name__ == '__main__':
qp.environ['SAMPLE_SIZE'] = 250
qp.environ['N_JOBS'] = -1
min_df = 10
# dataset = 'imdb'
repeats = 10
error = 'mae'
div = 'topsoe'
# generates tuples (dataset, method, method_name)
# (the dataset is needed for methods that process the dataset differently)
def gen_methods():
for dataset in qp.datasets.REVIEWS_SENTIMENT_DATASETS:
data = qp.datasets.fetch_reviews(dataset, tfidf=False)
# bernoulli_vectorizer = CountVectorizer(min_df=min_df, binary=True)
# dxs = DxS(divergence=div, vectorizer=bernoulli_vectorizer)
# yield data, dxs, 'DxS-Bernoulli'
#
# multinomial_vectorizer = CountVectorizer(min_df=min_df, binary=False)
# dxs = DxS(divergence=div, vectorizer=multinomial_vectorizer)
# yield data, dxs, 'DxS-multinomial'
#
# tf_vectorizer = TfidfVectorizer(sublinear_tf=False, use_idf=False, min_df=min_df, norm=None)
# dxs = DxS(divergence=div, vectorizer=tf_vectorizer)
# yield data, dxs, 'DxS-TF'
#
# logtf_vectorizer = TfidfVectorizer(sublinear_tf=True, use_idf=False, min_df=min_df, norm=None)
# dxs = DxS(divergence=div, vectorizer=logtf_vectorizer)
# yield data, dxs, 'DxS-logTF'
#
# tfidf_vectorizer = TfidfVectorizer(use_idf=True, min_df=min_df, norm=None)
# dxs = DxS(divergence=div, vectorizer=tfidf_vectorizer)
# yield data, dxs, 'DxS-TFIDF'
#
# tfidf_vectorizer = TfidfVectorizer(use_idf=True, min_df=min_df, norm='l2')
# dxs = DxS(divergence=div, vectorizer=tfidf_vectorizer)
# yield data, dxs, 'DxS-TFIDF-l2'
tsr_vectorizer = TSRweighting(tsr_function=information_gain, min_df=min_df, norm='l2')
dxs = DxS(divergence=div, vectorizer=tsr_vectorizer)
yield data, dxs, 'DxS-TFTSR-l2'
data = qp.datasets.fetch_reviews(dataset, tfidf=True, min_df=min_df)
kdex = KDExML()
reduction = TruncatedSVD(n_components=100, random_state=0)
red_data = qp.data.preprocessing.instance_transformation(data, transformer=reduction, inplace=False)
yield red_data, kdex, 'KDEx'
hdy = HDy(LogisticRegression())
yield data, hdy, 'HDy'
# dm = DistributionMatchingY(LogisticRegression(), divergence=div, nbins=5)
# yield data, dm, 'DM-5b'
#
# dm = DistributionMatchingY(LogisticRegression(), divergence=div, nbins=10)
# yield data, dm, 'DM-10b'
result_path = 'results.csv'
with open(result_path, 'wt') as csv:
csv.write(f'Method\tDataset\tMAE\tMRAE\n')
for data, quantifier, quant_name in gen_methods():
quantifier.fit(*data.training.Xy)
report = qp.evaluation.evaluation_report(quantifier, APP(data.test, repeats=repeats), error_metrics=['mae','mrae'], verbose=True)
means = report.mean(numeric_only=True)
csv.write(f'{quant_name}\t{data.name}\t{means["mae"]:.5f}\t{means["mrae"]:.5f}\n')
df = pd.read_csv(result_path, sep='\t')
# print(df)
pv = df.pivot_table(index='Method', columns="Dataset", values=["MAE", "MRAE"])
print(pv)

View File

@ -13,7 +13,7 @@ from . import model_selection
from . import classification
import os
__version__ = '0.2.0'
__version__ = '0.2.1'
def _default_cls():

View File

@ -33,7 +33,6 @@ class LabelledCollection:
else:
self.instances = np.asarray(instances)
self.labels = np.asarray(labels)
n_docs = len(self)
if classes is None:
self.classes_ = F.classes_from_labels(self.labels)
else:
@ -41,7 +40,13 @@ class LabelledCollection:
self.classes_.sort()
if len(set(self.labels).difference(set(classes))) > 0:
raise ValueError(f'labels ({set(self.labels)}) contain values not included in classes_ ({set(classes)})')
self.index = {class_: np.arange(n_docs)[self.labels == class_] for class_ in self.classes_}
self._index = None
@property
def index(self):
if self._index is None:
self._index = {class_: np.arange(len(self))[self.labels == class_] for class_ in self.classes_}
return self._index
@classmethod
def load(cls, path: str, loader_func: callable, classes=None, **loader_kwargs):

View File

@ -10,6 +10,37 @@ from quapy.util import map_parallel
from .base import LabelledCollection
def instance_transformation(dataset:Dataset, transformer, inplace=False):
"""
Transforms a :class:`quapy.data.base.Dataset` applying the `fit_transform` and `transform` functions
of a (sklearn's) transformer.
:param dataset: a :class:`quapy.data.base.Dataset` where the instances of training and test collections are
lists of str
:param transformer: TransformerMixin implementing `fit_transform` and `transform` functions
:param inplace: whether or not to apply the transformation inplace (True), or to a new copy (False, default)
:return: a new :class:`quapy.data.base.Dataset` with transformed instances (if inplace=False) or a reference to the
current Dataset (if inplace=True) where the instances have been transformed
"""
training_transformed = transformer.fit_transform(*dataset.training.Xy)
test_transformed = transformer.transform(dataset.test.X)
orig_name = dataset.name
if inplace:
dataset.training = LabelledCollection(training_transformed, dataset.training.labels, dataset.classes_)
dataset.test = LabelledCollection(test_transformed, dataset.test.labels, dataset.classes_)
if hasattr(transformer, 'vocabulary_'):
dataset.vocabulary = transformer.vocabulary_
return dataset
else:
training = LabelledCollection(training_transformed, dataset.training.labels.copy(), dataset.classes_)
test = LabelledCollection(test_transformed, dataset.test.labels.copy(), dataset.classes_)
vocab = None
if hasattr(transformer, 'vocabulary_'):
vocab = transformer.vocabulary_
return Dataset(training, test, vocabulary=vocab, name=orig_name)
def text2tfidf(dataset:Dataset, min_df=3, sublinear_tf=True, inplace=False, **kwargs):
"""
Transforms a :class:`quapy.data.base.Dataset` of textual instances into a :class:`quapy.data.base.Dataset` of
@ -29,18 +60,7 @@ def text2tfidf(dataset:Dataset, min_df=3, sublinear_tf=True, inplace=False, **kw
__check_type(dataset.test.instances, np.ndarray, str)
vectorizer = TfidfVectorizer(min_df=min_df, sublinear_tf=sublinear_tf, **kwargs)
training_documents = vectorizer.fit_transform(dataset.training.instances)
test_documents = vectorizer.transform(dataset.test.instances)
if inplace:
dataset.training = LabelledCollection(training_documents, dataset.training.labels, dataset.classes_)
dataset.test = LabelledCollection(test_documents, dataset.test.labels, dataset.classes_)
dataset.vocabulary = vectorizer.vocabulary_
return dataset
else:
training = LabelledCollection(training_documents, dataset.training.labels.copy(), dataset.classes_)
test = LabelledCollection(test_documents, dataset.test.labels.copy(), dataset.classes_)
return Dataset(training, test, vectorizer.vocabulary_)
return instance_transformation(dataset, vectorizer, inplace)
def reduce_columns(dataset: Dataset, min_df=5, inplace=False):

View File

@ -9,12 +9,28 @@ import quapy.functional as F
from sklearn.metrics.pairwise import rbf_kernel
# class KDE(KernelDensity):
#
# KERNELS = ['gaussian', 'aitchison']
#
# def __init__(self, bandwidth, kernel):
# assert kernel in KDE.KERNELS, f'unknown {kernel=}'
# self.bandwidth = bandwidth
# self.kernel = kernel
#
# def
class KDEBase:
"""
Common ancestor for KDE-based methods. Implements some common routines.
"""
BANDWIDTH_METHOD = ['scott', 'silverman']
KERNELS = ['gaussian', 'aitchison']
@classmethod
def _check_bandwidth(cls, bandwidth):
@ -28,31 +44,62 @@ class KDEBase:
f'invalid bandwidth, valid ones are {KDEBase.BANDWIDTH_METHOD} or float values'
if isinstance(bandwidth, float):
assert 0 < bandwidth < 1, \
"the bandwith for KDEy should be in (0,1), since this method models the unit simplex"
"the bandwidth for KDEy should be in (0,1), since this method models the unit simplex"
return bandwidth
def get_kde_function(self, X, bandwidth):
@classmethod
def _check_kernel(cls, kernel):
"""
Checks that the kernel parameter is correct
:param kernel: str
:return: the validated kernel
"""
assert kernel in KDEBase.KERNELS, f'unknown {kernel=}'
return kernel
@classmethod
def clr_transform(cls, P, eps=1e-7):
"""
Centered-Log Ratio (CLR) transform.
P: array (n_samples, n_classes), every row is a point in the probability simplex
eps: smoothing, to avoid log(0)
"""
X_safe = np.clip(P, eps, None)
X_safe /= X_safe.sum(axis=1, keepdims=True) # renormalize
gm = np.exp(np.mean(np.log(X_safe), axis=1, keepdims=True))
return np.log(X_safe / gm)
def get_kde_function(self, X, bandwidth, kernel):
"""
Wraps the KDE function from scikit-learn.
:param X: data for which the density function is to be estimated
:param bandwidth: the bandwidth of the kernel
:param kernel: the kernel (valid ones are in KDEBase.KERNELS)
:return: a scikit-learn's KernelDensity object
"""
if kernel == 'aitchison':
X = KDEBase.clr_transform(X)
return KernelDensity(bandwidth=bandwidth).fit(X)
def pdf(self, kde, X):
def pdf(self, kde, X, kernel):
"""
Wraps the density evalution of scikit-learn's KDE. Scikit-learn returns log-scores (s), so this
function returns :math:`e^{s}`
:param kde: a previously fit KDE function
:param X: the data for which the density is to be estimated
:param kernel: the kernel (valid ones are in KDEBase.KERNELS)
:return: np.ndarray with the densities
"""
if kernel == 'aitchison':
X = KDEBase.clr_transform(X)
return np.exp(kde.score_samples(X))
def get_mixture_components(self, X, y, classes, bandwidth):
def get_mixture_components(self, X, y, classes, bandwidth, kernel):
"""
Returns an array containing the mixture components, i.e., the KDE functions for each class.
@ -60,6 +107,7 @@ class KDEBase:
:param y: the class labels
:param n_classes: integer, the number of classes
:param bandwidth: float, the bandwidth of the kernel
:param kernel: the kernel (valid ones are in KDEBase.KERNELS)
:return: a list of KernelDensity objects, each fitted with the corresponding class-specific covariates
"""
class_cond_X = []
@ -67,8 +115,12 @@ class KDEBase:
selX = X[y==cat]
if selX.size==0:
selX = [F.uniform_prevalence(len(classes))]
# if kernel == 'aitchison':
# this is already done within get_kde_function
# selX = KDEBase.clr_transform(selX)
class_cond_X.append(selX)
return [self.get_kde_function(X_cond_yi, bandwidth) for X_cond_yi in class_cond_X]
return [self.get_kde_function(X_cond_yi, bandwidth, kernel) for X_cond_yi in class_cond_X]
class KDEyML(AggregativeSoftQuantifier, KDEBase):
@ -107,17 +159,19 @@ class KDEyML(AggregativeSoftQuantifier, KDEBase):
are to be generated in a `k`-fold cross-validation manner (with this integer indicating the value
for `k`); or as a tuple (X,y) defining the specific set of data to use for validation.
:param bandwidth: float, the bandwidth of the Kernel
:param kernel: kernel of KDE, valid ones are in KDEBase.KERNELS
:param random_state: a seed to be set before fitting any base quantifier (default None)
"""
def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=0.1,
def __init__(self, classifier: BaseEstimator=None, fit_classifier=True, val_split=5, bandwidth=0.1, kernel='gaussian',
random_state=None):
super().__init__(classifier, fit_classifier, val_split)
self.bandwidth = KDEBase._check_bandwidth(bandwidth)
self.kernel = self._check_kernel(kernel)
self.random_state=random_state
def aggregation_fit(self, classif_predictions, labels):
self.mix_densities = self.get_mixture_components(classif_predictions, labels, self.classes_, self.bandwidth)
self.mix_densities = self.get_mixture_components(classif_predictions, labels, self.classes_, self.bandwidth, self.kernel)
return self
def aggregate(self, posteriors: np.ndarray):
@ -131,10 +185,11 @@ class KDEyML(AggregativeSoftQuantifier, KDEBase):
with qp.util.temp_seed(self.random_state):
epsilon = 1e-10
n_classes = len(self.mix_densities)
test_densities = [self.pdf(kde_i, posteriors) for kde_i in self.mix_densities]
test_densities = [self.pdf(kde_i, posteriors, self.kernel) for kde_i in self.mix_densities]
def neg_loglikelihood(prev):
test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip (prev, test_densities))
# test_mixture_likelihood = sum(prev_i * dens_i for prev_i, dens_i in zip (prev, test_densities))
test_mixture_likelihood = prev @ test_densities
test_loglikelihood = np.log(test_mixture_likelihood + epsilon)
return -np.sum(test_loglikelihood)

View File

@ -88,18 +88,30 @@ class WithConfidenceABC(ABC):
METHODS = ['intervals', 'ellipse', 'ellipse-clr']
@abstractmethod
def quantify_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC):
def predict_conf(self, instances, confidence_level=0.95) -> (np.ndarray, ConfidenceRegionABC):
"""
Adds the method `quantify_conf` to the interface. This method returns not only the point-estimate, but
Adds the method `predict_conf` to the interface. This method returns not only the point-estimate, but
also the confidence region around it.
:param instances: a np.ndarray of shape (n_instances, n_features,)
:confidence_level: float in (0, 1)
:param confidence_level: float in (0, 1), default is 0.95
:return: a tuple (`point_estimate`, `conf_region`), where `point_estimate` is a np.ndarray of shape
(n_classes,) and `conf_region` is an object from :class:`ConfidenceRegionABC`
"""
...
def quantify_conf(self, instances, confidence_level=0.95) -> (np.ndarray, ConfidenceRegionABC):
"""
Alias to `predict_conf`. This method returns not only the point-estimate, but
also the confidence region around it.
:param instances: a np.ndarray of shape (n_instances, n_features,)
:param confidence_level: float in (0, 1), default is 0.95
:return: a tuple (`point_estimate`, `conf_region`), where `point_estimate` is a np.ndarray of shape
(n_classes,) and `conf_region` is an object from :class:`ConfidenceRegionABC`
"""
return self.predict_conf(instances=instances, confidence_level=confidence_level)
@classmethod
def construct_region(cls, prev_estims, confidence_level=0.95, method='intervals'):
"""
@ -227,6 +239,7 @@ class ConfidenceEllipseCLR(ConfidenceRegionABC):
"""
def __init__(self, X, confidence_level=0.95):
X = np.asarray(X)
self.clr = CLRtransformation()
Z = self.clr(X)
self.mean_ = np.mean(X, axis=0)
@ -297,6 +310,9 @@ class ConfidenceIntervals(ConfidenceRegionABC):
return proportion
def __repr__(self):
return '['+', '.join(f'({low:.4f}, {high:.4f})' for (low,high) in zip(self.I_low, self.I_high))+']'
class CLRtransformation:
"""
@ -339,6 +355,12 @@ class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):
During inference, the bootstrap repetitions are applied to the pre-classified test instances.
See
`Moreo, A., Salvati, N.
An Efficient Method for Deriving Confidence Intervals in Aggregative Quantification.
Learning to Quantify: Methods and Applications (LQ 2025), co-located at ECML-PKDD 2025.
pp 12-33 <https://lq-2025.github.io/proceedings/CompleteVolume.pdf>`_
:param quantifier: an aggregative quantifier
:para n_train_samples: int, the number of training resamplings (defaults to 1, set to > 1 to activate a
model-based bootstrap approach)
@ -423,7 +445,7 @@ class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):
self.aggregation_fit(classif_predictions, labels)
return self
def quantify_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC):
def predict_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC):
predictions = self.quantifier.classify(instances)
return self.aggregate_conf(predictions, confidence_level=confidence_level)
@ -437,7 +459,7 @@ class AggregativeBootstrap(WithConfidenceABC, AggregativeQuantifier):
class BayesianCC(AggregativeCrispQuantifier, WithConfidenceABC):
"""
`Bayesian quantification <https://arxiv.org/abs/2302.09159>`_ method,
`Bayesian quantification <https://arxiv.org/abs/2302.09159>`_ method (by Albert Ziegler and Paweł Czyż),
which is a variant of :class:`ACC` that calculates the posterior probability distribution
over the prevalence vectors, rather than providing a point estimate obtained
by matrix inversion.
@ -543,7 +565,7 @@ class BayesianCC(AggregativeCrispQuantifier, WithConfidenceABC):
samples = self.sample_from_posterior(classif_predictions)[_bayesian.P_TEST_Y]
return np.asarray(samples.mean(axis=0), dtype=float)
def quantify_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC):
def predict_conf(self, instances, confidence_level=None) -> (np.ndarray, ConfidenceRegionABC):
classif_predictions = self.classify(instances)
point_estimate = self.aggregate(classif_predictions)
samples = self.get_prevalence_samples() # available after calling "aggregate" function

View File

@ -1,11 +1,17 @@
from typing import Union, Callable
from itertools import product
from tqdm import tqdm
from typing import Union, Callable, Counter
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.utils import resample
from sklearn.preprocessing import normalize
from method.confidence import WithConfidenceABC, ConfidenceRegionABC
from quapy.functional import get_divergence
from quapy.data import LabelledCollection
from quapy.method.base import BaseQuantifier, BinaryQuantifier
import quapy.functional as F
from scipy.optimize import lsq_linear
from scipy import sparse
class MaximumLikelihoodPrevalenceEstimation(BaseQuantifier):
@ -149,53 +155,164 @@ class DMx(BaseQuantifier):
return F.argmin_prevalence(loss, n_classes, method=self.search)
# class ReadMe(BaseQuantifier):
#
# def __init__(self, bootstrap_trials=100, bootstrap_range=100, bagging_trials=100, bagging_range=25, **vectorizer_kwargs):
# raise NotImplementedError('under development ...')
# self.bootstrap_trials = bootstrap_trials
# self.bootstrap_range = bootstrap_range
# self.bagging_trials = bagging_trials
# self.bagging_range = bagging_range
# self.vectorizer_kwargs = vectorizer_kwargs
#
# def fit(self, data: LabelledCollection):
# X, y = data.Xy
# self.vectorizer = CountVectorizer(binary=True, **self.vectorizer_kwargs)
# X = self.vectorizer.fit_transform(X)
# self.class_conditional_X = {i: X[y==i] for i in range(data.classes_)}
#
# def predict(self, X):
# X = self.vectorizer.transform(X)
#
# # number of features
# num_docs, num_feats = X.shape
#
# # bootstrap
# p_boots = []
# for _ in range(self.bootstrap_trials):
# docs_idx = np.random.choice(num_docs, size=self.bootstra_range, replace=False)
# class_conditional_X = {i: X[docs_idx] for i, X in self.class_conditional_X.items()}
# Xboot = X[docs_idx]
#
# # bagging
# p_bags = []
# for _ in range(self.bagging_trials):
# feat_idx = np.random.choice(num_feats, size=self.bagging_range, replace=False)
# class_conditional_Xbag = {i: X[:, feat_idx] for i, X in class_conditional_X.items()}
# Xbag = Xboot[:,feat_idx]
# p = self.std_constrained_linear_ls(Xbag, class_conditional_Xbag)
# p_bags.append(p)
# p_boots.append(np.mean(p_bags, axis=0))
#
# p_mean = np.mean(p_boots, axis=0)
# p_std = np.std(p_bags, axis=0)
#
# return p_mean
#
#
# def std_constrained_linear_ls(self, X, class_cond_X: dict):
# pass
class ReadMe(BaseQuantifier, WithConfidenceABC):
"""
ReadMe is a non-aggregative quantification system proposed by
`Daniel Hopkins and Gary King, 2007. A method of automated nonparametric content analysis for
social science. American Journal of Political Science, 54(1):229247.
<https://onlinelibrary.wiley.com/doi/abs/10.1111/j.1540-5907.2009.00428.x>`_.
The idea is to estimate `Q(Y=i)` directly from:
:math:`Q(X)=\\sum_{i=1} Q(X|Y=i) Q(Y=i)`
via least-squares regression, i.e., without incurring the cost of computing posterior probabilities.
However, this poses a very difficult representation in which the vector `Q(X)` and the matrix `Q(X|Y=i)`
can be of very high dimensions. In order to render the problem tracktable, ReadMe performs bagging in
the feature space. ReadMe also combines bagging with bootstrap in order to derive confidence intervals
around point estimations.
We use the same default parameters as in the official
`R implementation <https://github.com/iqss-research/ReadMeV1/blob/master/R/prototype.R>`_.
:param prob_model: str ('naive', or 'full'), selects the modality in which the probabilities `Q(X)` and
`Q(X|Y)` are to be modelled. Options include "full", which corresponds to the original formulation of
ReadMe, in which X is constrained to be a binary matrix (e.g., of term presence/absence) and in which
`Q(X)` and `Q(X|Y)` are modelled, respectively, as matrices of `(2^K, 1)` and `(2^K, n)` values, where
`K` is the number of columns in the data matrix (i.e., `bagging_range`), and `n` is the number of classes.
Of course, this approach is computationally prohibited for large `K`, so the computation is restricted to data
matrices with `K<=25` (although we recommend even smaller values of `K`). A much faster model is "naive", which
considers the `Q(X)` and `Q(X|Y)` be multinomial distributions under the `bag-of-words` perspective. In this
case, `bagging_range` can be set to much larger values. Default is "full" (i.e., original ReadMe behavior).
:param bootstrap_trials: int, number of bootstrap trials (default 300)
:param bagging_trials: int, number of bagging trials (default 300)
:param bagging_range: int, number of features to keep for each bagging trial (default 15)
:param confidence_level: float, a value in (0,1) reflecting the desired confidence level (default 0.95)
:param region: str in 'intervals', 'ellipse', 'ellipse-clr'; indicates the preferred method for
defining the confidence region (see :class:`WithConfidenceABC`)
:param random_state: int or None, allows replicability (default None)
:param verbose: bool, whether to display information during the process (default False)
"""
MAX_FEATURES_FOR_EMPIRICAL_ESTIMATION = 25
PROBABILISTIC_MODELS = ["naive", "full"]
def __init__(self,
prob_model="full",
bootstrap_trials=300,
bagging_trials=300,
bagging_range=15,
confidence_level=0.95,
region='intervals',
random_state=None,
verbose=False):
assert prob_model in ReadMe.PROBABILISTIC_MODELS, \
f'unknown {prob_model=}, valid ones are {ReadMe.PROBABILISTIC_MODELS=}'
self.prob_model = prob_model
self.bootstrap_trials = bootstrap_trials
self.bagging_trials = bagging_trials
self.bagging_range = bagging_range
self.confidence_level = confidence_level
self.region = region
self.random_state = random_state
self.verbose = verbose
def fit(self, X, y):
self._check_matrix(X)
self.rng = np.random.default_rng(self.random_state)
self.classes_ = np.unique(y)
Xsize = X.shape[0]
# Bootstrap loop
self.Xboots, self.yboots = [], []
for _ in range(self.bootstrap_trials):
idx = self.rng.choice(Xsize, size=Xsize, replace=True)
self.Xboots.append(X[idx])
self.yboots.append(y[idx])
return self
def predict_conf(self, X, confidence_level=0.95) -> (np.ndarray, ConfidenceRegionABC):
self._check_matrix(X)
n_features = X.shape[1]
boots_prevalences = []
for Xboots, yboots in tqdm(
zip(self.Xboots, self.yboots),
desc='bootstrap predictions', total=self.bootstrap_trials, disable=not self.verbose
):
bagging_estimates = []
for _ in range(self.bagging_trials):
feat_idx = self.rng.choice(n_features, size=self.bagging_range, replace=False)
Xboots_bagging = Xboots[:, feat_idx]
X_boots_bagging = X[:, feat_idx]
bagging_prev = self._quantify_iteration(Xboots_bagging, yboots, X_boots_bagging)
bagging_estimates.append(bagging_prev)
boots_prevalences.append(np.mean(bagging_estimates, axis=0))
conf = WithConfidenceABC.construct_region(boots_prevalences, confidence_level, method=self.region)
prev_estim = conf.point_estimate()
return prev_estim, conf
def predict(self, X):
prev_estim, _ = self.predict_conf(X)
return prev_estim
def _quantify_iteration(self, Xtr, ytr, Xte):
"""Single ReadMe estimate."""
PX_given_Y = np.asarray([self._compute_P(Xtr[ytr == c]) for i,c in enumerate(self.classes_)])
PX = self._compute_P(Xte)
res = lsq_linear(A=PX_given_Y.T, b=PX, bounds=(0, 1))
pY = np.maximum(res.x, 0)
return pY / pY.sum()
def _check_matrix(self, X):
"""the "full" model requires estimating empirical distributions; due to the high computational cost,
this function is only made available for binary matrices"""
if self.prob_model == 'full' and not self._is_binary_matrix(X):
raise ValueError('the empirical distribution can only be computed efficiently on binary matrices')
def _is_binary_matrix(self, X):
data = X.data if sparse.issparse(X) else X
return np.all((data == 0) | (data == 1))
def _compute_P(self, X):
if self.prob_model == 'naive':
return self._multinomial_distribution(X)
elif self.prob_model == 'full':
return self._empirical_distribution(X)
else:
raise ValueError(f'unknown {self.prob_model}; valid ones are {ReadMe.PROBABILISTIC_MODELS=}')
def _empirical_distribution(self, X):
if X.shape[1] > self.MAX_FEATURES_FOR_EMPIRICAL_ESTIMATION:
raise ValueError(f'the empirical distribution can only be computed efficiently for dimensions '
f'less or equal than {self.MAX_FEATURES_FOR_EMPIRICAL_ESTIMATION}')
# we convert every binary row (e.g., 0 0 1 0 1) into the equivalent number (e.g., 5)
K = X.shape[1]
binary_powers = 1 << np.arange(K-1, -1, -1) # (2^K, ..., 32, 16, 8, 4, 2, 1)
X_as_binary_numbers = X @ binary_powers
# count occurrences and compute probs
counts = np.bincount(X_as_binary_numbers, minlength=2 ** K).astype(float)
probs = counts / counts.sum()
return probs
def _multinomial_distribution(self, X):
PX = np.asarray(X.sum(axis=0))
PX = normalize(PX, norm='l1', axis=1)
return PX.ravel()
def _get_features_range(X):