diff --git a/quacc/evaluation/comp.py b/quacc/evaluation/comp.py index 0ea0eef..956c77b 100644 --- a/quacc/evaluation/comp.py +++ b/quacc/evaluation/comp.py @@ -5,6 +5,7 @@ from traceback import print_exception as traceback import pandas as pd import quapy as qp +from joblib import Parallel, delayed from quacc.dataset import Dataset from quacc.environment import env @@ -21,49 +22,53 @@ def evaluate_comparison(dataset: Dataset, estimators=None) -> DatasetReport: log = Logger.logger() # with multiprocessing.Pool(1) as pool: __pool_size = round(os.cpu_count() * 0.8) - with multiprocessing.Pool(__pool_size) as pool: - dr = DatasetReport(dataset.name) - log.info(f"dataset {dataset.name} [pool size: {__pool_size}]") - for d in dataset(): - log.info( - f"Dataset sample {d.train_prev[1]:.2f} of dataset {dataset.name} started" + # with multiprocessing.Pool(__pool_size) as pool: + dr = DatasetReport(dataset.name) + log.info(f"dataset {dataset.name} [pool size: {__pool_size}]") + for d in dataset(): + log.info( + f"Dataset sample {d.train_prev[1]:.2f} of dataset {dataset.name} started" + ) + tasks = [ + WorkerArgs( + _estimate=estim, + train=d.train, + validation=d.validation, + test=d.test, + _env=env, + q=Logger.queue(), ) - tasks = [ - WorkerArgs( - _estimate=estim, - train=d.train, - validation=d.validation, - test=d.test, - _env=env, - q=Logger.queue(), - ) - for estim in CE.func[estimators] - ] - try: - tstart = time.time() - results = [ - r for r in pool.imap(estimate_worker, tasks) if r is not None - ] + for estim in CE.func[estimators] + ] + try: + tstart = time.time() + results = Parallel(n_jobs=1)(delayed(estimate_worker)(t) for t in tasks) + results = [r for r in results if r is not None] + # # r for r in pool.imap(estimate_worker, tasks) if r is not None + # r + # for r in map(estimate_worker, tasks) + # if r is not None + # ] - g_time = time.time() - tstart - log.info( - f"Dataset sample {d.train_prev[1]:.2f} of dataset {dataset.name} finished " - f"[took {g_time:.4f}s]" - ) + g_time = time.time() - tstart + log.info( + f"Dataset sample {d.train_prev[1]:.2f} of dataset {dataset.name} finished " + f"[took {g_time:.4f}s]" + ) - cr = CompReport( - results, - name=dataset.name, - train_prev=d.train_prev, - valid_prev=d.validation_prev, - g_time=g_time, - ) - dr += cr + cr = CompReport( + results, + name=dataset.name, + train_prev=d.train_prev, + valid_prev=d.validation_prev, + g_time=g_time, + ) + dr += cr - except Exception as e: - log.warning( - f"Dataset sample {d.train_prev[1]:.2f} of dataset {dataset.name} failed. " - f"Exception: {e}" - ) - traceback(e) + except Exception as e: + log.warning( + f"Dataset sample {d.train_prev[1]:.2f} of dataset {dataset.name} failed. " + f"Exception: {e}" + ) + traceback(e) return dr