diff --git a/examples/bayesian_quantification.py b/examples/bayesian_quantification.py index 3bca084..06c928c 100644 --- a/examples/bayesian_quantification.py +++ b/examples/bayesian_quantification.py @@ -20,30 +20,21 @@ Due to a low sample size and the fact that classes 2 and 3 are hard to distingui it is hard to estimate the proportions accurately, what is visible by looking at the posterior samples, showing large uncertainty. """ -from dataclasses import dataclass import numpy as np import matplotlib.pyplot as plt +import quapy as qp from sklearn.ensemble import RandomForestClassifier from quapy.method.aggregative import BayesianCC, ACC, PACC -from quapy.data import LabelledCollection +from quapy.data import LabelledCollection, Dataset + FIGURE_PATH = "bayesian_quantification.pdf" -@dataclass -class SimulatedData: - """Auxiliary class to keep the training and test data sets.""" - n_classes: int - X_train: np.ndarray - Y_train: np.ndarray - X_test: np.ndarray - Y_test: np.ndarray - - -def simulate_data(rng) -> SimulatedData: +def simulate_data(rng) -> Dataset: """Generates a simulated data set with three classes.""" # Number of examples of each class in both data sets @@ -53,44 +44,33 @@ def simulate_data(rng) -> SimulatedData: # Mean vectors and shared covariance of P(X|Y) distributions mus = [np.zeros(2), np.array([1, 1.5]), np.array([1.5, 1])] cov = np.eye(2) - + + def gen_Xy(centers, sizes): + X = np.concatenate([rng.multivariate_normal(mu_i, cov, size_i) for mu_i, size_i in zip(centers, sizes)]) + y = np.concatenate([[i] * n for i, n in enumerate(sizes)]) + return X, y + # Generate the features accordingly - X_train = np.concatenate([ - rng.multivariate_normal(mus[i], cov, size=n_train[i]) - for i in range(3) - ]) + train = LabelledCollection(*gen_Xy(centers=mus, sizes=n_train)) + test = LabelledCollection(*gen_Xy(centers=mus, sizes=n_test)) - X_test = np.concatenate([ - rng.multivariate_normal(mus[i], cov, size=n_test[i]) - for i in range(3) - ]) - - Y_train = np.concatenate([[i] * n for i, n in enumerate(n_train)]) - Y_test = np.concatenate([[i] * n for i, n in enumerate(n_test)]) - - return SimulatedData( - n_classes=3, - X_train=X_train, - X_test=X_test, - Y_train=Y_train, - Y_test=Y_test, - ) + return Dataset(training=train, test=test) -def plot_simulated_data(axs, data: SimulatedData) -> None: +def plot_simulated_data(axs, data: Dataset) -> None: """Plots a simulated data set. - - Args: - axs: a list of three `plt.Axes` objects, on which the samples will be plotted. - data: the simulated data set. + + :param axs: a list of three `plt.Axes` objects, on which the samples will be plotted. + :param data: the simulated data set. """ + train, test = data.train_test xlim = ( - -0.3 + min(data.X_train[:, 0].min(), data.X_test[:, 0].min()), - 0.3 + max(data.X_train[:, 0].max(), data.X_test[:, 0].max()) + -0.3 + min(train.X[:, 0].min(), test.X[:, 0].min()), + 0.3 + max(train.X[:, 0].max(), test.X[:, 0].max()) ) ylim = ( - -0.3 + min(data.X_train[:, 1].min(), data.X_test[:, 1].min()), - 0.3 + max(data.X_train[:, 1].max(), data.X_test[:, 1].max()) + -0.3 + min(train.X[:, 1].min(), test.X[:, 1].min()), + 0.3 + max(train.X[:, 1].max(), test.X[:, 1].max()) ) for ax in axs: @@ -105,63 +85,23 @@ def plot_simulated_data(axs, data: SimulatedData) -> None: ax = axs[0] ax.set_title("Training set") for i in range(data.n_classes): - ax.scatter(data.X_train[data.Y_train == i, 0], data.X_train[data.Y_train == i, 1], c=f"C{i}", s=3, rasterized=True) + ax.scatter(train.X[train.y == i, 0], train.X[train.y == i, 1], c=f"C{i}", s=3, rasterized=True) ax = axs[1] ax.set_title("Test set\n(with labels)") for i in range(data.n_classes): - ax.scatter(data.X_test[data.Y_test == i, 0], data.X_test[data.Y_test == i, 1], c=f"C{i}", s=3, rasterized=True) + ax.scatter(test.X[test.y == i, 0], test.X[test.y == i, 1], c=f"C{i}", s=3, rasterized=True) ax = axs[2] ax.set_title("Test set\n(as observed)") - ax.scatter(data.X_test[:, 0], data.X_test[:, 1], c="C5", s=3, rasterized=True) + ax.scatter(test.X[:, 0], test.X[:, 1], c="C5", s=3, rasterized=True) -def get_random_forest() -> RandomForestClassifier: - """An auxiliary factory method to generate a random forest.""" - return RandomForestClassifier(n_estimators=10, random_state=5) - - -def train_and_plot_bayesian_quantification(ax: plt.Axes, training: LabelledCollection, test: np.ndarray, n_classes: int) -> None: - """Fits Bayesian quantification and plots posterior mean as well as individual samples""" - quantifier = BayesianCC(classifier=get_random_forest()) - quantifier.fit(training) - - # Obtain mean prediction - mean_prediction = quantifier.quantify(test) - x_ax = np.arange(n_classes) - ax.plot(x_ax, mean_prediction, c="salmon", linewidth=2, linestyle=":", label="Bayesian") - - # Obtain individual samples - samples = quantifier.get_prevalence_samples() - for sample in samples[::5, :]: - ax.plot(x_ax, sample, c="salmon", alpha=0.1, linewidth=0.3, rasterized=True) - - -def _get_estimate(estimator_class, training: LabelledCollection, test: np.ndarray) -> None: - """Auxiliary method for running ACC and PACC.""" - estimator = estimator_class(get_random_forest()) - estimator.fit(training) - return estimator.quantify(test) - - -def train_and_plot_acc(ax: plt.Axes, training: LabelledCollection, test: np.ndarray, n_classes: int) -> None: - estimate = _get_estimate(ACC, training, test) - ax.plot(np.arange(n_classes), estimate, c="darkblue", linewidth=2, linestyle=":", label="ACC") - - -def train_and_plot_pacc(ax: plt.Axes, training: LabelledCollection, test: np.ndarray, n_classes: int) -> None: - estimate = _get_estimate(PACC, training, test) - ax.plot(np.arange(n_classes), estimate, c="limegreen", linewidth=2, linestyle=":", label="PACC") - - -def plot_true_proportions(ax: plt.Axes, test_labels: np.ndarray, n_classes: int) -> None: +def plot_true_proportions(ax: plt.Axes, test_prevalence: np.ndarray) -> None: """Plots the true proportions.""" - counts = np.bincount(test_labels, minlength=n_classes) - proportion = counts / counts.sum() - + n_classes = len(test_prevalence) x_ax = np.arange(n_classes) - ax.plot(x_ax, proportion, c="black", linewidth=2, label="True") + ax.plot(x_ax, test_prevalence, c="black", linewidth=2, label="True") ax.set_xlabel("Class") ax.set_ylabel("Prevalence") @@ -171,11 +111,59 @@ def plot_true_proportions(ax: plt.Axes, test_labels: np.ndarray, n_classes: int) ax.set_ylim(-0.01, 1.01) +def get_random_forest() -> RandomForestClassifier: + """An auxiliary factory method to generate a random forest.""" + return RandomForestClassifier(n_estimators=10, random_state=5) + + +def _get_estimate(estimator_class, training: LabelledCollection, test: np.ndarray) -> None: + """Auxiliary method for running ACC and PACC.""" + estimator = estimator_class(get_random_forest()) + estimator.fit(training) + return estimator.quantify(test) + + +def train_and_plot_bayesian_quantification(ax: plt.Axes, training: LabelledCollection, test: LabelledCollection) -> None: + """Fits Bayesian quantification and plots posterior mean as well as individual samples""" + print('training model Bayesian CC...', end='') + quantifier = BayesianCC(classifier=get_random_forest()) + quantifier.fit(training) + + # Obtain mean prediction + mean_prediction = quantifier.quantify(test.X) + mae = qp.error.mae(test.prevalence(), mean_prediction) + x_ax = np.arange(training.n_classes) + ax.plot(x_ax, mean_prediction, c="salmon", linewidth=2, linestyle=":", label="Bayesian") + + # Obtain individual samples + samples = quantifier.get_prevalence_samples() + for sample in samples[::5, :]: + ax.plot(x_ax, sample, c="salmon", alpha=0.1, linewidth=0.3, rasterized=True) + print(f'MAE={mae:.4f} [done]') + + +def train_and_plot_acc(ax: plt.Axes, training: LabelledCollection, test: LabelledCollection) -> None: + print('training model ACC...', end='') + estimate = _get_estimate(ACC, training, test.X) + mae = qp.error.mae(test.prevalence(), estimate) + ax.plot(np.arange(training.n_classes), estimate, c="darkblue", linewidth=2, linestyle=":", label="ACC") + print(f'MAE={mae:.4f} [done]') + + +def train_and_plot_pacc(ax: plt.Axes, training: LabelledCollection, test: LabelledCollection) -> None: + print('training model PACC...', end='') + estimate = _get_estimate(PACC, training, test.X) + mae = qp.error.mae(test.prevalence(), estimate) + ax.plot(np.arange(training.n_classes), estimate, c="limegreen", linewidth=2, linestyle=":", label="PACC") + print(f'MAE={mae:.4f} [done]') + def main() -> None: # --- Simulate data --- + print('generating simulated data') rng = np.random.default_rng(42) data = simulate_data(rng) + training, test = data.train_test # --- Plot simulated data --- fig, axs = plt.subplots(1, 4, figsize=(13, 3), dpi=300) @@ -185,17 +173,19 @@ def main() -> None: # --- Plot quantification results --- ax = axs[3] - plot_true_proportions(ax, test_labels=data.Y_test, n_classes=data.n_classes) - - training = LabelledCollection(data.X_train, data.Y_train) - train_and_plot_acc(ax, training=training, test=data.X_test, n_classes=data.n_classes) - train_and_plot_pacc(ax, training=training, test=data.X_test, n_classes=data.n_classes) - train_and_plot_bayesian_quantification(ax=ax, training=training, test=data.X_test, n_classes=data.n_classes) + plot_true_proportions(ax, test_prevalence=test.prevalence()) + + train_and_plot_acc(ax, training=training, test=test) + train_and_plot_pacc(ax, training=training, test=test) + train_and_plot_bayesian_quantification(ax=ax, training=training, test=test) + print('[done]') ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left', frameon=False) + print(f'saving plot in path {FIGURE_PATH}...', end='') fig.tight_layout() fig.savefig(FIGURE_PATH) + print('[done]') if __name__ == '__main__':