small refactoring to reuse labelled collections and dataset classes instead of new dataclasses specific to it

This commit is contained in:
Alejandro Moreo Fernandez 2024-03-18 11:36:27 +01:00
parent 2db7cf20bd
commit 6ca89d0e55
1 changed files with 84 additions and 94 deletions

View File

@ -20,30 +20,21 @@ Due to a low sample size and the fact that classes 2 and 3 are hard to distingui
it is hard to estimate the proportions accurately, what is visible by looking at the posterior samples, it is hard to estimate the proportions accurately, what is visible by looking at the posterior samples,
showing large uncertainty. showing large uncertainty.
""" """
from dataclasses import dataclass
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import quapy as qp
from sklearn.ensemble import RandomForestClassifier from sklearn.ensemble import RandomForestClassifier
from quapy.method.aggregative import BayesianCC, ACC, PACC from quapy.method.aggregative import BayesianCC, ACC, PACC
from quapy.data import LabelledCollection from quapy.data import LabelledCollection, Dataset
FIGURE_PATH = "bayesian_quantification.pdf" FIGURE_PATH = "bayesian_quantification.pdf"
@dataclass def simulate_data(rng) -> Dataset:
class SimulatedData:
"""Auxiliary class to keep the training and test data sets."""
n_classes: int
X_train: np.ndarray
Y_train: np.ndarray
X_test: np.ndarray
Y_test: np.ndarray
def simulate_data(rng) -> SimulatedData:
"""Generates a simulated data set with three classes.""" """Generates a simulated data set with three classes."""
# Number of examples of each class in both data sets # Number of examples of each class in both data sets
@ -54,43 +45,32 @@ def simulate_data(rng) -> SimulatedData:
mus = [np.zeros(2), np.array([1, 1.5]), np.array([1.5, 1])] mus = [np.zeros(2), np.array([1, 1.5]), np.array([1.5, 1])]
cov = np.eye(2) cov = np.eye(2)
def gen_Xy(centers, sizes):
X = np.concatenate([rng.multivariate_normal(mu_i, cov, size_i) for mu_i, size_i in zip(centers, sizes)])
y = np.concatenate([[i] * n for i, n in enumerate(sizes)])
return X, y
# Generate the features accordingly # Generate the features accordingly
X_train = np.concatenate([ train = LabelledCollection(*gen_Xy(centers=mus, sizes=n_train))
rng.multivariate_normal(mus[i], cov, size=n_train[i]) test = LabelledCollection(*gen_Xy(centers=mus, sizes=n_test))
for i in range(3)
])
X_test = np.concatenate([ return Dataset(training=train, test=test)
rng.multivariate_normal(mus[i], cov, size=n_test[i])
for i in range(3)
])
Y_train = np.concatenate([[i] * n for i, n in enumerate(n_train)])
Y_test = np.concatenate([[i] * n for i, n in enumerate(n_test)])
return SimulatedData(
n_classes=3,
X_train=X_train,
X_test=X_test,
Y_train=Y_train,
Y_test=Y_test,
)
def plot_simulated_data(axs, data: SimulatedData) -> None: def plot_simulated_data(axs, data: Dataset) -> None:
"""Plots a simulated data set. """Plots a simulated data set.
Args: :param axs: a list of three `plt.Axes` objects, on which the samples will be plotted.
axs: a list of three `plt.Axes` objects, on which the samples will be plotted. :param data: the simulated data set.
data: the simulated data set.
""" """
train, test = data.train_test
xlim = ( xlim = (
-0.3 + min(data.X_train[:, 0].min(), data.X_test[:, 0].min()), -0.3 + min(train.X[:, 0].min(), test.X[:, 0].min()),
0.3 + max(data.X_train[:, 0].max(), data.X_test[:, 0].max()) 0.3 + max(train.X[:, 0].max(), test.X[:, 0].max())
) )
ylim = ( ylim = (
-0.3 + min(data.X_train[:, 1].min(), data.X_test[:, 1].min()), -0.3 + min(train.X[:, 1].min(), test.X[:, 1].min()),
0.3 + max(data.X_train[:, 1].max(), data.X_test[:, 1].max()) 0.3 + max(train.X[:, 1].max(), test.X[:, 1].max())
) )
for ax in axs: for ax in axs:
@ -105,63 +85,23 @@ def plot_simulated_data(axs, data: SimulatedData) -> None:
ax = axs[0] ax = axs[0]
ax.set_title("Training set") ax.set_title("Training set")
for i in range(data.n_classes): for i in range(data.n_classes):
ax.scatter(data.X_train[data.Y_train == i, 0], data.X_train[data.Y_train == i, 1], c=f"C{i}", s=3, rasterized=True) ax.scatter(train.X[train.y == i, 0], train.X[train.y == i, 1], c=f"C{i}", s=3, rasterized=True)
ax = axs[1] ax = axs[1]
ax.set_title("Test set\n(with labels)") ax.set_title("Test set\n(with labels)")
for i in range(data.n_classes): for i in range(data.n_classes):
ax.scatter(data.X_test[data.Y_test == i, 0], data.X_test[data.Y_test == i, 1], c=f"C{i}", s=3, rasterized=True) ax.scatter(test.X[test.y == i, 0], test.X[test.y == i, 1], c=f"C{i}", s=3, rasterized=True)
ax = axs[2] ax = axs[2]
ax.set_title("Test set\n(as observed)") ax.set_title("Test set\n(as observed)")
ax.scatter(data.X_test[:, 0], data.X_test[:, 1], c="C5", s=3, rasterized=True) ax.scatter(test.X[:, 0], test.X[:, 1], c="C5", s=3, rasterized=True)
def get_random_forest() -> RandomForestClassifier: def plot_true_proportions(ax: plt.Axes, test_prevalence: np.ndarray) -> None:
"""An auxiliary factory method to generate a random forest."""
return RandomForestClassifier(n_estimators=10, random_state=5)
def train_and_plot_bayesian_quantification(ax: plt.Axes, training: LabelledCollection, test: np.ndarray, n_classes: int) -> None:
"""Fits Bayesian quantification and plots posterior mean as well as individual samples"""
quantifier = BayesianCC(classifier=get_random_forest())
quantifier.fit(training)
# Obtain mean prediction
mean_prediction = quantifier.quantify(test)
x_ax = np.arange(n_classes)
ax.plot(x_ax, mean_prediction, c="salmon", linewidth=2, linestyle=":", label="Bayesian")
# Obtain individual samples
samples = quantifier.get_prevalence_samples()
for sample in samples[::5, :]:
ax.plot(x_ax, sample, c="salmon", alpha=0.1, linewidth=0.3, rasterized=True)
def _get_estimate(estimator_class, training: LabelledCollection, test: np.ndarray) -> None:
"""Auxiliary method for running ACC and PACC."""
estimator = estimator_class(get_random_forest())
estimator.fit(training)
return estimator.quantify(test)
def train_and_plot_acc(ax: plt.Axes, training: LabelledCollection, test: np.ndarray, n_classes: int) -> None:
estimate = _get_estimate(ACC, training, test)
ax.plot(np.arange(n_classes), estimate, c="darkblue", linewidth=2, linestyle=":", label="ACC")
def train_and_plot_pacc(ax: plt.Axes, training: LabelledCollection, test: np.ndarray, n_classes: int) -> None:
estimate = _get_estimate(PACC, training, test)
ax.plot(np.arange(n_classes), estimate, c="limegreen", linewidth=2, linestyle=":", label="PACC")
def plot_true_proportions(ax: plt.Axes, test_labels: np.ndarray, n_classes: int) -> None:
"""Plots the true proportions.""" """Plots the true proportions."""
counts = np.bincount(test_labels, minlength=n_classes) n_classes = len(test_prevalence)
proportion = counts / counts.sum()
x_ax = np.arange(n_classes) x_ax = np.arange(n_classes)
ax.plot(x_ax, proportion, c="black", linewidth=2, label="True") ax.plot(x_ax, test_prevalence, c="black", linewidth=2, label="True")
ax.set_xlabel("Class") ax.set_xlabel("Class")
ax.set_ylabel("Prevalence") ax.set_ylabel("Prevalence")
@ -171,11 +111,59 @@ def plot_true_proportions(ax: plt.Axes, test_labels: np.ndarray, n_classes: int)
ax.set_ylim(-0.01, 1.01) ax.set_ylim(-0.01, 1.01)
def get_random_forest() -> RandomForestClassifier:
"""An auxiliary factory method to generate a random forest."""
return RandomForestClassifier(n_estimators=10, random_state=5)
def _get_estimate(estimator_class, training: LabelledCollection, test: np.ndarray) -> None:
"""Auxiliary method for running ACC and PACC."""
estimator = estimator_class(get_random_forest())
estimator.fit(training)
return estimator.quantify(test)
def train_and_plot_bayesian_quantification(ax: plt.Axes, training: LabelledCollection, test: LabelledCollection) -> None:
"""Fits Bayesian quantification and plots posterior mean as well as individual samples"""
print('training model Bayesian CC...', end='')
quantifier = BayesianCC(classifier=get_random_forest())
quantifier.fit(training)
# Obtain mean prediction
mean_prediction = quantifier.quantify(test.X)
mae = qp.error.mae(test.prevalence(), mean_prediction)
x_ax = np.arange(training.n_classes)
ax.plot(x_ax, mean_prediction, c="salmon", linewidth=2, linestyle=":", label="Bayesian")
# Obtain individual samples
samples = quantifier.get_prevalence_samples()
for sample in samples[::5, :]:
ax.plot(x_ax, sample, c="salmon", alpha=0.1, linewidth=0.3, rasterized=True)
print(f'MAE={mae:.4f} [done]')
def train_and_plot_acc(ax: plt.Axes, training: LabelledCollection, test: LabelledCollection) -> None:
print('training model ACC...', end='')
estimate = _get_estimate(ACC, training, test.X)
mae = qp.error.mae(test.prevalence(), estimate)
ax.plot(np.arange(training.n_classes), estimate, c="darkblue", linewidth=2, linestyle=":", label="ACC")
print(f'MAE={mae:.4f} [done]')
def train_and_plot_pacc(ax: plt.Axes, training: LabelledCollection, test: LabelledCollection) -> None:
print('training model PACC...', end='')
estimate = _get_estimate(PACC, training, test.X)
mae = qp.error.mae(test.prevalence(), estimate)
ax.plot(np.arange(training.n_classes), estimate, c="limegreen", linewidth=2, linestyle=":", label="PACC")
print(f'MAE={mae:.4f} [done]')
def main() -> None: def main() -> None:
# --- Simulate data --- # --- Simulate data ---
print('generating simulated data')
rng = np.random.default_rng(42) rng = np.random.default_rng(42)
data = simulate_data(rng) data = simulate_data(rng)
training, test = data.train_test
# --- Plot simulated data --- # --- Plot simulated data ---
fig, axs = plt.subplots(1, 4, figsize=(13, 3), dpi=300) fig, axs = plt.subplots(1, 4, figsize=(13, 3), dpi=300)
@ -185,17 +173,19 @@ def main() -> None:
# --- Plot quantification results --- # --- Plot quantification results ---
ax = axs[3] ax = axs[3]
plot_true_proportions(ax, test_labels=data.Y_test, n_classes=data.n_classes) plot_true_proportions(ax, test_prevalence=test.prevalence())
training = LabelledCollection(data.X_train, data.Y_train) train_and_plot_acc(ax, training=training, test=test)
train_and_plot_acc(ax, training=training, test=data.X_test, n_classes=data.n_classes) train_and_plot_pacc(ax, training=training, test=test)
train_and_plot_pacc(ax, training=training, test=data.X_test, n_classes=data.n_classes) train_and_plot_bayesian_quantification(ax=ax, training=training, test=test)
train_and_plot_bayesian_quantification(ax=ax, training=training, test=data.X_test, n_classes=data.n_classes) print('[done]')
ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left', frameon=False) ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left', frameon=False)
print(f'saving plot in path {FIGURE_PATH}...', end='')
fig.tight_layout() fig.tight_layout()
fig.savefig(FIGURE_PATH) fig.savefig(FIGURE_PATH)
print('[done]')
if __name__ == '__main__': if __name__ == '__main__':