gfun_multimodal/gfun/vgfs/learners/svms.py

346 lines
11 KiB
Python

import time
import numpy as np
from joblib import Parallel, delayed
from scipy.sparse import issparse
from sklearn.model_selection import GridSearchCV
from sklearn.multiclass import OneVsRestClassifier
from sklearn.preprocessing import normalize
from sklearn.svm import SVC
def _sort_if_sparse(X):
if issparse(X) and not X.has_sorted_indices:
X.sort_indices()
def get_learner(calibrate=False, kernel="linear", C=1):
"""
instantiate scikit Support Vector Classifier
:param calibrate: boolean, whether to return posterior probabilities or not
:param kernel: string,kernel to be applied to the SVC
:param C: int or dict {'C': list of integer}, Regularization parameter
:return: Support Vector Classifier
"""
return SVC(
kernel=kernel,
probability=calibrate,
cache_size=1000,
C=C,
random_state=1,
gamma="auto",
verbose=False,
)
def _joblib_transform_multiling(transformer, lX, n_jobs=-1):
if n_jobs == 1:
return {lang: transformer(lX[lang]) for lang in lX.keys()}
else:
langs = list(lX.keys())
transformations = Parallel(n_jobs=n_jobs)(
delayed(transformer)(lX[lang]) for lang in langs
)
return {lang: transformations[i] for i, lang in enumerate(langs)}
class MonolingualClassifier:
def __init__(self, base_learner, parameters=None, n_jobs=-1):
self.learner = base_learner
self.parameters = parameters
self.model = None
self.best_params_ = None
self.n_jobs = n_jobs
def fit(self, X, y):
tinit = time.time()
_sort_if_sparse(X)
self.empty_categories = np.argwhere(np.sum(y, axis=0) == 0).flatten()
# multi-class format
if len(y.shape) == 2:
if self.parameters is not None:
self.parameters = [
{"estimator__" + key: params[key] for key in params.keys()}
for params in self.parameters
]
self.model = OneVsRestClassifier(self.learner, n_jobs=self.n_jobs)
else:
self.model = self.learner
raise NotImplementedError(
"not working as a base-classifier for funneling if there are gaps in "
"the labels across languages"
)
# parameter optimization?
if self.parameters:
self.model = GridSearchCV(
self.model,
param_grid=self.parameters,
refit=True,
cv=5,
n_jobs=self.n_jobs,
error_score=0,
verbose=1,
)
# print(f"-- Fitting learner on matrices X={X.shape} Y={y.shape}")
self.model.fit(X, y)
if isinstance(self.model, GridSearchCV):
self.best_params_ = self.model.best_params_
print("best parameters: ", self.best_params_)
self.time = time.time() - tinit
return self
def decision_function(self, X):
assert self.model is not None, "predict called before fit"
_sort_if_sparse(X)
return self.model.decision_function(X)
def predict_proba(self, X):
assert self.model is not None, "predict called before fit"
assert hasattr(
self.model, "predict_proba"
), "the probability predictions are not enabled in this model"
_sort_if_sparse(X)
return self.model.predict_proba(X)
def predict(self, X):
assert self.model is not None, "predict called before fit"
_sort_if_sparse(X)
return self.model.predict(X)
def best_params(self):
raise NotImplementedError
class NaivePolylingualClassifier:
"""
Is a mere set of independet MonolingualClassifiers
"""
def __init__(self, base_learner, parameters=None, n_jobs=-1):
self.base_learner = base_learner
self.parameters = parameters
self.model = None
self.n_jobs = n_jobs
def fit(self, lX, ly):
"""
trains the independent monolingual classifiers
:param lX: a dictionary {language_label: X csr-matrix}
:param ly: a dictionary {language_label: y np.array}
:return: self
"""
tinit = time.time()
assert set(lX.keys()) == set(ly.keys()), "inconsistent language mappings in fit"
langs = list(lX.keys())
for lang in langs:
_sort_if_sparse(lX[lang])
models = Parallel(n_jobs=self.n_jobs)(
delayed(
MonolingualClassifier(self.base_learner, parameters=self.parameters).fit
)((lX[lang]), ly[lang])
for lang in langs
)
self.model = {lang: models[i] for i, lang in enumerate(langs)}
self.empty_categories = {
lang: self.model[lang].empty_categories for lang in langs
}
self.time = time.time() - tinit
return self
def decision_function(self, lX):
"""
:param lX: a dictionary {language_label: X csr-matrix}
:return: a dictionary of classification scores for each class
"""
assert self.model is not None, "predict called before fit"
assert set(lX.keys()).issubset(
set(self.model.keys())
), "unknown languages requested in decision function"
langs = list(lX.keys())
scores = Parallel(n_jobs=self.n_jobs)(
delayed(self.model[lang].decision_function)(lX[lang]) for lang in langs
)
return {lang: scores[i] for i, lang in enumerate(langs)}
def predict_proba(self, lX):
"""
:param lX: a dictionary {language_label: X csr-matrix}
:return: a dictionary of probabilities that each document belongs to each class
"""
assert self.model is not None, "predict called before fit"
assert set(lX.keys()).issubset(
set(self.model.keys())
), "unknown languages requested in decision function"
langs = list(lX.keys())
scores = Parallel(n_jobs=self.n_jobs, max_nbytes=None)(
delayed(self.model[lang].predict_proba)(lX[lang]) for lang in langs
)
return {lang: scores[i] for i, lang in enumerate(langs)}
def predict(self, lX):
"""
:param lX: a dictionary {language_label: X csr-matrix}
:return: a dictionary of predictions
"""
assert self.model is not None, "predict called before fit"
assert set(lX.keys()).issubset(
set(self.model.keys())
), "unknown languages requested in predict"
if self.n_jobs == 1:
return {lang: self.model[lang].transform(lX[lang]) for lang in lX.keys()}
else:
langs = list(lX.keys())
scores = Parallel(n_jobs=self.n_jobs)(
delayed(self.model[lang].predict)(lX[lang]) for lang in langs
)
return {lang: scores[i] for i, lang in enumerate(langs)}
def best_params(self):
return {lang: model.best_params() for lang, model in self.model.items()}
class MetaClassifier:
def __init__(
self,
meta_learner,
meta_parameters=None,
n_jobs=-1,
standardize_range=None,
verbose=True,
):
self.n_jobs = n_jobs
self.model = MonolingualClassifier(
base_learner=meta_learner, parameters=meta_parameters, n_jobs=self.n_jobs
)
self.standardize_range = standardize_range
self.verbose = verbose
def fit(self, lZ, lY):
tinit = time.time()
Z, y = self.stack(lZ, lY)
self.standardizer = StandardizeTransformer(range=self.standardize_range)
Z = self.standardizer.fit_transform(Z)
if self.verbose:
print(f"- fitting the metaclassifier on data shape: {Z.shape}")
self.model.fit(Z, y)
self.time = time.time() - tinit
def stack(self, lZ, lY=None):
langs = list(lZ.keys())
Z = np.vstack([lZ[lang] for lang in langs])
if lY is not None:
y = np.vstack([lY[lang] for lang in langs])
return Z, y
else:
return Z
def predict(self, lZ):
lZ = _joblib_transform_multiling(
self.standardizer.transform, lZ, n_jobs=self.n_jobs
)
return _joblib_transform_multiling(self.model.predict, lZ, n_jobs=self.n_jobs)
def predict_proba(self, lZ):
lZ = _joblib_transform_multiling(
self.standardizer.transform, lZ, n_jobs=self.n_jobs
)
return _joblib_transform_multiling(
self.model.predict_proba, lZ, n_jobs=self.n_jobs
)
class StandardizeTransformer:
def __init__(self, axis=0, range=None):
"""
:param axis:
:param range:
"""
assert range is None or isinstance(
range, slice
), "wrong format for range, should either be None or a slice"
self.axis = axis
self.yetfit = False
self.range = range
def fit(self, X):
# print("Applying z-score standardization...")
std = np.std(X, axis=self.axis, ddof=1)
self.std = np.clip(std, 1e-5, None)
self.mean = np.mean(X, axis=self.axis)
if self.range is not None:
ones = np.ones_like(self.std)
zeros = np.zeros_like(self.mean)
ones[self.range] = self.std[self.range]
zeros[self.range] = self.mean[self.range]
self.std = ones
self.mean = zeros
self.yetfit = True
return self
def transform(self, X):
if not self.yetfit:
"transform called before fit"
return (X - self.mean) / self.std
def fit_transform(self, X):
return self.fit(X).transform(X)
class FeatureSet2Posteriors:
"""
Takes care of recasting features outputted by the embedders to vecotrs of posterior probabilities by means of
a multiclass SVM.
"""
def __init__(self, verbose=True, l2=True, n_jobs=-1):
"""
Init the class.
:param embedder: ViewGen, view generators which does not natively outputs posterior probabilities.
:param l2: bool, whether to apply or not L2 normalization to the projection
:param n_jobs: int, number of concurrent workers.
"""
# self.embedder = embedder
self.l2 = l2
self.n_jobs = n_jobs
self.prob_classifier = MetaClassifier(
SVC(
kernel="rbf",
gamma="auto",
probability=True,
cache_size=1000,
random_state=1,
),
n_jobs=n_jobs,
verbose=verbose,
)
def fit(self, lX, lY):
self.prob_classifier.fit(lX, lY)
return self
def transform(self, lX):
lP = self.predict_proba(lX)
lP = _normalize(lP, self.l2)
return lP
def fit_transform(self, lX, lY):
return self.fit(lX, lY).transform(lX)
def predict(self, lX):
return self.prob_classifier.predict(lX)
def predict_proba(self, lX):
return self.prob_classifier.predict_proba(lX)
def _normalize(lX, l2=True):
return {lang: normalize(np.asarray(X)) for lang, X in lX.items()} if l2 else lX