result file format check, read, load, and evaluation with pandas

This commit is contained in:
Alejandro Moreo Fernandez 2021-10-22 19:03:15 +02:00
parent 646d21873f
commit 5f15b365fe
3 changed files with 256 additions and 120 deletions

View File

@ -1,6 +1,12 @@
import os.path
from typing import List, Tuple, Union
import pandas as pd
import quapy as qp import quapy as qp
import numpy as np import numpy as np
import sklearn import sklearn
import re
# def load_binary_raw_document(path): # def load_binary_raw_document(path):
@ -40,40 +46,162 @@ def gen_load_samples_T2B(path_dir:str, ground_truth_path:str = None):
class ResultSubmission: class ResultSubmission:
def __init__(self, team_name, run_name, task_name): DEV_LEN = 1000
assert isinstance(team_name, str) and team_name, \ TEST_LEN = 5000
f'invalid value encountered for team_name' ERROR_TOL = 1E-3
assert isinstance(run_name, str) and run_name, \
f'invalid value encountered for run_name' def __init__(self, categories: List[str]):
assert isinstance(task_name, str) and task_name in {'T1A', 'T1B', 'T2A', 'T2B'}, \ if not isinstance(categories, list) or len(categories) < 2:
f'invalid value encountered for task_name; valid values are T1A, T1B, T2A, and T2B' raise TypeError('wrong format for categories; a list with at least two category names (str) was expected')
self.team_name = team_name self.categories = categories
self.run_name = run_name self.df = pd.DataFrame(columns=['filename'] + list(categories))
self.task_name = task_name self.inferred_type = None
self.data = {}
def add(self, sample_name:str, prevalence_values:np.ndarray): def add(self, sample_name:str, prevalence_values:np.ndarray):
# assert the result is a valid sample_name (not repeated) if not isinstance(sample_name, str):
pass raise TypeError(f'error: expected str for sample_sample, found {type(sample_name)}')
if not isinstance(prevalence_values, np.ndarray):
raise TypeError(f'error: expected np.ndarray for prevalence_values, found {type(prevalence_values)}')
if self.inferred_type is None:
if sample_name.startswith('test'):
self.inferred_type = 'test'
elif sample_name.startswith('dev'):
self.inferred_type = 'dev'
else:
if not sample_name.startswith(self.inferred_type):
raise ValueError(f'error: sample "{sample_name}" is not a valid entry for type "{self.inferred_type}"')
if not re.match("(test|dev)_sample_\d+\.txt", sample_name):
raise ValueError(f'error: wrong format "{sample_name}"; right format is (test|dev)_sample_<number>.txt')
if sample_name in self.df.filename.values:
raise ValueError(f'error: prevalence values for "{sample_name}" already added')
if prevalence_values.ndim!=1 and prevalence_values.size != len(self.categories):
raise ValueError(f'error: wrong shape found for prevalence vector {prevalence_values}')
if (prevalence_values<0).any() or (prevalence_values>1).any():
raise ValueError(f'error: prevalence values out of range [0,1] for "{sample_name}"')
if np.abs(prevalence_values.sum()-1) > ResultSubmission.ERROR_TOL:
raise ValueError(f'error: prevalence values do not sum up to one for "{sample_name}"'
f'(error tolerance {ResultSubmission.ERROR_TOL})')
new_entry = dict([('filename',sample_name)]+[(col_i,prev_i) for col_i, prev_i in zip(self.categories, prevalence_values)])
self.df = self.df.append(new_entry, ignore_index=True)
def __len__(self): def __len__(self):
return len(self.data) return len(self.df)
@classmethod @classmethod
def load(cls, path:str)-> 'ResultSubmission': def load(cls, path: str) -> 'ResultSubmission':
pass df, inferred_type = ResultSubmission.check_file_format(path, return_inferred_type=True)
r = ResultSubmission(categories=df.columns.values.tolist())
r.inferred_type = inferred_type
r.df = df
return r
def dump(self, path:str): def dump(self, path:str):
# assert all samples are covered (check for test and dev accordingly) ResultSubmission.check_dataframe_format(self.df)
pass self.df.to_csv(path)
def get(self, sample_name:str): def get(self, sample_name:str):
pass sel = self.df.loc[self.df['filename'] == sample_name]
if sel.empty:
return None
else:
return sel.loc[:,self.df.columns[1]:].values.flatten()
@classmethod
def check_file_format(cls, path, return_inferred_type=False) -> Union[pd.DataFrame, Tuple[pd.DataFrame, str]]:
df = pd.read_csv(path, index_col=0)
return ResultSubmission.check_dataframe_format(df, path=path, return_inferred_type=return_inferred_type)
@classmethod
def check_dataframe_format(cls, df, path=None, return_inferred_type=False) -> Union[pd.DataFrame, Tuple[pd.DataFrame, str]]:
hint_path = '' # if given, show the data path in the error messages
if path is not None:
hint_path = f' in {path}'
if 'filename' not in df.columns or len(df.columns) < 3:
raise ValueError(f'wrong header{hint_path}, the format of the header should be ",filename,<cat_1>,...,<cat_n>"')
if df.empty:
raise ValueError(f'error{hint_path}: results file is empty')
elif len(df) == ResultSubmission.DEV_LEN:
inferred_type = 'dev'
expected_len = ResultSubmission.DEV_LEN
elif len(df) == ResultSubmission.TEST_LEN:
inferred_type = 'test'
expected_len = ResultSubmission.TEST_LEN
else:
raise ValueError(f'wrong number of prevalence values found{hint_path}; '
f'expected {ResultSubmission.DEV_LEN} for development sets and '
f'{ResultSubmission.TEST_LEN} for test sets; found {len(df)}')
set_names = frozenset(df.filename)
for i in range(expected_len):
if f'{inferred_type}_sample_{i}.txt' not in set_names:
raise ValueError(f'{hint_path} a file with {len(df)} entries is assumed to be of type '
f'"{inferred_type}" but entry {inferred_type}_sample_{i}.txt is missing '
f'(among perhaps many others)')
for category_name in df.columns[1:]:
if (df[category_name] < 0).any() or (df[category_name] > 1).any():
raise ValueError(f'{hint_path} column "{category_name}" contains values out of range [0,1]')
prevs = df.loc[:, df.columns[1]:].values
round_errors = np.abs(prevs.sum(axis=-1) - 1.) > ResultSubmission.ERROR_TOL
if round_errors.any():
raise ValueError(f'warning: prevalence values in rows with id {np.where(round_errors)[0].tolist()} '
f'do not sum up to 1 (error tolerance {ResultSubmission.ERROR_TOL}), '
f'probably due to some rounding errors.')
if return_inferred_type:
return df, inferred_type
else:
return df
def sort_categories(self):
self.df = self.df.reindex([self.df.columns[0]] + sorted(self.df.columns[1:]), axis=1)
self.categories = sorted(self.categories)
def evaluate_submission(ground_truth_prevs: ResultSubmission, submission_prevs: ResultSubmission):
pass def evaluate_submission(true_prevs: ResultSubmission, predicted_prevs: ResultSubmission, sample_size=1000, average=True):
if len(true_prevs) != len(predicted_prevs):
raise ValueError(f'size mismatch, groun truth has {len(true_prevs)} entries '
f'while predictions contain {len(predicted_prevs)} entries')
true_prevs.sort_categories()
predicted_prevs.sort_categories()
if true_prevs.categories != predicted_prevs.categories:
raise ValueError(f'these result files are not comparable since the categories are different')
ae, rae = [], []
for sample_name in true_prevs.df.filename.values:
ae.append(qp.error.mae(true_prevs.get(sample_name), predicted_prevs.get(sample_name)))
rae.append(qp.error.mrae(true_prevs.get(sample_name), predicted_prevs.get(sample_name), eps=sample_size))
ae = np.asarray(ae)
rae = np.asarray(rae)
if average:
return ae.mean(), rae.mean()
else:
return ae, rae
# r = ResultSubmission(['negative', 'positive'])
# from tqdm import tqdm
# for i in tqdm(range(1000), total=1000):
# r.add(f'dev_sample_{i}.txt', np.asarray([0.5, 0.5]))
# r.dump('./path.csv')
r = ResultSubmission.load('./data/T1A/public/dummy_submission.csv')
t = ResultSubmission.load('./data/T1A/public/dummy_submission (copy).csv')
# print(r.df)
# print(r.get('dev_sample_10.txt'))
print(evaluate_submission(r, t))
# s = ResultSubmission.load('./data/T1A/public/dummy_submission.csv')
#
# print(s)

