QuAcc/jiang18_trustscore/trustscore_evaluation.py

287 lines
11 KiB
Python

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from sklearn.model_selection import StratifiedShuffleSplit
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
import matplotlib.cm as cm
from sklearn.metrics import precision_recall_curve
import tensorflow as tf
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier
def run_logistic(X_train, y_train, X_test, y_test, get_training=False):
model = LogisticRegression()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
all_confidence = model.predict_proba(X_test)
confidences = all_confidence[range(len(y_pred)), y_pred]
if not get_training:
return y_pred, confidences
y_pred_training = model.predict(X_train)
all_confidence_training = model.predict_proba(X_train)
confidence_training = all_confidence_training[range(len(y_pred_training)),
y_pred_training]
return y_pred, confidences, y_pred_training, confidence_training
def run_linear_svc(X_train, y_train, X_test, y_test, get_training=False):
model = LinearSVC()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
all_confidence = model.decision_function(X_test)
confidences = all_confidence[range(len(y_pred)), y_pred]
if not get_training:
return y_pred, confidences
y_pred_training = model.predict(X_train)
all_confidence_training = model.decision_function(X_train)
confidence_training = all_confidence_training[range(len(y_pred_training)),
y_pred_training]
return y_pred, confidences, y_pred_training, confidence_training
def run_random_forest(X_train, y_train, X_test, y_test, get_training=False):
model = RandomForestClassifier()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
all_confidence = model.predict_proba(X_test)
confidences = all_confidence[range(len(y_pred)), y_pred]
if not get_training:
return y_pred, confidences
y_pred_training = model.predict(X_train)
all_confidence_training = model.predict_proba(X_train)
confidence_training = all_confidence_training[range(len(y_pred_training)),
y_pred_training]
return y_pred, confidences, y_pred_training, confidence_training
def run_simple_NN(X,
y,
X_test,
y_test,
num_iter=10000,
hidden_units=100,
learning_rate=0.05,
batch_size=100,
display_steps=1000,
n_layers=1,
get_training=False):
"""Run a NN with a single layer on some data.
Returns the predicted values as well as the confidences.
"""
n_labels = np.max(y) + 1
n_features = X.shape[1]
x = tf.placeholder(tf.float32, [None, n_features])
y_ = tf.placeholder(tf.float32, [None, n_labels])
def simple_NN(input_placeholder, n_layers):
W_in = weight_variable([n_features, hidden_units])
b_in = bias_variable([hidden_units])
W_mid = [
weight_variable([hidden_units, hidden_units])
for i in range(n_layers - 1)
]
b_mid = [bias_variable([hidden_units]) for i in range(n_layers - 1)]
W_out = weight_variable([hidden_units, n_labels])
b_out = bias_variable([n_labels])
layers = [tf.nn.relu(tf.matmul(input_placeholder, W_in) + b_in)]
for i in range(n_layers - 1):
layer = tf.nn.relu(tf.matmul(layers[-1], W_mid[i]) + b_mid[i])
layers.append(layer)
logits = tf.matmul(layers[-1], W_out) + b_out
return logits
NN_logits = simple_NN(x, n_layers)
cross_entropy = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(labels=y_, logits=NN_logits))
train_step = tf.train.AdamOptimizer(learning_rate).minimize(cross_entropy)
correct_prediction = tf.equal(tf.argmax(NN_logits, 1), tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
def one_hot(ns):
return np.eye(n_labels)[ns]
y_onehot = one_hot(y)
y_test_onehot = one_hot(y_test)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for i in range(num_iter):
ns = np.random.randint(0, len(X), size=batch_size)
if (i + 1) % display_steps == 0:
train_accuracy = accuracy.eval(feed_dict={x: X, y_: y_onehot})
test_accuracy = accuracy.eval(feed_dict={x: X_test, y_: y_test_onehot})
print("step %d, training accuracy %g, test accuracy %g" %
(i + 1, train_accuracy, test_accuracy))
train_step.run(feed_dict={x: X[ns, :], y_: y_onehot[ns, :]})
testing_logits = NN_logits.eval(feed_dict={x: X_test})
testing_prediction = tf.argmax(NN_logits, 1).eval(feed_dict={x: X_test})
NN_softmax = tf.nn.softmax(NN_logits).eval(feed_dict={x: X_test})
testing_confidence_raw = tf.reduce_max(NN_softmax,
1).eval(feed_dict={x: X_test})
if not get_training:
return testing_prediction, testing_confidence_raw
training_prediction = tf.argmax(NN_logits, 1).eval(feed_dict={x: X})
NN_softmax = tf.nn.softmax(NN_logits).eval(feed_dict={x: X})
training_confidence_raw = tf.reduce_max(NN_softmax,
1).eval(feed_dict={x: X})
return testing_prediction, testing_confidence_raw, training_prediction, training_confidence_raw
def plot_precision_curve(
extra_plot_title,
percentile_levels,
signal_names,
final_TPs,
final_stderrs,
final_misclassification,
model_name="Model",
colors=["blue", "darkorange", "brown", "red", "purple"],
legend_loc=None,
figure_size=None,
ylim=None):
if figure_size is not None:
plt.figure(figsize=figure_size)
title = "Precision Curve" if extra_plot_title == "" else extra_plot_title
plt.title(title, fontsize=20)
colors = colors + list(cm.rainbow(np.linspace(0, 1, len(final_TPs))))
plt.xlabel("Percentile level", fontsize=18)
plt.ylabel("Precision", fontsize=18)
for i, signal_name in enumerate(signal_names):
ls = "--" if ("Model" in signal_name) else "-"
plt.plot(
percentile_levels, final_TPs[i], ls, c=colors[i], label=signal_name)
plt.fill_between(
percentile_levels,
final_TPs[i] - final_stderrs[i],
final_TPs[i] + final_stderrs[i],
color=colors[i],
alpha=0.1)
if legend_loc is None:
if 0. in percentile_levels:
plt.legend(loc="lower right", fontsize=14)
else:
plt.legend(loc="upper left", fontsize=14)
else:
if legend_loc == "outside":
plt.legend(bbox_to_anchor=(1.04, 1), loc="upper left", fontsize=14)
else:
plt.legend(loc=legend_loc, fontsize=14)
if ylim is not None:
plt.ylim(*ylim)
model_acc = 100 * (1 - final_misclassification)
plt.axvline(x=model_acc, linestyle="dotted", color="black")
plt.show()
def run_precision_recall_experiment_general(X,
y,
n_repeats,
percentile_levels,
trainer,
test_size=0.5,
extra_plot_title="",
signals=[],
signal_names=[],
predict_when_correct=False,
skip_print=False):
def get_stderr(L):
return np.std(L) / np.sqrt(len(L))
all_signal_names = ["Model Confidence"] + signal_names
all_TPs = [[[] for p in percentile_levels] for signal in all_signal_names]
misclassifications = []
sign = 1 if predict_when_correct else -1
sss = StratifiedShuffleSplit(
n_splits=n_repeats, test_size=test_size, random_state=0)
for train_idx, test_idx in sss.split(X, y):
X_train = X[train_idx, :]
y_train = y[train_idx]
X_test = X[test_idx, :]
y_test = y[test_idx]
testing_prediction, testing_confidence_raw = trainer(
X_train, y_train, X_test, y_test)
target_points = np.where(
testing_prediction == y_test)[0] if predict_when_correct else np.where(
testing_prediction != y_test)[0]
final_signals = [testing_confidence_raw]
for signal in signals:
signal.fit(X_train, y_train)
final_signals.append(signal.get_score(X_test, testing_prediction))
for p, percentile_level in enumerate(percentile_levels):
all_high_confidence_points = [
np.where(sign * signal >= np.percentile(sign *
signal, percentile_level))[0]
for signal in final_signals
]
if 0 in map(len, all_high_confidence_points):
continue
TP = [
len(np.intersect1d(high_confidence_points, target_points)) /
(1. * len(high_confidence_points))
for high_confidence_points in all_high_confidence_points
]
for i in range(len(all_signal_names)):
all_TPs[i][p].append(TP[i])
misclassifications.append(len(target_points) / (1. * len(X_test)))
final_TPs = [[] for signal in all_signal_names]
final_stderrs = [[] for signal in all_signal_names]
for p, percentile_level in enumerate(percentile_levels):
for i in range(len(all_signal_names)):
final_TPs[i].append(np.mean(all_TPs[i][p]))
final_stderrs[i].append(get_stderr(all_TPs[i][p]))
if not skip_print:
print("Precision at percentile", percentile_level)
ss = ""
for i, signal_name in enumerate(all_signal_names):
ss += (signal_name + (": %.4f " % final_TPs[i][p]))
print(ss)
print()
final_misclassification = np.mean(misclassifications)
if not skip_print:
print("Misclassification rate mean/std", np.mean(misclassifications),
get_stderr(misclassifications))
for i in range(len(all_signal_names)):
final_TPs[i] = np.array(final_TPs[i])
final_stderrs[i] = np.array(final_stderrs[i])
plot_precision_curve(extra_plot_title, percentile_levels, all_signal_names,
final_TPs, final_stderrs, final_misclassification)
return (all_signal_names, final_TPs, final_stderrs, final_misclassification)