gfun_multimodal/gfun/vgfs/learners/svms.py

355 lines
12 KiB
Python

import time
import numpy as np
from joblib import Parallel, delayed
from scipy.sparse import issparse
from sklearn.model_selection import GridSearchCV
from sklearn.multiclass import OneVsRestClassifier
from sklearn.preprocessing import normalize
from sklearn.svm import SVC
def _sort_if_sparse(X):
if issparse(X) and not X.has_sorted_indices:
X.sort_indices()
def get_learner(calibrate=False, kernel="linear", C=1):
"""
instantiate scikit Support Vector Classifier
:param calibrate: boolean, whether to return posterior probabilities or not
:param kernel: string,kernel to be applied to the SVC
:param C: int or dict {'C': list of integer}, Regularization parameter
:return: Support Vector Classifier
"""
return SVC(
kernel=kernel,
probability=calibrate,
cache_size=1000,
C=C,
random_state=1,
gamma="auto",
verbose=False,
)
def _joblib_transform_multiling(transformer, lX, n_jobs=-1):
if n_jobs == 1:
return {lang: transformer(lX[lang]) for lang in lX.keys()}
else:
langs = list(lX.keys())
transformations = Parallel(n_jobs=n_jobs)(
delayed(transformer)(lX[lang]) for lang in langs
)
return {lang: transformations[i] for i, lang in enumerate(langs)}
class MonolingualClassifier:
def __init__(self, base_learner, parameters=None, n_jobs=-1):
self.learner = base_learner
self.parameters = parameters
self.model = None
self.best_params_ = None
self.n_jobs = n_jobs
def fit(self, X, y):
tinit = time.time()
_sort_if_sparse(X)
self.empty_categories = np.argwhere(np.sum(y, axis=0) == 0).flatten()
# multi-class format
if len(y.shape) == 2:
if self.parameters is not None:
self.parameters = [
{"estimator__" + key: params[key] for key in params.keys()}
for params in self.parameters
]
self.model = OneVsRestClassifier(self.learner, n_jobs=self.n_jobs)
else:
self.model = self.learner
raise NotImplementedError(
"not working as a base-classifier for funneling if there are gaps in "
"the labels across languages"
)
# parameter optimization?
if self.parameters:
self.model = GridSearchCV(
self.model,
param_grid=self.parameters,
refit=True,
cv=5,
n_jobs=self.n_jobs,
error_score=0,
verbose=1,
)
# print(f"-- Fitting learner on matrices X={X.shape} Y={y.shape}")
self.model.fit(X, y)
if isinstance(self.model, GridSearchCV):
self.best_params_ = self.model.best_params_
print("best parameters: ", self.best_params_)
self.time = time.time() - tinit
return self
def decision_function(self, X):
assert self.model is not None, "predict called before fit"
_sort_if_sparse(X)
return self.model.decision_function(X)
def predict_proba(self, X):
assert self.model is not None, "predict called before fit"
assert hasattr(
self.model, "predict_proba"
), "the probability predictions are not enabled in this model"
_sort_if_sparse(X)
return self.model.predict_proba(X)
def predict(self, X):
assert self.model is not None, "predict called before fit"
_sort_if_sparse(X)
return self.model.predict(X)
def best_params(self):
raise NotImplementedError
class NaivePolylingualClassifier:
"""
Is a mere set of independet MonolingualClassifiers
"""
def __init__(self, base_learner, parameters=None, n_jobs=-1):
self.base_learner = base_learner
self.parameters = parameters
self.model = None
self.n_jobs = n_jobs
def fit(self, lX, ly):
"""
trains the independent monolingual classifiers
:param lX: a dictionary {language_label: X csr-matrix}
:param ly: a dictionary {language_label: y np.array}
:return: self
"""
tinit = time.time()
assert set(lX.keys()) == set(ly.keys()), "inconsistent language mappings in fit"
langs = list(lX.keys())
for lang in langs:
_sort_if_sparse(lX[lang])
models = Parallel(n_jobs=self.n_jobs)(
delayed(
MonolingualClassifier(self.base_learner, parameters=self.parameters).fit
)((lX[lang]), ly[lang])
for lang in langs
)
self.model = {lang: models[i] for i, lang in enumerate(langs)}
self.empty_categories = {
lang: self.model[lang].empty_categories for lang in langs
}
self.time = time.time() - tinit
return self
def decision_function(self, lX):
"""
:param lX: a dictionary {language_label: X csr-matrix}
:return: a dictionary of classification scores for each class
"""
assert self.model is not None, "predict called before fit"
assert set(lX.keys()).issubset(
set(self.model.keys())
), "unknown languages requested in decision function"
langs = list(lX.keys())
scores = Parallel(n_jobs=self.n_jobs)(
delayed(self.model[lang].decision_function)(lX[lang]) for lang in langs
)
return {lang: scores[i] for i, lang in enumerate(langs)}
def predict_proba(self, lX):
"""
:param lX: a dictionary {language_label: X csr-matrix}
:return: a dictionary of probabilities that each document belongs to each class
"""
assert self.model is not None, "predict called before fit"
if not set(lX.keys()).issubset(set(self.model.keys())):
langs = set(lX.keys()).intersection(set(self.model.keys()))
scores = Parallel(n_jobs=self.n_jobs, max_nbytes=None)(
delayed(self.model[lang].predict_proba)(lX[lang]) for lang in langs)
# res = {lang: None for lang in lX.keys()}
# for i, lang in enumerate(langs):
# res[lang] = scores[i]
# return res
return {lang: scores[i] for i, lang in enumerate(langs)}
# assert set(lX.keys()).issubset(
# set(self.model.keys())
# ), "unknown languages requested in decision function"
langs = list(lX.keys())
scores = Parallel(n_jobs=self.n_jobs, max_nbytes=None)(
delayed(self.model[lang].predict_proba)(lX[lang]) for lang in langs
)
return {lang: scores[i] for i, lang in enumerate(langs)}
def predict(self, lX):
"""
:param lX: a dictionary {language_label: X csr-matrix}
:return: a dictionary of predictions
"""
assert self.model is not None, "predict called before fit"
assert set(lX.keys()).issubset(
set(self.model.keys())
), "unknown languages requested in predict"
if self.n_jobs == 1:
return {lang: self.model[lang].transform(lX[lang]) for lang in lX.keys()}
else:
langs = list(lX.keys())
scores = Parallel(n_jobs=self.n_jobs)(
delayed(self.model[lang].predict)(lX[lang]) for lang in langs
)
return {lang: scores[i] for i, lang in enumerate(langs)}
def best_params(self):
return {lang: model.best_params() for lang, model in self.model.items()}
class MetaClassifier:
def __init__(
self,
meta_learner,
meta_parameters=None,
n_jobs=-1,
standardize_range=None,
verbose=True,
):
self.n_jobs = n_jobs
self.model = MonolingualClassifier(
base_learner=meta_learner, parameters=meta_parameters, n_jobs=self.n_jobs
)
self.standardize_range = standardize_range
self.verbose = verbose
def fit(self, lZ, lY):
tinit = time.time()
Z, y = self.stack(lZ, lY)
self.standardizer = StandardizeTransformer(range=self.standardize_range)
Z = self.standardizer.fit_transform(Z)
if self.verbose:
print(f"- fitting the metaclassifier on data shape: {Z.shape}")
self.model.fit(Z, y)
self.time = time.time() - tinit
def stack(self, lZ, lY=None):
langs = list(lZ.keys())
Z = np.vstack([lZ[lang] for lang in langs])
if lY is not None:
y = np.vstack([lY[lang] for lang in langs])
return Z, y
else:
return Z
def predict(self, lZ):
lZ = _joblib_transform_multiling(
self.standardizer.transform, lZ, n_jobs=self.n_jobs
)
return _joblib_transform_multiling(self.model.predict, lZ, n_jobs=self.n_jobs)
def predict_proba(self, lZ):
lZ = _joblib_transform_multiling(
self.standardizer.transform, lZ, n_jobs=self.n_jobs
)
return _joblib_transform_multiling(
self.model.predict_proba, lZ, n_jobs=self.n_jobs
)
class StandardizeTransformer:
def __init__(self, axis=0, range=None):
"""
:param axis:
:param range:
"""
assert range is None or isinstance(
range, slice
), "wrong format for range, should either be None or a slice"
self.axis = axis
self.yetfit = False
self.range = range
def fit(self, X):
# print("Applying z-score standardization...")
std = np.std(X, axis=self.axis, ddof=1)
self.std = np.clip(std, 1e-5, None)
self.mean = np.mean(X, axis=self.axis)
if self.range is not None:
ones = np.ones_like(self.std)
zeros = np.zeros_like(self.mean)
ones[self.range] = self.std[self.range]
zeros[self.range] = self.mean[self.range]
self.std = ones
self.mean = zeros
self.yetfit = True
return self
def transform(self, X):
if not self.yetfit:
"transform called before fit"
return (X - self.mean) / self.std
def fit_transform(self, X):
return self.fit(X).transform(X)
class FeatureSet2Posteriors:
"""
Takes care of recasting features outputted by the embedders to vecotrs of posterior probabilities by means of
a multiclass SVM.
"""
def __init__(self, verbose=True, l2=True, n_jobs=-1):
"""
Init the class.
:param embedder: ViewGen, view generators which does not natively outputs posterior probabilities.
:param l2: bool, whether to apply or not L2 normalization to the projection
:param n_jobs: int, number of concurrent workers.
"""
# self.embedder = embedder
self.l2 = l2
self.n_jobs = n_jobs
self.prob_classifier = MetaClassifier(
SVC(
kernel="rbf",
gamma="auto",
probability=True,
cache_size=1000,
random_state=1,
),
n_jobs=n_jobs,
verbose=verbose,
)
def fit(self, lX, lY):
self.prob_classifier.fit(lX, lY)
return self
def transform(self, lX):
lP = self.predict_proba(lX)
lP = _normalize(lP, self.l2)
return lP
def fit_transform(self, lX, lY):
return self.fit(lX, lY).transform(lX)
def predict(self, lX):
return self.prob_classifier.predict(lX)
def predict_proba(self, lX):
return self.prob_classifier.predict_proba(lX)
def _normalize(lX, l2=True):
return {lang: normalize(np.asarray(X)) for lang, X in lX.items()} if l2 else lX