From 345807977c68f68454f12c0c926b047eac3dc619 Mon Sep 17 00:00:00 2001 From: Lorenzo Volpi Date: Sat, 28 Oct 2023 16:14:37 +0200 Subject: [PATCH] logger implemented --- .gitignore | 4 +- conf.yaml | 9 ++-- quacc.log | 26 ----------- quacc/dataset.py | 20 +++++++-- quacc/environment.py | 1 + quacc/evaluation/comp.py | 77 ++++++++++----------------------- quacc/evaluation/worker.py | 42 ++++++++++++++++++ quacc/{logger.py => logging.py} | 19 ++++++-- quacc/main.py | 12 +++-- 9 files changed, 114 insertions(+), 96 deletions(-) delete mode 100644 quacc.log create mode 100644 quacc/evaluation/worker.py rename quacc/{logger.py => logging.py} (81%) diff --git a/.gitignore b/.gitignore index f450568..069fa69 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,6 @@ elsahar19_rca/__pycache__/* *.coverage .coverage scp_sync.py -out/* \ No newline at end of file +out/* +output/* +*.log \ No newline at end of file diff --git a/conf.yaml b/conf.yaml index 003dfd8..c109fad 100644 --- a/conf.yaml +++ b/conf.yaml @@ -7,6 +7,7 @@ debug_conf: &debug_conf datasets: - DATASET_NAME: rcv1 DATASET_TARGET: CCAT + - DATASET_NAME: imdb plot_confs: debug: @@ -49,14 +50,14 @@ main_conf: &main_conf DATASET_N_PREVS: 9 datasets: - - DATASET_NAME: rcv1 - DATASET_TARGET: CCAT + - DATASET_NAME: imdb datasets_bck: - DATASET_NAME: rcv1 DATASET_TARGET: GCAT - DATASET_NAME: rcv1 DATASET_TARGET: MCAT - - DATASET_NAME: imdb + - DATASET_NAME: rcv1 + DATASET_TARGET: CCAT plot_confs: gs_vs_atc: @@ -99,4 +100,4 @@ main_conf: &main_conf - atc_ne - doc_feat -exec: *debug_conf \ No newline at end of file +exec: *main_conf \ No newline at end of file diff --git a/quacc.log b/quacc.log deleted file mode 100644 index a6406a2..0000000 --- a/quacc.log +++ /dev/null @@ -1,26 +0,0 @@ -dataset rcv1_CCAT -28/10/23 00:45:46| INFO: dataset rcv1_CCAT -Dataset sample 0.50 of dataset rcv1_CCAT started -Dataset sample 0.50 of dataset rcv1_CCAT failed. Exception: Queue objects should only be shared between processes through inheritance -28/10/23 00:45:50| INFO: Dataset sample 0.50 of dataset rcv1_CCAT started -Dataset sample 0.50 of dataset rcv1_CCAT failed. Exception: Queue objects should only be shared between processes through inheritance -28/10/23 00:45:51| ERROR: Dataset sample 0.50 of dataset rcv1_CCAT failed. Exception: Queue objects should only be shared between processes through inheritance -Dataset sample 0.50 of dataset rcv1_CCAT failed. Exception: Queue objects should only be shared between processes through inheritance -28/10/23 00:45:52| ERROR: Dataset sample 0.50 of dataset rcv1_CCAT failed. Exception: Queue objects should only be shared between processes through inheritance -Dataset sample 0.50 of dataset rcv1_CCAT failed. Exception: Queue objects should only be shared between processes through inheritance -28/10/23 00:45:52| ERROR: Dataset sample 0.50 of dataset rcv1_CCAT failed. Exception: Queue objects should only be shared between processes through inheritance -Dataset sample 0.50 of dataset rcv1_CCAT finished [took 1.8041s -28/10/23 00:45:52| ERROR: Dataset sample 0.50 of dataset rcv1_CCAT failed. Exception: Queue objects should only be shared between processes through inheritance -28/10/23 00:45:52| INFO: Dataset sample 0.50 of dataset rcv1_CCAT finished [took 1.8041s -Configuration rcv1_CCAT_1prevs failed. Exception: too many indices for array: array is 1-dimensional, but 2 were indexed -28/10/23 00:45:52| ERROR: Configuration rcv1_CCAT_1prevs failed. Exception: too many indices for array: array is 1-dimensional, but 2 were indexed -dataset rcv1_CCAT -28/10/23 00:47:52| INFO: dataset rcv1_CCAT -Dataset sample 0.50 of dataset rcv1_CCAT started -Dataset sample 0.50 of dataset rcv1_CCAT failed. Exception: Queue objects should only be shared between processes through inheritance -28/10/23 00:47:56| INFO: Dataset sample 0.50 of dataset rcv1_CCAT started -Dataset sample 0.50 of dataset rcv1_CCAT failed. Exception: Queue objects should only be shared between processes through inheritance -28/10/23 00:47:57| ERROR: Dataset sample 0.50 of dataset rcv1_CCAT failed. Exception: Queue objects should only be shared between processes through inheritance -Dataset sample 0.50 of dataset rcv1_CCAT failed. Exception: Queue objects should only be shared between processes through inheritance -Dataset sample 0.50 of dataset rcv1_CCAT failed. Exception: Queue objects should only be shared between processes through inheritance -Dataset sample 0.50 of dataset rcv1_CCAT finished [took 1.7186s diff --git a/quacc/dataset.py b/quacc/dataset.py index 9362da8..0faf577 100644 --- a/quacc/dataset.py +++ b/quacc/dataset.py @@ -34,9 +34,17 @@ class DatasetSample: class Dataset: - def __init__(self, name, n_prevalences=9, target=None): + def __init__(self, name, n_prevalences=9, prevs=None, target=None): self._name = name self._target = target + + self.prevs = None + if prevs is not None: + prevs = np.unique([p for p in prevs if p > 0.0 and p < 1.0]) + if prevs.shape[0] > 0: + self.prevs = np.sort(prevs) + self.n_prevs = self.prevs.shape[0] + self.n_prevs = n_prevalences def __spambase(self): @@ -92,10 +100,14 @@ class Dataset: ) # sample prevalences - prevalences = np.linspace(0.0, 1.0, num=self.n_prevs + 1, endpoint=False)[1:] - at_size = min(math.floor(len(all_train) * 0.5 / p) for p in prevalences) + if self.prevs is not None: + prevs = self.prevs + else: + prevs = np.linspace(0.0, 1.0, num=self.n_prevs + 1, endpoint=False)[1:] + + at_size = min(math.floor(len(all_train) * 0.5 / p) for p in prevs) datasets = [] - for p in prevalences: + for p in prevs: all_train_sampled = all_train.sampling(at_size, p, random_state=0) train, validation = all_train_sampled.split_stratified( train_prop=TRAIN_VAL_PROP, random_state=0 diff --git a/quacc/environment.py b/quacc/environment.py index 1a7a832..d5447e3 100644 --- a/quacc/environment.py +++ b/quacc/environment.py @@ -8,6 +8,7 @@ defalut_env = { "PLOT_ESTIMATORS": [], "PLOT_STDEV": False, "DATASET_N_PREVS": 9, + "DATASET_PREVS": None, "OUT_DIR_NAME": "output", "OUT_DIR": None, "PLOT_DIR_NAME": "plot", diff --git a/quacc/evaluation/comp.py b/quacc/evaluation/comp.py index 67b7f9d..c0a5eba 100644 --- a/quacc/evaluation/comp.py +++ b/quacc/evaluation/comp.py @@ -1,21 +1,21 @@ import multiprocessing import time +import traceback from typing import List import pandas as pd import quapy as qp -from quapy.protocol import APP -from sklearn.linear_model import LogisticRegression from quacc.dataset import Dataset from quacc.environment import env from quacc.evaluation import baseline, method from quacc.evaluation.report import CompReport, DatasetReport, EvaluationReport -from quacc.logger import Logger, SubLogger - -qp.environ["SAMPLE_SIZE"] = env.SAMPLE_SIZE +from quacc.evaluation.worker import estimate_worker +from quacc.logging import Logger pd.set_option("display.float_format", "{:.4f}".format) +qp.environ["SAMPLE_SIZE"] = env.SAMPLE_SIZE +log = Logger.logger() class CompEstimator: @@ -40,45 +40,9 @@ class CompEstimator: CE = CompEstimator -def fit_and_estimate(_estimate, train, validation, test, _env=None, q=None): - _env = env if _env is None else _env - SubLogger.setup(q) - log = SubLogger.logger() - - model = LogisticRegression() - - model.fit(*train.Xy) - protocol = APP( - test, - n_prevalences=_env.PROTOCOL_N_PREVS, - repeats=_env.PROTOCOL_REPEATS, - return_type="labelled_collection", - ) - start = time.time() - try: - result = _estimate(model, validation, protocol) - except Exception as e: - log.error(f"Method {_estimate.__name__} failed. Exception: {e}") - return { - "name": _estimate.__name__, - "result": None, - "time": 0, - } - - end = time.time() - log.info(f"{_estimate.__name__} finished [took {end-start:.4f}s]") - - return { - "name": _estimate.__name__, - "result": result, - "time": end - start, - } - - def evaluate_comparison( dataset: Dataset, estimators=["OUR_BIN_SLD", "OUR_MUL_SLD"] ) -> EvaluationReport: - log = Logger.logger() # with multiprocessing.Pool(1) as pool: with multiprocessing.Pool(len(estimators)) as pool: dr = DatasetReport(dataset.name) @@ -90,9 +54,7 @@ def evaluate_comparison( tstart = time.time() tasks = [(estim, d.train, d.validation, d.test) for estim in CE[estimators]] results = [ - pool.apply_async( - fit_and_estimate, t, {"_env": env, "q": Logger.queue()} - ) + pool.apply_async(estimate_worker, t, {"_env": env, "q": Logger.queue()}) for t in tasks ] @@ -103,7 +65,7 @@ def evaluate_comparison( if r["result"] is not None: results_got.append(r) except Exception as e: - log.error( + log.warning( f"Dataset sample {d.train_prev[1]:.2f} of dataset {dataset.name} failed. Exception: {e}" ) @@ -111,14 +73,21 @@ def evaluate_comparison( times = {r["name"]: r["time"] for r in results_got} times["tot"] = tend - tstart log.info( - f"Dataset sample {d.train_prev[1]:.2f} of dataset {dataset.name} finished [took {times['tot']:.4f}s" + f"Dataset sample {d.train_prev[1]:.2f} of dataset {dataset.name} finished [took {times['tot']:.4f}s]" ) - dr += CompReport( - [r["result"] for r in results_got], - name=dataset.name, - train_prev=d.train_prev, - valid_prev=d.validation_prev, - times=times, - ) - + try: + cr = CompReport( + [r["result"] for r in results_got], + name=dataset.name, + train_prev=d.train_prev, + valid_prev=d.validation_prev, + times=times, + ) + except Exception as e: + log.warning( + f"Dataset sample {d.train_prev[1]:.2f} of dataset {dataset.name} failed. Exception: {e}" + ) + traceback(e) + cr = None + dr += cr return dr diff --git a/quacc/evaluation/worker.py b/quacc/evaluation/worker.py new file mode 100644 index 0000000..0ab75e2 --- /dev/null +++ b/quacc/evaluation/worker.py @@ -0,0 +1,42 @@ +import time + +import quapy as qp +from quapy.protocol import APP +from sklearn.linear_model import LogisticRegression + +from quacc.logging import SubLogger + + +def estimate_worker(_estimate, train, validation, test, _env=None, q=None): + qp.environ["SAMPLE_SIZE"] = _env.SAMPLE_SIZE + SubLogger.setup(q) + log = SubLogger.logger() + + model = LogisticRegression() + + model.fit(*train.Xy) + protocol = APP( + test, + n_prevalences=_env.PROTOCOL_N_PREVS, + repeats=_env.PROTOCOL_REPEATS, + return_type="labelled_collection", + ) + start = time.time() + try: + result = _estimate(model, validation, protocol) + except Exception as e: + log.warning(f"Method {_estimate.__name__} failed. Exception: {e}") + return { + "name": _estimate.__name__, + "result": None, + "time": 0, + } + + end = time.time() + log.info(f"{_estimate.__name__} finished [took {end-start:.4f}s]") + + return { + "name": _estimate.__name__, + "result": result, + "time": end - start, + } diff --git a/quacc/logger.py b/quacc/logging.py similarity index 81% rename from quacc/logger.py rename to quacc/logging.py index 002b79b..efa41af 100644 --- a/quacc/logger.py +++ b/quacc/logging.py @@ -7,6 +7,7 @@ import threading class Logger: __logger_file = "quacc.log" __logger_name = "queue_logger" + __manager = None __queue = None __thread = None __setup = False @@ -17,7 +18,7 @@ class Logger: record = q.get() if record is None: break - root = logging.getLogger() + root = logging.getLogger("listener") root.handle(record) @classmethod @@ -26,13 +27,19 @@ class Logger: return # setup root - root = logging.getLogger() + root = logging.getLogger("listener") + root.setLevel(logging.DEBUG) rh = logging.FileHandler(cls.__logger_file, mode="a") + rh.setLevel(logging.DEBUG) root.addHandler(rh) + root.info("-" * 100) # setup logger + if cls.__manager is None: + cls.__manager = multiprocessing.Manager() + if cls.__queue is None: - cls.__queue = multiprocessing.Queue() + cls.__queue = cls.__manager.Queue() logger = logging.getLogger(cls.__logger_name) logger.setLevel(logging.DEBUG) @@ -70,9 +77,11 @@ class Logger: return logging.getLogger(cls.__logger_name) @classmethod - def join_listener(cls): + def close(cls): if cls.__setup and cls.__thread is not None: + cls.__queue.put(None) cls.__thread.join() + # cls.__manager.close() class SubLogger: @@ -88,7 +97,9 @@ class SubLogger: # setup root root = logging.getLogger() + root.setLevel(logging.DEBUG) rh = logging.handlers.QueueHandler(q) + rh.setLevel(logging.DEBUG) rh.setFormatter( logging.Formatter( fmt="%(asctime)s| %(levelname)s: %(message)s", diff --git a/quacc/main.py b/quacc/main.py index ab80119..8a46e2a 100644 --- a/quacc/main.py +++ b/quacc/main.py @@ -4,7 +4,7 @@ from sys import platform import quacc.evaluation.comp as comp from quacc.dataset import Dataset from quacc.environment import env -from quacc.logger import Logger +from quacc.logging import Logger from quacc.utils import create_dataser_dir log = Logger.logger() @@ -24,6 +24,7 @@ def estimate_comparison(): env.DATASET_NAME, target=env.DATASET_TARGET, n_prevalences=env.DATASET_N_PREVS, + prevs=env.DATASET_PREVS, ) try: dr = comp.evaluate_comparison(dataset, estimators=env.COMP_ESTIMATORS) @@ -48,9 +49,14 @@ def estimate_comparison(): def main(): - estimate_comparison() + try: + estimate_comparison() + except Exception as e: + log.error(f"estimate comparison failed. Exceprion: {e}") + traceback(e) + toast() - Logger.join_listener() + Logger.close() if __name__ == "__main__":