1
0
Fork 0

updating parallel policy to take n_jobs from environment (not yet tested)

This commit is contained in:
Alejandro Moreo Fernandez 2022-06-14 09:35:39 +02:00
parent 82a01478ec
commit 2cc7db60cc
8 changed files with 37 additions and 23 deletions

View File

@ -18,6 +18,8 @@
procedure. The user can now specify "force", "auto", True of False, in order to actively decide for applying it procedure. The user can now specify "force", "auto", True of False, in order to actively decide for applying it
or not. or not.
- n_jobs is now taken from the environment if set to None
Things to fix: Things to fix:
- clean functions like binary, aggregative, probabilistic, etc; those should be resolved via isinstance(): - clean functions like binary, aggregative, probabilistic, etc; those should be resolved via isinstance():
this is not working; I don't know how to make the isinstance work. Looks like there is some problem with the this is not working; I don't know how to make the isinstance work. Looks like there is some problem with the
@ -29,7 +31,7 @@ Things to fix:
- Policies should be able to set their output to "labelled_collection" or "instances_prevalence" or something similar. - Policies should be able to set their output to "labelled_collection" or "instances_prevalence" or something similar.
- Policies should implement the "gen()" one, taking a reader function as an input, and a folder path maybe - Policies should implement the "gen()" one, taking a reader function as an input, and a folder path maybe
- Review all documentation, redo the Sphinx doc, update Wikis... - Review all documentation, redo the Sphinx doc, update Wikis...
- Resolve the OneVsAll thing (it is in base.py and in aggregative.py - Resolve the OneVsAll thing (it is in base.py and in aggregative.py)
- Better handle the environment (e.g., with n_jobs) - Better handle the environment (e.g., with n_jobs)
- test cross_generate_predictions and cancel cross_generate_predictions_depr - test cross_generate_predictions and cancel cross_generate_predictions_depr
- Add a proper log? - Add a proper log?

View File

@ -18,7 +18,14 @@ environ = {
'UNK_INDEX': 0, 'UNK_INDEX': 0,
'PAD_TOKEN': '[PAD]', 'PAD_TOKEN': '[PAD]',
'PAD_INDEX': 1, 'PAD_INDEX': 1,
'SVMPERF_HOME': './svm_perf_quantification' 'SVMPERF_HOME': './svm_perf_quantification',
'N_JOBS': 1
} }
def get_njobs(n_jobs):
return environ['N_JOBS'] if n_jobs is None else n_jobs

View File

@ -169,7 +169,7 @@ class IndexTransformer:
self.pad = self.add_word(qp.environ['PAD_TOKEN'], qp.environ['PAD_INDEX']) self.pad = self.add_word(qp.environ['PAD_TOKEN'], qp.environ['PAD_INDEX'])
return self return self
def transform(self, X, n_jobs=-1): def transform(self, X, n_jobs=None):
""" """
Transforms the strings in `X` as lists of numerical ids Transforms the strings in `X` as lists of numerical ids
@ -179,6 +179,7 @@ class IndexTransformer:
""" """
# given the number of tasks and the number of jobs, generates the slices for the parallel processes # given the number of tasks and the number of jobs, generates the slices for the parallel processes
assert self.unk != -1, 'transform called before fit' assert self.unk != -1, 'transform called before fit'
n_jobs = qp.get_njobs(n_jobs)
indexed = map_parallel(func=self._index, args=X, n_jobs=n_jobs) indexed = map_parallel(func=self._index, args=X, n_jobs=n_jobs)
return np.asarray(indexed) return np.asarray(indexed)
@ -186,7 +187,7 @@ class IndexTransformer:
vocab = self.vocabulary_.copy() vocab = self.vocabulary_.copy()
return [[vocab.prevalence(word, self.unk) for word in self.analyzer(doc)] for doc in tqdm(documents, 'indexing')] return [[vocab.prevalence(word, self.unk) for word in self.analyzer(doc)] for doc in tqdm(documents, 'indexing')]
def fit_transform(self, X, n_jobs=-1): def fit_transform(self, X, n_jobs=None):
""" """
Fits the transform on `X` and transforms it. Fits the transform on `X` and transforms it.

View File

