import numpy as np import pandas as pd from sklearn.calibration import CalibratedClassifierCV from sklearn.linear_model import LogisticRegression, LogisticRegressionCV from quapy.data import LabelledCollection from quapy.model_selection import GridSearchQ from quapy.protocol import APP from quapy.method.aggregative import PACC, PCC, EMQ, DMy, ACC, KDEyML, CC import quapy.functional as F def show_data(X, y=None, nbins=50): import matplotlib.pyplot as plt if y is None: plt.hist(X, bins=nbins, edgecolor='black') else: pos = X[y==1] neg = X[y==0] bins = np.histogram_bin_edges(X, bins=nbins) plt.hist(pos, bins=bins, edgecolor='black', label='positive', alpha=0.5) plt.hist(neg, bins=bins, edgecolor='black', label='negative', alpha=0.5) plt.xlabel('value') plt.ylabel('frequency') plt.show() df = pd.read_csv('./data/Simulated_PopulationData.csv', index_col=0) X = df.X.values.reshape(-1,1) y = df.Y.values A = df.area.values # X[y==1] += 2 show_data(X, y, nbins=50) show_data(X, nbins=50) areas = sorted(np.unique(A)) n_areas = len(areas) N_EXPERIMENTS=2 # print(list(df.columns)) for experiment_id in range(1, N_EXPERIMENTS+1): in_sample = df[f'InSample_{experiment_id}'].values.astype(dtype=bool) Xtr = X[in_sample] ytr = y[in_sample] Atr = A[in_sample] show_data(Xtr, ytr) show_data(Xtr) # Xte = X[~in_sample] # yte = y[~in_sample] # Ate = A[~in_sample] # baseline_soft = df[f'PrCens_{experiment_id}'].values[~in_sample] # baseline_hard = df[f'YCens_{experiment_id}'].values[~in_sample] Xte = X yte = y Ate = A baseline_soft = df[f'PrCens_{experiment_id}'].values baseline_hard = df[f'YCens_{experiment_id}'].values train = LabelledCollection(Xtr, ytr, classes=[0, 1]) # print(f'Experiment {experiment_id}: training prevalence = {train.prevalence()[1]:.3f}') q = CC(classifier=LogisticRegression()) # q = PACC(classifier=LogisticRegression()) # q = EMQ(classifier=LogisticRegression()) # q = KDEyML(classifier=LogisticRegression(), bandwidth=0.001) q = PCC(classifier=LogisticRegression(C=1)) # q = DMy(classifier=LogisticRegression(), nbins=16) q.fit(train) # tr, val = train.split_stratified(random_state=0) # mod_sel = GridSearchQ( # model=q, # param_grid={ # 'classifier__C':np.logspace(-3,3,7), # 'classifier__class_weight':['balance', None], # 'bandwidth': np.linspace(0.02, 0.20, 19) # }, # protocol=APP(data=val, sample_size=100, n_prevalences=21, repeats=10, random_state=0), # refit=True, # n_jobs=-1 # ).fit(tr) # q = mod_sel.best_model_ mae = [] mae_baseline_soft = [] mae_baseline_hard = [] for area in areas: # sel_tr_a = Atr == area sel_te_a = Ate == area # train_A = LabelledCollection(Xtr[sel_tr_a], ytr[sel_tr_a], classes=[0,1]) test_A = LabelledCollection(Xte[sel_te_a], yte[sel_te_a], classes=[0,1]) # if np.prod(train_A.prevalence())==0: continue # print(f'train-prev A = {train_A.prevalence()} n_instances={len(train_A)}') # q = DMy(classifier=LogisticRegression()) # q.fit(train_A) pred_prev = q.quantify(test_A.X)[1] true_prev = test_A.prevalence()[1] ae = abs(pred_prev-true_prev) mae.append(ae) baseline_soft_estim = np.mean(baseline_soft[sel_te_a]) ae_baseline_soft = abs(baseline_soft_estim-true_prev) mae_baseline_soft.append(ae_baseline_soft) baseline_hard_estim = np.mean(baseline_hard[sel_te_a]) ae_baseline_hard = abs(baseline_hard_estim - true_prev) mae_baseline_hard.append(ae_baseline_hard) print(f'Area {area} true={true_prev:.2f} ' f'baseline-soft={baseline_soft_estim:.3f} (AE={ae_baseline_soft:.3f}) ' f'baseline-hard={baseline_hard_estim:.3f} (AE={ae_baseline_hard:.3f}) ' f'predicted={pred_prev:.3f} (AE={ae:.3f})') mae = np.mean(mae) mae_baseline_soft = np.mean(mae_baseline_soft) mae_baseline_hard = np.mean(mae_baseline_hard) print(f'Experiment {experiment_id} Baseline(soft)={mae_baseline_soft:.3f} Baseline(hard)={mae_baseline_hard:.3f} MAE={mae:.3f}')