import time import numpy as np from joblib import Parallel, delayed from scipy.sparse import issparse from sklearn.model_selection import GridSearchCV from sklearn.multiclass import OneVsRestClassifier from sklearn.preprocessing import normalize from sklearn.svm import SVC def _sort_if_sparse(X): if issparse(X) and not X.has_sorted_indices: X.sort_indices() def get_learner(calibrate=False, kernel="linear", C=1): """ instantiate scikit Support Vector Classifier :param calibrate: boolean, whether to return posterior probabilities or not :param kernel: string,kernel to be applied to the SVC :param C: int or dict {'C': list of integer}, Regularization parameter :return: Support Vector Classifier """ return SVC( kernel=kernel, probability=calibrate, cache_size=1000, C=C, random_state=1, gamma="auto", verbose=False, ) def _joblib_transform_multiling(transformer, lX, n_jobs=-1): if n_jobs == 1: return {lang: transformer(lX[lang]) for lang in lX.keys()} else: langs = list(lX.keys()) transformations = Parallel(n_jobs=n_jobs)( delayed(transformer)(lX[lang]) for lang in langs ) return {lang: transformations[i] for i, lang in enumerate(langs)} class MonolingualClassifier: def __init__(self, base_learner, parameters=None, n_jobs=-1): self.learner = base_learner self.parameters = parameters self.model = None self.best_params_ = None self.n_jobs = n_jobs def fit(self, X, y): tinit = time.time() _sort_if_sparse(X) self.empty_categories = np.argwhere(np.sum(y, axis=0) == 0).flatten() # multi-class format if len(y.shape) == 2: if self.parameters is not None: self.parameters = [ {"estimator__" + key: params[key] for key in params.keys()} for params in self.parameters ] self.model = OneVsRestClassifier(self.learner, n_jobs=self.n_jobs) else: self.model = self.learner raise NotImplementedError( "not working as a base-classifier for funneling if there are gaps in " "the labels across languages" ) # parameter optimization? if self.parameters: self.model = GridSearchCV( self.model, param_grid=self.parameters, refit=True, cv=5, n_jobs=self.n_jobs, error_score=0, verbose=1, ) # print(f"-- Fitting learner on matrices X={X.shape} Y={y.shape}") self.model.fit(X, y) if isinstance(self.model, GridSearchCV): self.best_params_ = self.model.best_params_ print("best parameters: ", self.best_params_) self.time = time.time() - tinit return self def decision_function(self, X): assert self.model is not None, "predict called before fit" _sort_if_sparse(X) return self.model.decision_function(X) def predict_proba(self, X): assert self.model is not None, "predict called before fit" assert hasattr( self.model, "predict_proba" ), "the probability predictions are not enabled in this model" _sort_if_sparse(X) return self.model.predict_proba(X) def predict(self, X): assert self.model is not None, "predict called before fit" _sort_if_sparse(X) return self.model.predict(X) def best_params(self): raise NotImplementedError class NaivePolylingualClassifier: """ Is a mere set of independet MonolingualClassifiers """ def __init__(self, base_learner, parameters=None, n_jobs=-1): self.base_learner = base_learner self.parameters = parameters self.model = None self.n_jobs = n_jobs def fit(self, lX, ly): """ trains the independent monolingual classifiers :param lX: a dictionary {language_label: X csr-matrix} :param ly: a dictionary {language_label: y np.array} :return: self """ tinit = time.time() assert set(lX.keys()) == set(ly.keys()), "inconsistent language mappings in fit" langs = list(lX.keys()) for lang in langs: _sort_if_sparse(lX[lang]) models = Parallel(n_jobs=self.n_jobs)( delayed( MonolingualClassifier(self.base_learner, parameters=self.parameters).fit )((lX[lang]), ly[lang]) for lang in langs ) self.model = {lang: models[i] for i, lang in enumerate(langs)} self.empty_categories = { lang: self.model[lang].empty_categories for lang in langs } self.time = time.time() - tinit return self def decision_function(self, lX): """ :param lX: a dictionary {language_label: X csr-matrix} :return: a dictionary of classification scores for each class """ assert self.model is not None, "predict called before fit" assert set(lX.keys()).issubset( set(self.model.keys()) ), "unknown languages requested in decision function" langs = list(lX.keys()) scores = Parallel(n_jobs=self.n_jobs)( delayed(self.model[lang].decision_function)(lX[lang]) for lang in langs ) return {lang: scores[i] for i, lang in enumerate(langs)} def predict_proba(self, lX): """ :param lX: a dictionary {language_label: X csr-matrix} :return: a dictionary of probabilities that each document belongs to each class """ assert self.model is not None, "predict called before fit" if not set(lX.keys()).issubset(set(self.model.keys())): langs = set(lX.keys()).intersection(set(self.model.keys())) scores = Parallel(n_jobs=self.n_jobs, max_nbytes=None)( delayed(self.model[lang].predict_proba)(lX[lang]) for lang in langs) # res = {lang: None for lang in lX.keys()} # for i, lang in enumerate(langs): # res[lang] = scores[i] # return res return {lang: scores[i] for i, lang in enumerate(langs)} # assert set(lX.keys()).issubset( # set(self.model.keys()) # ), "unknown languages requested in decision function" langs = list(lX.keys()) scores = Parallel(n_jobs=self.n_jobs, max_nbytes=None)( delayed(self.model[lang].predict_proba)(lX[lang]) for lang in langs ) return {lang: scores[i] for i, lang in enumerate(langs)} def predict(self, lX): """ :param lX: a dictionary {language_label: X csr-matrix} :return: a dictionary of predictions """ assert self.model is not None, "predict called before fit" assert set(lX.keys()).issubset( set(self.model.keys()) ), "unknown languages requested in predict" if self.n_jobs == 1: return {lang: self.model[lang].transform(lX[lang]) for lang in lX.keys()} else: langs = list(lX.keys()) scores = Parallel(n_jobs=self.n_jobs)( delayed(self.model[lang].predict)(lX[lang]) for lang in langs ) return {lang: scores[i] for i, lang in enumerate(langs)} def best_params(self): return {lang: model.best_params() for lang, model in self.model.items()} class MetaClassifier: def __init__( self, meta_learner, meta_parameters=None, n_jobs=-1, standardize_range=None, verbose=True, ): self.n_jobs = n_jobs self.model = MonolingualClassifier( base_learner=meta_learner, parameters=meta_parameters, n_jobs=self.n_jobs ) self.standardize_range = standardize_range self.verbose = verbose def fit(self, lZ, lY): tinit = time.time() Z, y = self.stack(lZ, lY) self.standardizer = StandardizeTransformer(range=self.standardize_range) Z = self.standardizer.fit_transform(Z) if self.verbose: print(f"- fitting the metaclassifier on data shape: {Z.shape}") self.model.fit(Z, y) self.time = time.time() - tinit def stack(self, lZ, lY=None): langs = list(lZ.keys()) Z = np.vstack([lZ[lang] for lang in langs]) if lY is not None: y = np.vstack([lY[lang] for lang in langs]) return Z, y else: return Z def predict(self, lZ): lZ = _joblib_transform_multiling( self.standardizer.transform, lZ, n_jobs=self.n_jobs ) return _joblib_transform_multiling(self.model.predict, lZ, n_jobs=self.n_jobs) def predict_proba(self, lZ): lZ = _joblib_transform_multiling( self.standardizer.transform, lZ, n_jobs=self.n_jobs ) return _joblib_transform_multiling( self.model.predict_proba, lZ, n_jobs=self.n_jobs ) class StandardizeTransformer: def __init__(self, axis=0, range=None): """ :param axis: :param range: """ assert range is None or isinstance( range, slice ), "wrong format for range, should either be None or a slice" self.axis = axis self.yetfit = False self.range = range def fit(self, X): # print("Applying z-score standardization...") std = np.std(X, axis=self.axis, ddof=1) self.std = np.clip(std, 1e-5, None) self.mean = np.mean(X, axis=self.axis) if self.range is not None: ones = np.ones_like(self.std) zeros = np.zeros_like(self.mean) ones[self.range] = self.std[self.range] zeros[self.range] = self.mean[self.range] self.std = ones self.mean = zeros self.yetfit = True return self def transform(self, X): if not self.yetfit: "transform called before fit" return (X - self.mean) / self.std def fit_transform(self, X): return self.fit(X).transform(X) class FeatureSet2Posteriors: """ Takes care of recasting features outputted by the embedders to vecotrs of posterior probabilities by means of a multiclass SVM. """ def __init__(self, verbose=True, l2=True, n_jobs=-1): """ Init the class. :param embedder: ViewGen, view generators which does not natively outputs posterior probabilities. :param l2: bool, whether to apply or not L2 normalization to the projection :param n_jobs: int, number of concurrent workers. """ # self.embedder = embedder self.l2 = l2 self.n_jobs = n_jobs self.prob_classifier = MetaClassifier( SVC( kernel="rbf", gamma="auto", probability=True, cache_size=1000, random_state=1, ), n_jobs=n_jobs, verbose=verbose, ) def fit(self, lX, lY): self.prob_classifier.fit(lX, lY) return self def transform(self, lX): lP = self.predict_proba(lX) lP = _normalize(lP, self.l2) return lP def fit_transform(self, lX, lY): return self.fit(lX, lY).transform(lX) def predict(self, lX): return self.prob_classifier.predict(lX) def predict_proba(self, lX): return self.prob_classifier.predict_proba(lX) def _normalize(lX, l2=True): return {lang: normalize(np.asarray(X)) for lang, X in lX.items()} if l2 else lX