@ -207,6 +207,8 @@ def cross_generate_predictions(
n_jobs n_jobs
): ):
n_jobs = qp.get_njobs(n_jobs)
if isinstance(val_split, int): if isinstance(val_split, int):
assert fit_learner == True, \ assert fit_learner == True, \
'the parameters for the adjustment cannot be estimated with kFCV with fit_learner=False' 'the parameters for the adjustment cannot be estimated with kFCV with fit_learner=False'
@ -331,10 +333,10 @@ class ACC(AggregativeQuantifier):
:class:`quapy.data.base.LabelledCollection` (the split itself). :class:`quapy.data.base.LabelledCollection` (the split itself).
""" """
def __init__(self, learner: BaseEstimator, val_split=0.4, n_jobs=1): def __init__(self, learner: BaseEstimator, val_split=0.4, n_jobs=None):
self.learner = learner self.learner = learner
self.val_split = val_split self.val_split = val_split
self.n_jobs = n_jobs self.n_jobs = qp.get_njobs(n_jobs)
def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, int, LabelledCollection] = None): def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, int, LabelledCollection] = None):
""" """
@ -437,10 +439,10 @@ class PACC(AggregativeProbabilisticQuantifier):
:class:`quapy.data.base.LabelledCollection` (the split itself). :class:`quapy.data.base.LabelledCollection` (the split itself).
""" """
def __init__(self, learner: BaseEstimator, val_split=0.4, n_jobs=1): def __init__(self, learner: BaseEstimator, val_split=0.4, n_jobs=None):
self.learner = learner self.learner = learner
self.val_split = val_split self.val_split = val_split
self.n_jobs = n_jobs self.n_jobs = qp.get_njobs(n_jobs)
def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, int, LabelledCollection] = None): def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, int, LabelledCollection] = None):
""" """
@ -769,10 +771,10 @@ class ThresholdOptimization(AggregativeQuantifier, BinaryQuantifier):
:class:`quapy.data.base.LabelledCollection` (the split itself). :class:`quapy.data.base.LabelledCollection` (the split itself).
""" """
def __init__(self, learner: BaseEstimator, val_split=0.4, n_jobs=1): def __init__(self, learner: BaseEstimator, val_split=0.4, n_jobs=None):
self.learner = learner self.learner = learner
self.val_split = val_split self.val_split = val_split
self.n_jobs = n_jobs self.n_jobs = qp.get_njobs(n_jobs)
def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, int, LabelledCollection] = None): def fit(self, data: LabelledCollection, fit_learner=True, val_split: Union[float, int, LabelledCollection] = None):
self._check_binary(data, "Threshold Optimization") self._check_binary(data, "Threshold Optimization")
@ -1022,13 +1024,13 @@ class OneVsAll(AggregativeQuantifier):
:param n_jobs: number of parallel workers :param n_jobs: number of parallel workers
""" """
def __init__(self, binary_quantifier, n_jobs=-1): def __init__(self, binary_quantifier, n_jobs=None):
assert isinstance(self.binary_quantifier, BaseQuantifier), \ assert isinstance(self.binary_quantifier, BaseQuantifier), \
f'{self.binary_quantifier} does not seem to be a Quantifier' f'{self.binary_quantifier} does not seem to be a Quantifier'
assert isinstance(self.binary_quantifier, AggregativeQuantifier), \ assert isinstance(self.binary_quantifier, AggregativeQuantifier), \
f'{self.binary_quantifier} does not seem to be of type Aggregative' f'{self.binary_quantifier} does not seem to be of type Aggregative'
self.binary_quantifier = binary_quantifier self.binary_quantifier = binary_quantifier
self.n_jobs = n_jobs self.n_jobs = qp.get_njobs(n_jobs)
def fit(self, data: LabelledCollection, fit_learner=True): def fit(self, data: LabelledCollection, fit_learner=True):
assert not data.binary, \ assert not data.binary, \

View File

@ -1,6 +1,6 @@
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from copy import deepcopy from copy import deepcopy
import quapy as qp
from quapy.data import LabelledCollection from quapy.data import LabelledCollection
@ -63,17 +63,18 @@ class BinaryQuantifier(BaseQuantifier):
assert data.binary, f'{quantifier_name} works only on problems of binary classification. ' \ assert data.binary, f'{quantifier_name} works only on problems of binary classification. ' \
f'Use the class OneVsAll to enable {quantifier_name} work on single-label data.' f'Use the class OneVsAll to enable {quantifier_name} work on single-label data.'
class OneVsAllGeneric: class OneVsAllGeneric:
""" """
Allows any binary quantifier to perform quantification on single-label datasets. The method maintains one binary Allows any binary quantifier to perform quantification on single-label datasets. The method maintains one binary
quantifier for each class, and then l1-normalizes the outputs so that the class prevelences sum up to 1. quantifier for each class, and then l1-normalizes the outputs so that the class prevelences sum up to 1.
""" """
def __init__(self, binary_quantifier, n_jobs=1): def __init__(self, binary_quantifier, n_jobs=None):
assert isinstance(binary_quantifier, BaseQuantifier), \ assert isinstance(binary_quantifier, BaseQuantifier), \
f'{binary_quantifier} does not seem to be a Quantifier' f'{binary_quantifier} does not seem to be a Quantifier'
self.binary_quantifier = binary_quantifier self.binary_quantifier = binary_quantifier
self.n_jobs = n_jobs self.n_jobs = qp.get_njobs(n_jobs)
def fit(self, data: LabelledCollection, **kwargs): def fit(self, data: LabelledCollection, **kwargs):
assert not data.binary, \ assert not data.binary, \

