diff --git a/.coverage b/.coverage index e04d9e2..c9d78c7 100644 Binary files a/.coverage and b/.coverage differ diff --git a/jiang18_trustscore/__pycache__/trustscore.cpython-311.pyc b/jiang18_trustscore/__pycache__/trustscore.cpython-311.pyc new file mode 100644 index 0000000..bfba8b5 Binary files /dev/null and b/jiang18_trustscore/__pycache__/trustscore.cpython-311.pyc differ diff --git a/jiang18_trustscore/trustscore.py b/jiang18_trustscore/trustscore.py new file mode 100644 index 0000000..9b6d417 --- /dev/null +++ b/jiang18_trustscore/trustscore.py @@ -0,0 +1,141 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +from sklearn.neighbors import KDTree, KNeighborsClassifier + + +class TrustScore: + """ + Trust Score: a measure of classifier uncertainty based on nearest neighbors. + """ + + def __init__(self, k=10, alpha=0.0, filtering="none", min_dist=1e-12): + """ + k and alpha are the tuning parameters for the filtering, + filtering: method of filtering. option are "none", "density", + "uncertainty" + min_dist: some small number to mitigate possible division by 0. + """ + self.k = k + self.filtering = filtering + self.alpha = alpha + self.min_dist = min_dist + + def filter_by_density(self, X: np.array): + """Filter out points with low kNN density. + + Args: + X: an array of sample points. + + Returns: + A subset of the array without points in the bottom alpha-fraction of + original points of kNN density. + """ + kdtree = KDTree(X) + knn_radii = kdtree.query(X, k=self.k)[0][:, -1] + eps = np.percentile(knn_radii, (1 - self.alpha) * 100) + return X[np.where(knn_radii <= eps)[0], :] + + def filter_by_uncertainty(self, X: np.array, y: np.array): + """Filter out points with high label disagreement amongst its kNN neighbors. + + Args: + X: an array of sample points. + + Returns: + A subset of the array without points in the bottom alpha-fraction of + samples with highest disagreement amongst its k nearest neighbors. + """ + neigh = KNeighborsClassifier(n_neighbors=self.k) + neigh.fit(X, y) + confidence = neigh.predict_proba(X) + cutoff = np.percentile(confidence, self.alpha * 100) + unfiltered_idxs = np.where(confidence >= cutoff)[0] + return X[unfiltered_idxs, :], y[unfiltered_idxs] + + def fit(self, X: np.array, y: np.array): + """Initialize trust score precomputations with training data. + + WARNING: assumes that the labels are 0-indexed (i.e. + 0, 1,..., n_labels-1). + + Args: + X: an array of sample points. + y: corresponding labels. + """ + + self.n_labels = np.max(y) + 1 + self.kdtrees = [None] * self.n_labels + if self.filtering == "uncertainty": + X_filtered, y_filtered = self.filter_by_uncertainty(X, y) + for label in range(self.n_labels): + if self.filtering == "none": + X_to_use = X[np.where(y == label)[0]] + self.kdtrees[label] = KDTree(X_to_use) + elif self.filtering == "density": + X_to_use = self.filter_by_density(X[np.where(y == label)[0]]) + self.kdtrees[label] = KDTree(X_to_use) + elif self.filtering == "uncertainty": + X_to_use = X_filtered[np.where(y_filtered == label)[0]] + self.kdtrees[label] = KDTree(X_to_use) + + if len(X_to_use) == 0: + print( + "Filtered too much or missing examples from a label! Please lower " + "alpha or check data." + ) + + def get_score(self, X: np.array, y_pred: np.array): + """Compute the trust scores. + + Given a set of points, determines the distance to each class. + + Args: + X: an array of sample points. + y_pred: The predicted labels for these points. + + Returns: + The trust score, which is ratio of distance to closest class that was not + the predicted class to the distance to the predicted class. + """ + d = np.tile(None, (X.shape[0], self.n_labels)) + for label_idx in range(self.n_labels): + d[:, label_idx] = self.kdtrees[label_idx].query(X, k=2)[0][:, -1] + + sorted_d = np.sort(d, axis=1) + d_to_pred = d[range(d.shape[0]), y_pred] + d_to_closest_not_pred = np.where( + sorted_d[:, 0] != d_to_pred, sorted_d[:, 0], sorted_d[:, 1] + ) + return d_to_closest_not_pred / (d_to_pred + self.min_dist) + + +class KNNConfidence: + """Baseline which uses disagreement to kNN classifier. + """ + + def __init__(self, k=10): + self.k = k + + def fit(self, X, y): + self.kdtree = KDTree(X) + self.y = y + + def get_score(self, X, y_pred): + knn_idxs = self.kdtree.query(X, k=self.k)[1] + knn_outputs = self.y[knn_idxs] + return np.mean( + knn_outputs == np.transpose(np.tile(y_pred, (self.k, 1))), axis=1 + ) diff --git a/jiang18_trustscore/trustscore_evaluation.py b/jiang18_trustscore/trustscore_evaluation.py new file mode 100644 index 0000000..78f50ec --- /dev/null +++ b/jiang18_trustscore/trustscore_evaluation.py @@ -0,0 +1,286 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +from sklearn.model_selection import StratifiedShuffleSplit +import matplotlib.pyplot as plt +from sklearn.decomposition import PCA +import matplotlib.cm as cm +from sklearn.metrics import precision_recall_curve +import tensorflow as tf + +from sklearn.linear_model import LogisticRegression +from sklearn.svm import LinearSVC +from sklearn.ensemble import RandomForestClassifier + + +def run_logistic(X_train, y_train, X_test, y_test, get_training=False): + model = LogisticRegression() + model.fit(X_train, y_train) + y_pred = model.predict(X_test) + all_confidence = model.predict_proba(X_test) + confidences = all_confidence[range(len(y_pred)), y_pred] + if not get_training: + return y_pred, confidences + y_pred_training = model.predict(X_train) + all_confidence_training = model.predict_proba(X_train) + confidence_training = all_confidence_training[range(len(y_pred_training)), + y_pred_training] + return y_pred, confidences, y_pred_training, confidence_training + + +def run_linear_svc(X_train, y_train, X_test, y_test, get_training=False): + model = LinearSVC() + model.fit(X_train, y_train) + y_pred = model.predict(X_test) + all_confidence = model.decision_function(X_test) + confidences = all_confidence[range(len(y_pred)), y_pred] + if not get_training: + return y_pred, confidences + y_pred_training = model.predict(X_train) + all_confidence_training = model.decision_function(X_train) + confidence_training = all_confidence_training[range(len(y_pred_training)), + y_pred_training] + return y_pred, confidences, y_pred_training, confidence_training + + +def run_random_forest(X_train, y_train, X_test, y_test, get_training=False): + model = RandomForestClassifier() + model.fit(X_train, y_train) + y_pred = model.predict(X_test) + all_confidence = model.predict_proba(X_test) + confidences = all_confidence[range(len(y_pred)), y_pred] + if not get_training: + return y_pred, confidences + y_pred_training = model.predict(X_train) + all_confidence_training = model.predict_proba(X_train) + confidence_training = all_confidence_training[range(len(y_pred_training)), + y_pred_training] + return y_pred, confidences, y_pred_training, confidence_training + + +def run_simple_NN(X, + y, + X_test, + y_test, + num_iter=10000, + hidden_units=100, + learning_rate=0.05, + batch_size=100, + display_steps=1000, + n_layers=1, + get_training=False): + """Run a NN with a single layer on some data. + + Returns the predicted values as well as the confidences. + """ + n_labels = np.max(y) + 1 + n_features = X.shape[1] + + x = tf.placeholder(tf.float32, [None, n_features]) + y_ = tf.placeholder(tf.float32, [None, n_labels]) + + def simple_NN(input_placeholder, n_layers): + + W_in = weight_variable([n_features, hidden_units]) + b_in = bias_variable([hidden_units]) + W_mid = [ + weight_variable([hidden_units, hidden_units]) + for i in range(n_layers - 1) + ] + b_mid = [bias_variable([hidden_units]) for i in range(n_layers - 1)] + W_out = weight_variable([hidden_units, n_labels]) + b_out = bias_variable([n_labels]) + + layers = [tf.nn.relu(tf.matmul(input_placeholder, W_in) + b_in)] + for i in range(n_layers - 1): + layer = tf.nn.relu(tf.matmul(layers[-1], W_mid[i]) + b_mid[i]) + layers.append(layer) + + logits = tf.matmul(layers[-1], W_out) + b_out + return logits + + NN_logits = simple_NN(x, n_layers) + + cross_entropy = tf.reduce_mean( + tf.nn.softmax_cross_entropy_with_logits(labels=y_, logits=NN_logits)) + train_step = tf.train.AdamOptimizer(learning_rate).minimize(cross_entropy) + correct_prediction = tf.equal(tf.argmax(NN_logits, 1), tf.argmax(y_, 1)) + accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) + + def one_hot(ns): + return np.eye(n_labels)[ns] + + y_onehot = one_hot(y) + y_test_onehot = one_hot(y_test) + + with tf.Session() as sess: + sess.run(tf.global_variables_initializer()) + for i in range(num_iter): + ns = np.random.randint(0, len(X), size=batch_size) + if (i + 1) % display_steps == 0: + train_accuracy = accuracy.eval(feed_dict={x: X, y_: y_onehot}) + test_accuracy = accuracy.eval(feed_dict={x: X_test, y_: y_test_onehot}) + + print("step %d, training accuracy %g, test accuracy %g" % + (i + 1, train_accuracy, test_accuracy)) + train_step.run(feed_dict={x: X[ns, :], y_: y_onehot[ns, :]}) + + testing_logits = NN_logits.eval(feed_dict={x: X_test}) + testing_prediction = tf.argmax(NN_logits, 1).eval(feed_dict={x: X_test}) + NN_softmax = tf.nn.softmax(NN_logits).eval(feed_dict={x: X_test}) + testing_confidence_raw = tf.reduce_max(NN_softmax, + 1).eval(feed_dict={x: X_test}) + + if not get_training: + return testing_prediction, testing_confidence_raw + training_prediction = tf.argmax(NN_logits, 1).eval(feed_dict={x: X}) + NN_softmax = tf.nn.softmax(NN_logits).eval(feed_dict={x: X}) + training_confidence_raw = tf.reduce_max(NN_softmax, + 1).eval(feed_dict={x: X}) + return testing_prediction, testing_confidence_raw, training_prediction, training_confidence_raw + + +def plot_precision_curve( + extra_plot_title, + percentile_levels, + signal_names, + final_TPs, + final_stderrs, + final_misclassification, + model_name="Model", + colors=["blue", "darkorange", "brown", "red", "purple"], + legend_loc=None, + figure_size=None, + ylim=None): + if figure_size is not None: + plt.figure(figsize=figure_size) + title = "Precision Curve" if extra_plot_title == "" else extra_plot_title + plt.title(title, fontsize=20) + colors = colors + list(cm.rainbow(np.linspace(0, 1, len(final_TPs)))) + + plt.xlabel("Percentile level", fontsize=18) + plt.ylabel("Precision", fontsize=18) + for i, signal_name in enumerate(signal_names): + ls = "--" if ("Model" in signal_name) else "-" + plt.plot( + percentile_levels, final_TPs[i], ls, c=colors[i], label=signal_name) + + plt.fill_between( + percentile_levels, + final_TPs[i] - final_stderrs[i], + final_TPs[i] + final_stderrs[i], + color=colors[i], + alpha=0.1) + + if legend_loc is None: + if 0. in percentile_levels: + plt.legend(loc="lower right", fontsize=14) + else: + plt.legend(loc="upper left", fontsize=14) + else: + if legend_loc == "outside": + plt.legend(bbox_to_anchor=(1.04, 1), loc="upper left", fontsize=14) + else: + plt.legend(loc=legend_loc, fontsize=14) + if ylim is not None: + plt.ylim(*ylim) + model_acc = 100 * (1 - final_misclassification) + plt.axvline(x=model_acc, linestyle="dotted", color="black") + plt.show() + + +def run_precision_recall_experiment_general(X, + y, + n_repeats, + percentile_levels, + trainer, + test_size=0.5, + extra_plot_title="", + signals=[], + signal_names=[], + predict_when_correct=False, + skip_print=False): + + def get_stderr(L): + return np.std(L) / np.sqrt(len(L)) + + all_signal_names = ["Model Confidence"] + signal_names + all_TPs = [[[] for p in percentile_levels] for signal in all_signal_names] + misclassifications = [] + sign = 1 if predict_when_correct else -1 + sss = StratifiedShuffleSplit( + n_splits=n_repeats, test_size=test_size, random_state=0) + for train_idx, test_idx in sss.split(X, y): + X_train = X[train_idx, :] + y_train = y[train_idx] + X_test = X[test_idx, :] + y_test = y[test_idx] + testing_prediction, testing_confidence_raw = trainer( + X_train, y_train, X_test, y_test) + target_points = np.where( + testing_prediction == y_test)[0] if predict_when_correct else np.where( + testing_prediction != y_test)[0] + + final_signals = [testing_confidence_raw] + for signal in signals: + signal.fit(X_train, y_train) + final_signals.append(signal.get_score(X_test, testing_prediction)) + + for p, percentile_level in enumerate(percentile_levels): + all_high_confidence_points = [ + np.where(sign * signal >= np.percentile(sign * + signal, percentile_level))[0] + for signal in final_signals + ] + + if 0 in map(len, all_high_confidence_points): + continue + TP = [ + len(np.intersect1d(high_confidence_points, target_points)) / + (1. * len(high_confidence_points)) + for high_confidence_points in all_high_confidence_points + ] + for i in range(len(all_signal_names)): + all_TPs[i][p].append(TP[i]) + misclassifications.append(len(target_points) / (1. * len(X_test))) + + final_TPs = [[] for signal in all_signal_names] + final_stderrs = [[] for signal in all_signal_names] + for p, percentile_level in enumerate(percentile_levels): + for i in range(len(all_signal_names)): + final_TPs[i].append(np.mean(all_TPs[i][p])) + final_stderrs[i].append(get_stderr(all_TPs[i][p])) + + if not skip_print: + print("Precision at percentile", percentile_level) + ss = "" + for i, signal_name in enumerate(all_signal_names): + ss += (signal_name + (": %.4f " % final_TPs[i][p])) + print(ss) + print() + + final_misclassification = np.mean(misclassifications) + + if not skip_print: + print("Misclassification rate mean/std", np.mean(misclassifications), + get_stderr(misclassifications)) + + for i in range(len(all_signal_names)): + final_TPs[i] = np.array(final_TPs[i]) + final_stderrs[i] = np.array(final_stderrs[i]) + + plot_precision_curve(extra_plot_title, percentile_levels, all_signal_names, + final_TPs, final_stderrs, final_misclassification) + return (all_signal_names, final_TPs, final_stderrs, final_misclassification) diff --git a/pyproject.toml b/pyproject.toml index 49bfad4..9ca845e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,7 @@ pytest-mock = "^3.11.1" pytest-cov = "^4.1.0" [tool.pytest.ini_options] -addopts = "--cov=quacc" +addopts = "--cov=quacc --capture=tee-sys" [build-system] requires = ["poetry-core"] diff --git a/quacc/baseline.py b/quacc/baseline.py index 2cc95d0..32dcacc 100644 --- a/quacc/baseline.py +++ b/quacc/baseline.py @@ -10,7 +10,7 @@ from garg22_ATC.ATC_helper import ( get_max_conf, ) import numpy as np - +from jiang18_trustscore.trustscore import TrustScore def kfcv(c_model: BaseEstimator, validation: LabelledCollection) -> Dict: @@ -43,10 +43,11 @@ def ATC_MC( ATC_accuracy = get_ATC_acc(ATC_thres, test_scores) return { - "true_acc": 100*np.mean(np.argmax(test_probs, axis=-1) == test.y), - "pred_acc": ATC_accuracy + "true_acc": 100 * np.mean(np.argmax(test_probs, axis=-1) == test.y), + "pred_acc": ATC_accuracy, } + def ATC_NE( c_model: BaseEstimator, validation: LabelledCollection, @@ -71,7 +72,23 @@ def ATC_NE( ATC_accuracy = get_ATC_acc(ATC_thres, test_scores) return { - "true_acc": 100*np.mean(np.argmax(test_probs, axis=-1) == test.y), - "pred_acc": ATC_accuracy + "true_acc": 100 * np.mean(np.argmax(test_probs, axis=-1) == test.y), + "pred_acc": ATC_accuracy, } + +def trust_score( + c_model: BaseEstimator, + validation: LabelledCollection, + test: LabelledCollection, + predict_method="predict", +): + c_model_predict = getattr(c_model, predict_method) + + test_pred = c_model_predict(test.X) + + trust_model = TrustScore() + trust_model.fit(validation.X, validation.y) + + return trust_model.get_score(test.X, test_pred) + diff --git a/quacc/main.py b/quacc/main.py index 1b1dd4b..bbb054e 100644 --- a/quacc/main.py +++ b/quacc/main.py @@ -99,4 +99,4 @@ def estimate_binary(): if __name__ == "__main__": - estimate_multiclass() + estimate_binary() diff --git a/tests/test_baseline.py b/tests/test_baseline.py index 7351497..582a10d 100644 --- a/tests/test_baseline.py +++ b/tests/test_baseline.py @@ -1,12 +1,20 @@ from sklearn.linear_model import LogisticRegression -from quacc.baseline import kfcv +from quacc.baseline import kfcv, trust_score from quacc.dataset import get_spambase class TestBaseline: def test_kfcv(self): - train, _, _ = get_spambase() + train, validation, _ = get_spambase() c_model = LogisticRegression() - assert "f1_score" in kfcv(c_model, train) \ No newline at end of file + c_model.fit(train.X, train.y) + assert "f1_score" in kfcv(c_model, validation) + + def test_trust_score(self): + train, validation, test = get_spambase() + c_model = LogisticRegression() + c_model.fit(train.X, train.y) + trustscore = trust_score(c_model, train, test) + assert len(trustscore) == len(test.y) \ No newline at end of file