diff --git a/.gitignore b/.gitignore index f450568..069fa69 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,6 @@ elsahar19_rca/__pycache__/* *.coverage .coverage scp_sync.py -out/* \ No newline at end of file +out/* +output/* +*.log \ No newline at end of file diff --git a/conf.yaml b/conf.yaml index eef0123..c109fad 100644 --- a/conf.yaml +++ b/conf.yaml @@ -7,6 +7,7 @@ debug_conf: &debug_conf datasets: - DATASET_NAME: rcv1 DATASET_TARGET: CCAT + - DATASET_NAME: imdb plot_confs: debug: @@ -49,14 +50,14 @@ main_conf: &main_conf DATASET_N_PREVS: 9 datasets: - - DATASET_NAME: rcv1 - DATASET_TARGET: CCAT + - DATASET_NAME: imdb datasets_bck: - DATASET_NAME: rcv1 DATASET_TARGET: GCAT - DATASET_NAME: rcv1 DATASET_TARGET: MCAT - - DATASET_NAME: imdb + - DATASET_NAME: rcv1 + DATASET_TARGET: CCAT plot_confs: gs_vs_atc: diff --git a/quacc/dataset.py b/quacc/dataset.py index 2d3228c..3f6e179 100644 --- a/quacc/dataset.py +++ b/quacc/dataset.py @@ -34,9 +34,17 @@ class DatasetSample: class Dataset: - def __init__(self, name, n_prevalences=9, target=None): + def __init__(self, name, n_prevalences=9, prevs=None, target=None): self._name = name self._target = target + + self.prevs = None + if prevs is not None: + prevs = np.unique([p for p in prevs if p > 0.0 and p < 1.0]) + if prevs.shape[0] > 0: + self.prevs = np.sort(prevs) + self.n_prevs = self.prevs.shape[0] + self.n_prevs = n_prevalences def __spambase(self): @@ -93,10 +101,14 @@ class Dataset: ) # sample prevalences - prevalences = np.linspace(0.0, 1.0, num=self.n_prevs + 1, endpoint=False)[1:] - at_size = min(math.floor(len(all_train) * 0.5 / p) for p in prevalences) + if self.prevs is not None: + prevs = self.prevs + else: + prevs = np.linspace(0.0, 1.0, num=self.n_prevs + 1, endpoint=False)[1:] + + at_size = min(math.floor(len(all_train) * 0.5 / p) for p in prevs) datasets = [] - for p in prevalences: + for p in prevs: all_train_sampled = all_train.sampling(at_size, p, random_state=0) train, validation = all_train_sampled.split_stratified( train_prop=TRAIN_VAL_PROP, random_state=0 diff --git a/quacc/environment.py b/quacc/environment.py index 1a7a832..d5447e3 100644 --- a/quacc/environment.py +++ b/quacc/environment.py @@ -8,6 +8,7 @@ defalut_env = { "PLOT_ESTIMATORS": [], "PLOT_STDEV": False, "DATASET_N_PREVS": 9, + "DATASET_PREVS": None, "OUT_DIR_NAME": "output", "OUT_DIR": None, "PLOT_DIR_NAME": "plot", diff --git a/quacc/evaluation/comp.py b/quacc/evaluation/comp.py index e5da34d..c0a5eba 100644 --- a/quacc/evaluation/comp.py +++ b/quacc/evaluation/comp.py @@ -1,22 +1,21 @@ -import logging as log import multiprocessing import time +import traceback from typing import List -import numpy as np import pandas as pd import quapy as qp -from quapy.protocol import APP -from sklearn.linear_model import LogisticRegression from quacc.dataset import Dataset from quacc.environment import env from quacc.evaluation import baseline, method from quacc.evaluation.report import CompReport, DatasetReport, EvaluationReport - -qp.environ["SAMPLE_SIZE"] = env.SAMPLE_SIZE +from quacc.evaluation.worker import estimate_worker +from quacc.logging import Logger pd.set_option("display.float_format", "{:.4f}".format) +qp.environ["SAMPLE_SIZE"] = env.SAMPLE_SIZE +log = Logger.logger() class CompEstimator: @@ -41,38 +40,6 @@ class CompEstimator: CE = CompEstimator -def fit_and_estimate(_estimate, train, validation, test, _env=None): - _env = env if _env is None else _env - model = LogisticRegression() - - model.fit(*train.Xy) - protocol = APP( - test, - n_prevalences=_env.PROTOCOL_N_PREVS, - repeats=_env.PROTOCOL_REPEATS, - return_type="labelled_collection", - ) - start = time.time() - try: - result = _estimate(model, validation, protocol) - except Exception as e: - log.error(f"Method {_estimate.__name__} failed. Exception: {e}") - return { - "name": _estimate.__name__, - "result": None, - "time": 0, - } - - end = time.time() - log.info(f"{_estimate.__name__} finished [took {end-start:.4f}s]") - - return { - "name": _estimate.__name__, - "result": result, - "time": end - start, - } - - def evaluate_comparison( dataset: Dataset, estimators=["OUR_BIN_SLD", "OUR_MUL_SLD"] ) -> EvaluationReport: @@ -81,11 +48,14 @@ def evaluate_comparison( dr = DatasetReport(dataset.name) log.info(f"dataset {dataset.name}") for d in dataset(): - log.info(f"train prev.: {np.around(d.train_prev, decimals=2)}") + log.info( + f"Dataset sample {d.train_prev[1]:.2f} of dataset {dataset.name} started" + ) tstart = time.time() tasks = [(estim, d.train, d.validation, d.test) for estim in CE[estimators]] results = [ - pool.apply_async(fit_and_estimate, t, {"_env": env}) for t in tasks + pool.apply_async(estimate_worker, t, {"_env": env, "q": Logger.queue()}) + for t in tasks ] results_got = [] @@ -95,22 +65,29 @@ def evaluate_comparison( if r["result"] is not None: results_got.append(r) except Exception as e: - log.error( - f"Dataset sample {d.train[1]:.2f} of dataset {dataset.name} failed. Exception: {e}" + log.warning( + f"Dataset sample {d.train_prev[1]:.2f} of dataset {dataset.name} failed. Exception: {e}" ) tend = time.time() times = {r["name"]: r["time"] for r in results_got} times["tot"] = tend - tstart log.info( - f"Dataset sample {d.train[1]:.2f} of dataset {dataset.name} finished [took {times['tot']:.4f}s" + f"Dataset sample {d.train_prev[1]:.2f} of dataset {dataset.name} finished [took {times['tot']:.4f}s]" ) - dr += CompReport( - [r["result"] for r in results_got], - name=dataset.name, - train_prev=d.train_prev, - valid_prev=d.validation_prev, - times=times, - ) - + try: + cr = CompReport( + [r["result"] for r in results_got], + name=dataset.name, + train_prev=d.train_prev, + valid_prev=d.validation_prev, + times=times, + ) + except Exception as e: + log.warning( + f"Dataset sample {d.train_prev[1]:.2f} of dataset {dataset.name} failed. Exception: {e}" + ) + traceback(e) + cr = None + dr += cr return dr diff --git a/quacc/evaluation/report.py b/quacc/evaluation/report.py index 56019a9..50ff5ad 100644 --- a/quacc/evaluation/report.py +++ b/quacc/evaluation/report.py @@ -280,6 +280,9 @@ class DatasetReport: return np.around([(1.0 - p, p) for p in self.prevs], decimals=2) def add(self, cr: CompReport): + if cr is None: + return + self.crs.append(cr) if self._dict is None: @@ -320,6 +323,11 @@ class DatasetReport: [self.s_dict[col][sp], cr_s_dict[col][sp]] ) + for sp in self.s_prevs: + for col, vals in self.s_dict.items(): + if sp not in vals: + vals[sp] = [] + for k, score in cr.fit_scores.items(): if k not in self.fit_scores: self.fit_scores[k] = [] diff --git a/quacc/evaluation/worker.py b/quacc/evaluation/worker.py new file mode 100644 index 0000000..0ab75e2 --- /dev/null +++ b/quacc/evaluation/worker.py @@ -0,0 +1,42 @@ +import time + +import quapy as qp +from quapy.protocol import APP +from sklearn.linear_model import LogisticRegression + +from quacc.logging import SubLogger + + +def estimate_worker(_estimate, train, validation, test, _env=None, q=None): + qp.environ["SAMPLE_SIZE"] = _env.SAMPLE_SIZE + SubLogger.setup(q) + log = SubLogger.logger() + + model = LogisticRegression() + + model.fit(*train.Xy) + protocol = APP( + test, + n_prevalences=_env.PROTOCOL_N_PREVS, + repeats=_env.PROTOCOL_REPEATS, + return_type="labelled_collection", + ) + start = time.time() + try: + result = _estimate(model, validation, protocol) + except Exception as e: + log.warning(f"Method {_estimate.__name__} failed. Exception: {e}") + return { + "name": _estimate.__name__, + "result": None, + "time": 0, + } + + end = time.time() + log.info(f"{_estimate.__name__} finished [took {end-start:.4f}s]") + + return { + "name": _estimate.__name__, + "result": result, + "time": end - start, + } diff --git a/quacc/logging.py b/quacc/logging.py new file mode 100644 index 0000000..efa41af --- /dev/null +++ b/quacc/logging.py @@ -0,0 +1,118 @@ +import logging +import logging.handlers +import multiprocessing +import threading + + +class Logger: + __logger_file = "quacc.log" + __logger_name = "queue_logger" + __manager = None + __queue = None + __thread = None + __setup = False + + @classmethod + def __logger_listener(cls, q): + while True: + record = q.get() + if record is None: + break + root = logging.getLogger("listener") + root.handle(record) + + @classmethod + def setup(cls): + if cls.__setup: + return + + # setup root + root = logging.getLogger("listener") + root.setLevel(logging.DEBUG) + rh = logging.FileHandler(cls.__logger_file, mode="a") + rh.setLevel(logging.DEBUG) + root.addHandler(rh) + root.info("-" * 100) + + # setup logger + if cls.__manager is None: + cls.__manager = multiprocessing.Manager() + + if cls.__queue is None: + cls.__queue = cls.__manager.Queue() + + logger = logging.getLogger(cls.__logger_name) + logger.setLevel(logging.DEBUG) + qh = logging.handlers.QueueHandler(cls.__queue) + qh.setLevel(logging.DEBUG) + qh.setFormatter( + logging.Formatter( + fmt="%(asctime)s| %(levelname)s: %(message)s", + datefmt="%d/%m/%y %H:%M:%S", + ) + ) + logger.addHandler(qh) + + # start listener + cls.__thread = threading.Thread( + target=cls.__logger_listener, + args=(cls.__queue,), + ) + cls.__thread.start() + + cls.__setup = True + + @classmethod + def queue(cls): + if not cls.__setup: + cls.setup() + + return cls.__queue + + @classmethod + def logger(cls): + if not cls.__setup: + cls.setup() + + return logging.getLogger(cls.__logger_name) + + @classmethod + def close(cls): + if cls.__setup and cls.__thread is not None: + cls.__queue.put(None) + cls.__thread.join() + # cls.__manager.close() + + +class SubLogger: + __queue = None + __setup = False + + @classmethod + def setup(cls, q): + if cls.__setup: + return + + cls.__queue = q + + # setup root + root = logging.getLogger() + root.setLevel(logging.DEBUG) + rh = logging.handlers.QueueHandler(q) + rh.setLevel(logging.DEBUG) + rh.setFormatter( + logging.Formatter( + fmt="%(asctime)s| %(levelname)s: %(message)s", + datefmt="%d/%m/%y %H:%M:%S", + ) + ) + root.addHandler(rh) + + cls.__setup = True + + @classmethod + def logger(cls): + if not cls.__setup: + return None + + return logging.getLogger() diff --git a/quacc/main.py b/quacc/main.py index 4a35dee..8a46e2a 100644 --- a/quacc/main.py +++ b/quacc/main.py @@ -1,12 +1,14 @@ -import logging as log import traceback from sys import platform import quacc.evaluation.comp as comp from quacc.dataset import Dataset from quacc.environment import env +from quacc.logging import Logger from quacc.utils import create_dataser_dir +log = Logger.logger() + def toast(): if platform == "win32": @@ -22,6 +24,7 @@ def estimate_comparison(): env.DATASET_NAME, target=env.DATASET_TARGET, n_prevalences=env.DATASET_N_PREVS, + prevs=env.DATASET_PREVS, ) try: dr = comp.evaluate_comparison(dataset, estimators=env.COMP_ESTIMATORS) @@ -46,14 +49,14 @@ def estimate_comparison(): def main(): - log.basicConfig( - filename="quacc.log", - filemode="a", - format="%(asctime)s| %(levelname)s: %(message)s", - datefmt="%d/%m/%y %H:%M:%S", - ) - estimate_comparison() + try: + estimate_comparison() + except Exception as e: + log.error(f"estimate comparison failed. Exceprion: {e}") + traceback(e) + toast() + Logger.close() if __name__ == "__main__":