View File

@ -72,7 +72,7 @@ class Ensemble(BaseQuantifier):
policy='ave', policy='ave',
max_sample_size=None, max_sample_size=None,
val_split:Union[qp.data.LabelledCollection, float]=None, val_split:Union[qp.data.LabelledCollection, float]=None,
n_jobs=1, n_jobs=None,
verbose=False): verbose=False):
assert policy in Ensemble.VALID_POLICIES, \ assert policy in Ensemble.VALID_POLICIES, \
f'unknown policy={policy}; valid are {Ensemble.VALID_POLICIES}' f'unknown policy={policy}; valid are {Ensemble.VALID_POLICIES}'
@ -84,7 +84,7 @@ class Ensemble(BaseQuantifier):
self.red_size = red_size self.red_size = red_size
self.policy = policy self.policy = policy
self.val_split = val_split self.val_split = val_split
self.n_jobs = n_jobs self.n_jobs = qp.get_njobs(n_jobs)
self.post_proba_fn = None self.post_proba_fn = None
self.verbose = verbose self.verbose = verbose
self.max_sample_size = max_sample_size self.max_sample_size = max_sample_size

View File

@ -37,7 +37,7 @@ class GridSearchQ(BaseQuantifier):
error: Union[Callable, str] = qp.error.mae, error: Union[Callable, str] = qp.error.mae,
refit=True, refit=True,
timeout=-1, timeout=-1,
n_jobs=1, n_jobs=None,
verbose=False): verbose=False):
self.model = model self.model = model
@ -45,7 +45,7 @@ class GridSearchQ(BaseQuantifier):
self.protocol = protocol self.protocol = protocol
self.refit = refit self.refit = refit
self.timeout = timeout self.timeout = timeout
self.n_jobs = n_jobs self.n_jobs = qp.get_njobs(n_jobs)
self.verbose = verbose self.verbose = verbose
self.__check_error(error) self.__check_error(error)
assert isinstance(protocol, AbstractProtocol), 'unknown protocol' assert isinstance(protocol, AbstractProtocol), 'unknown protocol'
@ -76,7 +76,6 @@ class GridSearchQ(BaseQuantifier):
params_values = list(self.param_grid.values()) params_values = list(self.param_grid.values())
protocol = self.protocol protocol = self.protocol
n_jobs = self.n_jobs
self.param_scores_ = {} self.param_scores_ = {}
self.best_score_ = None self.best_score_ = None
@ -84,7 +83,7 @@ class GridSearchQ(BaseQuantifier):
tinit = time() tinit = time()
hyper = [dict({k: values[i] for i, k in enumerate(params_keys)}) for values in itertools.product(*params_values)] hyper = [dict({k: values[i] for i, k in enumerate(params_keys)}) for values in itertools.product(*params_values)]
scores = qp.util.parallel(self._delayed_eval, ((params, training) for params in hyper), n_jobs=n_jobs) scores = qp.util.parallel(self._delayed_eval, ((params, training) for params in hyper), n_jobs=self.n_jobs)
for params, score, model in scores: for params, score, model in scores:
if score is not None: if score is not None:

View File

@ -11,7 +11,7 @@ import numpy as np
from joblib import Parallel, delayed from joblib import Parallel, delayed
def _get_parallel_slices(n_tasks, n_jobs=-1): def _get_parallel_slices(n_tasks, n_jobs):
if n_jobs == -1: if n_jobs == -1:
n_jobs = multiprocessing.cpu_count() n_jobs = multiprocessing.cpu_count()
batch = int(n_tasks / n_jobs) batch = int(n_tasks / n_jobs)
@ -48,7 +48,9 @@ def parallel(func, args, n_jobs):
""" """
print('n_jobs',n_jobs) print('n_jobs',n_jobs)
def func_dec(environ, *args): def func_dec(environ, *args):
qp.environ = environ qp.environ = environ.copy()
qp.environ['N_JOBS'] = 1
print(f'setting n_jobs from {environ["N_JOBS"]} to 1')
return func(*args) return func(*args)
return Parallel(n_jobs=n_jobs)( return Parallel(n_jobs=n_jobs)(
delayed(func_dec)(qp.environ, args_i) for args_i in args delayed(func_dec)(qp.environ, args_i) for args_i in args