View File

@ -31,8 +31,6 @@ dev_prev = pd.read_csv(os.path.join(path_binary_vector, 'public', 'dev_prevalenc
print(dev_prev) print(dev_prev)
scores = {} scores = {}
for quantifier in [CC]: #, ACC, PCC, PACC, EMQ, HDy]: for quantifier in [CC]: #, ACC, PCC, PACC, EMQ, HDy]:

View File

@ -1,3 +1,4 @@
from abc import abstractmethod
from typing import List, Union from typing import List, Union
import numpy as np import numpy as np
@ -8,6 +9,112 @@ from sklearn.model_selection import train_test_split, RepeatedStratifiedKFold
from quapy.functional import artificial_prevalence_sampling, strprev from quapy.functional import artificial_prevalence_sampling, strprev
# class Sampling:
#
# @abstractmethod
# def load(cls, path: str, loader_func: callable, classes=None): ...
#
# @abstractmethod
# @property
# def __len__(self): ...
#
# @abstractmethod
# @property
# def prevalence(self): ...
#
# @abstractmethod
# @property
# def n_classes(self):
#
# @property
# def binary(self):
# return self.n_classes == 2
#
# def uniform_sampling_index(self, size):
# return np.random.choice(len(self), size, replace=False)
#
# def uniform_sampling(self, size):
# unif_index = self.uniform_sampling_index(size)
# return self.sampling_from_index(unif_index)
#
# def sampling(self, size, *prevs, shuffle=True):
# prev_index = self.sampling_index(size, *prevs, shuffle=shuffle)
# return self.sampling_from_index(prev_index)
#
# def sampling_from_index(self, index):
# documents = self.instances[index]
# labels = self.labels[index]
# return LabelledCollection(documents, labels, classes_=self.classes_)
#
# def split_stratified(self, train_prop=0.6, random_state=None):
# # with temp_seed(42):
# tr_docs, te_docs, tr_labels, te_labels = \
# train_test_split(self.instances, self.labels, train_size=train_prop, stratify=self.labels,
# random_state=random_state)
# return LabelledCollection(tr_docs, tr_labels), LabelledCollection(te_docs, te_labels)
#
# def artificial_sampling_generator(self, sample_size, n_prevalences=101, repeats=1):
# dimensions = self.n_classes
# for prevs in artificial_prevalence_sampling(dimensions, n_prevalences, repeats):
# yield self.sampling(sample_size, *prevs)
#
# def artificial_sampling_index_generator(self, sample_size, n_prevalences=101, repeats=1):
# dimensions = self.n_classes
# for prevs in artificial_prevalence_sampling(dimensions, n_prevalences, repeats):
# yield self.sampling_index(sample_size, *prevs)
#
# def natural_sampling_generator(self, sample_size, repeats=100):
# for _ in range(repeats):
# yield self.uniform_sampling(sample_size)
#
# def natural_sampling_index_generator(self, sample_size, repeats=100):
# for _ in range(repeats):
# yield self.uniform_sampling_index(sample_size)
#
# def __add__(self, other):
# if other is None:
# return self
# elif issparse(self.instances) and issparse(other.instances):
# join_instances = vstack([self.instances, other.instances])
# elif isinstance(self.instances, list) and isinstance(other.instances, list):
# join_instances = self.instances + other.instances
# elif isinstance(self.instances, np.ndarray) and isinstance(other.instances, np.ndarray):
# join_instances = np.concatenate([self.instances, other.instances])
# else:
# raise NotImplementedError('unsupported operation for collection types')
# labels = np.concatenate([self.labels, other.labels])
# return LabelledCollection(join_instances, labels)
#
# @property
# def Xy(self):
# return self.instances, self.labels
#
# def stats(self, show=True):
# ninstances = len(self)
# instance_type = type(self.instances[0])
# if instance_type == list:
# nfeats = len(self.instances[0])
# elif instance_type == np.ndarray or issparse(self.instances):
# nfeats = self.instances.shape[1]
# else:
# nfeats = '?'
# stats_ = {'instances': ninstances,
# 'type': instance_type,
# 'features': nfeats,
# 'classes': self.classes_,
# 'prevs': strprev(self.prevalence())}
# if show:
# print(f'#instances={stats_["instances"]}, type={stats_["type"]}, #features={stats_["features"]}, '
# f'#classes={stats_["classes"]}, prevs={stats_["prevs"]}')
# return stats_
#
# def kFCV(self, nfolds=5, nrepeats=1, random_state=0):
# kf = RepeatedStratifiedKFold(n_splits=nfolds, n_repeats=nrepeats, random_state=random_state)
# for train_index, test_index in kf.split(*self.Xy):
# train = self.sampling_from_index(train_index)
# test = self.sampling_from_index(test_index)
# yield train, test
class LabelledCollection: class LabelledCollection:
''' '''
A LabelledCollection is a set of objects each with a label associated to it. A LabelledCollection is a set of objects each with a label associated to it.
@ -176,104 +283,7 @@ class LabelledCollection:
yield train, test yield train, test
class MultilingualLabelledCollection:
def __init__(self, langs:List[str], labelledCollections:List[LabelledCollection]):
assert len(langs) == len(labelledCollections), 'length mismatch for langs and labelledCollection lists'
assert all(isinstance(lc, LabelledCollection) for lc in labelledCollections), 'unexpected type for labelledCollections'
assert all(labelledCollections[0].classes_ == lc_i.classes_ for lc_i in labelledCollections[1:]), \
'inconsistent classes found for some labelled collections'
self.llc = {l: lc for l, lc in zip(langs, labelledCollections)}
self.classes_=labelledCollections[0].classes_
@classmethod
def fromLangDict(cls, lang_labelledCollection:dict):
return MultilingualLabelledCollection(*list(zip(*list(lang_labelledCollection.items()))))
def langs(self):
return list(sorted(self.llc.keys()))
def __getitem__(self, lang)->LabelledCollection:
return self.llc[lang]
@classmethod
def load(cls, path: str, loader_func: callable):
return MultilingualLabelledCollection(*loader_func(path))
def __len__(self):
return sum(map(len, self.llc.values()))
def prevalence(self):
prev = np.asarray([lc.prevalence() * len(lc) for lc in self.llc.values()]).sum(axis=0)
return prev / prev.sum()
def language_prevalence(self):
lang_count = np.asarray([len(self.llc[l]) for l in self.langs()])
return lang_count / lang_count.sum()
def counts(self):
return np.asarray([lc.counts() for lc in self.llc.values()]).sum(axis=0)
@property
def n_classes(self):
return len(self.classes_)
@property
def binary(self):
return self.n_classes == 2
def __check_langs(self, l_dict:dict):
assert len(l_dict)==len(self.langs()), 'wrong number of languages'
assert all(l in l_dict for l in self.langs()), 'missing languages in l_sizes'
def __check_sizes(self, l_sizes: Union[int,dict]):
assert isinstance(l_sizes, int) or isinstance(l_sizes, dict), 'unexpected type for l_sizes'
if isinstance(l_sizes, int):
return {l:l_sizes for l in self.langs()}
self.__check_langs(l_sizes)
return l_sizes
def sampling_index(self, l_sizes: Union[int,dict], *prevs, shuffle=True):
l_sizes = self.__check_sizes(l_sizes)
return {l:lc.sampling_index(l_sizes[l], *prevs, shuffle=shuffle) for l,lc in self.llc.items()}
def uniform_sampling_index(self, l_sizes: Union[int, dict]):
l_sizes = self.__check_sizes(l_sizes)
return {l: lc.uniform_sampling_index(l_sizes[l]) for l,lc in self.llc.items()}
def uniform_sampling(self, l_sizes: Union[int, dict]):
l_sizes = self.__check_sizes(l_sizes)
return MultilingualLabelledCollection.fromLangDict(
{l: lc.uniform_sampling(l_sizes[l]) for l,lc in self.llc.items()}
)
def sampling(self, l_sizes: Union[int, dict], *prevs, shuffle=True):
l_sizes = self.__check_sizes(l_sizes)
return MultilingualLabelledCollection.fromLangDict(
{l: lc.sampling(l_sizes[l], *prevs, shuffle=shuffle) for l,lc in self.llc.items()}
)
def sampling_from_index(self, l_index:dict):
self.__check_langs(l_index)
return MultilingualLabelledCollection.fromLangDict(
{l: lc.sampling_from_index(l_index[l]) for l,lc in self.llc.items()}
)
def split_stratified(self, train_prop=0.6, random_state=None):
train, test = list(zip(*[self[l].split_stratified(train_prop, random_state) for l in self.langs()]))
return MultilingualLabelledCollection(self.langs(), train), MultilingualLabelledCollection(self.langs(), test)
def asLabelledCollection(self, return_langs=False):
lXy_list = [([l]*len(lc),*lc.Xy) for l, lc in self.llc.items()] # a list with (lang_i, Xi, yi)
ls,Xs,ys = list(zip(*lXy_list))
ls = np.concatenate(ls)
vertstack = vstack if issparse(Xs[0]) else np.vstack
Xs = vertstack(Xs)
ys = np.concatenate(ys)
lc = LabelledCollection(Xs, ys, classes_=self.classes_)
# return lc, ls if return_langs else lc
#
#
#
class Dataset: class Dataset:
def __init__(self, training: LabelledCollection, test: LabelledCollection, vocabulary: dict = None, name=''): def __init__(self, training: LabelledCollection, test: LabelledCollection, vocabulary: dict = None, name=''):